1818
1919import com .ctrip .framework .apollo .biz .config .BizConfig ;
2020import com .google .common .collect .Lists ;
21+ import com .google .common .collect .Sets ;
2122import com .google .common .util .concurrent .SettableFuture ;
2223
2324import com .ctrip .framework .apollo .biz .AbstractUnitTest ;
2425import com .ctrip .framework .apollo .biz .entity .ReleaseMessage ;
2526import com .ctrip .framework .apollo .biz .repository .ReleaseMessageRepository ;
2627
28+ import java .util .ArrayList ;
29+ import org .awaitility .Awaitility ;
2730import org .junit .Before ;
2831import org .junit .Test ;
2932import org .mockito .Mock ;
3033import org .springframework .test .util .ReflectionTestUtils ;
3134
3235import java .util .concurrent .TimeUnit ;
3336
37+ import static org .awaitility .Awaitility .await ;
3438import static org .junit .Assert .assertEquals ;
39+ import static org .junit .Assert .assertSame ;
3540import static org .mockito .Mockito .when ;
3641
3742/**
@@ -54,6 +59,10 @@ public void setUp() throws Exception {
5459 databaseScanInterval = 100 ; //100 ms
5560 when (bizConfig .releaseMessageScanIntervalInMilli ()).thenReturn (databaseScanInterval );
5661 releaseMessageScanner .afterPropertiesSet ();
62+
63+ Awaitility .reset ();
64+ Awaitility .setDefaultTimeout (databaseScanInterval * 5 , TimeUnit .MILLISECONDS );
65+ Awaitility .setDefaultPollInterval (databaseScanInterval , TimeUnit .MILLISECONDS );
5766 }
5867
5968 @ Test
@@ -91,7 +100,86 @@ public void testScanMessageAndNotifyMessageListener() throws Exception {
91100
92101 assertEquals (anotherMessage , anotherListenerMessage .getMessage ());
93102 assertEquals (anotherId , anotherListenerMessage .getId ());
103+ }
104+
105+ @ Test
106+ public void testScanMessageWithGapAndNotifyMessageListener () throws Exception {
107+ String someMessage = "someMessage" ;
108+ long someId = 1 ;
109+ ReleaseMessage someReleaseMessage = assembleReleaseMessage (someId , someMessage );
110+
111+ String someMissingMessage = "someMissingMessage" ;
112+ long someMissingId = 2 ;
113+ ReleaseMessage someMissingReleaseMessage = assembleReleaseMessage (someMissingId , someMissingMessage );
114+
115+ String anotherMessage = "anotherMessage" ;
116+ long anotherId = 3 ;
117+ ReleaseMessage anotherReleaseMessage = assembleReleaseMessage (anotherId , anotherMessage );
118+
119+ String anotherMissingMessage = "anotherMissingMessage" ;
120+ long anotherMissingId = 4 ;
121+ ReleaseMessage anotherMissingReleaseMessage = assembleReleaseMessage (anotherMissingId , anotherMissingMessage );
122+
123+ long someRolledBackId = 5 ;
124+
125+ String yetAnotherMessage = "yetAnotherMessage" ;
126+ long yetAnotherId = 6 ;
127+ ReleaseMessage yetAnotherReleaseMessage = assembleReleaseMessage (yetAnotherId , yetAnotherMessage );
128+
129+ ArrayList <ReleaseMessage > receivedMessage = Lists .newArrayList ();
130+ SettableFuture <ReleaseMessage > someListenerFuture = SettableFuture .create ();
131+ ReleaseMessageListener someListener = (message , channel ) -> receivedMessage .add (message );
132+ releaseMessageScanner .addMessageListener (someListener );
133+
134+ when (releaseMessageRepository .findFirst500ByIdGreaterThanOrderByIdAsc (0L )).thenReturn (
135+ Lists .newArrayList (someReleaseMessage ));
136+
137+ await ().untilAsserted (() -> {
138+ assertEquals (1 , receivedMessage .size ());
139+ assertSame (someReleaseMessage , receivedMessage .get (0 ));
140+ });
141+
142+ when (releaseMessageRepository .findFirst500ByIdGreaterThanOrderByIdAsc (someId )).thenReturn (
143+ Lists .newArrayList (anotherReleaseMessage ));
94144
145+ await ().untilAsserted (() -> {
146+ assertEquals (2 , receivedMessage .size ());
147+ assertSame (someReleaseMessage , receivedMessage .get (0 ));
148+ assertSame (anotherReleaseMessage , receivedMessage .get (1 ));
149+ });
150+
151+ when (releaseMessageRepository .findAllById (Sets .newHashSet (someMissingId )))
152+ .thenReturn (Lists .newArrayList (someMissingReleaseMessage ));
153+
154+ await ().untilAsserted (() -> {
155+ assertEquals (3 , receivedMessage .size ());
156+ assertSame (someReleaseMessage , receivedMessage .get (0 ));
157+ assertSame (anotherReleaseMessage , receivedMessage .get (1 ));
158+ assertSame (someMissingReleaseMessage , receivedMessage .get (2 ));
159+ });
160+
161+ when (releaseMessageRepository .findFirst500ByIdGreaterThanOrderByIdAsc (anotherId )).thenReturn (
162+ Lists .newArrayList (yetAnotherReleaseMessage ));
163+
164+ await ().untilAsserted (() -> {
165+ assertEquals (4 , receivedMessage .size ());
166+ assertSame (someReleaseMessage , receivedMessage .get (0 ));
167+ assertSame (anotherReleaseMessage , receivedMessage .get (1 ));
168+ assertSame (someMissingReleaseMessage , receivedMessage .get (2 ));
169+ assertSame (yetAnotherReleaseMessage , receivedMessage .get (3 ));
170+ });
171+
172+ when (releaseMessageRepository .findAllById (Sets .newHashSet (anotherMissingId , someRolledBackId )))
173+ .thenReturn (Lists .newArrayList (anotherMissingReleaseMessage ));
174+
175+ await ().untilAsserted (() -> {
176+ assertEquals (5 , receivedMessage .size ());
177+ assertSame (someReleaseMessage , receivedMessage .get (0 ));
178+ assertSame (anotherReleaseMessage , receivedMessage .get (1 ));
179+ assertSame (someMissingReleaseMessage , receivedMessage .get (2 ));
180+ assertSame (yetAnotherReleaseMessage , receivedMessage .get (3 ));
181+ assertSame (anotherMissingReleaseMessage , receivedMessage .get (4 ));
182+ });
95183 }
96184
97185 private ReleaseMessage assembleReleaseMessage (long id , String message ) {
0 commit comments