Skip to content

Commit fe82c3d

Browse files
Enable the kernel to support passing topics to MQTT custom plugins (#15523)
1 parent 26289f5 commit fe82c3d

File tree

7 files changed

+57
-19
lines changed

7 files changed

+57
-19
lines changed

example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
2525

2626
import io.netty.buffer.ByteBuf;
27+
import org.apache.commons.lang3.NotImplementedException;
2728

2829
import java.util.ArrayList;
2930
import java.util.Arrays;
@@ -33,7 +34,7 @@
3334
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
3435

3536
@Override
36-
public List<Message> format(ByteBuf payload) {
37+
public List<Message> format(String topic, ByteBuf payload) {
3738
// Suppose the payload is a json format
3839
if (payload == null) {
3940
return Collections.emptyList();
@@ -54,6 +55,12 @@ public List<Message> format(ByteBuf payload) {
5455
return ret;
5556
}
5657

58+
@Override
59+
@Deprecated
60+
public List<Message> format(ByteBuf payload) {
61+
throw new NotImplementedException();
62+
}
63+
5764
@Override
5865
public String getName() {
5966
// set the value of mqtt_payload_formatter in iotdb-common.properties as the following string:

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.gson.JsonParseException;
2828
import com.google.gson.reflect.TypeToken;
2929
import io.netty.buffer.ByteBuf;
30+
import org.apache.commons.lang3.NotImplementedException;
3031
import org.apache.tsfile.enums.TSDataType;
3132

3233
import java.nio.charset.StandardCharsets;
@@ -50,7 +51,7 @@ public class JSONPayloadFormatter implements PayloadFormatter {
5051
private static final Gson GSON = new GsonBuilder().create();
5152

5253
@Override
53-
public List<Message> format(ByteBuf payload) {
54+
public List<Message> format(String topic, ByteBuf payload) {
5455
if (payload == null) {
5556
return new ArrayList<>();
5657
}
@@ -81,6 +82,12 @@ public List<Message> format(ByteBuf payload) {
8182
throw new JsonParseException("payload is invalidate");
8283
}
8384

85+
@Override
86+
@Deprecated
87+
public List<Message> format(ByteBuf payload) {
88+
throw new NotImplementedException();
89+
}
90+
8491
private List<Message> formatJson(JsonObject jsonObject) {
8592
TreeMessage message = new TreeMessage();
8693
message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iotdb.db.protocol.mqtt;
2020

2121
import io.netty.buffer.ByteBuf;
22+
import org.apache.commons.lang3.NotImplementedException;
2223
import org.apache.tsfile.enums.TSDataType;
2324
import org.apache.tsfile.utils.Binary;
2425
import org.apache.tsfile.utils.Pair;
@@ -63,14 +64,16 @@ public LinePayloadFormatter() {
6364
}
6465

6566
@Override
66-
public List<Message> format(ByteBuf payload) {
67+
public List<Message> format(String topic, ByteBuf payload) {
6768
List<Message> messages = new ArrayList<>();
6869
if (payload == null) {
6970
return messages;
7071
}
7172

7273
String txt = payload.toString(StandardCharsets.UTF_8);
7374
String[] lines = txt.split(LINE_BREAK);
75+
// '/' previously defined as a database name
76+
String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/"));
7477
for (String line : lines) {
7578
if (line.trim().startsWith(WELL)) {
7679
continue;
@@ -83,6 +86,9 @@ public List<Message> format(ByteBuf payload) {
8386
continue;
8487
}
8588

89+
// Parsing Database Name
90+
message.setDatabase((database));
91+
8692
// Parsing Table Names
8793
message.setTable(matcher.group(TABLE));
8894

@@ -121,6 +127,12 @@ public List<Message> format(ByteBuf payload) {
121127
return messages;
122128
}
123129

130+
@Override
131+
@Deprecated
132+
public List<Message> format(ByteBuf payload) {
133+
throw new NotImplementedException();
134+
}
135+
124136
private boolean setTags(Matcher matcher, TableMessage message) {
125137
List<String> tagKeys = new ArrayList<>();
126138
List<Object> tagValues = new ArrayList<>();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void onPublish(InterceptPublishMessage msg) {
136136
topic,
137137
payload);
138138

139-
List<Message> messages = payloadFormat.format(payload);
139+
List<Message> messages = payloadFormat.format(topic, payload);
140140
if (messages == null) {
141141
return;
142142
}
@@ -146,14 +146,7 @@ public void onPublish(InterceptPublishMessage msg) {
146146
continue;
147147
}
148148
if (useTableInsert) {
149-
TableMessage tableMessage = (TableMessage) message;
150-
// '/' previously defined as a database name
151-
String database =
152-
!msg.getTopicName().contains("/")
153-
? msg.getTopicName()
154-
: msg.getTopicName().substring(0, msg.getTopicName().indexOf("/"));
155-
tableMessage.setDatabase(database);
156-
insertTable(tableMessage, session);
149+
insertTable((TableMessage) message, session);
157150
} else {
158151
insertTree((TreeMessage) message, session);
159152
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,20 @@ public interface PayloadFormatter {
4040
* @param payload
4141
* @return
4242
*/
43+
@Deprecated
4344
List<Message> format(ByteBuf payload);
4445

46+
/**
47+
* format a payload of a topic to a list of messages
48+
*
49+
* @param topic
50+
* @param payload
51+
* @return
52+
*/
53+
default List<Message> format(String topic, ByteBuf payload) {
54+
return format(payload);
55+
}
56+
4557
/**
4658
* get the formatter name
4759
*

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ public void formatJson() {
3838
+ " }";
3939

4040
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
41+
String topic = "";
4142

4243
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
43-
TreeMessage message = (TreeMessage) formatter.format(buf).get(0);
44+
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0);
4445

4546
assertEquals("root.sg.d1", message.getDevice());
4647
assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
@@ -59,9 +60,10 @@ public void formatBatchJson() {
5960
+ " }";
6061

6162
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
63+
String topic = "";
6264

6365
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
64-
TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
66+
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
6567

6668
assertEquals("root.sg.d1", message.getDevice());
6769
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -88,9 +90,10 @@ public void formatJsonArray() {
8890
+ "]";
8991

9092
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
93+
String topic = "";
9194

9295
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
93-
TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
96+
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
9497

9598
assertEquals("root.sg.d2", message.getDevice());
9699
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -117,9 +120,10 @@ public void formatBatchJsonArray() {
117120
+ "]";
118121

119122
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
123+
String topic = "";
120124

121125
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
122-
TreeMessage message = (TreeMessage) formatter.format(buf).get(3);
126+
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3);
123127

124128
assertEquals("root.sg.d2", message.getDevice());
125129
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ public void formatLine() {
3737
"test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1";
3838

3939
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
40+
String topic = "";
4041

4142
LinePayloadFormatter formatter = new LinePayloadFormatter();
42-
TableMessage message = (TableMessage) formatter.format(buf).get(0);
43+
TableMessage message = (TableMessage) formatter.format(topic, buf).get(0);
4344

4445
assertEquals("test1", message.getTable());
4546
assertEquals(Long.valueOf(1L), message.getTimestamp());
@@ -64,9 +65,10 @@ public void formatBatchLine() {
6465
+ "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 ";
6566

6667
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
68+
String topic = "";
6769

6870
LinePayloadFormatter formatter = new LinePayloadFormatter();
69-
TableMessage message = (TableMessage) formatter.format(buf).get(1);
71+
TableMessage message = (TableMessage) formatter.format(topic, buf).get(1);
7072

7173
assertEquals("test2", message.getTable());
7274
assertEquals(Long.valueOf(2L), message.getTimestamp());
@@ -82,9 +84,10 @@ public void formatLineAnnotation() {
8284
+ " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 ";
8385

8486
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
87+
String topic = "";
8588

8689
LinePayloadFormatter formatter = new LinePayloadFormatter();
87-
List<Message> message = formatter.format(buf);
90+
List<Message> message = formatter.format(topic, buf);
8891

8992
assertEquals(1, message.size());
9093
}

0 commit comments

Comments
 (0)