|
5 | 5 | import json |
6 | 6 | import logging |
7 | 7 | import shutil |
| 8 | +import threading |
8 | 9 | import time |
9 | 10 |
|
| 11 | +from collections import defaultdict |
10 | 12 | from pathlib import Path |
11 | 13 | from typing import TYPE_CHECKING |
12 | 14 | from typing import Any |
@@ -188,6 +190,9 @@ def _deserialize(self, data_raw: bytes) -> CacheItem[T]: |
188 | 190 | class ArtifactCache: |
189 | 191 | def __init__(self, *, cache_dir: Path) -> None: |
190 | 192 | self._cache_dir = cache_dir |
| 193 | + self._archive_locks: defaultdict[Path, threading.Lock] = defaultdict( |
| 194 | + threading.Lock |
| 195 | + ) |
191 | 196 |
|
192 | 197 | def get_cache_directory_for_link(self, link: Link) -> Path: |
193 | 198 | key_parts = {"url": link.url_without_fragment} |
@@ -253,13 +258,18 @@ def get_cached_archive_for_link( |
253 | 258 | cache_dir, strict=strict, filename=link.filename, env=env |
254 | 259 | ) |
255 | 260 | if cached_archive is None and strict and download_func is not None: |
256 | | - cache_dir.mkdir(parents=True, exist_ok=True) |
257 | 261 | cached_archive = cache_dir / link.filename |
258 | | - try: |
259 | | - download_func(link.url, cached_archive) |
260 | | - except BaseException: |
261 | | - cached_archive.unlink(missing_ok=True) |
262 | | - raise |
| 262 | + with self._archive_locks[cached_archive]: |
| 263 | + # Check again if the archive exists (under the lock) to avoid |
| 264 | + # duplicate downloads because it may have already been downloaded |
| 265 | + # by another thread in the meantime |
| 266 | + if not cached_archive.exists(): |
| 267 | + cache_dir.mkdir(parents=True, exist_ok=True) |
| 268 | + try: |
| 269 | + download_func(link.url, cached_archive) |
| 270 | + except BaseException: |
| 271 | + cached_archive.unlink(missing_ok=True) |
| 272 | + raise |
263 | 273 |
|
264 | 274 | return cached_archive |
265 | 275 |
|
|
0 commit comments