Skip to content

Commit 0420414

Browse files
authored
bump awssdk version for a bugfix on http conn leakage (#10898)
* bump awssdk version for a bugfix on http conn leakage * use latest s3mock and fix S3PinotFSTest
1 parent 2070369 commit 0420414

File tree

7 files changed

+82
-80
lines changed

7 files changed

+82
-80
lines changed

pinot-integration-test-base/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<pinot.root>${basedir}/..</pinot.root>
3838
<localstack-utils.version>0.2.11</localstack-utils.version>
3939
<awaitility.version>3.0.0</awaitility.version>
40-
<aws.sdk.version>2.14.28</aws.sdk.version>
40+
<aws.sdk.version>2.20.83</aws.sdk.version>
4141
</properties>
4242

4343
<build>

pinot-integration-tests/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<pinot.root>${basedir}/..</pinot.root>
3737
<localstack-utils.version>0.2.19</localstack-utils.version>
3838
<awaitility.version>3.0.0</awaitility.version>
39-
<aws.sdk.version>2.14.28</aws.sdk.version>
39+
<aws.sdk.version>2.20.83</aws.sdk.version>
4040
<testcontainers.version>1.17.3</testcontainers.version>
4141
</properties>
4242

pinot-plugins/pinot-file-system/pinot-s3/pom.xml

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
<url>https://pinot.apache.org</url>
3636
<properties>
3737
<pinot.root>${basedir}/../../..</pinot.root>
38-
<aws.sdk.version>2.14.28</aws.sdk.version>
38+
<aws.sdk.version>2.20.83</aws.sdk.version>
3939
<http.client.version>4.5.13</http.client.version>
4040
<http.core.version>4.4.13</http.core.version>
41-
<s3mock.version>2.1.19</s3mock.version>
41+
<s3mock.version>2.12.2</s3mock.version>
4242
<javax.version>3.1.0</javax.version>
4343
<phase.prop>package</phase.prop>
4444
</properties>
@@ -183,27 +183,9 @@
183183
</dependency>
184184
<dependency>
185185
<groupId>com.adobe.testing</groupId>
186-
<artifactId>s3mock-testng</artifactId>
186+
<artifactId>s3mock-testcontainers</artifactId>
187187
<version>${s3mock.version}</version>
188188
<scope>test</scope>
189-
<exclusions>
190-
<exclusion>
191-
<groupId>ch.qos.logback</groupId>
192-
<artifactId>logback-core</artifactId>
193-
</exclusion>
194-
<exclusion>
195-
<groupId>org.apache.logging.log4j</groupId>
196-
<artifactId>log4j-to-slf4j</artifactId>
197-
</exclusion>
198-
<exclusion>
199-
<groupId>javax.servlet</groupId>
200-
<artifactId>javax.servlet-api</artifactId>
201-
</exclusion>
202-
<exclusion>
203-
<groupId>org.apache.httpcomponents</groupId>
204-
<artifactId>httpclient</artifactId>
205-
</exclusion>
206-
</exclusions>
207189
</dependency>
208190
<dependency>
209191
<groupId>javax.servlet</groupId>

pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java

Lines changed: 75 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.apache.pinot.plugin.filesystem;
2020

21-
import com.adobe.testing.s3mock.testng.S3Mock;
21+
import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
22+
import java.io.BufferedOutputStream;
2223
import java.io.File;
24+
import java.io.FileOutputStream;
2325
import java.io.IOException;
2426
import java.io.InputStream;
2527
import java.net.URI;
@@ -28,37 +30,45 @@
2830
import java.util.Arrays;
2931
import java.util.List;
3032
import java.util.stream.Collectors;
33+
import org.apache.commons.io.FileUtils;
3134
import org.apache.commons.io.IOUtils;
3235
import org.apache.pinot.spi.filesystem.FileMetadata;
3336
import org.testng.Assert;
3437
import org.testng.annotations.AfterClass;
3538
import org.testng.annotations.BeforeClass;
36-
import org.testng.annotations.Listeners;
3739
import org.testng.annotations.Test;
40+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
41+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
3842
import software.amazon.awssdk.core.sync.RequestBody;
43+
import software.amazon.awssdk.regions.Region;
3944
import software.amazon.awssdk.services.s3.S3Client;
45+
import software.amazon.awssdk.services.s3.S3Configuration;
4046
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
4147
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
4248
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
4349
import software.amazon.awssdk.services.s3.model.S3Object;
4450

4551

4652
@Test
47-
@Listeners(com.adobe.testing.s3mock.testng.S3MockListener.class)
4853
public class S3PinotFSTest {
54+
private static final String S3MOCK_VERSION = System.getProperty("s3mock.version", "2.12.2");
55+
private static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "S3PinotFSTest");
4956
private static final String DELIMITER = "/";
5057
private static final String BUCKET = "test-bucket";
5158
private static final String SCHEME = "s3";
5259
private static final String FILE_FORMAT = "%s://%s/%s";
5360
private static final String DIR_FORMAT = "%s://%s";
5461

62+
private S3MockContainer _s3MockContainer;
5563
private S3PinotFS _s3PinotFS;
5664
private S3Client _s3Client;
5765

5866
@BeforeClass
5967
public void setUp() {
60-
S3Mock s3Mock = S3Mock.getInstance();
61-
_s3Client = s3Mock.createS3ClientV2();
68+
_s3MockContainer = new S3MockContainer(S3MOCK_VERSION);
69+
_s3MockContainer.start();
70+
String endpoint = _s3MockContainer.getHttpEndpoint();
71+
_s3Client = createS3ClientV2(endpoint);
6272
_s3PinotFS = new S3PinotFS();
6373
_s3PinotFS.init(_s3Client);
6474
_s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
@@ -69,12 +79,14 @@ public void tearDown()
6979
throws IOException {
7080
_s3PinotFS.close();
7181
_s3Client.close();
82+
_s3MockContainer.stop();
83+
FileUtils.deleteQuietly(TEMP_FILE);
7284
}
7385

7486
private void createEmptyFile(String folderName, String fileName) {
75-
String fileNameWithFolder = folderName + DELIMITER + fileName;
76-
_s3Client
77-
.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder), RequestBody.fromBytes(new byte[0]));
87+
String fileNameWithFolder = folderName.length() == 0 ? fileName : folderName + DELIMITER + fileName;
88+
_s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder),
89+
RequestBody.fromBytes(new byte[0]));
7890
}
7991

8092
@Test
@@ -249,7 +261,7 @@ public void testDeleteFile()
249261
ListObjectsV2Response listObjectsV2Response =
250262
_s3Client.listObjectsV2(S3TestUtils.getListObjectRequest(BUCKET, "", true));
251263
String[] actualResponse =
252-
listObjectsV2Response.contents().stream().map(x -> x.key().substring(1)).filter(x -> x.contains("delete"))
264+
listObjectsV2Response.contents().stream().map(S3Object::key).filter(x -> x.contains("delete"))
253265
.toArray(String[]::new);
254266

255267
Assert.assertEquals(actualResponse.length, 2);
@@ -290,8 +302,8 @@ public void testIsDirectory()
290302

291303
boolean isBucketDir = _s3PinotFS.isDirectory(URI.create(String.format(DIR_FORMAT, SCHEME, BUCKET)));
292304
boolean isDir = _s3PinotFS.isDirectory(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)));
293-
boolean isDirChild = _s3PinotFS
294-
.isDirectory(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder)));
305+
boolean isDirChild = _s3PinotFS.isDirectory(
306+
URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder)));
295307
boolean notIsDir = _s3PinotFS.isDirectory(URI.create(
296308
String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder + DELIMITER + "a-delete.txt")));
297309

@@ -333,47 +345,46 @@ public void testExists()
333345
public void testCopyFromAndToLocal()
334346
throws Exception {
335347
String fileName = "copyFile.txt";
336-
337-
File fileToCopy = new File(getClass().getClassLoader().getResource(fileName).getFile());
338-
339-
_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
340-
341-
HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
342-
343-
Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
344-
345-
File fileToDownload = new File("copyFile_download.txt").getAbsoluteFile();
346-
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
347-
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
348-
349-
fileToDownload.deleteOnExit();
348+
File fileToCopy = new File(TEMP_FILE, fileName);
349+
File fileToDownload = new File(TEMP_FILE, "copyFile_download.txt").getAbsoluteFile();
350+
try {
351+
createDummyFile(fileToCopy, 1024);
352+
_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
353+
HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
354+
Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
355+
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
356+
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
357+
} finally {
358+
FileUtils.deleteQuietly(fileToCopy);
359+
FileUtils.deleteQuietly(fileToDownload);
360+
}
350361
}
351362

352363
@Test
353364
public void testMultiPartUpload()
354365
throws Exception {
355-
String fileName = "copyFile.txt";
356-
357-
File fileToCopy = new File(getClass().getClassLoader().getResource(fileName).getFile());
358-
359-
// input file size is 20
360-
_s3PinotFS.setMultiPartUploadConfigs(1, 3);
366+
String fileName = "copyFile_for_multipart.txt";
367+
File fileToCopy = new File(TEMP_FILE, fileName);
368+
File fileToDownload = new File(TEMP_FILE, "copyFile_download_multipart.txt").getAbsoluteFile();
361369
try {
362-
_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
370+
// Make a file of 11MB to upload in parts, whose min required size is 5MB.
371+
createDummyFile(fileToCopy, 11 * 1024 * 1024);
372+
System.out.println("fileToCopy.length:" + fileToCopy.length());
373+
_s3PinotFS.setMultiPartUploadConfigs(1, 5 * 1024 * 1024);
374+
try {
375+
_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
376+
} finally {
377+
// disable multipart upload again for the other UT cases.
378+
_s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
379+
}
380+
HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
381+
Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
382+
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
383+
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
363384
} finally {
364-
// disable multipart upload again for the other UT cases.
365-
_s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
385+
FileUtils.deleteQuietly(fileToCopy);
386+
FileUtils.deleteQuietly(fileToDownload);
366387
}
367-
368-
HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
369-
370-
Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
371-
372-
File fileToDownload = new File("copyFile_download_multipart.txt").getAbsoluteFile();
373-
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
374-
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
375-
376-
fileToDownload.deleteOnExit();
377388
}
378389

379390
@Test
@@ -396,7 +407,26 @@ public void testMkdir()
396407

397408
_s3PinotFS.mkdir(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName)));
398409

399-
HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, folderName));
410+
HeadObjectResponse headObjectResponse =
411+
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, folderName + "/"));
400412
Assert.assertTrue(headObjectResponse.sdkHttpResponse().isSuccessful());
401413
}
414+
415+
private static void createDummyFile(File file, int size)
416+
throws IOException {
417+
FileUtils.deleteQuietly(file);
418+
FileUtils.touch(file);
419+
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
420+
for (int i = 0; i < size; i++) {
421+
out.write((byte) i);
422+
}
423+
}
424+
}
425+
426+
private static S3Client createS3ClientV2(String endpoint) {
427+
return S3Client.builder().region(Region.of("us-east-1"))
428+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("foo", "bar")))
429+
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
430+
.endpointOverride(URI.create(endpoint)).build();
431+
}
402432
}

pinot-plugins/pinot-file-system/pinot-s3/src/test/resources/copyFile.txt

Lines changed: 0 additions & 10 deletions
This file was deleted.

pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
<properties>
3737
<pinot.root>${basedir}/../../..</pinot.root>
3838
<phase.prop>package</phase.prop>
39-
<aws.version>2.14.28</aws.version>
39+
<aws.version>2.20.83</aws.version>
4040
<easymock.version>4.2</easymock.version>
4141
<reactive.version>1.0.2</reactive.version>
4242
<localstack-utils.version>0.2.19</localstack-utils.version>

pinot-tools/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<url>https://pinot.apache.org/</url>
3434
<properties>
3535
<pinot.root>${basedir}/..</pinot.root>
36-
<aws.version>2.14.28</aws.version>
36+
<aws.version>2.20.83</aws.version>
3737
<scala.version>2.12</scala.version>
3838
<spark.version>3.2.1</spark.version>
3939
<airlift.version>0.16</airlift.version>

0 commit comments

Comments
 (0)