Skip to content

Commit 92bf3e7

Browse files
committed
Flush versions in the same way we flush mappings now
1 parent 0c997a9 commit 92bf3e7

1 file changed

Lines changed: 32 additions & 30 deletions

File tree

lib/segment/src/id_tracker/mutable_id_tracker.rs

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -275,29 +275,7 @@ impl IdTracker for MutableIdTracker {
275275
return Ok(());
276276
}
277277

278-
// Open file in append mode to write new changes to the end
279-
let file = File::options()
280-
.create(true)
281-
.append(true)
282-
.open(&versions_path)?;
283-
let mut writer = BufWriter::new(file);
284-
285-
write_versions(&mut writer, &pending_versions).map_err(|err| {
286-
OperationError::service_error(format!(
287-
"Failed to persist ID tracker point versions ({}): {err}",
288-
versions_path.display(),
289-
))
290-
})?;
291-
292-
// Explicitly fsync file contents to ensure durability
293-
let file = writer.into_inner().unwrap();
294-
file.sync_all().map_err(|err| {
295-
OperationError::service_error(format!(
296-
"Failed to fsync ID tracker point mappings: {err}",
297-
))
298-
})?;
299-
300-
Ok(())
278+
store_version_changes(&versions_path, &pending_versions)
301279
})
302280
}
303281

@@ -626,10 +604,36 @@ fn load_versions(
626604
Ok(internal_to_version)
627605
}
628606

629-
fn write_versions<T>(writer: &mut BufWriter<T>, changes: &[VersionChange]) -> OperationResult<()>
630-
where
631-
T: Write,
632-
{
607+
/// Store new version changes, appending them to the given file
608+
fn store_version_changes(versions_path: &Path, changes: &[VersionChange]) -> OperationResult<()> {
609+
// Open file in append mode to write new changes to the end
610+
let file = File::options()
611+
.create(true)
612+
.append(true)
613+
.open(versions_path)?;
614+
let mut writer = BufWriter::new(file);
615+
616+
write_version_changes(&mut writer, changes).map_err(|err| {
617+
OperationError::service_error(format!(
618+
"Failed to persist ID tracker point versions ({}): {err}",
619+
versions_path.display(),
620+
))
621+
})?;
622+
623+
// Explicitly fsync file contents to ensure durability
624+
let file = writer.into_inner().unwrap();
625+
file.sync_all().map_err(|err| {
626+
OperationError::service_error(format!("Failed to fsync ID tracker point versions: {err}"))
627+
})?;
628+
629+
Ok(())
630+
}
631+
632+
/// Serializes pending point version changes into the given writer
633+
fn write_version_changes<W: Write>(
634+
mut writer: W,
635+
changes: &[VersionChange],
636+
) -> OperationResult<()> {
633637
for change in changes {
634638
let entry = serde_json::to_vec(change)?;
635639
debug_assert!(
@@ -641,9 +645,7 @@ where
641645
}
642646

643647
// Explicitly flush writer to catch IO errors
644-
writer
645-
.flush()
646-
.map_err(|err| OperationError::service_error(format!("Failed to flush: {err}")))?;
648+
writer.flush()?;
647649

648650
Ok(())
649651
}

0 commit comments

Comments
 (0)