Skip to content

Commit c29ada6

Browse files
authored
[ISSUE #4082][Task1] Support S3 file source connector (#4132)
* feat(connector): add S3 file source connector with specified format 1. add S3 file source connector with specified format Closes #4082 * test(connector): add S3SourceConnectorTest to verify 1. add S3SourceConnectorTest to verify Closes #4082 * feat(connector): add S3SourceWorker 1. add S3SourceWorker Closes #4082 * style(connector): add header license 1. add header license Closes #4082 * refactor(connector): refactor S3 source connector 1. refactor S3 source connector Closes #4082
1 parent dd6cc06 commit c29ada6

File tree

10 files changed

+496
-0
lines changed

10 files changed

+496
-0
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,8 @@ subprojects {
527527
dependency "com.github.seancfoley:ipaddress:5.3.3"
528528
dependency "javax.annotation:javax.annotation-api:1.3.2"
529529
dependency "com.alibaba:fastjson:1.2.83"
530+
531+
dependency "software.amazon.awssdk:s3:2.20.29"
530532
}
531533
}
532534
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
dependencies {
19+
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
20+
implementation 'software.amazon.awssdk:s3'
21+
compileOnly 'org.projectlombok:lombok'
22+
annotationProcessor 'org.projectlombok:lombok'
23+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.s3.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.Config;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
@Data
26+
@EqualsAndHashCode(callSuper = true)
27+
public class S3ServerConfig extends Config {
28+
29+
private boolean sourceEnable;
30+
31+
private boolean sinkEnable;
32+
33+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.s3.server;
19+
20+
import org.apache.eventmesh.connector.s3.config.S3ServerConfig;
21+
import org.apache.eventmesh.connector.s3.source.connector.S3SourceConnector;
22+
import org.apache.eventmesh.openconnect.Application;
23+
import org.apache.eventmesh.openconnect.util.ConfigUtil;
24+
25+
import lombok.extern.slf4j.Slf4j;
26+
27+
@Slf4j
28+
public class S3ConnectServer {
29+
30+
public static void main(String[] args) throws Exception {
31+
S3ServerConfig s3ServerConfig = ConfigUtil.parse(S3ServerConfig.class, "server-config.yml");
32+
if (s3ServerConfig.isSourceEnable()) {
33+
Application application = new Application();
34+
application.run(S3SourceConnector.class);
35+
}
36+
37+
if (s3ServerConfig.isSinkEnable()) {
38+
log.error("S3 sink is not supported yet.");
39+
}
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.eventmesh.connector.s3.source.config;
20+
21+
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
22+
23+
import lombok.Data;
24+
25+
26+
@Data
27+
public class S3SourceConfig extends SourceConfig {
28+
private SourceConnectorConfig sourceConnectorConfig;
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.eventmesh.connector.s3.source.config;
20+
21+
import java.util.Map;
22+
23+
import lombok.Data;
24+
25+
@Data
26+
public class SourceConnectorConfig {
27+
28+
private String connectorName;
29+
30+
private String region;
31+
32+
private String bucket;
33+
34+
private String accessKey;
35+
36+
private String secretKey;
37+
38+
private String fileName;
39+
40+
/**
41+
* The schema for the data to be read from S3.
42+
*/
43+
private Map<String/*column name*/, Integer/*bytes*/> schema;
44+
45+
/**
46+
* The maximum number of records that should be returned in each batch poll.
47+
*/
48+
private Integer batchSize = 20;
49+
50+
/**
51+
* The maximum ms to wait for request futures to complete.
52+
*/
53+
private long timeout = 3000;
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.s3.source.connector;
19+
20+
21+
import org.apache.eventmesh.connector.s3.source.config.S3SourceConfig;
22+
import org.apache.eventmesh.connector.s3.source.config.SourceConnectorConfig;
23+
import org.apache.eventmesh.openconnect.api.config.Config;
24+
import org.apache.eventmesh.openconnect.api.source.Source;
25+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
26+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
27+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
28+
29+
import java.util.ArrayList;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Optional;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.TimeUnit;
37+
38+
import lombok.extern.slf4j.Slf4j;
39+
40+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
41+
import software.amazon.awssdk.core.ResponseBytes;
42+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
43+
import software.amazon.awssdk.regions.Region;
44+
import software.amazon.awssdk.services.s3.S3AsyncClient;
45+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
46+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
47+
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
48+
49+
@Slf4j
50+
public class S3SourceConnector implements Source {
51+
52+
public static final String REGION = "region";
53+
54+
public static final String BUCKET = "bucket";
55+
56+
public static final String FILE_NAME = "fileName";
57+
58+
public static final String POSITION = "position";
59+
60+
private S3SourceConfig sourceConfig;
61+
62+
private SourceConnectorConfig sourceConnectorConfig;
63+
64+
private int eachRecordSize;
65+
66+
private long fileSize;
67+
68+
private S3AsyncClient s3Client;
69+
70+
private long position;
71+
72+
73+
@Override
74+
public Class<? extends Config> configClass() {
75+
return S3SourceConfig.class;
76+
}
77+
78+
@Override
79+
public void init(Config config) throws Exception {
80+
// init config for s3 source connector
81+
this.sourceConfig = (S3SourceConfig) config;
82+
this.sourceConnectorConfig = this.sourceConfig.getSourceConnectorConfig();
83+
this.eachRecordSize = calculateEachRecordSize();
84+
AwsBasicCredentials basicCredentials = AwsBasicCredentials.create(this.sourceConnectorConfig.getAccessKey(),
85+
this.sourceConnectorConfig.getSecretKey());
86+
this.s3Client = S3AsyncClient.builder().credentialsProvider(() -> basicCredentials)
87+
.region(Region.of(this.sourceConnectorConfig.getRegion())).build();
88+
}
89+
90+
private int calculateEachRecordSize() {
91+
Optional<Integer> sum = this.sourceConnectorConfig.getSchema().values().stream().reduce((x, y) -> x + y);
92+
return sum.orElse(0);
93+
}
94+
95+
@Override
96+
public void start() throws Exception {
97+
CompletableFuture<HeadObjectResponse> headObjectResponseCompletableFuture = this.s3Client.headObject(
98+
builder -> builder.bucket(this.sourceConnectorConfig.getBucket()).key(this.sourceConnectorConfig.getFileName()));
99+
headObjectResponseCompletableFuture.get(this.sourceConnectorConfig.getTimeout(), TimeUnit.MILLISECONDS);
100+
this.fileSize = headObjectResponseCompletableFuture.get().contentLength();
101+
}
102+
103+
@Override
104+
public void commit(ConnectRecord record) {
105+
106+
}
107+
108+
@Override
109+
public String name() {
110+
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
111+
}
112+
113+
@Override
114+
public void stop() throws Exception {
115+
116+
}
117+
118+
@Override
119+
public List<ConnectRecord> poll() {
120+
if (this.position >= this.fileSize) {
121+
return Collections.EMPTY_LIST;
122+
}
123+
long startPosition = this.position;
124+
long endPosition = Math.min(this.fileSize, this.position + this.eachRecordSize * this.sourceConnectorConfig.getBatchSize()) - 1;
125+
GetObjectRequest request = GetObjectRequest.builder().bucket(this.sourceConnectorConfig.getBucket())
126+
.key(this.sourceConnectorConfig.getFileName())
127+
.range("bytes=" + startPosition + "-" + endPosition).build();
128+
ResponseBytes<GetObjectResponse> resp;
129+
try {
130+
resp = this.s3Client.getObject(request, AsyncResponseTransformer.toBytes())
131+
.get(this.sourceConnectorConfig.getTimeout(), TimeUnit.MILLISECONDS);
132+
} catch (Exception e) {
133+
log.error("poll records from S3 file, poll range {}-{}, but failed", startPosition, endPosition, e);
134+
return Collections.EMPTY_LIST;
135+
}
136+
byte[] bytes = resp.asByteArray();
137+
List<ConnectRecord> records = new ArrayList<>(bytes.length / this.eachRecordSize);
138+
for (int i = 0; i < bytes.length; i += this.eachRecordSize) {
139+
byte[] body = new byte[this.eachRecordSize];
140+
System.arraycopy(bytes, i, body, 0, this.eachRecordSize);
141+
this.position += this.eachRecordSize;
142+
ConnectRecord record = new ConnectRecord(getRecordPartition(), getRecordOffset(), System.currentTimeMillis(), body);
143+
records.add(record);
144+
}
145+
return records;
146+
}
147+
148+
private RecordPartition getRecordPartition() {
149+
Map<String, String> map = new HashMap<>();
150+
map.put(REGION, this.sourceConnectorConfig.getRegion());
151+
map.put(BUCKET, this.sourceConnectorConfig.getBucket());
152+
map.put(FILE_NAME, this.sourceConnectorConfig.getFileName());
153+
return new RecordPartition(map);
154+
}
155+
156+
private RecordOffset getRecordOffset() {
157+
Map<String, String> map = new HashMap<>();
158+
map.put(POSITION, String.valueOf(this.position));
159+
return new RecordOffset(map);
160+
}
161+
}

0 commit comments

Comments
 (0)