1818 */
1919package org .apache .pinot .plugin .filesystem ;
2020
21- import com .adobe .testing .s3mock .testng .S3Mock ;
21+ import com .adobe .testing .s3mock .testcontainers .S3MockContainer ;
22+ import java .io .BufferedOutputStream ;
2223import java .io .File ;
24+ import java .io .FileOutputStream ;
2325import java .io .IOException ;
2426import java .io .InputStream ;
2527import java .net .URI ;
2830import java .util .Arrays ;
2931import java .util .List ;
3032import java .util .stream .Collectors ;
33+ import org .apache .commons .io .FileUtils ;
3134import org .apache .commons .io .IOUtils ;
3235import org .apache .pinot .spi .filesystem .FileMetadata ;
3336import org .testng .Assert ;
3437import org .testng .annotations .AfterClass ;
3538import org .testng .annotations .BeforeClass ;
36- import org .testng .annotations .Listeners ;
3739import org .testng .annotations .Test ;
40+ import software .amazon .awssdk .auth .credentials .AwsBasicCredentials ;
41+ import software .amazon .awssdk .auth .credentials .StaticCredentialsProvider ;
3842import software .amazon .awssdk .core .sync .RequestBody ;
43+ import software .amazon .awssdk .regions .Region ;
3944import software .amazon .awssdk .services .s3 .S3Client ;
45+ import software .amazon .awssdk .services .s3 .S3Configuration ;
4046import software .amazon .awssdk .services .s3 .model .CreateBucketRequest ;
4147import software .amazon .awssdk .services .s3 .model .HeadObjectResponse ;
4248import software .amazon .awssdk .services .s3 .model .ListObjectsV2Response ;
4349import software .amazon .awssdk .services .s3 .model .S3Object ;
4450
4551
4652@ Test
47- @ Listeners (com .adobe .testing .s3mock .testng .S3MockListener .class )
4853public class S3PinotFSTest {
54+ private static final String S3MOCK_VERSION = System .getProperty ("s3mock.version" , "2.12.2" );
55+ private static final File TEMP_FILE = new File (FileUtils .getTempDirectory (), "S3PinotFSTest" );
4956 private static final String DELIMITER = "/" ;
5057 private static final String BUCKET = "test-bucket" ;
5158 private static final String SCHEME = "s3" ;
5259 private static final String FILE_FORMAT = "%s://%s/%s" ;
5360 private static final String DIR_FORMAT = "%s://%s" ;
5461
62+ private S3MockContainer _s3MockContainer ;
5563 private S3PinotFS _s3PinotFS ;
5664 private S3Client _s3Client ;
5765
5866 @ BeforeClass
5967 public void setUp () {
60- S3Mock s3Mock = S3Mock .getInstance ();
61- _s3Client = s3Mock .createS3ClientV2 ();
68+ _s3MockContainer = new S3MockContainer (S3MOCK_VERSION );
69+ _s3MockContainer .start ();
70+ String endpoint = _s3MockContainer .getHttpEndpoint ();
71+ _s3Client = createS3ClientV2 (endpoint );
6272 _s3PinotFS = new S3PinotFS ();
6373 _s3PinotFS .init (_s3Client );
6474 _s3Client .createBucket (CreateBucketRequest .builder ().bucket (BUCKET ).build ());
@@ -69,12 +79,14 @@ public void tearDown()
6979 throws IOException {
7080 _s3PinotFS .close ();
7181 _s3Client .close ();
82+ _s3MockContainer .stop ();
83+ FileUtils .deleteQuietly (TEMP_FILE );
7284 }
7385
7486 private void createEmptyFile (String folderName , String fileName ) {
75- String fileNameWithFolder = folderName + DELIMITER + fileName ;
76- _s3Client
77- . putObject ( S3TestUtils . getPutObjectRequest ( BUCKET , fileNameWithFolder ), RequestBody .fromBytes (new byte [0 ]));
87+ String fileNameWithFolder = folderName . length () == 0 ? fileName : folderName + DELIMITER + fileName ;
88+ _s3Client . putObject ( S3TestUtils . getPutObjectRequest ( BUCKET , fileNameWithFolder ),
89+ RequestBody .fromBytes (new byte [0 ]));
7890 }
7991
8092 @ Test
@@ -249,7 +261,7 @@ public void testDeleteFile()
249261 ListObjectsV2Response listObjectsV2Response =
250262 _s3Client .listObjectsV2 (S3TestUtils .getListObjectRequest (BUCKET , "" , true ));
251263 String [] actualResponse =
252- listObjectsV2Response .contents ().stream ().map (x -> x . key (). substring ( 1 ) ).filter (x -> x .contains ("delete" ))
264+ listObjectsV2Response .contents ().stream ().map (S3Object :: key ).filter (x -> x .contains ("delete" ))
253265 .toArray (String []::new );
254266
255267 Assert .assertEquals (actualResponse .length , 2 );
@@ -290,8 +302,8 @@ public void testIsDirectory()
290302
291303 boolean isBucketDir = _s3PinotFS .isDirectory (URI .create (String .format (DIR_FORMAT , SCHEME , BUCKET )));
292304 boolean isDir = _s3PinotFS .isDirectory (URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , folder )));
293- boolean isDirChild = _s3PinotFS
294- . isDirectory ( URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , folder + DELIMITER + childFolder )));
305+ boolean isDirChild = _s3PinotFS . isDirectory (
306+ URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , folder + DELIMITER + childFolder )));
295307 boolean notIsDir = _s3PinotFS .isDirectory (URI .create (
296308 String .format (FILE_FORMAT , SCHEME , BUCKET , folder + DELIMITER + childFolder + DELIMITER + "a-delete.txt" )));
297309
@@ -333,47 +345,46 @@ public void testExists()
333345 public void testCopyFromAndToLocal ()
334346 throws Exception {
335347 String fileName = "copyFile.txt" ;
336-
337- File fileToCopy = new File (getClass ().getClassLoader ().getResource (fileName ).getFile ());
338-
339- _s3PinotFS .copyFromLocalFile (fileToCopy , URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )));
340-
341- HeadObjectResponse headObjectResponse = _s3Client .headObject (S3TestUtils .getHeadObjectRequest (BUCKET , fileName ));
342-
343- Assert .assertEquals (headObjectResponse .contentLength (), (Long ) fileToCopy .length ());
344-
345- File fileToDownload = new File ("copyFile_download.txt" ).getAbsoluteFile ();
346- _s3PinotFS .copyToLocalFile (URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )), fileToDownload );
347- Assert .assertEquals (fileToCopy .length (), fileToDownload .length ());
348-
349- fileToDownload .deleteOnExit ();
348+ File fileToCopy = new File (TEMP_FILE , fileName );
349+ File fileToDownload = new File (TEMP_FILE , "copyFile_download.txt" ).getAbsoluteFile ();
350+ try {
351+ createDummyFile (fileToCopy , 1024 );
352+ _s3PinotFS .copyFromLocalFile (fileToCopy , URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )));
353+ HeadObjectResponse headObjectResponse = _s3Client .headObject (S3TestUtils .getHeadObjectRequest (BUCKET , fileName ));
354+ Assert .assertEquals (headObjectResponse .contentLength (), (Long ) fileToCopy .length ());
355+ _s3PinotFS .copyToLocalFile (URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )), fileToDownload );
356+ Assert .assertEquals (fileToCopy .length (), fileToDownload .length ());
357+ } finally {
358+ FileUtils .deleteQuietly (fileToCopy );
359+ FileUtils .deleteQuietly (fileToDownload );
360+ }
350361 }
351362
352363 @ Test
353364 public void testMultiPartUpload ()
354365 throws Exception {
355- String fileName = "copyFile.txt" ;
356-
357- File fileToCopy = new File (getClass ().getClassLoader ().getResource (fileName ).getFile ());
358-
359- // input file size is 20
360- _s3PinotFS .setMultiPartUploadConfigs (1 , 3 );
366+ String fileName = "copyFile_for_multipart.txt" ;
367+ File fileToCopy = new File (TEMP_FILE , fileName );
368+ File fileToDownload = new File (TEMP_FILE , "copyFile_download_multipart.txt" ).getAbsoluteFile ();
361369 try {
362- _s3PinotFS .copyFromLocalFile (fileToCopy , URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )));
370+ // Make a file of 11MB to upload in parts, whose min required size is 5MB.
371+ createDummyFile (fileToCopy , 11 * 1024 * 1024 );
372+ System .out .println ("fileToCopy.length:" + fileToCopy .length ());
373+ _s3PinotFS .setMultiPartUploadConfigs (1 , 5 * 1024 * 1024 );
374+ try {
375+ _s3PinotFS .copyFromLocalFile (fileToCopy , URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )));
376+ } finally {
377+ // disable multipart upload again for the other UT cases.
378+ _s3PinotFS .setMultiPartUploadConfigs (-1 , 128 * 1024 * 1024 );
379+ }
380+ HeadObjectResponse headObjectResponse = _s3Client .headObject (S3TestUtils .getHeadObjectRequest (BUCKET , fileName ));
381+ Assert .assertEquals (headObjectResponse .contentLength (), (Long ) fileToCopy .length ());
382+ _s3PinotFS .copyToLocalFile (URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )), fileToDownload );
383+ Assert .assertEquals (fileToCopy .length (), fileToDownload .length ());
363384 } finally {
364- // disable multipart upload again for the other UT cases.
365- _s3PinotFS . setMultiPartUploadConfigs (- 1 , 128 * 1024 * 1024 );
385+ FileUtils . deleteQuietly ( fileToCopy );
386+ FileUtils . deleteQuietly ( fileToDownload );
366387 }
367-
368- HeadObjectResponse headObjectResponse = _s3Client .headObject (S3TestUtils .getHeadObjectRequest (BUCKET , fileName ));
369-
370- Assert .assertEquals (headObjectResponse .contentLength (), (Long ) fileToCopy .length ());
371-
372- File fileToDownload = new File ("copyFile_download_multipart.txt" ).getAbsoluteFile ();
373- _s3PinotFS .copyToLocalFile (URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , fileName )), fileToDownload );
374- Assert .assertEquals (fileToCopy .length (), fileToDownload .length ());
375-
376- fileToDownload .deleteOnExit ();
377388 }
378389
379390 @ Test
@@ -396,7 +407,26 @@ public void testMkdir()
396407
397408 _s3PinotFS .mkdir (URI .create (String .format (FILE_FORMAT , SCHEME , BUCKET , folderName )));
398409
399- HeadObjectResponse headObjectResponse = _s3Client .headObject (S3TestUtils .getHeadObjectRequest (BUCKET , folderName ));
410+ HeadObjectResponse headObjectResponse =
411+ _s3Client .headObject (S3TestUtils .getHeadObjectRequest (BUCKET , folderName + "/" ));
400412 Assert .assertTrue (headObjectResponse .sdkHttpResponse ().isSuccessful ());
401413 }
414+
415+ private static void createDummyFile (File file , int size )
416+ throws IOException {
417+ FileUtils .deleteQuietly (file );
418+ FileUtils .touch (file );
419+ try (BufferedOutputStream out = new BufferedOutputStream (new FileOutputStream (file ))) {
420+ for (int i = 0 ; i < size ; i ++) {
421+ out .write ((byte ) i );
422+ }
423+ }
424+ }
425+
426+ private static S3Client createS3ClientV2 (String endpoint ) {
427+ return S3Client .builder ().region (Region .of ("us-east-1" ))
428+ .credentialsProvider (StaticCredentialsProvider .create (AwsBasicCredentials .create ("foo" , "bar" )))
429+ .serviceConfiguration (S3Configuration .builder ().pathStyleAccessEnabled (true ).build ())
430+ .endpointOverride (URI .create (endpoint )).build ();
431+ }
402432}
0 commit comments