Skip to content

Commit e004171

Browse files
authored
Merge 9621900 into 800b628
2 parents 800b628 + 9621900 commit e004171

File tree

49 files changed

+1476
-45
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1476
-45
lines changed

eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.eventmesh.connector.kafka.sink.config.KafkaSinkConfig;
2121
import org.apache.eventmesh.openconnect.api.config.Config;
22-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
2322
import org.apache.eventmesh.openconnect.api.sink.Sink;
23+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2424

2525
import org.apache.kafka.clients.producer.KafkaProducer;
2626
import org.apache.kafka.clients.producer.Producer;

eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/source/connector/KafkaSourceConnector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919

2020
import org.apache.eventmesh.connector.kafka.source.config.KafkaSourceConfig;
2121
import org.apache.eventmesh.openconnect.api.config.Config;
22-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
23-
import org.apache.eventmesh.openconnect.api.data.RecordOffset;
24-
import org.apache.eventmesh.openconnect.api.data.RecordPartition;
2522
import org.apache.eventmesh.openconnect.api.source.Source;
23+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
24+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
25+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
2626

2727
import org.apache.kafka.clients.consumer.ConsumerConfig;
2828
import org.apache.kafka.clients.consumer.ConsumerRecord;

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/service/ConsumerService.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
import org.apache.eventmesh.connector.openfunction.client.CallbackServiceGrpc.CallbackServiceBlockingStub;
2626
import org.apache.eventmesh.connector.openfunction.config.OpenFunctionServerConfig;
2727
import org.apache.eventmesh.connector.openfunction.sink.connector.OpenFunctionSinkConnector;
28-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
28+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
29+
2930

3031
import org.apache.commons.lang3.StringUtils;
3132

@@ -46,19 +47,16 @@ public class ConsumerService extends ConsumerServiceGrpc.ConsumerServiceImplBase
4647

4748
private final BlockingQueue<ConnectRecord> queue;
4849

49-
private final OpenFunctionServerConfig config;
50-
51-
private final transient ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 10115).usePlaintext().build();
52-
53-
private CallbackServiceBlockingStub publisherClient = CallbackServiceGrpc.newBlockingStub(channel);
54-
55-
private final ExecutorService handleService = Executors.newSingleThreadExecutor();
50+
private final CallbackServiceBlockingStub publisherClient;
5651

5752

5853
public ConsumerService(OpenFunctionSinkConnector openFunctionSinkConnector, OpenFunctionServerConfig serverConfig) {
5954
this.openFunctionSinkConnector = openFunctionSinkConnector;
6055
this.queue = openFunctionSinkConnector.queue();
61-
this.config = serverConfig;
56+
ManagedChannel channel = ManagedChannelBuilder.forAddress(serverConfig.getTargetAddress(),
57+
serverConfig.getTargetPort()).usePlaintext().build();
58+
this.publisherClient = CallbackServiceGrpc.newBlockingStub(channel);
59+
ExecutorService handleService = Executors.newSingleThreadExecutor();
6260
handleService.execute(this::startHandleConsumeEvents);
6361
}
6462

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/service/ProducerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
2626
import org.apache.eventmesh.connector.openfunction.config.OpenFunctionServerConfig;
2727
import org.apache.eventmesh.connector.openfunction.source.connector.OpenFunctionSourceConnector;
28-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
28+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2929

3030
import java.time.Instant;
3131
import java.time.LocalDateTime;

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/sink/connector/OpenFunctionSinkConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.eventmesh.connector.openfunction.sink.config.OpenFunctionSinkConfig;
2121
import org.apache.eventmesh.openconnect.api.config.Config;
22-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
2322
import org.apache.eventmesh.openconnect.api.sink.Sink;
23+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2424

2525
import java.util.List;
2626
import java.util.concurrent.BlockingQueue;

eventmesh-connectors/eventmesh-connector-openfunction/src/main/java/org/apache/eventmesh/connector/openfunction/source/connector/OpenFunctionSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.eventmesh.connector.openfunction.source.config.OpenFunctionSourceConfig;
2121
import org.apache.eventmesh.openconnect.api.config.Config;
22-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
2322
import org.apache.eventmesh.openconnect.api.source.Source;
23+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2424

2525
import java.util.ArrayList;
2626
import java.util.List;

eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/sink/connector/PulsarSinkConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
import org.apache.eventmesh.connector.pulsar.sink.config.PulsarSinkConfig;
2323
import org.apache.eventmesh.openconnect.api.config.Config;
24-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
2524
import org.apache.eventmesh.openconnect.api.sink.Sink;
25+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2626

2727
import org.apache.pulsar.client.api.MessageId;
2828
import org.apache.pulsar.client.api.Producer;

eventmesh-connectors/eventmesh-connector-pulsar/src/main/java/org/apache/eventmesh/connector/pulsar/source/connector/PulsarSourceConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import org.apache.eventmesh.connector.pulsar.source.config.PulsarSourceConfig;
2222
import org.apache.eventmesh.openconnect.api.config.Config;
23-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
24-
import org.apache.eventmesh.openconnect.api.data.RecordPartition;
2523
import org.apache.eventmesh.openconnect.api.source.Source;
24+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
25+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
2626

2727
import org.apache.pulsar.client.api.Consumer;
2828
import org.apache.pulsar.client.api.Message;

eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/sink/connector/RedisSinkConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.eventmesh.connector.redis.cloudevent.CloudEventCodec;
2121
import org.apache.eventmesh.connector.redis.sink.config.RedisSinkConfig;
2222
import org.apache.eventmesh.openconnect.api.config.Config;
23-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
2423
import org.apache.eventmesh.openconnect.api.sink.Sink;
24+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2525

2626
import java.net.URI;
2727
import java.net.URISyntaxException;

eventmesh-connectors/eventmesh-connector-redis/src/main/java/org/apache/eventmesh/connector/redis/source/connector/RedisSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.eventmesh.connector.redis.cloudevent.CloudEventCodec;
2121
import org.apache.eventmesh.connector.redis.source.config.RedisSourceConfig;
2222
import org.apache.eventmesh.openconnect.api.config.Config;
23-
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
2423
import org.apache.eventmesh.openconnect.api.source.Source;
24+
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
2525

2626
import java.util.ArrayList;
2727
import java.util.List;

0 commit comments

Comments
 (0)