Skip to content

Commit c87074c

Browse files
committed
Add edge crate
1 parent 050eb1d commit c87074c

6 files changed

Lines changed: 489 additions & 4 deletions

File tree

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ members = [
310310
"lib/api",
311311
"lib/collection",
312312
"lib/common/*",
313+
"lib/edge",
313314
"lib/gridstore",
314315
"lib/macros",
315316
"lib/posting_list",

lib/edge/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "edge"
3+
version = "0.1.0"
4+
authors = ["Qdrant Team <[email protected]>"]
5+
license = "Apache-2.0"
6+
edition = "2024"
7+
8+
9+
[dependencies]
10+
common = { path = "../common/common" }
11+
segment = { path = "../segment", default-features = false }
12+
shard = { path = "../shard" }
13+
14+
log = { workspace = true }
15+
parking_lot = { workspace = true }
16+
thiserror = { workspace = true }
17+
wal = { workspace = true }
18+
19+
20+
[dev-dependencies]
21+
anyhow = "1.0"

lib/edge/examples/edge-cli.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use std::env;
2+
use std::path::Path;
3+
4+
fn main() -> anyhow::Result<()> {
5+
let args: Vec<_> = env::args().skip(1).take(2).collect();
6+
7+
let [edge_shard_path] = args
8+
.try_into()
9+
.map_err(|args| anyhow::format_err!("unexpected arguments {args:?}"))?;
10+
11+
let _edge_shard = edge::Shard::load(Path::new(&edge_shard_path), None)?;
12+
Ok(())
13+
}

lib/edge/src/lib.rs

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
use std::num::NonZero;
2+
use std::path::{Path, PathBuf};
3+
use std::sync::Arc;
4+
use std::sync::atomic::AtomicBool;
5+
use std::{fmt, fs};
6+
7+
use common::counter::hardware_accumulator::HwMeasurementAcc;
8+
use common::counter::hardware_counter::HardwareCounterCell;
9+
use common::save_on_disk::SaveOnDisk;
10+
use parking_lot::Mutex;
11+
use segment::common::operation_error::{OperationError, OperationResult};
12+
use segment::data_types::query_context::QueryContext;
13+
use segment::data_types::vectors::QueryVector;
14+
use segment::entry::SegmentEntry;
15+
use segment::segment_constructor::load_segment;
16+
use segment::types::{DEFAULT_FULL_SCAN_THRESHOLD, ScoredPoint, SegmentConfig, WithPayload};
17+
use shard::operations::CollectionUpdateOperations;
18+
use shard::search::CoreSearchRequest;
19+
use shard::search_result_aggregator::BatchResultAggregator;
20+
use shard::segment_holder::{LockedSegmentHolder, SegmentHolder};
21+
use shard::update::*;
22+
use shard::wal::SerdeWal;
23+
use wal::WalOptions;
24+
25+
#[derive(Debug)]
26+
pub struct Shard {
27+
_path: PathBuf,
28+
config: SegmentConfig,
29+
wal: Mutex<SerdeWal<CollectionUpdateOperations>>,
30+
segments: LockedSegmentHolder,
31+
}
32+
33+
const WAL_PATH: &str = "wal";
34+
const SEGMENTS_PATH: &str = "segments";
35+
36+
impl Shard {
37+
pub fn load(path: &Path, mut config: Option<SegmentConfig>) -> OperationResult<Self> {
38+
let wal_path = path.join(WAL_PATH);
39+
let wal: SerdeWal<CollectionUpdateOperations> =
40+
SerdeWal::new(&wal_path, default_wal_options()).map_err(|err| {
41+
OperationError::service_error(format!(
42+
"failed to open WAL {}: {err}",
43+
wal_path.display()
44+
))
45+
})?;
46+
47+
let segments_path = path.join(SEGMENTS_PATH);
48+
let segments_dir = fs::read_dir(&segments_path).map_err(|err| {
49+
OperationError::service_error(format!(
50+
"failed to read segments directory {}: {err}",
51+
segments_path.display()
52+
))
53+
})?;
54+
55+
let mut segments = SegmentHolder::default();
56+
57+
for entry in segments_dir {
58+
let entry = entry.map_err(|err| {
59+
OperationError::service_error(format!(
60+
"failed to read entry in segments directory {}: {err}",
61+
segments_path.display()
62+
))
63+
})?;
64+
65+
let segment_path = entry.path();
66+
67+
if !segment_path.is_dir() {
68+
log::warn!(
69+
"Skipping non-directory segment entry {}",
70+
segment_path.display(),
71+
);
72+
73+
continue;
74+
}
75+
76+
if let Some(name) = segment_path.file_name()
77+
&& let Some(name) = name.to_str()
78+
&& name.starts_with(".")
79+
{
80+
log::warn!(
81+
"Skipping hidden segment directory {}",
82+
segment_path.display()
83+
);
84+
continue;
85+
}
86+
87+
let segment = load_segment(&segment_path, &AtomicBool::new(false)).map_err(|err| {
88+
OperationError::service_error(format!(
89+
"failed to load segment {}: {err}",
90+
segment_path.display()
91+
))
92+
})?;
93+
94+
let Some(mut segment) = segment else {
95+
fs::remove_dir_all(&segment_path).map_err(|err| {
96+
OperationError::service_error(format!(
97+
"failed to remove leftover segment {}: {err}",
98+
segment_path.display(),
99+
))
100+
})?;
101+
102+
continue;
103+
};
104+
105+
if let Some(config) = &config {
106+
if !config.is_compatible(segment.config()) {
107+
return Err(OperationError::service_error(format!(
108+
"segment {} is incompatible with provided config or previously loaded segments: \
109+
expected {:?}, but received {:?}",
110+
segment_path.display(),
111+
config,
112+
segment.config(),
113+
)));
114+
}
115+
} else {
116+
config = Some(segment.config().clone());
117+
}
118+
119+
segment.check_consistency_and_repair().map_err(|err| {
120+
OperationError::service_error(format!(
121+
"failed to repair segment {}: {err}",
122+
segment_path.display()
123+
))
124+
})?;
125+
126+
segments.add_new(segment);
127+
}
128+
129+
if !segments.has_appendable_segment() {
130+
let Some(config) = &config else {
131+
return Err(OperationError::service_error(
132+
"segment config is not provided and no segments were loaded",
133+
));
134+
};
135+
136+
let payload_index_schema_path = path.join("payload_index.json");
137+
let payload_index_schema = SaveOnDisk::load_or_init_default(&payload_index_schema_path)
138+
.map_err(|err| {
139+
OperationError::service_error(format!(
140+
"failed to initialize temporary payload index schema file {}: {err}",
141+
payload_index_schema_path.display(),
142+
))
143+
})?;
144+
145+
segments.create_appendable_segment(
146+
&segments_path,
147+
config.clone(),
148+
Arc::new(payload_index_schema),
149+
)?;
150+
151+
debug_assert!(segments.has_appendable_segment());
152+
}
153+
154+
let shard = Self {
155+
_path: path.into(),
156+
config: config.expect("config was provided or at least one segment was loaded"),
157+
wal: parking_lot::Mutex::new(wal),
158+
segments: Arc::new(parking_lot::RwLock::new(segments)),
159+
};
160+
161+
Ok(shard)
162+
}
163+
164+
pub fn config(&self) -> &SegmentConfig {
165+
&self.config
166+
}
167+
168+
pub fn update(&self, operation: CollectionUpdateOperations) -> OperationResult<()> {
169+
let mut wal = self.wal.lock();
170+
171+
let operation_id = wal.write(&operation).map_err(service_error)?;
172+
let hw_counter = HardwareCounterCell::disposable();
173+
174+
let result = match operation {
175+
CollectionUpdateOperations::PointOperation(point_operation) => {
176+
process_point_operation(&self.segments, operation_id, point_operation, &hw_counter)
177+
}
178+
CollectionUpdateOperations::VectorOperation(vector_operation) => {
179+
process_vector_operation(
180+
&self.segments,
181+
operation_id,
182+
vector_operation,
183+
&hw_counter,
184+
)
185+
}
186+
CollectionUpdateOperations::PayloadOperation(payload_operation) => {
187+
process_payload_operation(
188+
&self.segments,
189+
operation_id,
190+
payload_operation,
191+
&hw_counter,
192+
)
193+
}
194+
CollectionUpdateOperations::FieldIndexOperation(index_operation) => {
195+
process_field_index_operation(
196+
&self.segments,
197+
operation_id,
198+
&index_operation,
199+
&hw_counter,
200+
)
201+
}
202+
};
203+
204+
result.map(|_| ())
205+
}
206+
207+
pub fn search(&self, search: CoreSearchRequest) -> OperationResult<Vec<ScoredPoint>> {
208+
let segments: Vec<_> = self
209+
.segments
210+
.read()
211+
.non_appendable_then_appendable_segments()
212+
.collect();
213+
214+
let CoreSearchRequest {
215+
query,
216+
filter,
217+
params,
218+
limit,
219+
offset,
220+
with_payload,
221+
with_vector,
222+
score_threshold,
223+
} = search;
224+
225+
let vector_name = query.get_vector_name().to_string();
226+
let query_vector = QueryVector::from(query);
227+
let with_payload = WithPayload::from(with_payload.unwrap_or_default());
228+
let with_vector = with_vector.unwrap_or_default();
229+
230+
let context =
231+
QueryContext::new(DEFAULT_FULL_SCAN_THRESHOLD, HwMeasurementAcc::disposable());
232+
233+
let mut points_by_segment = Vec::with_capacity(segments.len());
234+
235+
for segment in segments {
236+
let batched_points = segment.get().read().search_batch(
237+
&vector_name,
238+
&[&query_vector],
239+
&with_payload,
240+
&with_vector,
241+
filter.as_ref(),
242+
offset + limit,
243+
params.as_ref(),
244+
&context.get_segment_query_context(),
245+
)?;
246+
247+
debug_assert_eq!(batched_points.len(), 1);
248+
249+
let [points] = batched_points
250+
.try_into()
251+
.expect("single batched search result");
252+
253+
points_by_segment.push(points);
254+
}
255+
256+
let mut aggregator = BatchResultAggregator::new([offset + limit]);
257+
aggregator.update_point_versions(points_by_segment.iter().flatten());
258+
259+
for points in points_by_segment {
260+
aggregator.update_batch_results(0, points);
261+
}
262+
263+
let [mut points] = aggregator
264+
.into_topk()
265+
.try_into()
266+
.expect("single batched search result");
267+
268+
let distance = self
269+
.config
270+
.vector_data
271+
.get(&vector_name)
272+
.expect("vector config exist")
273+
.distance;
274+
275+
match &query_vector {
276+
QueryVector::Nearest(_) => {
277+
for point in &mut points {
278+
point.score = distance.postprocess_score(point.score);
279+
}
280+
}
281+
282+
QueryVector::RecommendBestScore(_) => (),
283+
QueryVector::RecommendSumScores(_) => (),
284+
QueryVector::Discovery(_) => (),
285+
QueryVector::Context(_) => (),
286+
}
287+
288+
if let Some(score_threshold) = score_threshold {
289+
debug_assert!(
290+
points.is_sorted_by(|left, right| distance.is_ordered(left.score, right.score)),
291+
);
292+
293+
let below_threshold = points
294+
.iter()
295+
.enumerate()
296+
.find(|(_, point)| !distance.check_threshold(point.score, score_threshold));
297+
298+
if let Some((below_threshold_idx, _)) = below_threshold {
299+
points.truncate(below_threshold_idx);
300+
}
301+
}
302+
303+
let _ = points.drain(..offset);
304+
305+
Ok(points)
306+
}
307+
}
308+
309+
fn default_wal_options() -> WalOptions {
310+
WalOptions {
311+
segment_capacity: 32 * 1024 * 1024,
312+
segment_queue_len: 0,
313+
retain_closed: NonZero::new(1).unwrap(),
314+
}
315+
}
316+
317+
fn service_error(err: impl fmt::Display) -> OperationError {
318+
OperationError::service_error(err.to_string())
319+
}

0 commit comments

Comments
 (0)