-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Architecture: Use multiple cores to run link archiving in parallel #91
Comments
Inspired by https://github.com/aurelg/linkbak I've had this on my mind for a while since it's super easy to implement, but @aurelg inspired me to actually make an issue for it. |
The relevant code is in this file:
This is actually pretty easy. In your case, the only difficulty might be to handle the screen output/progression bar properly: if different workers are updating the screen at the same time, it may quickly become a bit messy. |
I think I can fix the parallel process stdout multiplexing problem with one of two solutions:
some simplified pseudocode: import fcntl
def archive_links(link):
link, stdout = run_archive_methods(link)
with open(f'data/logs/archive.log', 'a') as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.write(stdout)
fcntl.flock(f, fcntl.LOCK_UN)
if parallel > 1:
pool = subprocess.Pool(count=parallel)
pool_status = pool.map_async(bar, links)
last_pos = 0
while not pool_status.ready():
with open(f'data/logs/archive.log', 'r') as f:
f.seek(last_pos)
print(f.read())
f.seek(0, os.SEEK_END)
last_pos = f.tell()
sys.stdout.flush()
mr.wait(0.2)
else:
for link in links:
link, stdout = run_archive_methods(link)
print(stdout) |
parallel downloading is tricky - you need to be nice to remote hosts and not hit them too much. there's a per-host limit specified in RFCs which most web browsers override. most clients do 7 or 10 requests per domain, IIRC. but since you're crawling different URLs, you have a more complicated problem to deal with and can fetch more than 7-10 simultaneous queries globally... you need to be careful because it requires coordination among the workers as well, or have a director that does the right thing. in my feed reader, I believe I just throw about that number of threads (with I came here because I was looking at a bug report about running multiple |
Most site mirroring apps incorporate download by proxy it is a very common feature which might be implemented into archivebox hence a large |
It's not an issue of overloading archivebox, it's an issue of overloading / hitting rate-limits on the content servers, which piping through a proxy wont solve. |
Even more cores than 8 might make sense, because often things are blocked on IO with no throughput, e.g. pages that would timeout. Might need some careful scheduling, but would be very cool to have! |
A quick update for everyone watching this, v0.5.0 is going to be released soon with improvements to how ArchiveResults are stored (we moved them into the SqliteDB). This was a necessary blocker to fix before we can get around to parallel archiving in the next version. v0.5.0 will be faster, but it wont have built-in concurrent archiving support yet, that will be the primary focus for v0.6.0. The plan is to add a background task queue handler like dramatiq or more likely huey (because it has sqlite3 support so we don't need to run redis). Once we have the background task worker system in place, we can implement a worker pool for Chrome/playwright and each of the other extractor methods. Then archiving can run in parallel by default, archiving like 5-10 sites at a time depending on the system resources available and how well the worker pool system performs for each extractor type. Huey and dramatic both have built-in rate limiting systems that will allow us to cap the number of concurrent requests going to each site or being handled by each extractor. It's still quite a bit of work left, but we're getting closer! Having a background task system will also enable us to do many other cool things, like building the scheduled import system into the UI #578, using a single shared chrome process instead of relaunching chrome for each link, and many other small improvements to performance. |
With v0.6 released now we've taken another step towards the goal of using a message-passing architecture to fully support parallel archiving. v0.6 moves that last bit of ArchiveResult state into the SQLite3 db where it can be managed with migrations and kept ACID compliant. The next step of the process is to implement a worker queue for DB writes, and have all writes made to Snapshot/ArchiveResult models processed in a single thread, opening up other threads to be able to do things in parallel without locking the db anymore. Message passing is a big change though, so expect it to come in increments, with about 3~6 months of work to go depending on how much free time I have for ArchiveBox. Side note: the UX of v0.6 is >10x faster in many other ways though (web UI, indexing, management tasks, etc.), only archiving itself remains to be sped up now. You can also still attempt to run |
Sorry quick question, so I run archivebox in a docker container and currently would allocating it more than one CPU core or thread have any performance gains? |
@1105420698, allocating more than 1 cpu is definitely still advised, as django will use all available cores to handle incoming requests in parallel, and a few of the extractors already take advantage of multiple cores to render pages faster (e.g. chrome). ArchiveBox is already fairly multicore-capable (e.g. you can run multiple |
Ok I'm pretty set on using Huey at this point for the job scheduler, it can use SQLite, it comes with a great django admin dashboard, and it supports nested tasks and mutexes. Here's the approach I'm thinking of to massage all critical operations into a message-passing / queue / worker arrangement in rough pseudocode:
# global mutexes that workers will use to limit number of concurrent operations at a time, when they use a resource that is constrained (e.g. disk IO, network, CPU, etc.)
GLOBAL_FS_ATOMIC_MUTEX = Pool('fs:atomic', timeout=120, max_concurrent=3) # increase these on fast SSDs, decrease it on slow spinny drives
GLOBAL_FS_CHOWN_MUTEX = Pool('fs:chown', timeout=120, max_concurrent=3)
GLOBAL_DB_WRITE_MUTEX = Pool('db:write', timeout=120, max_concurrent=1) # raise this if your db can handle concurrent writes at all (i.e. not sqlite)
GLOBAL_DB_TABLE_SNAPSHOT_MUTEX = Pool('db_snapshot:write', timeout=360, max_concurrent=1) # raise these if your db can handle concurrent writes to the same table
GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX = Pool('db_archiveresult:write', timeout=360, max_concurrent=1)
DB_ROW_SNAPSHOT_MUTEX = lambda url: Pool(f'db_snapshot:write:{url}', timeout=360, max_concurrent=1) # raise these if your db can handle concurrent writes to the same row (probably a bad idea)
DB_ROW_ARCHIVERESULT_MUTEX = lambda url: Pool(f'db_archiveresult:write:{url}', timeout=360, max_concurrent=1)
GLOBAL_SNAPSHOT_PULL_MUTEX = Pool('snapshot:pull', timeout=800000, max_concurrent=4) # raise this if you want to snapshot more URLs in parallel
GLOBAL_EXTRACTOR_MUTEX = Pool('extractor:run', timeout=234234234, max_concurrent=12) # only allow 12 extractors to run at a time globally
GLOBAL_PER_DEPENDENCY_MUTEX = lambda dependency: Pool('extractor:run_dependency:{dependency}', timeout=234234234, max_concurrent=4) # only allow 4 of each type of extractor dependency to run at a time
PER_DOMAIN_RATELIMIT_MUTEX = lambda domain: Pool('domain:pull:{url}', timeout=3600, max_concurrent=3, rate_limit=sliding_window(...)) # raise this if the domain you're archiving can handle lots of concurrent requests
GLOBAL_SHELL_CMD_MUTEX = Pool('system:shell_cmd', timeout=234234234, max_concurrent=8) # maximum number of shell cmds/external binaries to execute at once
await CLI.AchiveboxAdd(urls_text, depth=1) {
await parsers.Parse(urls_text, depth) { ... }
await models.Snapshot.bulk_update_snapshots(snapshots) {
await parallel([
models.Snapshot.update_or_create(snapshot1, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX(snapshot1.url)]),
models.Snapshot.update_or_create(snapshot2, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot2.url)),
models.Snapshot.update_or_create(snapshot3, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot3.url)),
])
}
await index.update_main_index(snapshots) {
await parallel(max=4, [
models.Snapshot.write(snapshot1) {
await await models.Snapshot.update_or_create(snapshot, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX])
await models.Snapshot.update_filesystem(snapshot1) {
await parallel([
system.atomic_write('index.json', acquire=[GLOBAL_FS_ATOMIC_MUTEX]),
system.atomic_write('index.html', acquire=[GLOBAL_FS_ATOMIC_MUTEX]),
])
}
},
models.Snapshot.write(snapshot2) {
...
},
...
])
}
await extractors.save_snapshots(snapshots) {
await extractors.pull_snapshot(snapshot1, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) {
await parallel(max_concurrency=2, [
extractors.extract(snapshot1, 'wget', acquire=[PER_DOMAIN_RATELIMIT_MUTEX(snapshot1.domain), GLOBAL_EXTRACTOR_MUTEX]) {
await extractors.run_dependency('wget', acquire=[GLOBAL_PER_DEPENDENCY_MUTEX('wget')]) {
system.run_shell_cmd('wget ...', acquire=[GLOBAL_SHELL_CMD_MUTEX])
system.chown(result.output_path, acquire=[GLOBAL_FS_CHOWN_MUTEX])
system.dir_size(result.output_path)
}
await models.ArchiveResult.write(snapshot, result) {
models.ArchiveResult.get_or_create(snapshot, result, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX])
models.Snapshot.write(snapshot, merge_results=results)
...
index.add_index_texts(result=result) &
}
},
extractors.extract(snapshot1, extractor='git', acquire=[...]) {
extractors.run_dependency('git')
system.run_shell_cmd('git ...')
system.chown(result.output_path)
system.dir_size(result.output_path)
models.ArchiveResult.write(snapshot, result)
...
},
extractors.extract(snapshot1, extractor='playwright_screenshot', acquire=[...]) {
extractors.run_dependency('playwright')
context.assert_on_url(snapshot1.url)
context.assert_loaded()
context.run_js(snapshot1, 'screenshot.js')
system.chown(result.output_path)
system.dir_size(result.output_path)
models.ArchiveResult.write(snapshot, result)
...
},
])
extractors.pull_snapshot(snapshot2, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) &
extractor_pool = POOL(max_concurrency=2, key='extractors.extract({snapshot.url})')
models.Snapshot.write(snapshot2)
...
extractors.extract(snapshot2, extractor='wget')
...
extractors.extract(snapshot1, extractor='git')
...
...
...
WIP ignore this, just toying around with different patterns / styles to find something with good ergonomics:
I'm worried that the heavy reliance on mutexes and locking will lead to difficult-to-debug deadlock scenarios where parents span children that eat up all the worker slots, then are unable to complete, leading to the parent to timeout and force kill those workers prematurely. I also reached out to the folks who are building |
Over in #781 it was stated that parallel adds don't work yet. Over at https://github.com/ArchiveBox/ArchiveBox/wiki/Usage#large-archives there is an example of doing this that should probably be removed until this is fixed. |
It works better in some cases (fast SSDs) than others so it's still worth trying, shouldn't be dangerous to data integrity, it'll just lock up if it's on a slow filesystem. I added a note to the Usage page. |
Note I've added a new DB/filesystem troubleshooting area to the wiki that may help people arriving here from Google: https://github.com/ArchiveBox/ArchiveBox/wiki/Upgrading-or-Merging-Archives#database-troubleshooting Contributions/suggestions welcome there. |
Add a
--parallel=8
cli option to enable using multiprocessing to download a large number of links in parallel. Default to number of cores on machine, allow--parallel=1
to override it to 1 core.The text was updated successfully, but these errors were encountered: