-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathproton.py
More file actions
49 lines (39 loc) · 1.54 KB
/
proton.py
File metadata and controls
49 lines (39 loc) · 1.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
"""Output to Timeplus Proton."""
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from proton_driver import client
import logging
__all__ = [
"ProtonSink",
]
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class _ProtonSinkPartition(StatelessSinkPartition):
def __init__(self, stream: str, host: str):
self.client = client.Client(host=host, port=8463)
self.stream = stream
sql = f"CREATE STREAM IF NOT EXISTS `{stream}` (raw string)" # noqa
logger.debug(sql)
self.client.execute(sql)
def write_batch(self, items):
logger.debug(f"inserting data {items}")
rows = []
for item in items:
rows.append([item]) # single column in each row
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
logger.debug(f"inserting data {sql}")
self.client.execute(sql, rows)
class ProtonSink(DynamicSink):
def __init__(self, stream: str, host: str):
self.stream = stream
self.host = host if host is not None and host != "" else "127.0.0.1"
"""
Write each output item to Proton on that worker.
Items consumed from the dataflow must look like a string. Use a
proceeding map step to do custom formatting.
Workers are the unit of parallelism.
Can support at-least-once processing. Messages from the resume
epoch will be duplicated right after resume.
"""
def build(self, worker_index, worker_count):
"""See ABC docstring."""
return _ProtonSinkPartition(self.stream, self.host)