-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathKafkaOperation.java
More file actions
82 lines (63 loc) · 2.58 KB
/
KafkaOperation.java
File metadata and controls
82 lines (63 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package dc.rmq;
import com.intersystems.enslib.pex.*;
import com.intersystems.jdbc.IRISObject;
import com.intersystems.jdbc.IRIS;
import com.intersystems.gateway.GatewayContext;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public class KafkaOperation extends BusinessOperation {
// Connection to InterSystems IRIS
private IRIS iris;
// Connection to Kafka
private Producer<Long, String> producer;
// Kafka server address (comma separated if several)
public String SERVERS;
// Name of our Producer
public String CLIENTID;
/// Path to Config File
public String CONFIG;
public void OnInit() throws Exception {
iris = GatewayContext.getIRIS();
LOGINFO("Initialized IRIS");
LOGINFO(String.format("SERVERS: %s CLIENTID: %s", SERVERS, CLIENTID));
producer = createProducer();
LOGINFO("Initialized Kafka Producer");
return;
}
public void OnTearDown() throws Exception {
producer.flush();
producer.close();
return;
}
public Object OnMessage(Object request) throws Exception {
IRISObject req = (IRISObject) request;
LOGINFO("Received object: " + req.invokeString("%ClassName", 1));
// Create record
String value = req.getString("Text");
String topic = req.getString("Topic");
final ProducerRecord<Long, String> record = new ProducerRecord<>(topic, value);
// Send new record
RecordMetadata metadata = producer.send(record).get();
// Return record info
IRISObject response = (IRISObject)(iris.classMethodObject("Ens.StringContainer","%New",metadata.offset()));
return response;
}
private Producer<Long, String> createProducer() throws IOException {
Properties props = new Properties();
if (CONFIG == null || CONFIG.isEmpty() || CONFIG.trim().isEmpty()) {
LOGINFO("Trying settings config");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENTID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
} else {
LOGINFO("Trying file config");
FileInputStream in = new FileInputStream(CONFIG);
props.load(in);
}
return new KafkaProducer<>(props);
}
}