Guest User

Untitled

a guest
Feb 22nd, 2022
42
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 11.84 KB | None | 0 0
  1. diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc
  2. index 81692849..e14bfd28 100644
  3. --- a/src/hydra-queue-runner/build-remote.cc
  4. +++ b/src/hydra-queue-runner/build-remote.cc
  5. @@ -1,19 +1,14 @@
  6.  #include <algorithm>
  7.  #include <cmath>
  8.  
  9. -#include <sys/types.h>
  10. -#include <sys/stat.h>
  11. -#include <fcntl.h>
  12. -
  13. -#include "serve-protocol.hh"
  14.  #include "state.hh"
  15.  #include "util.hh"
  16. -#include "worker-protocol.hh"
  17.  #include "finally.hh"
  18.  
  19.  using namespace nix;
  20.  
  21.  
  22. +#if 0
  23.  struct Child
  24.  {
  25.      Pid pid;
  26. @@ -21,12 +16,6 @@ struct Child
  27.  };
  28.  
  29.  
  30. -static void append(Strings & dst, const Strings & src)
  31. -{
  32. -    dst.insert(dst.end(), src.begin(), src.end());
  33. -}
  34. -
  35. -
  36.  static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Child & child)
  37.  {
  38.      string pgmName;
  39. @@ -121,6 +110,7 @@ static void copyClosureTo(std::timed_mutex & sendMutex, ref<Store> destStore,
  40.      if (readInt(from) != 1)
  41.          throw Error("remote machine failed to import closure");
  42.  }
  43. +#endif
  44.  
  45.  
  46.  void State::buildRemote(ref<Store> destStore,
  47. @@ -137,23 +127,36 @@ void State::buildRemote(ref<Store> destStore,
  48.  
  49.      createDirs(dirOf(result.logFile));
  50.  
  51. +    #if 0
  52.      AutoCloseFD logFD = open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666);
  53.      if (!logFD) throw SysError(format("creating log file ‘%1%’") % result.logFile);
  54. -
  55. -    nix::Path tmpDir = createTempDir();
  56. -    AutoDelete tmpDirDel(tmpDir, true);
  57. +    #endif
  58.  
  59.      try {
  60.  
  61.          updateStep(ssConnecting);
  62.  
  63. -        Child child;
  64. -        openConnection(machine, tmpDir, logFD.get(), child);
  65. +        Store::Params storeParams;
  66. +        if (hasPrefix(machine->sshName, "ssh://")) {
  67. +            storeParams["max-connections"] = "1";
  68. +            if (machine->sshKey != "")
  69. +                storeParams["ssh-key"] = machine->sshKey;
  70. +            // FIXME: set machine->sshPublicHostKey
  71. +        }
  72. +
  73. +        auto builderStore = openStore(machine->sshName, storeParams);
  74. +
  75. +        try {
  76. +            builderStore->connect();
  77. +        } catch (std::exception & e) {
  78. +            throw Error("cannot connect to ‘%s’: %s", machine->sshName, e.what());
  79. +        }
  80.  
  81.          {
  82.              auto activeStepState(activeStep->state_.lock());
  83.              if (activeStepState->cancelled) throw Error("step cancelled");
  84. -            activeStepState->pid = child.pid;
  85. +            // FIXME: cancellation
  86. +            //activeStepState->pid = child.pid;
  87.          }
  88.  
  89.          Finally clearPid([&]() {
  90. @@ -168,48 +171,20 @@ void State::buildRemote(ref<Store> destStore,
  91.                 process. Meh. */
  92.          });
  93.  
  94. -        FdSource from(child.from.get());
  95. -        FdSink to(child.to.get());
  96. -
  97. +        #if 0
  98.          Finally updateStats([&]() {
  99.              bytesReceived += from.read;
  100.              bytesSent += to.written;
  101.          });
  102. -
  103. -        /* Handshake. */
  104. -        bool sendDerivation = true;
  105. -        unsigned int remoteVersion;
  106. -
  107. -        try {
  108. -            to << SERVE_MAGIC_1 << 0x203;
  109. -            to.flush();
  110. -
  111. -            unsigned int magic = readInt(from);
  112. -            if (magic != SERVE_MAGIC_2)
  113. -                throw Error(format("protocol mismatch with ‘nix-store --serve’ on ‘%1%’") % machine->sshName);
  114. -            remoteVersion = readInt(from);
  115. -            if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200)
  116. -                throw Error(format("unsupported ‘nix-store --serve’ protocol version on ‘%1%’") % machine->sshName);
  117. -            // Always send the derivation to localhost, since it's a
  118. -            // no-op anyway but we might not be privileged to use
  119. -            // cmdBuildDerivation (e.g. if we're running in a NixOS
  120. -            // container).
  121. -            if (GET_PROTOCOL_MINOR(remoteVersion) >= 1 && !machine->isLocalhost())
  122. -                sendDerivation = false;
  123. -            if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0)
  124. -                throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName);
  125. -
  126. -        } catch (EndOfFile & e) {
  127. -            child.pid.wait();
  128. -            string s = chomp(readFile(result.logFile));
  129. -            throw Error(format("cannot connect to ‘%1%’: %2%") % machine->sshName % s);
  130. -        }
  131. +        #endif
  132.  
  133.          {
  134.              auto info(machine->state->connectInfo.lock());
  135.              info->consecutiveFailures = 0;
  136.          }
  137.  
  138. +        bool sendDerivation = false;
  139. +
  140.          /* Gather the inputs. If the remote side is Nix <= 1.9, we have to
  141.             copy the entire closure of ‘drvPath’, as well as the required
  142.             outputs of the input derivations. On Nix > 1.9, we only need to
  143. @@ -236,10 +211,11 @@ void State::buildRemote(ref<Store> destStore,
  144.              }
  145.          }
  146.  
  147. -        /* Ensure that the inputs exist in the destination store. This is
  148. -           a no-op for regular stores, but for the binary cache store,
  149. -           this will copy the inputs to the binary cache from the local
  150. -           store. */
  151. +        /* Ensure that the inputs exist in the destination store (so
  152. +           that the builder can substitute them from the destination
  153. +           store). This is a no-op for regular stores, but for the
  154. +           binary cache store, this will copy the inputs to the binary
  155. +           cache from the local store. */
  156.          if (localStore != std::shared_ptr<Store>(destStore))
  157.              copyClosure(ref<Store>(localStore), destStore, step->drv.inputSrcs, NoRepair, NoCheckSigs);
  158.  
  159. @@ -252,7 +228,7 @@ void State::buildRemote(ref<Store> destStore,
  160.  
  161.              auto now1 = std::chrono::steady_clock::now();
  162.  
  163. -            copyClosureTo(machine->state->sendLock, destStore, from, to, inputs, true);
  164. +            copyPaths(destStore, builderStore, inputs, NoRepair, NoCheckSigs, Substitute);
  165.  
  166.              auto now2 = std::chrono::steady_clock::now();
  167.  
  168. @@ -261,6 +237,7 @@ void State::buildRemote(ref<Store> destStore,
  169.  
  170.          autoDelete.cancel();
  171.  
  172. +        #if 0
  173.          /* Truncate the log to get rid of messages about substitutions
  174.             etc. on the remote system. */
  175.          if (lseek(logFD.get(), SEEK_SET, 0) != 0)
  176. @@ -270,65 +247,30 @@ void State::buildRemote(ref<Store> destStore,
  177.              throw SysError("truncating log file ‘%s’", result.logFile);
  178.  
  179.          logFD = -1;
  180. +        #endif
  181.  
  182.          /* Do the build. */
  183. -        printMsg(lvlDebug, format("building ‘%1%’ on ‘%2%’") % step->drvPath % machine->sshName);
  184. +        printMsg(lvlDebug, "building ‘%s’ on ‘%s’", step->drvPath, machine->sshName);
  185.  
  186.          updateStep(ssBuilding);
  187.  
  188. -        if (sendDerivation)
  189. -            to << cmdBuildPaths << PathSet({step->drvPath});
  190. -        else
  191. -            to << cmdBuildDerivation << step->drvPath << basicDrv;
  192. -        to << maxSilentTime << buildTimeout;
  193. -        if (GET_PROTOCOL_MINOR(remoteVersion) >= 2)
  194. -            to << maxLogSize;
  195. -        if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) {
  196. -            to << repeats // == build-repeat
  197. -               << step->isDeterministic; // == enforce-determinism
  198. -        }
  199. -        to.flush();
  200. +        // FIXME: send timeout, maxLogSize, repeats, isDeterministic
  201.  
  202. -        result.startTime = time(0);
  203. -        int res;
  204.          {
  205.              MaintainCount<counter> mc(nrStepsBuilding);
  206. -            res = readInt(from);
  207. -        }
  208. -        result.stopTime = time(0);
  209. -
  210. -        if (sendDerivation) {
  211. -            if (res) {
  212. -                result.errorMsg = (format("%1% on ‘%2%’") % readString(from) % machine->sshName).str();
  213. -                if (res == 100) {
  214. -                    result.stepStatus = bsFailed;
  215. -                    result.canCache = true;
  216. -                }
  217. -                else if (res == 101) {
  218. -                    result.stepStatus = bsTimedOut;
  219. -                }
  220. -                else {
  221. -                    result.stepStatus = bsAborted;
  222. -                    result.canRetry = true;
  223. -                }
  224. -                return;
  225. +            result.startTime = time(0);
  226. +            auto buildResult = builderStore->buildDerivation(step->drvPath, basicDrv);
  227. +            result.stopTime = time(0);
  228. +            result.errorMsg = buildResult.errorMsg;
  229. +            result.timesBuilt = buildResult.timesBuilt;
  230. +            result.isNonDeterministic = buildResult.isNonDeterministic;
  231. +            if (buildResult.startTime && buildResult.stopTime) {
  232. +                /* Note: this represents the duration of a single
  233. +                   round, rather than all rounds. */
  234. +                result.startTime = buildResult.startTime;
  235. +                result.stopTime = buildResult.stopTime;;
  236.              }
  237. -            result.stepStatus = bsSuccess;
  238. -        } else {
  239. -            result.errorMsg = readString(from);
  240. -            if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) {
  241. -                result.timesBuilt = readInt(from);
  242. -                result.isNonDeterministic = readInt(from);
  243. -                auto start = readInt(from);
  244. -                auto stop = readInt(from);
  245. -                if (start && start) {
  246. -                    /* Note: this represents the duration of a single
  247. -                       round, rather than all rounds. */
  248. -                    result.startTime = start;
  249. -                    result.stopTime = stop;
  250. -                }
  251. -            }
  252. -            switch ((BuildResult::Status) res) {
  253. +            switch ((BuildResult::Status) buildResult.status) {
  254.                  case BuildResult::Built:
  255.                      result.stepStatus = bsSuccess;
  256.                      break;
  257. @@ -375,8 +317,6 @@ void State::buildRemote(ref<Store> destStore,
  258.              if (result.stepStatus != bsSuccess) return;
  259.          }
  260.  
  261. -        result.errorMsg = "";
  262. -
  263.          /* If the path was substituted or already valid, then we didn't
  264.             get a build log. */
  265.          if (result.isCached) {
  266. @@ -400,16 +340,9 @@ void State::buildRemote(ref<Store> destStore,
  267.                  outputs.insert(output.second.path);
  268.  
  269.              /* Query the size of the output paths. */
  270. -            size_t totalNarSize = 0;
  271. -            to << cmdQueryPathInfos << outputs;
  272. -            to.flush();
  273. -            while (true) {
  274. -                if (readString(from) == "") break;
  275. -                readString(from); // deriver
  276. -                readStrings<PathSet>(from); // references
  277. -                readLongLong(from); // download size
  278. -                totalNarSize += readLongLong(from);
  279. -            }
  280. +            unsigned long long totalNarSize = 0;
  281. +            for (auto & output : outputs)
  282. +                totalNarSize += builderStore->queryPathInfo(output)->narSize;
  283.  
  284.              if (totalNarSize > maxOutputSize) {
  285.                  result.stepStatus = bsNarSizeLimitExceeded;
  286. @@ -434,9 +367,8 @@ void State::buildRemote(ref<Store> destStore,
  287.                  printMsg(lvlError, format("warning: had to wait %d ms for %d memory tokens for %s")
  288.                      % resMs % totalNarSize % step->drvPath);
  289.  
  290. -            to << cmdExportPaths << 0 << outputs;
  291. -            to.flush();
  292. -            destStore->importPaths(from, result.accessor, NoCheckSigs);
  293. +            // FIXME: use result.accessor
  294. +            copyPaths(builderStore, destStore, outputs, NoRepair, NoCheckSigs);
  295.  
  296.              /* Release the tokens pertaining to NAR
  297.                 compression. After this we only have the uncompressed
  298. @@ -448,10 +380,6 @@ void State::buildRemote(ref<Store> destStore,
  299.              result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
  300.          }
  301.  
  302. -        /* Shut down the connection. */
  303. -        child.to = -1;
  304. -        child.pid.wait();
  305. -
  306.      } catch (Error & e) {
  307.          /* Disable this machine until a certain period of time has
  308.             passed. This period increases on every consecutive
  309.  
Advertisement
Add Comment
Please, Sign In to add comment