1919package org .apache .pinot .integration .tests ;
2020
2121import cloud .localstack .Localstack ;
22+ import cloud .localstack .ServiceName ;
2223import cloud .localstack .docker .annotation .LocalstackDockerAnnotationProcessor ;
2324import cloud .localstack .docker .annotation .LocalstackDockerConfiguration ;
2425import cloud .localstack .docker .annotation .LocalstackDockerProperties ;
8384import software .amazon .awssdk .utils .AttributeMap ;
8485
8586
86- @ LocalstackDockerProperties (services = {"kinesis" })
87- @ Test (enabled = false )
87+ @ LocalstackDockerProperties (services = {ServiceName .KINESIS }, imageTag = "0.12.15" )
8888public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSet {
8989 private static final Logger LOGGER = LoggerFactory .getLogger (RealtimeKinesisIntegrationTest .class );
9090
@@ -189,31 +189,30 @@ public Map<String, String> createKinesisStreamConfig() {
189189 String streamType = "kinesis" ;
190190 streamConfigMap .put (StreamConfigProperties .STREAM_TYPE , streamType );
191191
192- streamConfigMap
193- . put ( StreamConfigProperties .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_TOPIC_NAME ),
194- STREAM_NAME );
192+ streamConfigMap . put (
193+ StreamConfigProperties .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_TOPIC_NAME ),
194+ STREAM_NAME );
195195
196196 streamConfigMap .put (
197197 StreamConfigProperties .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_FETCH_TIMEOUT_MILLIS ),
198198 "30000" );
199- streamConfigMap
200- .put (StreamConfigProperties .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_CONSUMER_TYPES ),
201- StreamConfig .ConsumerType .LOWLEVEL .toString ());
202- streamConfigMap .put (StreamConfigProperties
203- .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_CONSUMER_FACTORY_CLASS ),
204- KinesisConsumerFactory .class .getName ());
205- streamConfigMap
206- .put (StreamConfigProperties .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_DECODER_CLASS ),
207- "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder" );
199+ streamConfigMap .put (
200+ StreamConfigProperties .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_CONSUMER_TYPES ),
201+ StreamConfig .ConsumerType .LOWLEVEL .toString ());
202+ streamConfigMap .put (StreamConfigProperties .constructStreamProperty (STREAM_TYPE ,
203+ StreamConfigProperties .STREAM_CONSUMER_FACTORY_CLASS ), KinesisConsumerFactory .class .getName ());
204+ streamConfigMap .put (
205+ StreamConfigProperties .constructStreamProperty (STREAM_TYPE , StreamConfigProperties .STREAM_DECODER_CLASS ),
206+ "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder" );
208207 streamConfigMap .put (KinesisConfig .REGION , REGION );
209208 streamConfigMap .put (KinesisConfig .MAX_RECORDS_TO_FETCH , String .valueOf (MAX_RECORDS_TO_FETCH ));
210209 streamConfigMap .put (KinesisConfig .SHARD_ITERATOR_TYPE , ShardIteratorType .AFTER_SEQUENCE_NUMBER .toString ());
211210 streamConfigMap .put (KinesisConfig .ENDPOINT , LOCALSTACK_KINESIS_ENDPOINT );
212211 streamConfigMap .put (KinesisConfig .ACCESS_KEY , getLocalAWSCredentials ().resolveCredentials ().accessKeyId ());
213212 streamConfigMap .put (KinesisConfig .SECRET_KEY , getLocalAWSCredentials ().resolveCredentials ().secretAccessKey ());
214213 streamConfigMap .put (StreamConfigProperties .SEGMENT_FLUSH_THRESHOLD_ROWS , Integer .toString (200 ));
215- streamConfigMap .put (StreamConfigProperties
216- . constructStreamProperty ( streamType , StreamConfigProperties .STREAM_CONSUMER_OFFSET_CRITERIA ), "smallest" );
214+ streamConfigMap .put (StreamConfigProperties . constructStreamProperty ( streamType ,
215+ StreamConfigProperties .STREAM_CONSUMER_OFFSET_CRITERIA ), "smallest" );
217216 return streamConfigMap ;
218217 }
219218
@@ -225,8 +224,8 @@ public void startKinesis()
225224 _localstackDocker .startup (dockerConfig );
226225
227226 _kinesisClient = KinesisClient .builder ().httpClient (new ApacheSdkHttpService ().createHttpClientBuilder ()
228- .buildWithDefaults (
229- AttributeMap .builder ().put (SdkHttpConfigurationOption .TRUST_ALL_CERTIFICATES , Boolean .TRUE ).build ()))
227+ .buildWithDefaults (
228+ AttributeMap .builder ().put (SdkHttpConfigurationOption .TRUST_ALL_CERTIFICATES , Boolean .TRUE ).build ()))
230229 .credentialsProvider (getLocalAWSCredentials ()).region (Region .of (REGION ))
231230 .endpointOverride (new URI (LOCALSTACK_KINESIS_ENDPOINT )).build ();
232231
@@ -278,8 +277,8 @@ private void publishRecordsToKinesis() {
278277 .partitionKey (data .get ("Origin" ).textValue ()).build ();
279278 PutRecordResponse putRecordResponse = _kinesisClient .putRecord (putRecordRequest );
280279 if (putRecordResponse .sdkHttpResponse ().statusCode () == 200 ) {
281- if (StringUtils .isNotBlank (putRecordResponse .sequenceNumber ()) && StringUtils
282- . isNotBlank ( putRecordResponse .shardId ())) {
280+ if (StringUtils .isNotBlank (putRecordResponse .sequenceNumber ()) && StringUtils . isNotBlank (
281+ putRecordResponse .shardId ())) {
283282 _totalRecordsPushedInStream ++;
284283
285284 int fieldIndex = 1 ;
@@ -321,9 +320,8 @@ public void testRecords()
321320 throws Exception {
322321 Assert .assertNotEquals (_totalRecordsPushedInStream , 0 );
323322
324- ResultSet pinotResultSet = getPinotConnection ()
325- .execute (new Request ("sql" , "SELECT * FROM " + getTableName () + " ORDER BY Origin LIMIT 10000" ))
326- .getResultSet (0 );
323+ ResultSet pinotResultSet = getPinotConnection ().execute (
324+ new Request ("sql" , "SELECT * FROM " + getTableName () + " ORDER BY Origin LIMIT 10000" )).getResultSet (0 );
327325
328326 Assert .assertNotEquals (pinotResultSet .getRowCount (), 0 );
329327
@@ -440,8 +438,8 @@ public void createH2ConnectionAndTable()
440438 }
441439 }
442440
443- _h2Connection .prepareCall ("CREATE TABLE " + getTableName () + "(" + StringUtil
444- . join ( "," , _h2FieldNameAndTypes .toArray (new String [_h2FieldNameAndTypes .size ()])) + ")" ).execute ();
441+ _h2Connection .prepareCall ("CREATE TABLE " + getTableName () + "(" + StringUtil . join ( "," ,
442+ _h2FieldNameAndTypes .toArray (new String [_h2FieldNameAndTypes .size ()])) + ")" ).execute ();
445443 }
446444
447445 @ AfterClass (enabled = false )
0 commit comments