binary_distribution: parallelize fetches in buildcache update-index#32796
Conversation
|
I just used this PR to update the index on |
That's still pretty disappointing. Unless it was taking 21 hours previously. But I guess it finished and produced the index? |
It did finish OK, yes. First time I've had an up-to-date index for |
|
Yes, the default concurrency is 32, so it should be attempting to do 32 at a time. But you're saying when you ran with |
It did appear that way, yes. I had debug turned on, and so it printed out |
|
Yeah, hard to know from that. I guess better would be to know how long it took when run with |
aa787f7 to
ef687b1
Compare
|
@spackbot help |
|
You can interact with me in many ways!
I'll also help to label your pull request and assign reviewers! |
|
@tgamblin You asked what kind of performance gain we could get using Some things to be aware of if we want to use
So if we want to install the This PR currently tries to use |
alalazo
left a comment
There was a problem hiding this comment.
We can possibly improve the use of multiprocessing.pool. I also have a question on ThreadPool vs. Pool and a few suggestions to improve structure and readability. Let me know what you think.
| fetched_specs = [] | ||
|
|
||
| def _fetch_spec_from_mirror(spec_url): | ||
| spec_file_contents = None |
There was a problem hiding this comment.
The variable is reinitialized below
| spec_file_contents = None |
| except (URLError, web_util.SpackWebError) as url_err: | ||
| tty.error("Error reading specfile: {0}".format(file_path)) | ||
| tty.error(url_err) | ||
| def _read_specs_and_push_index(file_list, read_method, cache_prefix, db, db_root_dir, concurrency): |
There was a problem hiding this comment.
This might benefit from a docstring. It's really not clear how read_method should look like, and we have to trace it back to the implementation at the calling site.
| if cache_prefix.startswith("s3"): | ||
| aws = which("aws") | ||
|
|
||
| if aws: | ||
| tmpspecsdir = tempfile.mkdtemp() | ||
| sync_command_args = [ | ||
| "s3", | ||
| "sync", | ||
| "--exclude", | ||
| "*", | ||
| "--include", | ||
| "*.spec.json.sig", | ||
| "--include", | ||
| "*.spec.json", | ||
| "--include", | ||
| "*.spec.yaml", | ||
| cache_prefix, | ||
| tmpspecsdir, | ||
| ] | ||
|
|
||
| try: | ||
| tty.msg( | ||
| "Using aws s3 sync to download specs from {0} to {1}".format( | ||
| cache_prefix, tmpspecsdir | ||
| ) | ||
| ) | ||
| aws(*sync_command_args, output=os.devnull, error=os.devnull) | ||
| file_list = fsys.find(tmpspecsdir, ["*.spec.json.sig", "*.spec.json", "*.spec.yaml"]) | ||
| read_method = file_read_method | ||
| except Exception: | ||
| tty.msg("Failed to use aws s3 sync to retrieve specs, falling back to parallel fetch") | ||
| shutil.rmtree(tmpspecsdir) | ||
| tmpspecsdir = None | ||
|
|
||
| if not file_list: | ||
| read_method = url_read_method | ||
| try: | ||
| file_list = ( | ||
| url_util.join(cache_prefix, entry) | ||
| for entry in web_util.list_url(cache_prefix) | ||
| if entry.endswith(".yaml") | ||
| or entry.endswith("spec.json") | ||
| or entry.endswith("spec.json.sig") | ||
| ) | ||
| except KeyError as inst: | ||
| msg = "No packages at {0}: {1}".format(cache_prefix, inst) | ||
| tty.warn(msg) | ||
| return | ||
| except Exception as err: | ||
| # If we got some kind of S3 (access denied or other connection | ||
| # error), the first non boto-specific class in the exception | ||
| # hierarchy is Exception. Just print a warning and return | ||
| msg = "Encountered problem listing packages at {0}: {1}".format(cache_prefix, err) | ||
| tty.warn(msg) | ||
| return |
There was a problem hiding this comment.
Can we extract different functions here, and have code like:
file_list, read_fn = spec_files_from_cache(cache_prefix)That would make clear that the important result of the first 50 lines of computation is the list of spec.json.sig files and the "read function". Also, it would be good to have a single function that retrieves the list using aws cli, and another separate function implementing the fallback.
In this way spec_files_from_cache would just be responsible to decide when to call the other two functions.
|
|
||
| tp = multiprocessing.pool.ThreadPool(processes=concurrency) | ||
| try: | ||
| tp.map(llnl.util.lang.star(_fetch_spec_from_mirror), [(f,) for f in file_list]) |
There was a problem hiding this comment.
I think this call to map has either a race condition or is serialized by the GIL:
_fetch_spec_from_mirrorreturnsNone- Therefore the result (we discard) from
tp.mapis a list ofNones - The computations we do next are based on a global object (
fetched_specs) that is bound to_fetch_spec_from_mirroras a closure
It's probably better to have _fetch_spec_from_mirror return a spec and turn this line into:
| tp.map(llnl.util.lang.star(_fetch_spec_from_mirror), [(f,) for f in file_list]) | |
| fetched_specs = tp.map(llnl.util.lang.star(_fetch_spec_from_mirror), [(f,) for f in file_list]) |
There was a problem hiding this comment.
@scottwittenburg ☝️ This is the most important comment
There was a problem hiding this comment.
Yes, thanks, I'm attempting to address this now. 👍
| if spec_url.endswith(".json.sig"): | ||
| specfile_json = Spec.extract_json_from_clearsig(spec_file_contents) | ||
| s = Spec.from_dict(specfile_json) | ||
| elif spec_url.endswith(".json"): | ||
| s = Spec.from_json(spec_file_contents) | ||
| elif spec_url.endswith(".yaml"): | ||
| s = Spec.from_yaml(spec_file_contents) |
There was a problem hiding this comment.
If we use this with a process pool, it's probably better to return the single result, instead of relying on side effects over a global variable:
| if spec_url.endswith(".json.sig"): | |
| specfile_json = Spec.extract_json_from_clearsig(spec_file_contents) | |
| s = Spec.from_dict(specfile_json) | |
| elif spec_url.endswith(".json"): | |
| s = Spec.from_json(spec_file_contents) | |
| elif spec_url.endswith(".yaml"): | |
| s = Spec.from_yaml(spec_file_contents) | |
| if spec_url.endswith(".json.sig"): | |
| specfile_json = Spec.extract_json_from_clearsig(spec_file_contents) | |
| return Spec.from_dict(specfile_json) | |
| if spec_url.endswith(".json"): | |
| return Spec.from_json(spec_file_contents) | |
| if spec_url.endswith(".yaml"): | |
| return Spec.from_yaml(spec_file_contents) |
| if tmpspecsdir: | ||
| shutil.rmtree(tmpspecsdir) |
There was a problem hiding this comment.
This is needed only if we use aws. It would be better if we can scope it with that "reading strategy", not in a higher level function.
9ffb41d to
6c105cd
Compare
|
Thanks for your review @alalazo, I like this better after your suggested changes. I think indeed the GIL was serializing access to the closure variable, due to using I think I addressed all your suggestions, but let me know what you think when you get a chance. |
alalazo
left a comment
There was a problem hiding this comment.
A few minor comments Once those are in, LGTM.
| db.add(s, None) | ||
| db.mark(s, "in_buildcache", True) | ||
| def _fetch_spec_from_mirror(spec_url): | ||
| tty.debug("reading {0}".format(spec_url)) |
There was a problem hiding this comment.
Let's remove the debug print from the worker function.
| tty.debug("reading {0}".format(spec_url)) |
| """ | ||
| Use aws cli to sync all the specs into a local temp directory, returning | ||
| the list of local file paths and a function that can read each one from | ||
| the file system. | ||
| """ |
There was a problem hiding this comment.
Can we use Google style for docstrings? That would be:
| """ | |
| Use aws cli to sync all the specs into a local temp directory, returning | |
| the list of local file paths and a function that can read each one from | |
| the file system. | |
| """ | |
| """Use aws cli to sync all the specs into a local temporary directory. | |
| Args: | |
| cache_prefix (str): prefix of the build cache on s3 | |
| Return: | |
| List of the local file paths and a function that can read each one from the file system. | |
| """ |
| cache_prefix. This page contains a link for each binary package (.yaml or | ||
| .json) under cache_prefix. | ||
| try: | ||
| tty.msg( |
There was a problem hiding this comment.
Should we demote this and the next message to debug level?
| aws = None | ||
| if cache_prefix.startswith("s3"): | ||
| aws = which("aws") | ||
|
|
||
| read_fn = None | ||
| file_list = None | ||
|
|
||
| if aws: | ||
| file_list, read_fn = _specs_from_cache_aws_cli(cache_prefix) | ||
|
|
||
| if not read_fn or not file_list: | ||
| file_list, read_fn = _specs_from_cache_fallback(cache_prefix) | ||
|
|
||
| return file_list, read_fn |
There was a problem hiding this comment.
I think here we might do even better, and have code like:
callbacks = []
if cache_prefix.startswith("s3"):
callbacks.append(_specs_from_cache_aws_cli)
callbacks.append(_specs_from_cache_fallback)
for specs_from_cache_fn in callbacks:
file_list, read_fn = specs_from_cache_fn(cache_prefix)
if file_list:
return file_list, read_fn
raise ....6c105cd to
db80ecb
Compare
This change uses the aws cli, if available, to retrieve spec files from the mirror to a local temp directory, then parallelizes the reading of those files from disk using multiprocessing.ThreadPool. If the aws cli is not available, then a ThreadPool is used to fetch and read the spec files from the mirror. Using aws cli results in ~16 times speed up to recreate the binary mirror index, while just parallelizing the fetching and reading results in ~3 speed up.
db80ecb to
a548636
Compare
|
I tried the fallback method locally, didn't try |
This change uses the aws cli, if available, to retrieve spec files from the mirror to a local temp directory, then parallelizes the reading of those files from disk using multiprocessing.ThreadPool. If the aws cli is not available, then a ThreadPool is used to fetch and read the spec files from the mirror. Using aws cli results in ~16 times speed up to recreate the binary mirror index, while just parallelizing the fetching and reading results in ~3 speed up.
This change uses the aws cli, if available, to retrieve spec files from the mirror to a local temp directory, then parallelizes the reading of those files from disk using multiprocessing.ThreadPool. If the aws cli is not available, then a ThreadPool is used to fetch and read the spec files from the mirror. Using aws cli results in ~16 times speed up to recreate the binary mirror index, while just parallelizing the fetching and reading results in ~3 speed up.
Parallelize fetching of spec files when updating a buildcache index to reduce time spent on it. Additionally, if the mirror is on
s3and theawscli program is available, use that to fetch the spec files, and still use multiprocessing to read them. Roughly, using the parallelized fetching approach results in about a 3x speedup, while using theaws s3 syncapproach results in about a 16x speedup.Below are some results comparing
developto the parallel spec fetching as well as to usingaws s3 sync. This was on my development machine, a dual 8-core linux box running Ubuntu 18.04.On a mirror with 973 specs
If
aws s3 syncis available:With just parallelized fetching:
With spack
develop:On a mirror with 7209 specs
If
aws s3 syncis available:With just parallelized fetching:
With spack
develop: