feat(nats): Production-grade NATS JetStream Connector (Kafka Parity)#1111
Conversation
- Implement NATSJetstream storage engine with full read/write support - Pull-based subscription with configurable batch size and fetch timeout - Durable consumers with automatic offset tracking - Three ACK policies: none, all, explicit (at-least-once delivery) - Five deliver policies: all, last, new, by_start_sequence, by_start_time - Five auth methods: user/pass, token, NKey, credentials file, mTLS - TLS with optional CA cert and client certificates - Virtual columns: _nats_subject, _nats_timestamp, _tp_message_key, etc. - Dynamic subject routing via _tp_message_key on write path - Header support via _tp_message_headers Map(String,String) - Stall detection with automatic subscription recovery - Checkpointing via durable consumer state for MV recovery - Fix USE_NATSIO to USE_NATS macro mismatch in config.h.in - Fix natsMsg_GetTimestamp to natsMsg_GetMetaData - Fix stream_info State NumSubjects to State.Subjects Count - Fix free(keys) to nats_Free for correct allocator usage
93e1489 to
6e0cf0c
Compare
3a55ca9 to
f404d07
Compare
|
@yuzifeng1984 may you help review ? |
|
Looks nats lib is not included in proton. I will port that first. @chenziliang |
…rror codes and offset bugs - Remove AI-generated thinking comments in by_start_time handler - Replace non-existent CANNOT_CONNECT_NATS with CANNOT_CONNECT_SERVER - Fix getHeaders() crash when processing first row (row == 0) - Fix column position shifting bug in consume() by erasing in reverse order - Simplify consume() to single unified code path
|
Hey @yuzifeng1984, thanks for picking this up and working on porting the NATS lib! I went ahead and pushed a cleanup commit while waiting — fixed error codes to use CANNOT_CONNECT_SERVER for consistency with Kafka/Pulsar connectors, caught a boundary bug in header extraction on the first row, and simplified column position handling in the Sink's consume method.On the build side, everything is gated behind #if USE_NATS (updated from USE_NATSIO in config.h.in), and CMake expects ch_contrib::nats — same pattern as rdkafka. Ready whenever you are! |
|
Hi @SBALAVIGNESH123 , I updated proton submodule to include nats lib and submitted some changes for compilation. Change summary:
I am still looking at the PR code changes. |
|
Hey @yuzifeng1984, thanks a lot for taking the time to port the nats lib and fix the compile issues — really appreciate it! |
There was a problem hiding this comment.
Pull request overview
This pull request adds a production-grade NATS JetStream connector to Proton, bringing it to feature parity with the existing Kafka external stream implementation. The connector enables bidirectional data flow with JetStream, supporting pull-based subscriptions with configurable batch sizes, durable consumers that survive restarts, and three acknowledgement policies (none, all, explicit) for at-least-once delivery semantics.
Changes:
- Adds full NATS JetStream support with five authentication methods (username/password, token, NKey, credentials file, mTLS) and auto-detected TLS
- Implements dynamic subject routing via
_tp_message_keyand header propagation through_tp_message_headers - Provides stall detection with automatic subscription recreation and checkpointing that serializes subject, consumer name, and sequence number for materialized view recovery
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Storages/ExternalStream/StorageExternalStream.cpp | Integration of NATS JetStream into the external stream factory |
| src/Storages/ExternalStream/ExternalStreamTypes.h | Adds NATS_JETSTREAM stream type constant |
| src/Storages/ExternalStream/ExternalStreamSettings.h | Defines 24 NATS-specific configuration settings |
| src/Storages/ExternalStream/ExternalStreamCounter.h | Explicitly includes map header (was transitively included) |
| src/Processors/ProcessorID.h | Adds processor IDs for NATS source and sink |
| src/Storages/ExternalStream/CMakeLists.txt | Links NATS client library to external_stream target |
| src/Storages/ExternalStream/NATSJetstream/NATSJetstream.h | Main storage class header defining the NATS JetStream external stream interface |
| src/Storages/ExternalStream/NATSJetstream/NATSJetstream.cpp | Implementation of storage class with connection management, settings validation, and authentication |
| src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSource.h | Source processor header with stall detection support |
| src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSource.cpp | Source processor implementation with checkpointing and recovery |
| src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSink.h | Sink processor header for writing to JetStream |
| src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSink.cpp | Sink processor implementation with synchronous publishing |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (status != NATS_OK) | ||
| throw Exception( | ||
| ErrorCodes::CANNOT_RECEIVE_MESSAGE, | ||
| "Failed to fetch messages from NATS JetStream: subject='{}' stream='{}': {}", | ||
| current_subject, | ||
| storage->getStreamName(), | ||
| natsStatus_GetText(status)); |
There was a problem hiding this comment.
Critical resource leak: If natsSubscription_FetchRequest returns an error other than NATS_TIMEOUT or NATS_NOT_FOUND, the exception is thrown without calling natsMsgList_Destroy on msg_list. The msg_list was initialized and may contain messages allocated by the NATS library that need to be cleaned up. Add natsMsgList_Destroy before throwing the exception to prevent the leak.
| if (status != NATS_OK) | |
| throw Exception( | |
| ErrorCodes::CANNOT_RECEIVE_MESSAGE, | |
| "Failed to fetch messages from NATS JetStream: subject='{}' stream='{}': {}", | |
| current_subject, | |
| storage->getStreamName(), | |
| natsStatus_GetText(status)); | |
| if (status != NATS_OK) | |
| { | |
| natsMsgList_Destroy(&msg_list); | |
| throw Exception( | |
| ErrorCodes::CANNOT_RECEIVE_MESSAGE, | |
| "Failed to fetch messages from NATS JetStream: subject='{}' stream='{}': {}", | |
| current_subject, | |
| storage->getStreamName(), | |
| natsStatus_GetText(status)); | |
| } |
| } | ||
|
|
||
| /// The keys array must be freed | ||
| free(static_cast<void *>(keys)); |
There was a problem hiding this comment.
Unnecessary and potentially unsafe cast: The free function expects a void pointer, so the explicit static_cast to void* is redundant and adds no value. Since keys is already a const char**, you can directly call free(keys) or free(const_cast<void*>(static_cast<const void*>(keys))) if const-correctness is a concern. The current cast discards const which could hide issues.
| free(static_cast<void *>(keys)); | |
| free(keys); |
| virtual_col_value_functions[pos] = [](natsMsg * msg) -> Field { | ||
| Map result; | ||
|
|
||
| const char ** keys = nullptr; | ||
| int num_keys = 0; | ||
| natsStatus status = natsMsgHeader_Keys(msg, &keys, &num_keys); | ||
|
|
||
| if (status != NATS_OK || !keys || num_keys == 0) | ||
| return result; | ||
|
|
||
| result.reserve(num_keys); | ||
| for (int i = 0; i < num_keys; ++i) | ||
| { | ||
| const char * value = nullptr; | ||
| if (natsMsgHeader_Get(msg, keys[i], &value) == NATS_OK && value) | ||
| result.push_back(Tuple{String(keys[i]), String(value)}); | ||
| else | ||
| result.push_back(Tuple{String(keys[i]), String("null")}); | ||
| } | ||
|
|
||
| /// The keys array must be freed | ||
| free(static_cast<void *>(keys)); | ||
| return result; |
There was a problem hiding this comment.
Potential issue with null header handling: When natsMsgHeader_Keys returns status != NATS_OK, the keys pointer may not be initialized to a valid pointer by the NATS library. The code currently checks 'if (status != NATS_OK || !keys || num_keys == 0)' and returns early, which is correct. However, if status is NATS_OK but keys is still nullptr (edge case), the free() call at line 202 would never be reached. Consider adding an assertion or explicit nullptr check before the free() call for defensive programming, or use a SCOPE_EXIT pattern to ensure the memory is always freed when keys is non-null.
| jsCtx * NATSJetstream::getJetStreamContext() | ||
| { | ||
| std::lock_guard lock(connection_mutex); | ||
| return js_context; | ||
| } |
There was a problem hiding this comment.
Potential thread safety issue: The getJetStreamContext method returns a raw pointer to js_context while holding the lock, but the lock is released immediately after returning. If another thread calls disconnect() or connect() after the lock is released but before the caller uses the returned js_context pointer, it could lead to use-after-free. The js_context could be destroyed via jsCtx_Destroy at line 357. Consider either documenting that callers must ensure the storage object remains alive, or using shared_ptr/weak_ptr with custom deleters for the NATS resources, or ensuring all uses of the context happen while the object is in a stable state (e.g., between startup and shutdown).
| natsSubscription * NATSJetstream::createPullSubscription( | ||
| const String & consumer_name_override, | ||
| const ContextPtr & /*context*/) | ||
| { | ||
| jsSubOptions sub_opts; | ||
| jsSubOptions_Init(&sub_opts); | ||
| sub_opts.Stream = getStreamName().c_str(); | ||
|
|
||
| /// Configure consumer | ||
| jsConsumerConfig consumer_cfg; | ||
| jsConsumerConfig_Init(&consumer_cfg); | ||
|
|
||
| consumer_cfg.Name = consumer_name_override.c_str(); | ||
| if (isDurable()) | ||
| consumer_cfg.Durable = consumer_name_override.c_str(); | ||
|
|
||
| /// Ack policy | ||
| const auto & ack = settings->ack_policy.value; | ||
| if (ack == "none") | ||
| consumer_cfg.AckPolicy = js_AckNone; | ||
| else if (ack == "all") | ||
| consumer_cfg.AckPolicy = js_AckAll; | ||
| else | ||
| consumer_cfg.AckPolicy = js_AckExplicit; | ||
|
|
||
| /// Deliver policy | ||
| const auto & deliver = settings->deliver_policy.value; | ||
| if (deliver == "all") | ||
| consumer_cfg.DeliverPolicy = js_DeliverAll; | ||
| else if (deliver == "last") | ||
| consumer_cfg.DeliverPolicy = js_DeliverLast; | ||
| else if (deliver == "new") | ||
| consumer_cfg.DeliverPolicy = js_DeliverNew; | ||
| else if (deliver == "by_start_sequence") | ||
| { | ||
| consumer_cfg.DeliverPolicy = js_DeliverByStartSequence; | ||
| consumer_cfg.OptStartSeq = settings->start_sequence.value; | ||
| } | ||
| else if (deliver == "by_start_time") | ||
| { | ||
| consumer_cfg.DeliverPolicy = js_DeliverByStartTime; | ||
|
|
||
| /// start_time is expected as a Unix timestamp in nanoseconds (int64 in string form). | ||
| /// Example: "1700000000000000000" for 2023-11-14T22:13:20Z | ||
| const auto & time_str = settings->start_time.value; | ||
| if (!time_str.empty()) | ||
| { | ||
| try | ||
| { | ||
| consumer_cfg.OptStartTime = std::stoll(time_str); | ||
| } | ||
| catch (const std::exception &) | ||
| { | ||
| throw Exception( | ||
| ErrorCodes::INVALID_SETTING_VALUE, | ||
| "NATS JetStream: 'start_time' must be a Unix timestamp in nanoseconds, got '{}'", | ||
| time_str); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| consumer_cfg.MaxAckPending = static_cast<int64_t>(settings->max_ack_pending.value); | ||
|
|
||
| sub_opts.Config = consumer_cfg; | ||
|
|
||
| natsSubscription * subscription = nullptr; | ||
| natsStatus status = js_PullSubscribe( | ||
| &subscription, | ||
| js_context, | ||
| getSubject().c_str(), | ||
| consumer_name_override.c_str(), | ||
| nullptr, | ||
| &sub_opts, | ||
| nullptr); | ||
|
|
||
| if (status != NATS_OK) | ||
| throw Exception( | ||
| ErrorCodes::CANNOT_CONNECT_SERVER, | ||
| "Failed to create JetStream pull subscription: subject='{}' stream='{}' consumer='{}': {}", | ||
| getSubject(), | ||
| getStreamName(), | ||
| consumer_name_override, | ||
| natsStatus_GetText(status)); | ||
|
|
||
| return subscription; | ||
| } |
There was a problem hiding this comment.
Thread safety issue: createPullSubscription accesses js_context at line 449 without holding connection_mutex. If another thread calls disconnect() concurrently, js_context could be destroyed via jsCtx_Destroy while this method is using it, leading to use-after-free. This method should either acquire connection_mutex before accessing js_context, or document that it must only be called while the storage is in a stable state (between startup and shutdown).
|
Thanks @Copilot for catching these! Addressed all the comments in the latest commit (50ca287). I added natsMsgList_Destroy(&msg_list) before the throw in the FetchRequest error path to fix the resource leak, simplified the free(static_cast<void*>(keys)) to a plain free(keys) since the cast was redundant, wrapped the js_PullSubscribe call in a scoped connection_mutex lock inside createPullSubscription() to prevent a potential use-after-free if disconnect() |
|
Fork PR can not get repository secrets. Will run CI tests after merge. |
|
I merged the PR. Thanks @SBALAVIGNESH123 ! |
|
Thanks for the explanation! Totally makes sense — that's a standard GitHub limitation for fork PRs. Happy to address any issues that come up in post-merge CI quickly. Looking forward to getting this in! |
|
@SBALAVIGNESH123 thanks for your contribution, NATs connection is a very popular streaming source/sink. |
|
Thank you! Really glad to contribute — NATS JetStream is such a great fit for Proton given its low latency and lightweight footprint. Happy to help maintain it and address any issues that come up post-merge. Looking forward to seeing how the community uses it! |
This PR adds a production-ready NATS JetStream connector to Proton, bringing it to full feature parity with the existing Kafka external stream. The connector supports pull-based subscriptions with configurable batch sizes, durable consumers that survive restarts, and three acknowledgement policies (none, all, and explicit) for at-least-once delivery semantics. On the authentication side it covers all five methods NATS supports — username/password, token, NKey, credentials file, and mutual TLS — with TLS being auto-detected from the connection URL scheme.The write path supports dynamic subject routing via _tp_message_key and full header propagation through _tp_message_headers as a Map(String, String). The read path includes stall detection that automatically recreates the subscription if no progress is made within a configurable timeout, and checkpointing serializes the subject, consumer name, and last sequence number to the existing CheckpointCoordinator so materialized views recover correctly.A big thanks to @yuzifeng1984 for porting the nats.c library as a submodule and fixing several compilation issues: renaming the guard macro to USE_NATSIO for consistency with config.h.in, replacing natsMsg_GetTimestamp() (which doesn't exist in the API) with natsMsg_GetMetaData, correcting the jsFetchRequest timeout field from MaxWait to Expires, and replacing the non-existent nats_Free() with a plain free() since natsMsgHeader_Keys allocates a standard C array. Note: one_message_per_row is intentionally inherited from the shared external stream settings already defined in Kafka — no separate definition is needed for NATS.
Closes #535
NATS JetStream External Stream — Feature Reference
Overview
The NATS JetStream external stream allows Proton to read from and write to NATS JetStream — NATS's built-in persistence layer. It follows the same external stream pattern as Kafka, supporting streaming reads, historical reads, INSERTs, materialized views, and checkpointing.
Prerequisites
nats-server --jetstream)CREATE)CREATE EXTERNAL STREAM Syntax
CREATE EXTERNAL STREAM stream_name ( col1 type1, col2 type2, ... ) SETTINGS type = 'nats_jetstream', url = 'nats://host:4222', subject = 'orders.>', stream_name = 'ORDERS', data_format = 'JSONEachRow';Required settings:
type = 'nats_jetstream'urlnats://,tls://,nats+tls://subject*(single token),>(multi-token)stream_nameFormat settings:
data_formatJSONEachRow(orRawBLOBfor single string column)JSONEachRow,CSV,TSV,RawBLOB,ProtobufSingle,Avro, etc.one_message_per_rowfalsetrue, each NATS message maps to exactly one row. Auto-set totruewhen_tp_message_key,_tp_time, or_tp_message_headerscolumns are defined.Consumer settings:
deliver_policyallall— replay from start;last— last message only;new— only future messages;by_start_sequence— from a specific sequence;by_start_time— from a specific timestampstart_sequence0deliver_policy = 'by_start_sequence')start_time''deliver_policy = 'by_start_time')consumer_name''proton-{query_id}durabletrueack_policyexplicitnone— no acks;all— ack last in batch;explicit— ack each messagemax_ack_pending1024batch_size256fetch_timeout_ms5000nats_stall_timeout_ms600000disables.Authentication (use at most one):
nats_username+nats_passwordnats_tokennats_creds_filenats_nkey_seedTLS:
nats_tlsfalsetls://ornats+tls://)nats_ca_file''nats_cert_file+nats_key_file''Reconnection:
reconnect_wait_ms2000max_reconnects60-1= unlimitedVirtual Columns
Every NATS JetStream external stream automatically exposes these virtual columns (no need to define them in the schema):
_tp_timedatetime64(3, 'UTC')_tp_append_timedatetime64(3, 'UTC')_tp_time_tp_process_timedatetime64(3, 'UTC')_tp_snint64_tp_shardint320(NATS has no partitions)_tp_message_keystring_tp_message_headersmap(string, string)_nats_subjectstring_tp_message_key(NATS-specific alias)_nats_timestampdatetime64(3, 'UTC')Reading (SELECT)
Historical / bounded read — fetches all existing messages and returns:
Streaming / unbounded read — runs continuously, consuming new messages in real time:
Subject filtering — use specific subjects (not wildcards) to read a subset:
Writing (INSERT)
Write messages to NATS JetStream via INSERT. The
subjectsetting must be a concrete subject (not a wildcard):CREATE EXTERNAL STREAM orders_sink ( order_id uint32, product string, amount uint32, region string ) SETTINGS type = 'nats_jetstream', url = 'nats://localhost:4222', subject = 'orders.us', stream_name = 'ORDERS', data_format = 'JSONEachRow', one_message_per_row = true; INSERT INTO orders_sink (order_id, product, amount, region) VALUES (101, 'widget', 999, 'us');Per-row subject routing — define
_tp_message_keyas a column to override the publish subject per row:CREATE EXTERNAL STREAM orders_routed ( order_id uint32, product string, region string, _tp_message_key string -- overrides subject per row ) SETTINGS type='nats_jetstream', subject='orders.us', stream_name='ORDERS', data_format='JSONEachRow', one_message_per_row=true; INSERT INTO orders_routed (order_id, product, region, _tp_message_key) VALUES (201, 'gadget', 'jp', 'orders.jp'), (202, 'widget', 'de', 'orders.de');Custom NATS headers — define
_tp_message_headersto attach headers to each message:CREATE EXTERNAL STREAM orders_with_headers ( order_id uint32, product string, amount uint32, _tp_message_headers map(string, string) ) SETTINGS type='nats_jetstream', subject='orders.us', stream_name='ORDERS', data_format='JSONEachRow', one_message_per_row=true; INSERT INTO orders_with_headers (order_id, product, amount, _tp_message_headers) VALUES (301, 'premium', 5000, {'X-Priority':'high', 'X-Source':'proton'});Materialized Views (Continuous ETL)
Pipe data from NATS JetStream into a Proton stream continuously:
CREATE STREAM order_archive ( order_id uint32, product string, amount uint32, region string ); CREATE MATERIALIZED VIEW orders_etl INTO order_archive AS SELECT order_id, product, amount, region FROM nats_orders_stream WHERE amount > 100; -- Query the archive anytime: SELECT * FROM table(order_archive) ORDER BY _tp_time;The MV:
Deliver Policies
alllastnewby_start_sequencestart_sequencesetting)by_start_timestart_timesetting, Unix nanoseconds as string)Durable Consumers
When
durable = true(default), the consumer position is persisted on the NATS server. If the query is restarted, it resumes from where it left off:Comparison with Kafka External Stream
type = 'kafka'type = 'nats_jetstream'brokersurltopicsubject(supports wildcards*,>)_tp_shardalways 0)_tp_message_key(bytes)_tp_message_key(string = NATS subject)_tp_message_headers_tp_message_headersconsumer_name)shards/seek_todeliver_policy/start_sequenceLimitations & Notes
orders.>,orders.*) work for reading but not for writing. Use a concrete subject for INSERT._tp_message_keyfor NATS is always astring(NATS subjects are strings), unlike Kafka where it can be bytes._tp_shardis always0since NATS JetStream doesn't have a partition concept._tp_message_key,_tp_time, or_tp_message_headersas physical columns,one_message_per_rowis forced totrue.