Skip to content

Commit 137384d

Browse files
committed
Add edge crate
1 parent 2c5caa3 commit 137384d

6 files changed

Lines changed: 339 additions & 4 deletions

File tree

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ members = [
310310
"lib/api",
311311
"lib/collection",
312312
"lib/common/*",
313+
"lib/edge",
313314
"lib/gridstore",
314315
"lib/macros",
315316
"lib/posting_list",

lib/edge/Cargo.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "edge"
3+
version = "0.1.0"
4+
authors = ["Qdrant Team <[email protected]>"]
5+
license = "Apache-2.0"
6+
edition = "2024"
7+
8+
9+
[dependencies]
10+
common = { path = "../common/common" }
11+
segment = { path = "../segment", default-features = false }
12+
shard = { path = "../shard" }
13+
14+
log = { workspace = true }
15+
parking_lot = { workspace = true }
16+
thiserror = { workspace = true }
17+
wal = { workspace = true }
18+
19+
20+
[dev-dependencies]
21+
anyhow = "1.0"
22+
# clap = { version = "4.5", features = ["derive"] }

lib/edge/examples/edge-cli.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use std::env;
2+
use std::path::Path;
3+
4+
fn main() -> anyhow::Result<()> {
5+
let args: Vec<_> = env::args().skip(1).take(2).collect();
6+
7+
let [edge_shard_path] = args
8+
.try_into()
9+
.map_err(|args| anyhow::format_err!("unexpected arguments {args:?}"))?;
10+
11+
let _edge_shard = edge::Shard::load(Path::new(&edge_shard_path), None)?;
12+
Ok(())
13+
}

lib/edge/src/lib.rs

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
use std::num::NonZero;
2+
use std::path::{Path, PathBuf};
3+
use std::sync::Arc;
4+
use std::sync::atomic::AtomicBool;
5+
use std::{fmt, fs};
6+
7+
use common::counter::hardware_accumulator::HwMeasurementAcc;
8+
use common::counter::hardware_counter::HardwareCounterCell;
9+
use parking_lot::Mutex;
10+
use segment::common::operation_error::{OperationError, OperationResult};
11+
use segment::data_types::query_context::QueryContext;
12+
use segment::data_types::vectors::QueryVector;
13+
use segment::entry::SegmentEntry;
14+
use segment::segment_constructor::load_segment;
15+
use segment::types::{DEFAULT_FULL_SCAN_THRESHOLD, ScoredPoint, SegmentConfig, WithPayload};
16+
use shard::operations::CollectionUpdateOperations;
17+
use shard::search::CoreSearchRequest;
18+
use shard::search_result_aggregator::BatchResultAggregator;
19+
use shard::segment_holder::{LockedSegmentHolder, SegmentHolder};
20+
use shard::update::*;
21+
use shard::wal::SerdeWal;
22+
use wal::WalOptions;
23+
24+
#[derive(Debug)]
25+
pub struct Shard {
26+
_path: PathBuf,
27+
config: SegmentConfig,
28+
wal: Mutex<SerdeWal<CollectionUpdateOperations>>,
29+
segments: LockedSegmentHolder,
30+
}
31+
32+
const WAL_PATH: &str = "wal";
33+
const SEGMENTS_PATH: &str = "segments";
34+
35+
impl Shard {
36+
pub fn load(path: &Path, mut config: Option<SegmentConfig>) -> OperationResult<Self> {
37+
let wal_path = path.join(WAL_PATH);
38+
let wal: SerdeWal<CollectionUpdateOperations> =
39+
SerdeWal::new(&wal_path, default_wal_options()).map_err(|err| {
40+
OperationError::service_error(format!(
41+
"failed to open WAL {}: {err}",
42+
wal_path.display()
43+
))
44+
})?;
45+
46+
let segments_path = path.join(SEGMENTS_PATH);
47+
let segments_dir = fs::read_dir(&segments_path).map_err(|err| {
48+
OperationError::service_error(format!(
49+
"failed to read segments directory {}: {err}",
50+
segments_path.display()
51+
))
52+
})?;
53+
54+
let mut segments = SegmentHolder::default();
55+
56+
for entry in segments_dir {
57+
let entry = entry.map_err(|err| {
58+
OperationError::service_error(format!(
59+
"failed to read entry in segments directory {}: {err}",
60+
segments_path.display()
61+
))
62+
})?;
63+
64+
let segment_path = entry.path();
65+
66+
if !segment_path.is_dir() {
67+
log::warn!(
68+
"Skipping non-directory segment entry {}",
69+
segment_path.display(),
70+
);
71+
72+
continue;
73+
}
74+
75+
if let Some(name) = segment_path.file_name()
76+
&& let Some(name) = name.to_str()
77+
&& name.starts_with(".")
78+
{
79+
log::warn!(
80+
"Skipping hidden segment directory {}",
81+
segment_path.display()
82+
);
83+
continue;
84+
}
85+
86+
let segment = load_segment(&segment_path, &AtomicBool::new(false)).map_err(|err| {
87+
OperationError::service_error(format!(
88+
"failed to load segment {}: {err}",
89+
segment_path.display()
90+
))
91+
})?;
92+
93+
let Some(mut segment) = segment else {
94+
fs::remove_dir_all(&segment_path).map_err(|err| {
95+
OperationError::service_error(format!(
96+
"failed to remove leftover segment {}: {err}",
97+
segment_path.display(),
98+
))
99+
})?;
100+
101+
continue;
102+
};
103+
104+
if let Some(config) = &config {
105+
// TODO: Is simple equality check sufficient? 🤔
106+
if config != segment.config() {
107+
return Err(OperationError::service_error(format!(
108+
"segment {} does not match expected segment config: expected {:?}, but received {:?}",
109+
segment_path.display(),
110+
config,
111+
segment.config(),
112+
)));
113+
}
114+
} else {
115+
config = Some(segment.config().clone());
116+
}
117+
118+
segment.check_consistency_and_repair().map_err(|err| {
119+
OperationError::service_error(format!(
120+
"failed to repair segment {}: {err}",
121+
segment_path.display()
122+
))
123+
})?;
124+
125+
segments.add_new(segment);
126+
}
127+
128+
let shard = Self {
129+
_path: path.into(),
130+
config: config.expect("config was provided or at least one segment was loaded"),
131+
wal: parking_lot::Mutex::new(wal),
132+
segments: Arc::new(parking_lot::RwLock::new(segments)),
133+
};
134+
135+
Ok(shard)
136+
}
137+
138+
pub fn config(&self) -> &SegmentConfig {
139+
&self.config
140+
}
141+
142+
pub fn update(&self, operation: CollectionUpdateOperations) -> OperationResult<()> {
143+
let mut wal = self.wal.lock();
144+
145+
let operation_id = wal.write(&operation).map_err(service_error)?;
146+
let hw_counter = HardwareCounterCell::disposable();
147+
148+
let result = match operation {
149+
CollectionUpdateOperations::PointOperation(point_operation) => {
150+
process_point_operation(&self.segments, operation_id, point_operation, &hw_counter)
151+
}
152+
CollectionUpdateOperations::VectorOperation(vector_operation) => {
153+
process_vector_operation(
154+
&self.segments,
155+
operation_id,
156+
vector_operation,
157+
&hw_counter,
158+
)
159+
}
160+
CollectionUpdateOperations::PayloadOperation(payload_operation) => {
161+
process_payload_operation(
162+
&self.segments,
163+
operation_id,
164+
payload_operation,
165+
&hw_counter,
166+
)
167+
}
168+
CollectionUpdateOperations::FieldIndexOperation(index_operation) => {
169+
process_field_index_operation(
170+
&self.segments,
171+
operation_id,
172+
&index_operation,
173+
&hw_counter,
174+
)
175+
}
176+
};
177+
178+
result.map(|_| ())
179+
}
180+
181+
pub fn search(&self, search: CoreSearchRequest) -> OperationResult<Vec<ScoredPoint>> {
182+
let segments: Vec<_> = self
183+
.segments
184+
.read()
185+
.non_appendable_then_appendable_segments()
186+
.collect();
187+
188+
let CoreSearchRequest {
189+
query,
190+
filter,
191+
params,
192+
limit,
193+
offset,
194+
with_payload,
195+
with_vector,
196+
score_threshold,
197+
} = search;
198+
199+
let vector_name = query.get_vector_name().to_string();
200+
let query_vector = query.into();
201+
let with_payload = WithPayload::from(with_payload.unwrap_or_default());
202+
let with_vector = with_vector.unwrap_or_default();
203+
204+
let context =
205+
QueryContext::new(DEFAULT_FULL_SCAN_THRESHOLD, HwMeasurementAcc::disposable());
206+
207+
let mut points_by_segment = Vec::with_capacity(segments.len());
208+
209+
for segment in segments {
210+
let batched_points = segment.get().read().search_batch(
211+
&vector_name,
212+
&[&query_vector],
213+
&with_payload,
214+
&with_vector,
215+
filter.as_ref(),
216+
offset + limit,
217+
params.as_ref(),
218+
&context.get_segment_query_context(),
219+
)?;
220+
221+
debug_assert_eq!(batched_points.len(), 1);
222+
223+
let [points] = batched_points
224+
.try_into()
225+
.expect("single batched search result");
226+
227+
points_by_segment.push(points);
228+
}
229+
230+
let mut aggregator = BatchResultAggregator::new([offset + limit]);
231+
aggregator.update_point_versions(points_by_segment.iter().flatten());
232+
233+
for points in points_by_segment {
234+
aggregator.update_batch_results(0, points);
235+
}
236+
237+
let [mut points] = aggregator
238+
.into_topk()
239+
.try_into()
240+
.expect("single batched search result");
241+
242+
let distance = self
243+
.config
244+
.vector_data
245+
.get(&vector_name)
246+
.expect("vector config exist")
247+
.distance;
248+
249+
match &query_vector {
250+
QueryVector::Nearest(_) => {
251+
for point in &mut points {
252+
point.score = distance.postprocess_score(point.score);
253+
}
254+
}
255+
256+
QueryVector::RecommendBestScore(_) => (),
257+
QueryVector::RecommendSumScores(_) => (),
258+
QueryVector::Discovery(_) => (),
259+
QueryVector::Context(_) => (),
260+
}
261+
262+
if let Some(score_threshold) = score_threshold {
263+
// TODO: find+truncate instead of retain
264+
points.retain(|point| distance.check_threshold(point.score, score_threshold));
265+
}
266+
267+
let _ = points.drain(..points.len().saturating_sub(offset));
268+
269+
Ok(points)
270+
}
271+
}
272+
273+
fn default_wal_options() -> WalOptions {
274+
WalOptions {
275+
segment_capacity: 32 * 1024 * 1024,
276+
segment_queue_len: 0,
277+
retain_closed: NonZero::new(1).unwrap(),
278+
}
279+
}
280+
281+
fn service_error(err: impl fmt::Display) -> OperationError {
282+
OperationError::service_error(err.to_string())
283+
}

lib/segment/src/types.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,7 +1275,7 @@ impl PayloadStorageType {
12751275
}
12761276
}
12771277

1278-
#[derive(Anonymize, Default, Debug, Deserialize, Serialize, JsonSchema, Clone)]
1278+
#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema, Anonymize)]
12791279
#[serde(rename_all = "snake_case")]
12801280
pub struct SegmentConfig {
12811281
#[serde(default)]
@@ -1428,7 +1428,7 @@ impl VectorStorageType {
14281428
}
14291429

14301430
/// Config of single vector data storage
1431-
#[derive(Debug, Deserialize, Serialize, JsonSchema, Anonymize, Clone)]
1431+
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema, Anonymize)]
14321432
#[serde(rename_all = "snake_case")]
14331433
pub struct VectorDataConfig {
14341434
/// Size/dimensionality of the vectors used
@@ -1468,7 +1468,9 @@ impl VectorDataConfig {
14681468
}
14691469
}
14701470

1471-
#[derive(Debug, Deserialize, Serialize, JsonSchema, Anonymize, Clone, Copy, Default)]
1471+
#[derive(
1472+
Copy, Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize, JsonSchema, Anonymize,
1473+
)]
14721474
#[serde(rename_all = "snake_case")]
14731475
pub enum SparseVectorStorageType {
14741476
/// Storage on disk (rocksdb storage)
@@ -1493,7 +1495,7 @@ impl SparseVectorStorageType {
14931495
}
14941496

14951497
/// Config of single sparse vector data storage
1496-
#[derive(Debug, Deserialize, Serialize, JsonSchema, Anonymize, Clone, Validate)]
1498+
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema, Validate, Anonymize)]
14971499
#[serde(rename_all = "snake_case")]
14981500
pub struct SparseVectorDataConfig {
14991501
/// Sparse inverted index config

0 commit comments

Comments
 (0)