Skip to content

Commit 9dad4b5

Browse files
authored
Merge pull request #208 from taosdata/build/sunpeng/merge-from-3.0
build: merge from 3.0
2 parents b0686c2 + caaf1c8 commit 9dad4b5

File tree

3 files changed

+254
-74
lines changed

3 files changed

+254
-74
lines changed

taos/cinterface.py

Lines changed: 97 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,10 +1036,10 @@ def taos_stmt_affected_rows(stmt):
10361036

10371037

10381038
def taos_schemaless_insert(
1039-
connection: c_void_p,
1040-
lines: Union[List[str], Tuple[str]],
1041-
protocol: SmlProtocol,
1042-
precision: SmlPrecision,
1039+
connection: c_void_p,
1040+
lines: Union[List[str], Tuple[str]],
1041+
protocol: SmlProtocol,
1042+
precision: SmlPrecision,
10431043
):
10441044
_check_if_supported()
10451045
num_of_lines = len(lines)
@@ -1084,11 +1084,11 @@ def taos_schemaless_insert(
10841084

10851085

10861086
def taos_schemaless_insert_ttl(
1087-
connection: c_void_p,
1088-
lines: Union[List[str], Tuple[str]],
1089-
protocol: SmlProtocol,
1090-
precision: SmlPrecision,
1091-
ttl: int,
1087+
connection: c_void_p,
1088+
lines: Union[List[str], Tuple[str]],
1089+
protocol: SmlProtocol,
1090+
precision: SmlPrecision,
1091+
ttl: int,
10921092
):
10931093
_check_if_supported()
10941094
num_of_lines = len(lines)
@@ -1135,12 +1135,12 @@ def taos_schemaless_insert_ttl(
11351135

11361136

11371137
def taos_schemaless_insert_ttl_with_reqid(
1138-
connection: c_void_p,
1139-
lines: Union[List[str], Tuple[str]],
1140-
protocol: SmlProtocol,
1141-
precision: SmlPrecision,
1142-
ttl: int,
1143-
req_id: int,
1138+
connection: c_void_p,
1139+
lines: Union[List[str], Tuple[str]],
1140+
protocol: SmlProtocol,
1141+
precision: SmlPrecision,
1142+
ttl: int,
1143+
req_id: int,
11441144
):
11451145
_check_if_supported()
11461146
num_of_lines = len(lines)
@@ -1217,10 +1217,10 @@ def taos_schemaless_insert_with_reqid(connection, lines, protocol, precision, re
12171217

12181218

12191219
def taos_schemaless_insert_raw(
1220-
connection: c_void_p,
1221-
lines_raw: str,
1222-
protocol: SmlProtocol,
1223-
precision: SmlPrecision,
1220+
connection: c_void_p,
1221+
lines_raw: str,
1222+
protocol: SmlProtocol,
1223+
precision: SmlPrecision,
12241224
) -> int:
12251225
_check_if_supported()
12261226
length = len(lines_raw)
@@ -1264,11 +1264,11 @@ def taos_schemaless_insert_raw(
12641264

12651265

12661266
def taos_schemaless_insert_raw_with_reqid(
1267-
connection: c_void_p,
1268-
lines_raw: str,
1269-
protocol: SmlProtocol,
1270-
precision: SmlPrecision,
1271-
req_id: int,
1267+
connection: c_void_p,
1268+
lines_raw: str,
1269+
protocol: SmlProtocol,
1270+
precision: SmlPrecision,
1271+
req_id: int,
12721272
) -> int:
12731273
_check_if_supported()
12741274
length = len(lines_raw)
@@ -1322,11 +1322,11 @@ def taos_schemaless_insert_raw_with_reqid(
13221322

13231323

13241324
def taos_schemaless_insert_raw_ttl(
1325-
connection: c_void_p,
1326-
lines_raw: str,
1327-
protocol: SmlProtocol,
1328-
precision: SmlPrecision,
1329-
ttl: int,
1325+
connection: c_void_p,
1326+
lines_raw: str,
1327+
protocol: SmlProtocol,
1328+
precision: SmlPrecision,
1329+
ttl: int,
13301330
) -> int:
13311331
_check_if_supported()
13321332
length = len(lines_raw)
@@ -1381,12 +1381,12 @@ def taos_schemaless_insert_raw_ttl(
13811381

13821382

13831383
def taos_schemaless_insert_raw_ttl_with_reqid(
1384-
connection: c_void_p,
1385-
lines_raw: str,
1386-
protocol: SmlProtocol,
1387-
precision: SmlPrecision,
1388-
ttl: int,
1389-
req_id: int,
1384+
connection: c_void_p,
1385+
lines_raw: str,
1386+
protocol: SmlProtocol,
1387+
precision: SmlPrecision,
1388+
ttl: int,
1389+
req_id: int,
13901390
) -> int:
13911391
_check_if_supported()
13921392
length = len(lines_raw)
@@ -1509,7 +1509,7 @@ def tmq_consumer_new(conf, errstrlen=0):
15091509
def tmq_err2str(errno):
15101510
# type (c_int) -> c_char_p
15111511
_check_if_supported()
1512-
return c_char_p(_libtaos.tmq_err2str(errno))
1512+
return c_char_p(_libtaos.tmq_err2str(errno)).value.decode("utf-8")
15131513

15141514

15151515
try:
@@ -1669,13 +1669,28 @@ def tmq_consumer_close(tmq):
16691669

16701670

16711671
def tmq_commit_sync(tmq, offset):
1672-
# type: (c_void_p, c_void_p) -> None
1672+
# type: (c_void_p, c_void_p|None) -> None
16731673
_check_if_supported()
16741674
res = _libtaos.tmq_commit_sync(tmq, offset)
16751675
if res != 0:
16761676
raise TmqError(msg=f"failed on tmq_commit_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)
16771677

16781678

1679+
try:
1680+
_libtaos.tmq_commit_offset_sync.argtypes = (c_void_p, c_char_p, c_int32, c_int64)
1681+
_libtaos.tmq_commit_offset_sync.restype = c_int32
1682+
except Exception as err:
1683+
_UNSUPPORTED["tmq_commit_offset_sync"] = err
1684+
1685+
1686+
def tmq_commit_offset_sync(tmq, topic, vg_id, offset):
1687+
# type: (c_void_p, str, int, int) -> None
1688+
_check_if_supported()
1689+
res = _libtaos.tmq_commit_offset_sync(tmq, c_char_p(topic.encode("utf-8")), c_int32(vg_id), c_int64(offset))
1690+
if res != 0:
1691+
raise TmqError(msg=f"failed on tmq_commit_offset_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)
1692+
1693+
16791694
try:
16801695
_libtaos.tmq_get_topic_name.argtypes = (c_void_p,)
16811696
_libtaos.tmq_get_topic_name.restype = c_char_p
@@ -1777,6 +1792,18 @@ def tmq_get_res_type(message):
17771792
return _libtaos.tmq_get_res_type(message)
17781793

17791794

1795+
try:
1796+
_libtaos.tmq_get_vgroup_offset.argstype = (c_void_p,)
1797+
_libtaos.tmq_get_vgroup_offset.restype = c_int64
1798+
except Exception as err:
1799+
_UNSUPPORTED["tmq_get_vgroup_offset"] = err
1800+
1801+
1802+
def tmq_get_vgroup_offset(message):
1803+
# type: (c_void_p) -> int
1804+
return _libtaos.tmq_get_vgroup_offset(message)
1805+
1806+
17801807
class TmqTopicAssignment(Structure):
17811808
_fields_ = [
17821809
("_vg_id", c_int32),
@@ -1871,7 +1898,8 @@ def tmq_get_topic_assignment(tmq, topic_name):
18711898
code = _libtaos.tmq_get_topic_assignment(tmq, c_char_p(topic_name.encode('utf-8')), byref(assignment),
18721899
ctypes.byref(num_of_assignment))
18731900
if code != 0:
1874-
raise TmqError(msg="failed on tmq_get_topic_assignment()", errno=code)
1901+
raise TmqError(msg=f"failed on tmq_get_topic_assignment(), errno={code:X}, errmsg={tmq_err2str(code)}",
1902+
errno=code)
18751903

18761904
tmq_assignments = TmqTopicAssignments(assignment, num_of_assignment.value)
18771905

@@ -1893,7 +1921,37 @@ def tmq_get_topic_assignment(tmq, topic_name):
18931921
def tmq_offset_seek(tmq, topic_name, vgroup_id, offset):
18941922
code = _libtaos.tmq_offset_seek(tmq, c_char_p(topic_name.encode('utf-8')), c_int32(vgroup_id), c_int64(offset))
18951923
if code != 0:
1896-
raise TmqError(msg="failed on tmq_offset_seek()", errno=code)
1924+
raise TmqError(msg=f"failed on tmq_offset_seek(), errno={code:X}, errmsg={tmq_err2str(code)}", errno=code)
1925+
1926+
1927+
try:
1928+
_libtaos.tmq_committed.argstype = (c_void_p, c_char_p, c_int32)
1929+
_libtaos.tmq_committed.restype = c_int64
1930+
except Exception as err:
1931+
_UNSUPPORTED["tmq_committed"] = err
1932+
1933+
1934+
def tmq_committed(tmq, topic, vgroup_id):
1935+
# type: (c_void_p, str, int) -> int
1936+
res = _libtaos.tmq_committed(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
1937+
if res < 0:
1938+
raise TmqError(msg=f"failed on tmq_committed(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)
1939+
return res
1940+
1941+
1942+
try:
1943+
_libtaos.tmq_position.argstype = (c_void_p, c_char_p, c_int32)
1944+
_libtaos.tmq_position.restype = c_int64
1945+
except Exception as err:
1946+
_UNSUPPORTED["tmq_position"] = err
1947+
1948+
1949+
def tmq_position(tmq, topic, vgroup_id):
1950+
# type: (c_void_p, str, int) -> int
1951+
offset = _libtaos.tmq_position(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
1952+
if offset < 0:
1953+
raise TmqError(msg=f"failed on tmq_position(), errno={offset:X}, errmsg={tmq_err2str(offset)}", errno=offset)
1954+
return offset
18971955

18981956

18991957
class CTaosInterface(object):

taos/tmq.py

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ def value(self):
135135
table=tmq_get_table_name(self.msg)))
136136
return message_blocks
137137

138+
def offset(self):
139+
# type: () -> int
140+
"""
141+
:returns: message offset.
142+
:rtype: int
143+
"""
144+
return tmq_get_vgroup_offset(self.msg)
145+
138146
def __del__(self):
139147
if not self.msg:
140148
return
@@ -279,20 +287,76 @@ def close(self):
279287
tmq_consumer_close(self._tmq)
280288
self._tmq = None
281289

282-
def commit(self, message):
283-
# type (Message) -> None
290+
def commit(self, message: Message = None, offsets: [TopicPartition] = None):
291+
# type (Message, [TopicPartition], bool) -> None
284292
"""
285293
Commit a message.
286294
287-
The `message` parameters are mutually exclusive. If `message` is None, the current partition assignment's
288-
offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set to False.
295+
The `message` and `offsets` parameters are mutually exclusive. If neither is set, the current partition
296+
assignment's offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set
297+
to False.
289298
290-
:param Message message: Commit the message's offset.
299+
:param Message message: Commit the message's offset+1. Note: By convention, committed offsets reflect the next
300+
message to be consumed, **not** the last message consumed.
301+
:param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.
291302
"""
292-
if message is None and not isinstance(message, Message):
293-
tmq_commit_sync(self._tmq, None)
294-
else:
303+
if message:
304+
if not isinstance(message, Message):
305+
raise TmqError(msg='Invalid message type')
295306
tmq_commit_sync(self._tmq, message.msg)
307+
return
308+
309+
if offsets and isinstance(offsets, list):
310+
for offset in offsets:
311+
if not isinstance(offset, TopicPartition):
312+
raise TmqError(msg='Invalid offset type')
313+
tmq_commit_offset_sync(self._tmq, offset.topic, offset.partition, offset.offset)
314+
return
315+
316+
tmq_commit_sync(self._tmq, None)
317+
318+
def committed(self, partitions):
319+
# type ([TopicPartition]) -> [TopicPartition]
320+
"""
321+
Retrieve committed offsets for the specified partitions.
322+
323+
:param list(TopicPartition) partitions: List of topic+partitions to query for stored offsets.
324+
:returns: List of topic+partitions with offset and possibly error set.
325+
:rtype: list(TopicPartition)
326+
"""
327+
for partition in partitions:
328+
if not isinstance(partition, TopicPartition):
329+
raise TmqError(msg='Invalid partition type')
330+
offset = tmq_committed(self._tmq, partition.topic, partition.partition)
331+
partition.offset = offset
332+
333+
return partitions
334+
335+
def position(self, partitions):
336+
# type ([TopicPartition]) -> [TopicPartition]
337+
"""
338+
Retrieve current positions (offsets) for the specified partitions.
339+
340+
:param list(TopicPartition) partitions: List of topic+partitions to return current offsets for.
341+
:returns: List of topic+partitions with offset and possibly error set.
342+
:rtype: list(TopicPartition)
343+
"""
344+
for partition in partitions:
345+
if not isinstance(partition, TopicPartition):
346+
raise TmqError(msg='Invalid partition type')
347+
offset = tmq_position(self._tmq, partition.topic, partition.partition)
348+
partition.offset = offset
349+
350+
return partitions
351+
352+
def list_topics(self) -> [str]:
353+
# type () -> [str]
354+
"""
355+
Request subscription topics from the tmq.
356+
357+
:rtype: topics list
358+
"""
359+
return tmq_subscription(self._tmq)
296360

297361
def __del__(self):
298362
self.close()

0 commit comments

Comments
 (0)