@@ -3,7 +3,7 @@ use std::fs::File;
33use std:: str:: FromStr ;
44
55use rand:: Rng ;
6- use segment:: data_types:: vectors:: { DEFAULT_VECTOR_NAME , VectorInternal } ;
6+ use segment:: data_types:: vectors:: { DEFAULT_VECTOR_NAME , VectorInternal , only_default_vector } ;
77use segment:: json_path:: JsonPath ;
88use segment:: payload_json;
99use segment:: segment_constructor:: simple_segment_constructor:: build_simple_segment;
@@ -576,3 +576,92 @@ fn test_snapshot_all() {
576576 // one archive produced per concrete segment in the SegmentHolder
577577 assert_eq ! ( archive_count, 2 ) ;
578578}
579+
580+ #[ test]
581+ fn test_double_proxies ( ) {
582+ let hw_counter = HardwareCounterCell :: disposable ( ) ;
583+
584+ let dir = Builder :: new ( ) . prefix ( "segment_dir" ) . tempdir ( ) . unwrap ( ) ;
585+ let segment1 = build_segment_1 ( dir. path ( ) ) ;
586+
587+ let mut holder = SegmentHolder :: default ( ) ;
588+
589+ let _sid1 = holder. add_new ( segment1) ;
590+
591+ let holder = Arc :: new ( RwLock :: new ( holder) ) ;
592+
593+ let before_segment_ids = holder
594+ . read ( )
595+ . iter ( )
596+ . map ( |( id, _) | * id)
597+ . collect :: < HashSet < _ > > ( ) ;
598+
599+ let segments_dir = Builder :: new ( ) . prefix ( "segments_dir" ) . tempdir ( ) . unwrap ( ) ;
600+ let payload_schema_file = dir. path ( ) . join ( "payload.schema" ) ;
601+ let schema: Arc < SaveOnDisk < PayloadIndexSchema > > =
602+ Arc :: new ( SaveOnDisk :: load_or_init_default ( payload_schema_file) . unwrap ( ) ) ;
603+
604+ let ( inner_proxies, inner_tmp_segment, inner_segments_lock) =
605+ SegmentHolder :: proxy_all_segments (
606+ holder. upgradable_read ( ) ,
607+ segments_dir. path ( ) ,
608+ None ,
609+ schema. clone ( ) ,
610+ )
611+ . unwrap ( ) ;
612+
613+ // check inner proxy contains points
614+ let points = inner_proxies[ 0 ]
615+ . 1
616+ . get ( )
617+ . read ( )
618+ . read_range ( Some ( 1 . into ( ) ) , None ) ;
619+ assert_eq ! ( & points, & [ 1 . into( ) , 2 . into( ) , 3 . into( ) , 4 . into( ) , 5 . into( ) ] ) ;
620+
621+ // Writing to inner proxy segment
622+ inner_proxies[ 0 ]
623+ . 1
624+ . get ( )
625+ . write ( )
626+ . delete_point ( 10 , 1 . into ( ) , & hw_counter)
627+ . unwrap ( ) ;
628+
629+ let ( outer_proxies, outer_tmp_segment, outer_segments_lock) =
630+ SegmentHolder :: proxy_all_segments ( inner_segments_lock, segments_dir. path ( ) , None , schema)
631+ . unwrap ( ) ;
632+
633+ // Writing to outer proxy segment
634+ outer_proxies[ 0 ]
635+ . 1
636+ . get ( )
637+ . write ( )
638+ . upsert_point (
639+ 100 ,
640+ 1 . into ( ) ,
641+ only_default_vector ( & [ 0.0 , 0.0 , 0.0 , 0.0 ] ) ,
642+ & hw_counter,
643+ )
644+ . unwrap ( ) ;
645+
646+ // Unproxy once
647+ SegmentHolder :: unproxy_all_segments ( outer_segments_lock, outer_proxies, outer_tmp_segment)
648+ . unwrap ( ) ;
649+
650+ // Unproxy twice
651+ SegmentHolder :: unproxy_all_segments ( holder. upgradable_read ( ) , inner_proxies, inner_tmp_segment)
652+ . unwrap ( ) ;
653+
654+ let after_segment_ids = holder
655+ . read ( )
656+ . iter ( )
657+ . map ( |( id, _) | * id)
658+ . collect :: < HashSet < _ > > ( ) ;
659+
660+ // Check that we have one new segment
661+ let diff: HashSet < _ > = after_segment_ids. difference ( & before_segment_ids) . collect ( ) ;
662+ assert_eq ! (
663+ diff. len( ) ,
664+ 1 ,
665+ "There should be one new segment after unproxying"
666+ ) ;
667+ }
0 commit comments