2121import com .google .common .collect .ImmutableList ;
2222import com .google .common .collect .ImmutableMap ;
2323import java .util .ArrayList ;
24+ import java .util .Arrays ;
2425import java .util .List ;
2526import java .util .Map ;
2627import java .util .TreeMap ;
@@ -164,18 +165,19 @@ public void testRelocateCompletedSegments() {
164165 SegmentStateModel .OFFLINE );
165166 currentAssignment .put (offlineSegmentName , offlineSegmentInstanceStateMap );
166167
167- // Add an uploaded ONLINE segment to the consuming instances (i.e. no separation between consuming & completed)
168- List <String > uploadedSegmentNames = ImmutableList .of ("UploadedSegment1" , "UploadedSegment2" );
168+ // Add an UPLOADED segment to the CONSUMING instances (i.e. no separation between CONSUMING & COMPLETED)
169+ List <String > uploadedSegments = ImmutableList .of ("UploadedSegment1" , "UploadedSegment2" );
170+ int numUploadedSegments = uploadedSegments .size ();
169171 onlyConsumingInstancePartitionMap =
170172 ImmutableMap .of (InstancePartitionsType .CONSUMING , _instancePartitionsMap .get (InstancePartitionsType .CONSUMING ));
171- for (String uploadedSegName : uploadedSegmentNames ) {
173+ for (String segmentName : uploadedSegments ) {
172174 List <String > instancesAssigned =
173- _segmentAssignment .assignSegment (uploadedSegName , currentAssignment , onlyConsumingInstancePartitionMap );
174- currentAssignment .put (uploadedSegName ,
175+ _segmentAssignment .assignSegment (segmentName , currentAssignment , onlyConsumingInstancePartitionMap );
176+ currentAssignment .put (segmentName ,
175177 SegmentAssignmentUtils .getInstanceStateMap (instancesAssigned , SegmentStateModel .ONLINE ));
176178 }
177179 // Now there should be 103 segments assigned
178- assertEquals (currentAssignment .size (), NUM_SEGMENTS + uploadedSegmentNames . size () + 1 );
180+ assertEquals (currentAssignment .size (), NUM_SEGMENTS + numUploadedSegments + 1 );
179181 // Each segment should have 3 replicas and all assigned instances should be prefixed with consuming
180182 currentAssignment .forEach ((type , instanceStateMap ) -> {
181183 assertEquals (instanceStateMap .size (), NUM_REPLICAS );
@@ -197,25 +199,25 @@ public void testRelocateCompletedSegments() {
197199 Map <String , Map <String , String >> newAssignment =
198200 _segmentAssignment .rebalanceTable (currentAssignment , _instancePartitionsMap , null , null ,
199201 new BaseConfiguration ());
200- assertEquals (newAssignment .size (), NUM_SEGMENTS + uploadedSegmentNames . size () + 1 );
202+ assertEquals (newAssignment .size (), NUM_SEGMENTS + numUploadedSegments + 1 );
201203 for (int segmentId = 0 ; segmentId < NUM_SEGMENTS ; segmentId ++) {
202204 if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS ) {
203- // check COMPLETED (ONLINE) segments
205+ // Check the COMPLETED (ONLINE) segments
204206 newAssignment .get (_segments .get (segmentId )).forEach ((instance , state ) -> {
205207 assertTrue (instance .startsWith (COMPLETED_INSTANCE_NAME_PREFIX ));
206208 assertEquals (state , SegmentStateModel .ONLINE );
207209 });
208210 } else {
209- // check CONSUMING segments
211+ // Check the CONSUMING segments
210212 newAssignment .get (_segments .get (segmentId )).forEach ((instance , state ) -> {
211213 assertTrue (instance .startsWith (CONSUMING_INSTANCE_NAME_PREFIX ));
212214 assertEquals (state , SegmentStateModel .CONSUMING );
213215 });
214216 }
215217 }
216- // check the uploaded segments
217- for (String uploadedSegName : uploadedSegmentNames ) {
218- newAssignment .get (uploadedSegName ).forEach ((instance , state ) -> {
218+ // Check the UPLOADED (ONLINE) segments
219+ for (String segmentName : uploadedSegments ) {
220+ newAssignment .get (segmentName ).forEach ((instance , state ) -> {
219221 assertTrue (instance .startsWith (COMPLETED_INSTANCE_NAME_PREFIX ));
220222 assertEquals (state , SegmentStateModel .ONLINE );
221223 });
@@ -225,11 +227,16 @@ public void testRelocateCompletedSegments() {
225227 int [] numSegmentsAssignedPerInstance =
226228 SegmentAssignmentUtils .getNumSegmentsAssignedPerInstance (newAssignment , COMPLETED_INSTANCES );
227229 assertEquals (numSegmentsAssignedPerInstance .length , NUM_COMPLETED_INSTANCES );
228- int expectedMinNumSegmentsPerInstance =
229- (NUM_SEGMENTS - NUM_PARTITIONS ) * NUM_REPLICAS / NUM_COMPLETED_INSTANCES + 1 ;
230- for (int i = 0 ; i < NUM_COMPLETED_INSTANCES ; i ++) {
231- assertTrue (numSegmentsAssignedPerInstance [i ] >= expectedMinNumSegmentsPerInstance );
230+ int expectedTotalNumSegmentsAssigned = (NUM_SEGMENTS - NUM_PARTITIONS + numUploadedSegments ) * NUM_REPLICAS ;
231+ int expectedMinNumSegmentsPerInstance = expectedTotalNumSegmentsAssigned / NUM_COMPLETED_INSTANCES ;
232+ int totalNumSegmentsAssigned = 0 ;
233+ for (int numSegmentsAssigned : numSegmentsAssignedPerInstance ) {
234+ assertTrue (numSegmentsAssigned >= expectedMinNumSegmentsPerInstance ,
235+ String .format ("Expect at least: %d segments assigned per instance, got: %s" ,
236+ expectedMinNumSegmentsPerInstance , Arrays .toString (numSegmentsAssignedPerInstance )));
237+ totalNumSegmentsAssigned += numSegmentsAssigned ;
232238 }
239+ assertEquals (totalNumSegmentsAssigned , expectedTotalNumSegmentsAssigned );
233240
234241 // Rebalance with COMPLETED instance partitions including CONSUMING segments should give the same assignment
235242 BaseConfiguration rebalanceConfig = new BaseConfiguration ();
0 commit comments