Skip to content

Commit 4bae48b

Browse files
authored
Re-enable kinesis test (#7960)
* Enable kinesis test * Upgrade localstack version and change kinesis localstack port * Adding hostname provider and old localstack docker version * Fix linting: reduce line length * Rollback to 0.12.15 localstack version * remove unused import * Revert to default port
1 parent c0b3e9c commit 4bae48b

File tree

2 files changed

+24
-26
lines changed

2 files changed

+24
-26
lines changed

pinot-integration-tests/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
<properties>
3636
<pinot.root>${basedir}/..</pinot.root>
37-
<localstack-utils.version>0.2.15</localstack-utils.version>
37+
<localstack-utils.version>0.2.19</localstack-utils.version>
3838
<awaitility.version>3.0.0</awaitility.version>
3939
<aws.sdk.version>2.14.28</aws.sdk.version>
4040
</properties>

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pinot.integration.tests;
2020

2121
import cloud.localstack.Localstack;
22+
import cloud.localstack.ServiceName;
2223
import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
2324
import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
2425
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
@@ -83,8 +84,7 @@
8384
import software.amazon.awssdk.utils.AttributeMap;
8485

8586

86-
@LocalstackDockerProperties(services = {"kinesis"})
87-
@Test(enabled = false)
87+
@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = "0.12.15")
8888
public class RealtimeKinesisIntegrationTest extends BaseClusterIntegrationTestSet {
8989
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class);
9090

@@ -189,31 +189,30 @@ public Map<String, String> createKinesisStreamConfig() {
189189
String streamType = "kinesis";
190190
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
191191

192-
streamConfigMap
193-
.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
194-
STREAM_NAME);
192+
streamConfigMap.put(
193+
StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
194+
STREAM_NAME);
195195

196196
streamConfigMap.put(
197197
StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
198198
"30000");
199-
streamConfigMap
200-
.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
201-
StreamConfig.ConsumerType.LOWLEVEL.toString());
202-
streamConfigMap.put(StreamConfigProperties
203-
.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
204-
KinesisConsumerFactory.class.getName());
205-
streamConfigMap
206-
.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS),
207-
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
199+
streamConfigMap.put(
200+
StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
201+
StreamConfig.ConsumerType.LOWLEVEL.toString());
202+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
203+
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), KinesisConsumerFactory.class.getName());
204+
streamConfigMap.put(
205+
StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS),
206+
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
208207
streamConfigMap.put(KinesisConfig.REGION, REGION);
209208
streamConfigMap.put(KinesisConfig.MAX_RECORDS_TO_FETCH, String.valueOf(MAX_RECORDS_TO_FETCH));
210209
streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
211210
streamConfigMap.put(KinesisConfig.ENDPOINT, LOCALSTACK_KINESIS_ENDPOINT);
212211
streamConfigMap.put(KinesisConfig.ACCESS_KEY, getLocalAWSCredentials().resolveCredentials().accessKeyId());
213212
streamConfigMap.put(KinesisConfig.SECRET_KEY, getLocalAWSCredentials().resolveCredentials().secretAccessKey());
214213
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(200));
215-
streamConfigMap.put(StreamConfigProperties
216-
.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
214+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
215+
StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
217216
return streamConfigMap;
218217
}
219218

@@ -225,8 +224,8 @@ public void startKinesis()
225224
_localstackDocker.startup(dockerConfig);
226225

227226
_kinesisClient = KinesisClient.builder().httpClient(new ApacheSdkHttpService().createHttpClientBuilder()
228-
.buildWithDefaults(
229-
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build()))
227+
.buildWithDefaults(
228+
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build()))
230229
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION))
231230
.endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
232231

@@ -278,8 +277,8 @@ private void publishRecordsToKinesis() {
278277
.partitionKey(data.get("Origin").textValue()).build();
279278
PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
280279
if (putRecordResponse.sdkHttpResponse().statusCode() == 200) {
281-
if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) && StringUtils
282-
.isNotBlank(putRecordResponse.shardId())) {
280+
if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) && StringUtils.isNotBlank(
281+
putRecordResponse.shardId())) {
283282
_totalRecordsPushedInStream++;
284283

285284
int fieldIndex = 1;
@@ -321,9 +320,8 @@ public void testRecords()
321320
throws Exception {
322321
Assert.assertNotEquals(_totalRecordsPushedInStream, 0);
323322

324-
ResultSet pinotResultSet = getPinotConnection()
325-
.execute(new Request("sql", "SELECT * FROM " + getTableName() + " ORDER BY Origin LIMIT 10000"))
326-
.getResultSet(0);
323+
ResultSet pinotResultSet = getPinotConnection().execute(
324+
new Request("sql", "SELECT * FROM " + getTableName() + " ORDER BY Origin LIMIT 10000")).getResultSet(0);
327325

328326
Assert.assertNotEquals(pinotResultSet.getRowCount(), 0);
329327

@@ -440,8 +438,8 @@ public void createH2ConnectionAndTable()
440438
}
441439
}
442440

443-
_h2Connection.prepareCall("CREATE TABLE " + getTableName() + "(" + StringUtil
444-
.join(",", _h2FieldNameAndTypes.toArray(new String[_h2FieldNameAndTypes.size()])) + ")").execute();
441+
_h2Connection.prepareCall("CREATE TABLE " + getTableName() + "(" + StringUtil.join(",",
442+
_h2FieldNameAndTypes.toArray(new String[_h2FieldNameAndTypes.size()])) + ")").execute();
445443
}
446444

447445
@AfterClass(enabled = false)

0 commit comments

Comments
 (0)