@@ -1036,10 +1036,10 @@ def taos_stmt_affected_rows(stmt):
10361036
10371037
10381038def taos_schemaless_insert (
1039- connection : c_void_p ,
1040- lines : Union [List [str ], Tuple [str ]],
1041- protocol : SmlProtocol ,
1042- precision : SmlPrecision ,
1039+ connection : c_void_p ,
1040+ lines : Union [List [str ], Tuple [str ]],
1041+ protocol : SmlProtocol ,
1042+ precision : SmlPrecision ,
10431043):
10441044 _check_if_supported ()
10451045 num_of_lines = len (lines )
@@ -1084,11 +1084,11 @@ def taos_schemaless_insert(
10841084
10851085
10861086def taos_schemaless_insert_ttl (
1087- connection : c_void_p ,
1088- lines : Union [List [str ], Tuple [str ]],
1089- protocol : SmlProtocol ,
1090- precision : SmlPrecision ,
1091- ttl : int ,
1087+ connection : c_void_p ,
1088+ lines : Union [List [str ], Tuple [str ]],
1089+ protocol : SmlProtocol ,
1090+ precision : SmlPrecision ,
1091+ ttl : int ,
10921092):
10931093 _check_if_supported ()
10941094 num_of_lines = len (lines )
@@ -1135,12 +1135,12 @@ def taos_schemaless_insert_ttl(
11351135
11361136
11371137def taos_schemaless_insert_ttl_with_reqid (
1138- connection : c_void_p ,
1139- lines : Union [List [str ], Tuple [str ]],
1140- protocol : SmlProtocol ,
1141- precision : SmlPrecision ,
1142- ttl : int ,
1143- req_id : int ,
1138+ connection : c_void_p ,
1139+ lines : Union [List [str ], Tuple [str ]],
1140+ protocol : SmlProtocol ,
1141+ precision : SmlPrecision ,
1142+ ttl : int ,
1143+ req_id : int ,
11441144):
11451145 _check_if_supported ()
11461146 num_of_lines = len (lines )
@@ -1217,10 +1217,10 @@ def taos_schemaless_insert_with_reqid(connection, lines, protocol, precision, re
12171217
12181218
12191219def taos_schemaless_insert_raw (
1220- connection : c_void_p ,
1221- lines_raw : str ,
1222- protocol : SmlProtocol ,
1223- precision : SmlPrecision ,
1220+ connection : c_void_p ,
1221+ lines_raw : str ,
1222+ protocol : SmlProtocol ,
1223+ precision : SmlPrecision ,
12241224) -> int :
12251225 _check_if_supported ()
12261226 length = len (lines_raw )
@@ -1264,11 +1264,11 @@ def taos_schemaless_insert_raw(
12641264
12651265
12661266def taos_schemaless_insert_raw_with_reqid (
1267- connection : c_void_p ,
1268- lines_raw : str ,
1269- protocol : SmlProtocol ,
1270- precision : SmlPrecision ,
1271- req_id : int ,
1267+ connection : c_void_p ,
1268+ lines_raw : str ,
1269+ protocol : SmlProtocol ,
1270+ precision : SmlPrecision ,
1271+ req_id : int ,
12721272) -> int :
12731273 _check_if_supported ()
12741274 length = len (lines_raw )
@@ -1322,11 +1322,11 @@ def taos_schemaless_insert_raw_with_reqid(
13221322
13231323
13241324def taos_schemaless_insert_raw_ttl (
1325- connection : c_void_p ,
1326- lines_raw : str ,
1327- protocol : SmlProtocol ,
1328- precision : SmlPrecision ,
1329- ttl : int ,
1325+ connection : c_void_p ,
1326+ lines_raw : str ,
1327+ protocol : SmlProtocol ,
1328+ precision : SmlPrecision ,
1329+ ttl : int ,
13301330) -> int :
13311331 _check_if_supported ()
13321332 length = len (lines_raw )
@@ -1381,12 +1381,12 @@ def taos_schemaless_insert_raw_ttl(
13811381
13821382
13831383def taos_schemaless_insert_raw_ttl_with_reqid (
1384- connection : c_void_p ,
1385- lines_raw : str ,
1386- protocol : SmlProtocol ,
1387- precision : SmlPrecision ,
1388- ttl : int ,
1389- req_id : int ,
1384+ connection : c_void_p ,
1385+ lines_raw : str ,
1386+ protocol : SmlProtocol ,
1387+ precision : SmlPrecision ,
1388+ ttl : int ,
1389+ req_id : int ,
13901390) -> int :
13911391 _check_if_supported ()
13921392 length = len (lines_raw )
@@ -1509,7 +1509,7 @@ def tmq_consumer_new(conf, errstrlen=0):
15091509def tmq_err2str (errno ):
15101510 # type (c_int) -> c_char_p
15111511 _check_if_supported ()
1512- return c_char_p (_libtaos .tmq_err2str (errno ))
1512+ return c_char_p (_libtaos .tmq_err2str (errno )). value . decode ( "utf-8" )
15131513
15141514
15151515try :
@@ -1669,13 +1669,28 @@ def tmq_consumer_close(tmq):
16691669
16701670
16711671def tmq_commit_sync (tmq , offset ):
1672- # type: (c_void_p, c_void_p) -> None
1672+ # type: (c_void_p, c_void_p|None ) -> None
16731673 _check_if_supported ()
16741674 res = _libtaos .tmq_commit_sync (tmq , offset )
16751675 if res != 0 :
16761676 raise TmqError (msg = f"failed on tmq_commit_sync(), errno={ res :X} , errmsg={ tmq_err2str (res )} " , errno = res )
16771677
16781678
1679+ try :
1680+ _libtaos .tmq_commit_offset_sync .argtypes = (c_void_p , c_char_p , c_int32 , c_int64 )
1681+ _libtaos .tmq_commit_offset_sync .restype = c_int32
1682+ except Exception as err :
1683+ _UNSUPPORTED ["tmq_commit_offset_sync" ] = err
1684+
1685+
1686+ def tmq_commit_offset_sync (tmq , topic , vg_id , offset ):
1687+ # type: (c_void_p, str, int, int) -> None
1688+ _check_if_supported ()
1689+ res = _libtaos .tmq_commit_offset_sync (tmq , c_char_p (topic .encode ("utf-8" )), c_int32 (vg_id ), c_int64 (offset ))
1690+ if res != 0 :
1691+ raise TmqError (msg = f"failed on tmq_commit_offset_sync(), errno={ res :X} , errmsg={ tmq_err2str (res )} " , errno = res )
1692+
1693+
16791694try :
16801695 _libtaos .tmq_get_topic_name .argtypes = (c_void_p ,)
16811696 _libtaos .tmq_get_topic_name .restype = c_char_p
@@ -1777,6 +1792,18 @@ def tmq_get_res_type(message):
17771792 return _libtaos .tmq_get_res_type (message )
17781793
17791794
1795+ try :
1796+ _libtaos .tmq_get_vgroup_offset .argstype = (c_void_p ,)
1797+ _libtaos .tmq_get_vgroup_offset .restype = c_int64
1798+ except Exception as err :
1799+ _UNSUPPORTED ["tmq_get_vgroup_offset" ] = err
1800+
1801+
1802+ def tmq_get_vgroup_offset (message ):
1803+ # type: (c_void_p) -> int
1804+ return _libtaos .tmq_get_vgroup_offset (message )
1805+
1806+
17801807class TmqTopicAssignment (Structure ):
17811808 _fields_ = [
17821809 ("_vg_id" , c_int32 ),
@@ -1871,7 +1898,8 @@ def tmq_get_topic_assignment(tmq, topic_name):
18711898 code = _libtaos .tmq_get_topic_assignment (tmq , c_char_p (topic_name .encode ('utf-8' )), byref (assignment ),
18721899 ctypes .byref (num_of_assignment ))
18731900 if code != 0 :
1874- raise TmqError (msg = "failed on tmq_get_topic_assignment()" , errno = code )
1901+ raise TmqError (msg = f"failed on tmq_get_topic_assignment(), errno={ code :X} , errmsg={ tmq_err2str (code )} " ,
1902+ errno = code )
18751903
18761904 tmq_assignments = TmqTopicAssignments (assignment , num_of_assignment .value )
18771905
@@ -1893,7 +1921,37 @@ def tmq_get_topic_assignment(tmq, topic_name):
18931921def tmq_offset_seek (tmq , topic_name , vgroup_id , offset ):
18941922 code = _libtaos .tmq_offset_seek (tmq , c_char_p (topic_name .encode ('utf-8' )), c_int32 (vgroup_id ), c_int64 (offset ))
18951923 if code != 0 :
1896- raise TmqError (msg = "failed on tmq_offset_seek()" , errno = code )
1924+ raise TmqError (msg = f"failed on tmq_offset_seek(), errno={ code :X} , errmsg={ tmq_err2str (code )} " , errno = code )
1925+
1926+
1927+ try :
1928+ _libtaos .tmq_committed .argstype = (c_void_p , c_char_p , c_int32 )
1929+ _libtaos .tmq_committed .restype = c_int64
1930+ except Exception as err :
1931+ _UNSUPPORTED ["tmq_committed" ] = err
1932+
1933+
1934+ def tmq_committed (tmq , topic , vgroup_id ):
1935+ # type: (c_void_p, str, int) -> int
1936+ res = _libtaos .tmq_committed (tmq , c_char_p (topic .encode ('utf-8' )), c_int32 (vgroup_id ))
1937+ if res < 0 :
1938+ raise TmqError (msg = f"failed on tmq_committed(), errno={ res :X} , errmsg={ tmq_err2str (res )} " , errno = res )
1939+ return res
1940+
1941+
1942+ try :
1943+ _libtaos .tmq_position .argstype = (c_void_p , c_char_p , c_int32 )
1944+ _libtaos .tmq_position .restype = c_int64
1945+ except Exception as err :
1946+ _UNSUPPORTED ["tmq_position" ] = err
1947+
1948+
1949+ def tmq_position (tmq , topic , vgroup_id ):
1950+ # type: (c_void_p, str, int) -> int
1951+ offset = _libtaos .tmq_position (tmq , c_char_p (topic .encode ('utf-8' )), c_int32 (vgroup_id ))
1952+ if offset < 0 :
1953+ raise TmqError (msg = f"failed on tmq_position(), errno={ offset :X} , errmsg={ tmq_err2str (offset )} " , errno = offset )
1954+ return offset
18971955
18981956
18991957class CTaosInterface (object ):
0 commit comments