@@ -3,6 +3,7 @@ use std::io::{self, BufReader, BufWriter, Read, Seek, Write};
33use std:: path:: { Path , PathBuf } ;
44use std:: sync:: Arc ;
55
6+ use :: io:: counting_write:: CountingWrite ;
67use bitvec:: prelude:: { BitSlice , BitVec } ;
78use byteorder:: { ReadBytesExt , WriteBytesExt } ;
89use common:: is_alive_lock:: IsAliveLock ;
@@ -16,6 +17,7 @@ use uuid::Uuid;
1617use super :: point_mappings:: FileEndianess ;
1718use crate :: common:: Flusher ;
1819use crate :: common:: operation_error:: { OperationError , OperationResult } ;
20+ use crate :: common:: vector_utils:: TrySetCapacityExact ;
1921use crate :: id_tracker:: point_mappings:: PointMappings ;
2022use crate :: id_tracker:: { DELETED_POINT_VERSION , IdTracker } ;
2123use crate :: types:: { PointIdType , SeqNumberType } ;
@@ -399,23 +401,57 @@ fn store_mapping_changes(
399401 mappings_path : & Path ,
400402 changes : & Vec < MappingChange > ,
401403) -> OperationResult < ( ) > {
404+ // Estimate required buffer size
405+ let mut counting_write = CountingWrite :: default ( ) ;
406+ write_mapping_changes ( & mut counting_write, changes) . map_err ( |err| {
407+ OperationError :: service_error ( format ! (
408+ "Failed to estimate ID tracker point mappings size ({}): {err}" ,
409+ mappings_path. display( ) ,
410+ ) )
411+ } ) ?;
412+
413+ // Create a buffer with exact required size
414+ let mut writer: Vec < u8 > = Vec :: default ( ) ;
415+ writer
416+ . try_set_capacity_exact ( counting_write. bytes_written ( ) as usize )
417+ . map_err ( |err| {
418+ OperationError :: service_error ( format ! (
419+ "Failed to allocate buffer for ID tracker point mappings ({}): {err}" ,
420+ mappings_path. display( ) ,
421+ ) )
422+ } ) ?;
423+
424+ // Collect all changes into a single buffer first
425+ write_mapping_changes ( & mut writer, changes) . map_err ( |err| {
426+ OperationError :: service_error ( format ! (
427+ "Failed to collect ID tracker point mappings ({}): {err}" ,
428+ mappings_path. display( ) ,
429+ ) )
430+ } ) ?;
431+
402432 // Create or open file in append mode to write new changes to the end
403433 let file = File :: options ( )
404434 . create ( true )
405435 . append ( true )
406436 . open ( mappings_path) ?;
407- let mut writer = BufWriter :: new ( file) ;
408437
409- write_mapping_changes ( & mut writer, changes) . map_err ( |err| {
438+ // Write all changes in one shot
439+ let mut file_writer = BufWriter :: new ( file) ;
440+ file_writer. write_all ( & writer) . map_err ( |err| {
410441 OperationError :: service_error ( format ! (
411442 "Failed to persist ID tracker point mappings ({}): {err}" ,
412443 mappings_path. display( ) ,
413444 ) )
414445 } ) ?;
415446
447+ // Drop the buffer to free memory
448+ drop ( writer) ;
449+
416450 // Explicitly fsync file contents to ensure durability
417- writer. flush ( ) ?;
418- let file = writer. into_inner ( ) . unwrap ( ) ;
451+ // `BufWriter::into_inner` flushes the buffer as well
452+ let file = file_writer. into_inner ( ) . map_err ( |err| {
453+ OperationError :: service_error ( format ! ( "Failed to flush ID tracker point mappings: {err}" ) )
454+ } ) ?;
419455 file. sync_all ( ) . map_err ( |err| {
420456 OperationError :: service_error ( format ! ( "Failed to fsync ID tracker point mappings: {err}" ) )
421457 } ) ?;
@@ -438,10 +474,6 @@ fn write_mapping_changes<W: Write>(
438474 for & change in changes {
439475 write_entry ( & mut writer, change) ?;
440476 }
441-
442- // Explicitly flush writer to catch IO errors
443- writer. flush ( ) ?;
444-
445477 Ok ( ( ) )
446478}
447479
0 commit comments