Skip to content

Commit ee02761

Browse files
committed
Add partial snapshot integration tests
1 parent 7edb594 commit ee02761

1 file changed

Lines changed: 173 additions & 0 deletions

File tree

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import pathlib
2+
import requests
3+
from typing import Any
4+
5+
from .assertions import *
6+
from .fixtures import *
7+
from .utils import *
8+
9+
COLLECTION = "test_collection"
10+
SHARD = 0
11+
12+
@pytest.mark.parametrize(
13+
"bootstrap_points, recover_read, upsert_points, empty_manifest",
14+
[
15+
(0, True, 0, True),
16+
(100, True, 0, True),
17+
(0, True, 5, True),
18+
(100, True, 5, True),
19+
20+
(0, True, 0, False),
21+
(100, True, 0, False),
22+
(0, True, 5, False),
23+
(100, True, 5, False),
24+
25+
(0, False, 0, True),
26+
(100, False, 0, True),
27+
(0, False, 0, False),
28+
(100, False, 0, False),
29+
]
30+
)
31+
def test_partial_snapshot(tmp_path: pathlib.Path, bootstrap_points: int, recover_read: bool, upsert_points: int, empty_manifest: bool):
32+
assert_project_root()
33+
34+
write_peer, read_peer = bootstrap_peers(tmp_path, bootstrap_points, recover_read)
35+
36+
if upsert_points > 0:
37+
upsert(write_peer, upsert_points, offset = bootstrap_points)
38+
39+
recover_partial(read_peer, write_peer, tmp_path, {} if empty_manifest else None)
40+
41+
42+
def bootstrap_peers(tmp: pathlib.Path, bootstrap_points = 0, recover_read = False):
43+
write_peer = bootstrap_write_peer(tmp, bootstrap_points)
44+
read_peer = bootstrap_read_peer(tmp, write_peer if recover_read else None)
45+
return write_peer, read_peer
46+
47+
def bootstrap_write_peer(tmp: pathlib.Path, bootstrap_points = 0):
48+
write_peer = bootstrap_peer(tmp / "write", 6331)
49+
bootstrap_collection(write_peer, bootstrap_points)
50+
return write_peer
51+
52+
def bootstrap_read_peer(tmp: pathlib.Path, recover_from_url: str | None = None):
53+
read_peer = bootstrap_peer(tmp / "read", 63331)
54+
55+
if recover_from_url is None:
56+
bootstrap_collection(read_peer)
57+
else:
58+
recover_collection(read_peer, recover_from_url)
59+
60+
return read_peer
61+
62+
def bootstrap_peer(path: pathlib.Path, port: int):
63+
path.mkdir()
64+
65+
config = {
66+
"QDRANT__LOG_LEVEL": "debug,collection::common::file_utils=trace",
67+
"QDRANT__FEATURE_FLAGS__USE_MUTABLE_ID_TRACKER_WITHOUT_ROCKSDB": "true",
68+
}
69+
70+
uris, _, _ = start_cluster(path, 1, port_seed = port, extra_env = config)
71+
72+
return uris[0]
73+
74+
def bootstrap_collection(peer_url, bootstrap_points = 0):
75+
create_collection(peer_url, shard_number = 1, replication_factor = 1, indexing_threshold = 1000000, sparse_vectors = False)
76+
wait_collection_exists_and_active_on_all_peers(COLLECTION, [peer_url])
77+
78+
if bootstrap_points > 0:
79+
upsert(peer_url, bootstrap_points)
80+
81+
def recover_collection(peer_url: str, recover_from_url: str):
82+
snapshot_url = create_collection_snapshot(recover_from_url)
83+
recover_collection_snapshot(peer_url, snapshot_url)
84+
85+
assert_point_consistency(peer_url, recover_from_url)
86+
87+
def create_collection_snapshot(peer_url: str):
88+
resp = requests.post(f"{peer_url}/collections/{COLLECTION}/snapshots")
89+
assert_http_ok(resp)
90+
91+
snapshot_name = resp.json()["result"]["name"]
92+
snapshot_url = f"{peer_url}/collections/{COLLECTION}/snapshots/{snapshot_name}"
93+
return snapshot_url
94+
95+
def recover_collection_snapshot(peer_url: str, snapshot_url: str):
96+
resp = requests.put(
97+
f"{peer_url}/collections/{COLLECTION}/snapshots/recover",
98+
json = { "location": snapshot_url }
99+
)
100+
assert_http_ok(resp)
101+
102+
return resp.json()["result"]
103+
104+
105+
def recover_partial(peer_url: str, recover_from_url: str, tmp: pathlib.Path, manifest: Any = None):
106+
snapshot_path = create_partial_snapshot(recover_from_url, get_snapshot_manifest(peer_url) if manifest is None else manifest, tmp)
107+
recover_partial_snapshot(peer_url, snapshot_path)
108+
109+
assert_consistency(recover_from_url, peer_url)
110+
111+
def get_snapshot_manifest(peer_url: str):
112+
resp = requests.get(f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/manifest")
113+
assert_http_ok(resp)
114+
115+
return resp.json()["result"]
116+
117+
def create_partial_snapshot(peer_url: str, manifest: dict[str, Any], tmp: pathlib.Path):
118+
snapshot_resp = requests.post(
119+
f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/create",
120+
json = manifest,
121+
)
122+
assert_http_ok(snapshot_resp)
123+
124+
snapshot_path = tmp / "partial-snapshot.tar"
125+
126+
with open(snapshot_path, "wb") as snapshot_file:
127+
snapshot_file.write(snapshot_resp.content)
128+
129+
return snapshot_path
130+
131+
def recover_partial_snapshot(peer_url: str, snapshot_path: pathlib.Path):
132+
with open(snapshot_path, "rb") as snapshot_file:
133+
resp = requests.post(
134+
f"{peer_url}/collections/{COLLECTION}/shards/{SHARD}/snapshot/partial/recover",
135+
files = { "snapshot": snapshot_file },
136+
)
137+
assert_http_ok(resp)
138+
139+
return resp.json()["result"]
140+
141+
142+
def assert_consistency(write_peer: str, read_peer: str):
143+
assert_files_consistency(write_peer, read_peer)
144+
assert_point_consistency(write_peer, read_peer)
145+
146+
def assert_files_consistency(write_peer: str, read_peer: str):
147+
assert discard_file_versions(get_snapshot_manifest(write_peer)) == discard_file_versions(get_snapshot_manifest(read_peer))
148+
149+
def discard_file_versions(snapshot_manifest: Any):
150+
for _, segment_manifest in snapshot_manifest.items():
151+
segment_manifest['files'] = set(segment_manifest.pop('file_versions').keys())
152+
153+
return snapshot_manifest
154+
155+
def assert_point_consistency(write_peer: str, read_peer: str):
156+
assert scroll_points(write_peer) == scroll_points(read_peer)
157+
158+
def scroll_points(peer_url: str):
159+
resp = requests.post(f"{peer_url}/collections/{COLLECTION}/points/scroll", json = {
160+
"limit": 1000000,
161+
"with_vectors": True,
162+
"with_payload": True,
163+
})
164+
assert_http_ok(resp)
165+
166+
points = resp.json()["result"]["points"]
167+
points = { point["id"]: point for point in points }
168+
169+
return points
170+
171+
172+
def upsert(peer_url: str, points: int, offset = 0):
173+
upsert_random_points(peer_url, points, offset = offset, batch_size = 10, with_sparse_vector = False)

0 commit comments

Comments
 (0)