Skip to content

Commit c1344d0

Browse files
committed
id tracker single write in flush
1 parent 08785c5 commit c1344d0

3 files changed

Lines changed: 66 additions & 8 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use std::io::{self, Write};
2+
3+
/// A writer that counts the number of bytes written to it without actually storing them.
4+
#[derive(Default)]
5+
pub struct CountingWrite {
6+
bytes: u64,
7+
}
8+
9+
impl CountingWrite {
10+
/// Returns the total number of bytes that have been "written" to this writer.
11+
pub fn bytes_written(&self) -> u64 {
12+
self.bytes
13+
}
14+
}
15+
16+
impl Write for CountingWrite {
17+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
18+
self.bytes += buf.len() as u64;
19+
Ok(buf.len()) // pretend we wrote everything
20+
}
21+
22+
fn flush(&mut self) -> io::Result<()> {
23+
Ok(())
24+
}
25+
}

lib/common/io/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod counting_write;
12
pub mod file_operations;
23
pub mod move_files;
34
pub mod safe_delete;

lib/segment/src/id_tracker/mutable_id_tracker.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::io::{self, BufReader, BufWriter, Read, Seek, Write};
33
use std::path::{Path, PathBuf};
44
use std::sync::Arc;
55

6+
use ::io::counting_write::CountingWrite;
67
use bitvec::prelude::{BitSlice, BitVec};
78
use byteorder::{ReadBytesExt, WriteBytesExt};
89
use common::is_alive_lock::IsAliveLock;
@@ -16,6 +17,7 @@ use uuid::Uuid;
1617
use super::point_mappings::FileEndianess;
1718
use crate::common::Flusher;
1819
use crate::common::operation_error::{OperationError, OperationResult};
20+
use crate::common::vector_utils::TrySetCapacityExact;
1921
use crate::id_tracker::point_mappings::PointMappings;
2022
use crate::id_tracker::{DELETED_POINT_VERSION, IdTracker};
2123
use crate::types::{PointIdType, SeqNumberType};
@@ -399,23 +401,57 @@ fn store_mapping_changes(
399401
mappings_path: &Path,
400402
changes: &Vec<MappingChange>,
401403
) -> OperationResult<()> {
404+
// Estimate required buffer size
405+
let mut counting_write = CountingWrite::default();
406+
write_mapping_changes(&mut counting_write, changes).map_err(|err| {
407+
OperationError::service_error(format!(
408+
"Failed to estimate ID tracker point mappings size ({}): {err}",
409+
mappings_path.display(),
410+
))
411+
})?;
412+
413+
// Create a buffer with exact required size
414+
let mut writer: Vec<u8> = Vec::default();
415+
writer
416+
.try_set_capacity_exact(counting_write.bytes_written() as usize)
417+
.map_err(|err| {
418+
OperationError::service_error(format!(
419+
"Failed to allocate buffer for ID tracker point mappings ({}): {err}",
420+
mappings_path.display(),
421+
))
422+
})?;
423+
424+
// Collect all changes into a single buffer first
425+
write_mapping_changes(&mut writer, changes).map_err(|err| {
426+
OperationError::service_error(format!(
427+
"Failed to collect ID tracker point mappings ({}): {err}",
428+
mappings_path.display(),
429+
))
430+
})?;
431+
402432
// Create or open file in append mode to write new changes to the end
403433
let file = File::options()
404434
.create(true)
405435
.append(true)
406436
.open(mappings_path)?;
407-
let mut writer = BufWriter::new(file);
408437

409-
write_mapping_changes(&mut writer, changes).map_err(|err| {
438+
// Write all changes in one shot
439+
let mut file_writer = BufWriter::new(file);
440+
file_writer.write_all(&writer).map_err(|err| {
410441
OperationError::service_error(format!(
411442
"Failed to persist ID tracker point mappings ({}): {err}",
412443
mappings_path.display(),
413444
))
414445
})?;
415446

447+
// Drop the buffer to free memory
448+
drop(writer);
449+
416450
// Explicitly fsync file contents to ensure durability
417-
writer.flush()?;
418-
let file = writer.into_inner().unwrap();
451+
// `BufWriter::into_inner` flushes the buffer as well
452+
let file = file_writer.into_inner().map_err(|err| {
453+
OperationError::service_error(format!("Failed to flush ID tracker point mappings: {err}"))
454+
})?;
419455
file.sync_all().map_err(|err| {
420456
OperationError::service_error(format!("Failed to fsync ID tracker point mappings: {err}"))
421457
})?;
@@ -438,10 +474,6 @@ fn write_mapping_changes<W: Write>(
438474
for &change in changes {
439475
write_entry(&mut writer, change)?;
440476
}
441-
442-
// Explicitly flush writer to catch IO errors
443-
writer.flush()?;
444-
445477
Ok(())
446478
}
447479

0 commit comments

Comments
 (0)