Skip to content

Commit e205ef2

Browse files
authored
Merge 3140f37 into c3c7370
2 parents c3c7370 + 3140f37 commit e205ef2

File tree

6 files changed

+70
-47
lines changed

6 files changed

+70
-47
lines changed

eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/RocketMQAdmin.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ public RocketMQAdmin(Properties properties) {
6565
ConfigService configService = ConfigService.getInstance();
6666
ClientConfiguration clientConfiguration = configService.buildConfigInstance(ClientConfiguration.class);
6767

68-
nameServerAddr = clientConfiguration.namesrvAddr;
69-
clusterName = clientConfiguration.clusterName;
70-
String accessKey = clientConfiguration.accessKey;
71-
String secretKey = clientConfiguration.secretKey;
68+
nameServerAddr = clientConfiguration.getNamesrvAddr();
69+
clusterName = clientConfiguration.getClusterName();
70+
String accessKey = clientConfiguration.getAccessKey();
71+
String secretKey = clientConfiguration.getSecretKey();
7272

7373
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
7474
adminExt = new DefaultMQAdminExt(rpcHook);

eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ public void init() {
4141
ConfigService configService = ConfigService.getInstance();
4242
ClientConfiguration clientConfiguration = configService.buildConfigInstance(ClientConfiguration.class);
4343

44-
nameServerAddr = clientConfiguration.namesrvAddr;
45-
clusterName = clientConfiguration.clusterName;
46-
String accessKey = clientConfiguration.accessKey;
47-
String secretKey = clientConfiguration.secretKey;
44+
nameServerAddr = clientConfiguration.getNamesrvAddr();
45+
clusterName = clientConfiguration.getClusterName();
46+
String accessKey = clientConfiguration.getAccessKey();
47+
String secretKey = clientConfiguration.getSecretKey();
4848

4949
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
5050
adminExt = new DefaultMQAdminExt(rpcHook);

eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,54 +20,79 @@
2020
import org.apache.eventmesh.common.config.Config;
2121
import org.apache.eventmesh.common.config.ConfigFiled;
2222

23+
import lombok.AllArgsConstructor;
24+
import lombok.Builder;
25+
import lombok.Data;
26+
import lombok.NoArgsConstructor;
27+
2328
@Config(prefix = "eventMesh.server.rocketmq", path = "classPath://rocketmq-client.properties")
29+
@Data
30+
@Builder
31+
@AllArgsConstructor
32+
@NoArgsConstructor
2433
public class ClientConfiguration {
2534

2635
@ConfigFiled(field = "namesrvAddr", notEmpty = true)
27-
public String namesrvAddr = "";
36+
@Builder.Default
37+
private String namesrvAddr = "";
2838

2939
@ConfigFiled(field = "username")
30-
public String clientUserName = "username";
40+
@Builder.Default
41+
private String clientUserName = "username";
3142

3243
@ConfigFiled(field = "password")
33-
public String clientPass = "password";
44+
@Builder.Default
45+
private String clientPass = "password";
3446

3547
@ConfigFiled(field = "client.consumeThreadMin")
36-
public Integer consumeThreadMin = 2;
48+
@Builder.Default
49+
private Integer consumeThreadMin = 2;
3750

3851
@ConfigFiled(field = "client.consumeThreadMax")
39-
public Integer consumeThreadMax = 2;
52+
@Builder.Default
53+
private Integer consumeThreadMax = 2;
4054

4155
@ConfigFiled(field = "client.consumeThreadPoolQueueSize")
42-
public Integer consumeQueueSize = 10000;
56+
@Builder.Default
57+
private Integer consumeQueueSize = 10000;
4358

4459
@ConfigFiled(field = "client.pullBatchSize")
45-
public Integer pullBatchSize = 32;
60+
@Builder.Default
61+
private Integer pullBatchSize = 32;
4662

4763
@ConfigFiled(field = "client.ackwindow")
48-
public Integer ackWindow = 1000;
64+
@Builder.Default
65+
private Integer ackWindow = 1000;
4966

5067
@ConfigFiled(field = "client.pubwindow")
51-
public Integer pubWindow = 100;
68+
@Builder.Default
69+
private Integer pubWindow = 100;
5270

5371
@ConfigFiled(field = "client.comsumeTimeoutInMin")
54-
public long consumeTimeout = 0L;
72+
@Builder.Default
73+
private long consumeTimeout = 0L;
5574

5675
@ConfigFiled(field = "client.pollNameServerInterval")
57-
public Integer pollNameServerInterval = 10 * 1000;
76+
@Builder.Default
77+
private Integer pollNameServerInterval = 10 * 1000;
5878

5979
@ConfigFiled(field = "client.heartbeatBrokerInterval")
60-
public Integer heartbeatBrokerInterval = 30 * 1000;
80+
@Builder.Default
81+
private Integer heartbeatBrokerInterval = 30 * 1000;
6182

6283
@ConfigFiled(field = "client.rebalanceInterval")
63-
public Integer rebalanceInterval = 20 * 1000;
84+
@Builder.Default
85+
private Integer rebalanceInterval = 20 * 1000;
6486

6587
@ConfigFiled(field = "cluster")
66-
public String clusterName = "";
88+
@Builder.Default
89+
private String clusterName = "";
6790

6891
@ConfigFiled(field = "accessKey")
69-
public String accessKey = "";
92+
@Builder.Default
93+
private String accessKey = "";
7094

7195
@ConfigFiled(field = "secretKey")
72-
public String secretKey = "";
73-
}
96+
@Builder.Default
97+
private String secretKey = "";
98+
}

eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/RocketMQConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public synchronized void init(Properties keyValue) throws Exception {
5050
consumerGroup = Constants.BROADCAST_PREFIX + consumerGroup;
5151
}
5252

53-
String namesrvAddr = clientConfiguration.namesrvAddr;
53+
String namesrvAddr = clientConfiguration.getNamesrvAddr();
5454
String instanceName = keyValue.getProperty("instanceName");
5555
Properties properties = new Properties();
5656
properties.put("ACCESS_POINTS", namesrvAddr);

eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/RocketMQProducerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class RocketMQProducerImpl implements Producer {
4848
public synchronized void init(Properties keyValue) {
4949
String producerGroup = keyValue.getProperty(Constants.PRODUCER_GROUP);
5050

51-
String omsNamesrv = clientConfiguration.namesrvAddr;
51+
String omsNamesrv = clientConfiguration.getNamesrvAddr();
5252
Properties properties = new Properties();
5353
properties.put(Constants.ACCESS_POINTS, omsNamesrv);
5454
properties.put(Constants.REGION, Constants.NAMESPACE);

eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfigurationTest.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,38 +28,36 @@ public class ClientConfigurationTest {
2828

2929
@Test
3030
public void getConfigWhenRocketMQConsumerInit() {
31-
RocketMQConsumerImpl consumer =
32-
(RocketMQConsumerImpl) ConnectorPluginFactory.getMeshMQPushConsumer("rocketmq");
31+
RocketMQConsumerImpl consumer = (RocketMQConsumerImpl) ConnectorPluginFactory.getMeshMQPushConsumer("rocketmq");
3332

3433
ClientConfiguration config = consumer.getClientConfiguration();
3534
assertConfig(config);
3635
}
3736

3837
@Test
3938
public void getConfigWhenRocketMQProducerInit() {
40-
RocketMQProducerImpl producer =
41-
(RocketMQProducerImpl) ConnectorPluginFactory.getMeshMQProducer("rocketmq");
39+
RocketMQProducerImpl producer = (RocketMQProducerImpl) ConnectorPluginFactory.getMeshMQProducer("rocketmq");
4240

4341
ClientConfiguration config = producer.getClientConfiguration();
4442
assertConfig(config);
4543
}
4644

4745
private void assertConfig(ClientConfiguration config) {
48-
Assert.assertEquals(config.namesrvAddr, "127.0.0.1:9876;127.0.0.1:9876");
49-
Assert.assertEquals(config.clientUserName, "username-succeed!!!");
50-
Assert.assertEquals(config.clientPass, "password-succeed!!!");
51-
Assert.assertEquals(config.consumeThreadMin, Integer.valueOf(1816));
52-
Assert.assertEquals(config.consumeThreadMax, Integer.valueOf(2816));
53-
Assert.assertEquals(config.consumeQueueSize, Integer.valueOf(3816));
54-
Assert.assertEquals(config.pullBatchSize, Integer.valueOf(4816));
55-
Assert.assertEquals(config.ackWindow, Integer.valueOf(5816));
56-
Assert.assertEquals(config.pubWindow, Integer.valueOf(6816));
57-
Assert.assertEquals(config.consumeTimeout, 7816);
58-
Assert.assertEquals(config.pollNameServerInterval, Integer.valueOf(8816));
59-
Assert.assertEquals(config.heartbeatBrokerInterval, Integer.valueOf(9816));
60-
Assert.assertEquals(config.rebalanceInterval, Integer.valueOf(11816));
61-
Assert.assertEquals(config.clusterName, "cluster-succeed!!!");
62-
Assert.assertEquals(config.accessKey, "accessKey-succeed!!!");
63-
Assert.assertEquals(config.secretKey, "secretKey-succeed!!!");
46+
Assert.assertEquals(config.getNamesrvAddr(), "127.0.0.1:9876;127.0.0.1:9876");
47+
Assert.assertEquals(config.getClientUserName(), "username-succeed!!!");
48+
Assert.assertEquals(config.getClientPass(), "password-succeed!!!");
49+
Assert.assertEquals(config.getConsumeThreadMin(), Integer.valueOf(1816));
50+
Assert.assertEquals(config.getConsumeThreadMax(), Integer.valueOf(2816));
51+
Assert.assertEquals(config.getConsumeQueueSize(), Integer.valueOf(3816));
52+
Assert.assertEquals(config.getPullBatchSize(), Integer.valueOf(4816));
53+
Assert.assertEquals(config.getAckWindow(), Integer.valueOf(5816));
54+
Assert.assertEquals(config.getPubWindow(), Integer.valueOf(6816));
55+
Assert.assertEquals(config.getConsumeTimeout(), 7816);
56+
Assert.assertEquals(config.getPollNameServerInterval(), Integer.valueOf(8816));
57+
Assert.assertEquals(config.getHeartbeatBrokerInterval(), Integer.valueOf(9816));
58+
Assert.assertEquals(config.getRebalanceInterval(), Integer.valueOf(11816));
59+
Assert.assertEquals(config.getClusterName(), "cluster-succeed!!!");
60+
Assert.assertEquals(config.getAccessKey(), "accessKey-succeed!!!");
61+
Assert.assertEquals(config.getSecretKey(), "secretKey-succeed!!!");
6462
}
6563
}

0 commit comments

Comments
 (0)