Skip to content

Commit 62b9ee5

Browse files
authored
Merge bafdfcb into 109c493
2 parents 109c493 + bafdfcb commit 62b9ee5

File tree

14 files changed

+589
-0
lines changed

14 files changed

+589
-0
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
dependencies {
19+
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
20+
compileOnly 'org.projectlombok:lombok'
21+
annotationProcessor 'org.projectlombok:lombok'
22+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
pluginType=connector
18+
pluginName=file
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.file.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.Config;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
@Data
26+
@EqualsAndHashCode(callSuper = true)
27+
public class FileServerConfig extends Config {
28+
29+
private boolean sourceEnable;
30+
31+
private boolean sinkEnable;
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.file.server;
19+
20+
import org.apache.eventmesh.connector.file.config.FileServerConfig;
21+
import org.apache.eventmesh.connector.file.sink.connector.FileSinkConnector;
22+
import org.apache.eventmesh.connector.file.source.connector.FileSourceConnector;
23+
import org.apache.eventmesh.openconnect.Application;
24+
import org.apache.eventmesh.openconnect.util.ConfigUtil;
25+
26+
27+
import lombok.extern.slf4j.Slf4j;
28+
29+
@Slf4j
30+
public class FileConnectServer {
31+
32+
33+
public static void main(String[] args) throws Exception {
34+
35+
FileServerConfig serverConfig = ConfigUtil.parse(FileServerConfig.class, "server-config.yml");
36+
37+
if (serverConfig.isSourceEnable()) {
38+
Application fileSourceApp = new Application();
39+
fileSourceApp.run(FileSourceConnector.class);
40+
}
41+
42+
if (serverConfig.isSinkEnable()) {
43+
Application fileSinkApp = new Application();
44+
fileSinkApp.run(FileSinkConnector.class);
45+
}
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.file.sink.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
@Data
26+
@EqualsAndHashCode(callSuper = true)
27+
public class FileSinkConfig extends SinkConfig {
28+
29+
public SinkConnectorConfig connectorConfig;
30+
31+
private Integer flushSize = 1000;
32+
33+
private boolean hourlyFlushEnabled = false;
34+
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.file.sink.config;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class SinkConnectorConfig {
24+
25+
private String connectorName;
26+
27+
private String topic;
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.file.sink.connector;
19+
20+
import org.apache.eventmesh.connector.file.sink.config.FileSinkConfig;
21+
import org.apache.eventmesh.openconnect.api.config.Config;
22+
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
23+
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
24+
import org.apache.eventmesh.openconnect.api.sink.Sink;
25+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
26+
27+
import java.io.File;
28+
import java.io.IOException;
29+
import java.io.PrintStream;
30+
import java.nio.charset.StandardCharsets;
31+
import java.nio.file.Files;
32+
import java.nio.file.Paths;
33+
import java.nio.file.StandardOpenOption;
34+
import java.time.LocalDateTime;
35+
import java.util.Calendar;
36+
import java.util.List;
37+
import java.util.Locale;
38+
import java.util.concurrent.atomic.AtomicInteger;
39+
40+
import lombok.extern.slf4j.Slf4j;
41+
42+
@Slf4j
43+
public class FileSinkConnector implements Sink {
44+
45+
private static final AtomicInteger fileSize = new AtomicInteger(0);
46+
47+
private String filePath;
48+
49+
private String fileName;
50+
51+
private int flushSize;
52+
53+
private boolean hourlyFlushEnabled;
54+
55+
private FileSinkConfig sinkConfig;
56+
57+
private PrintStream outputStream;
58+
59+
@Override
60+
public Class<? extends Config> configClass() {
61+
return FileSinkConfig.class;
62+
}
63+
64+
@Override
65+
public void init(Config config) {
66+
// init config for hdfs source connector
67+
this.sinkConfig = (FileSinkConfig) config;
68+
this.filePath = buildFilePath();
69+
this.fileName = buildFileName();
70+
this.flushSize = sinkConfig.getFlushSize();
71+
this.hourlyFlushEnabled = sinkConfig.isHourlyFlushEnabled();
72+
}
73+
74+
@Override
75+
public void init(ConnectorContext connectorContext) {
76+
// init config for hdfs source connector
77+
SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext;
78+
this.sinkConfig = (FileSinkConfig) sinkConnectorContext.getSinkConfig();
79+
this.fileName = buildFileName();
80+
this.filePath = buildFilePath();
81+
this.flushSize = sinkConfig.getFlushSize();
82+
this.hourlyFlushEnabled = sinkConfig.isHourlyFlushEnabled();
83+
}
84+
85+
86+
@Override
87+
public void start() throws Exception {
88+
if (fileName == null || fileName.length() == 0 || filePath == null || filePath.length() == 0) {
89+
this.outputStream = System.out;
90+
} else {
91+
this.outputStream =
92+
new PrintStream(Files.newOutputStream(Paths.get(filePath + fileName), StandardOpenOption.CREATE, StandardOpenOption.APPEND),
93+
false, StandardCharsets.UTF_8.name());
94+
}
95+
}
96+
97+
@Override
98+
public void commit(ConnectRecord record) {
99+
100+
}
101+
102+
@Override
103+
public String name() {
104+
return this.sinkConfig.getConnectorConfig().getConnectorName();
105+
}
106+
107+
@Override
108+
public void stop() {
109+
outputStream.flush();
110+
outputStream.close();
111+
}
112+
113+
@Override
114+
public void put(List<ConnectRecord> sinkRecords) {
115+
for (ConnectRecord connectRecord : sinkRecords) {
116+
// the file data exceed the flushSize create the new file or
117+
// hourlyFlushEnabled && time on the hour
118+
if (fileSize.get() >= flushSize || (hourlyFlushEnabled && LocalDateTime.now().getHour() == 0)) {
119+
log.info("flush the file and open");
120+
outputStream.flush();
121+
outputStream.close();
122+
try {
123+
fileSize.set(0);
124+
this.outputStream = openWithNewFile();
125+
} catch (IOException e) {
126+
log.error("create file under path {} error", filePath);
127+
throw new RuntimeException(e);
128+
}
129+
}
130+
outputStream.println(new String((byte[]) connectRecord.getData(), StandardCharsets.UTF_8));
131+
fileSize.addAndGet(1);
132+
}
133+
}
134+
135+
private String buildFilePath() {
136+
Calendar calendar = Calendar.getInstance(Locale.CHINA);
137+
int year = calendar.get(Calendar.YEAR);
138+
int month = calendar.get(Calendar.MONTH) + 1;
139+
int day = calendar.get(Calendar.DATE);
140+
String filePath = sinkConfig.getConnectorConfig().getTopic()
141+
+ File.separator + year + File.separator + month + File.separator + day + File.separator;
142+
File path = new File(filePath);
143+
if (!path.exists()) {
144+
if (!path.mkdirs()) {
145+
log.error("make file dir {} error", filePath);
146+
}
147+
}
148+
return filePath;
149+
}
150+
151+
private String buildFileName() {
152+
Calendar calendar = Calendar.getInstance(Locale.CHINA);
153+
long currentTime = calendar.getTime().getTime();
154+
return sinkConfig.getConnectorConfig().getTopic() + "-" + calendar.get(Calendar.HOUR_OF_DAY) + "-" + currentTime;
155+
}
156+
157+
private PrintStream openWithNewFile() throws IOException {
158+
this.filePath = buildFilePath();
159+
this.fileName = buildFileName();
160+
if (fileName.length() == 0 || filePath == null || filePath.length() == 0) {
161+
return System.out;
162+
}
163+
return new PrintStream(Files.newOutputStream(Paths.get(filePath + fileName),
164+
StandardOpenOption.CREATE, StandardOpenOption.APPEND),
165+
false, StandardCharsets.UTF_8.name());
166+
}
167+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.file.source.config;
19+
20+
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
21+
22+
import lombok.Data;
23+
import lombok.EqualsAndHashCode;
24+
25+
@Data
26+
@EqualsAndHashCode(callSuper = true)
27+
public class FileSourceConfig extends SourceConfig {
28+
29+
public SourceConnectorConfig connectorConfig;
30+
}

0 commit comments

Comments
 (0)