|
| 1 | +import pathlib |
| 2 | +import requests |
| 3 | +from typing import Any |
| 4 | + |
| 5 | +from .assertions import * |
| 6 | +from .fixtures import * |
| 7 | +from .utils import * |
| 8 | + |
| 9 | +COLLECTION = "test_collection" |
| 10 | +SHARD = 0 |
| 11 | + |
| 12 | +@pytest.mark.parametrize( |
| 13 | + "bootstrap_points, recover_read, upsert_points, empty_manifest", |
| 14 | + [ |
| 15 | + (0, True, 0, True), |
| 16 | + (100, True, 0, True), |
| 17 | + (0, True, 5, True), |
| 18 | + (100, True, 5, True), |
| 19 | +
|
| 20 | + (0, True, 0, False), |
| 21 | + (100, True, 0, False), |
| 22 | + (0, True, 5, False), |
| 23 | + (100, True, 5, False), |
| 24 | +
|
| 25 | + (0, False, 0, True), |
| 26 | + (100, False, 0, True), |
| 27 | + (0, False, 0, False), |
| 28 | + (100, False, 0, False), |
| 29 | + ] |
| 30 | +) |
| 31 | +def test_partial_snapshot(tmp_path: pathlib.Path, bootstrap_points: int, recover_read: bool, upsert_points: int, empty_manifest: bool): |
| 32 | + assert_project_root() |
| 33 | + |
| 34 | + write_peer, read_peer = bootstrap_peers(tmp_path, bootstrap_points, recover_read) |
| 35 | + |
| 36 | + if upsert_points > 0: |
| 37 | + upsert(write_peer, upsert_points, offset = bootstrap_points) |
| 38 | + |
| 39 | + recover_partial(read_peer, write_peer, tmp_path, {} if empty_manifest else None) |
| 40 | + |
| 41 | + |
| 42 | +def bootstrap_peers(tmp: pathlib.Path, bootstrap_points = 0, recover_read = False): |
| 43 | + write_peer = bootstrap_write_peer(tmp, bootstrap_points) |
| 44 | + read_peer = bootstrap_read_peer(tmp, write_peer if recover_read else None) |
| 45 | + return write_peer, read_peer |
| 46 | + |
| 47 | +def bootstrap_write_peer(tmp: pathlib.Path, bootstrap_points = 0): |
| 48 | + write_peer = bootstrap_peer(tmp / "write", 6331) |
| 49 | + bootstrap_collection(write_peer, bootstrap_points) |
| 50 | + return write_peer |
| 51 | + |
| 52 | +def bootstrap_read_peer(tmp: pathlib.Path, recover_from_url: str | None = None): |
| 53 | + read_peer = bootstrap_peer(tmp / "read", 63331) |
| 54 | + |
| 55 | + if recover_from_url is None: |
| 56 | + bootstrap_collection(read_peer) |
| 57 | + else: |
| 58 | + recover_collection(read_peer, recover_from_url) |
| 59 | + |
| 60 | + return read_peer |
| 61 | + |
| 62 | +def bootstrap_peer(path: pathlib.Path, port: int): |
| 63 | + path.mkdir() |
| 64 | + |
| 65 | + config = { |
| 66 | + "QDRANT__LOG_LEVEL": "debug,collection::common::file_utils=trace", |
| 67 | + "QDRANT__FEATURE_FLAGS__USE_MUTABLE_ID_TRACKER_WITHOUT_ROCKSDB": "true", |
| 68 | + } |
| 69 | + |
| 70 | + uris, _, _ = start_cluster(path, 1, port_seed = port, extra_env = config) |
| 71 | + |
| 72 | + return uris[0] |
| 73 | + |
| 74 | +def bootstrap_collection(peer_url, bootstrap_points = 0): |
| 75 | + create_collection(peer_url, shard_number = 1, replication_factor = 1, indexing_threshold = 1000000, sparse_vectors = False) |
| 76 | + wait_collection_exists_and_active_on_all_peers(COLLECTION, [peer_url]) |
| 77 | + |
| 78 | + if bootstrap_points > 0: |
| 79 | + upsert(peer_url, bootstrap_points) |
| 80 | + |
| 81 | +def recover_collection(peer_url: str, recover_from_url: str): |
| 82 | + snapshot_url = create_collection_snapshot(recover_from_url) |
| 83 | + recover_collection_snapshot(peer_url, snapshot_url) |
| 84 | + |
| 85 | + assert_point_consistency(peer_url, recover_from_url) |
| 86 | + |
| 87 | +def create_collection_snapshot(peer_url: str): |
| 88 | + resp = requests.post(f"{peer_url}/collections/{COLLECTION}/snapshots") |
| 89 | + assert_http_ok(resp) |
| 90 | + |
| 91 | + snapshot_name = resp.json()["result"]["name"] |
| 92 | + snapshot_url = f"{peer_url}/collections/{COLLECTION}/snapshots/{snapshot_name}" |
| 93 | + return snapshot_url |
| 94 | + |
| 95 | +def recover_collection_snapshot(peer_url: str, snapshot_url: str): |
| 96 | + resp = requests.put( |
| 97 | + f"{peer_url}/collections/{COLLECTION}/snapshots/recover", |
| 98 | + json = { "location": snapshot_url } |
| 99 | + ) |
| 100 | + assert_http_ok(resp) |
| 101 | + |
| 102 | + return resp.json()["result"] |
| 103 | + |
| 104 | + |
| 105 | +def recover_partial(peer_url: str, recover_from_url: str, tmp: pathlib.Path, manifest: Any = None): |
| 106 | + snapshot_path = create_partial_snapshot(recover_from_url, get_snapshot_manifest(peer_url) if manifest is None else manifest, tmp) |
| 107 | + recover_partial_snapshot(peer_url, snapshot_path) |
| 108 | + |
| 109 | + assert_consistency(recover_from_url, peer_url) |
| 110 | + |
| 111 | +def get_snapshot_manifest(peer_url: str): |
| 112 | + resp = requests.get(f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/manifest") |
| 113 | + assert_http_ok(resp) |
| 114 | + |
| 115 | + return resp.json()["result"] |
| 116 | + |
| 117 | +def create_partial_snapshot(peer_url: str, manifest: dict[str, Any], tmp: pathlib.Path): |
| 118 | + snapshot_resp = requests.post( |
| 119 | + f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/create", |
| 120 | + json = manifest, |
| 121 | + ) |
| 122 | + assert_http_ok(snapshot_resp) |
| 123 | + |
| 124 | + snapshot_path = tmp / "partial-snapshot.tar" |
| 125 | + |
| 126 | + with open(snapshot_path, "wb") as snapshot_file: |
| 127 | + snapshot_file.write(snapshot_resp.content) |
| 128 | + |
| 129 | + return snapshot_path |
| 130 | + |
| 131 | +def recover_partial_snapshot(peer_url: str, snapshot_path: pathlib.Path): |
| 132 | + with open(snapshot_path, "rb") as snapshot_file: |
| 133 | + resp = requests.post( |
| 134 | + f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/recover", |
| 135 | + files = { "snapshot": snapshot_file }, |
| 136 | + ) |
| 137 | + assert_http_ok(resp) |
| 138 | + |
| 139 | + return resp.json()["result"] |
| 140 | + |
| 141 | + |
| 142 | +def assert_consistency(write_peer: str, read_peer: str): |
| 143 | + assert_files_consistency(write_peer, read_peer) |
| 144 | + assert_point_consistency(write_peer, read_peer) |
| 145 | + |
| 146 | +def assert_files_consistency(write_peer: str, read_peer: str): |
| 147 | + assert discard_file_versions(get_snapshot_manifest(write_peer)) == discard_file_versions(get_snapshot_manifest(read_peer)) |
| 148 | + |
| 149 | +def discard_file_versions(snapshot_manifest: Any): |
| 150 | + for _, segment_manifest in snapshot_manifest.items(): |
| 151 | + segment_manifest['files'] = set(segment_manifest.pop('file_versions').keys()) |
| 152 | + |
| 153 | + return snapshot_manifest |
| 154 | + |
| 155 | +def assert_point_consistency(write_peer: str, read_peer: str): |
| 156 | + assert scroll_points(write_peer) == scroll_points(read_peer) |
| 157 | + |
| 158 | +def scroll_points(peer_url: str): |
| 159 | + resp = requests.post(f"{peer_url}/collections/{COLLECTION}/points/scroll", json = { |
| 160 | + "limit": 1000000, |
| 161 | + "with_vectors": True, |
| 162 | + "with_payload": True, |
| 163 | + }) |
| 164 | + assert_http_ok(resp) |
| 165 | + |
| 166 | + points = resp.json()["result"]["points"] |
| 167 | + points = { point["id"]: point for point in points } |
| 168 | + |
| 169 | + return points |
| 170 | + |
| 171 | + |
| 172 | +def upsert(peer_url: str, points: int, offset = 0): |
| 173 | + upsert_random_points(peer_url, points, offset = offset, batch_size = 10, with_sparse_vector = False) |
0 commit comments