@@ -275,29 +275,7 @@ impl IdTracker for MutableIdTracker {
275275 return Ok ( ( ) ) ;
276276 }
277277
278- // Open file in append mode to write new changes to the end
279- let file = File :: options ( )
280- . create ( true )
281- . append ( true )
282- . open ( & versions_path) ?;
283- let mut writer = BufWriter :: new ( file) ;
284-
285- write_versions ( & mut writer, & pending_versions) . map_err ( |err| {
286- OperationError :: service_error ( format ! (
287- "Failed to persist ID tracker point versions ({}): {err}" ,
288- versions_path. display( ) ,
289- ) )
290- } ) ?;
291-
292- // Explicitly fsync file contents to ensure durability
293- let file = writer. into_inner ( ) . unwrap ( ) ;
294- file. sync_all ( ) . map_err ( |err| {
295- OperationError :: service_error ( format ! (
296- "Failed to fsync ID tracker point mappings: {err}" ,
297- ) )
298- } ) ?;
299-
300- Ok ( ( ) )
278+ store_version_changes ( & versions_path, & pending_versions)
301279 } )
302280 }
303281
@@ -626,10 +604,36 @@ fn load_versions(
626604 Ok ( internal_to_version)
627605}
628606
629- fn write_versions < T > ( writer : & mut BufWriter < T > , changes : & [ VersionChange ] ) -> OperationResult < ( ) >
630- where
631- T : Write ,
632- {
607+ /// Store new version changes, appending them to the given file
608+ fn store_version_changes ( versions_path : & Path , changes : & [ VersionChange ] ) -> OperationResult < ( ) > {
609+ // Open file in append mode to write new changes to the end
610+ let file = File :: options ( )
611+ . create ( true )
612+ . append ( true )
613+ . open ( versions_path) ?;
614+ let mut writer = BufWriter :: new ( file) ;
615+
616+ write_version_changes ( & mut writer, changes) . map_err ( |err| {
617+ OperationError :: service_error ( format ! (
618+ "Failed to persist ID tracker point versions ({}): {err}" ,
619+ versions_path. display( ) ,
620+ ) )
621+ } ) ?;
622+
623+ // Explicitly fsync file contents to ensure durability
624+ let file = writer. into_inner ( ) . unwrap ( ) ;
625+ file. sync_all ( ) . map_err ( |err| {
626+ OperationError :: service_error ( format ! ( "Failed to fsync ID tracker point versions: {err}" ) )
627+ } ) ?;
628+
629+ Ok ( ( ) )
630+ }
631+
632+ /// Serializes pending point version changes into the given writer
633+ fn write_version_changes < W : Write > (
634+ mut writer : W ,
635+ changes : & [ VersionChange ] ,
636+ ) -> OperationResult < ( ) > {
633637 for change in changes {
634638 let entry = serde_json:: to_vec ( change) ?;
635639 debug_assert ! (
@@ -641,9 +645,7 @@ where
641645 }
642646
643647 // Explicitly flush writer to catch IO errors
644- writer
645- . flush ( )
646- . map_err ( |err| OperationError :: service_error ( format ! ( "Failed to flush: {err}" ) ) ) ?;
648+ writer. flush ( ) ?;
647649
648650 Ok ( ( ) )
649651}
0 commit comments