Skip to content

feat(nats): Production-grade NATS JetStream Connector (Kafka Parity)#1111

Merged
yuzifeng1984 merged 8 commits intotimeplus-io:developfrom
SBALAVIGNESH123:feat/nats-jetstream-connector
Feb 27, 2026
Merged

feat(nats): Production-grade NATS JetStream Connector (Kafka Parity)#1111
yuzifeng1984 merged 8 commits intotimeplus-io:developfrom
SBALAVIGNESH123:feat/nats-jetstream-connector

Conversation

@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor

@SBALAVIGNESH123 SBALAVIGNESH123 commented Feb 10, 2026

This PR adds a production-ready NATS JetStream connector to Proton, bringing it to full feature parity with the existing Kafka external stream. The connector supports pull-based subscriptions with configurable batch sizes, durable consumers that survive restarts, and three acknowledgement policies (none, all, and explicit) for at-least-once delivery semantics. On the authentication side it covers all five methods NATS supports — username/password, token, NKey, credentials file, and mutual TLS — with TLS being auto-detected from the connection URL scheme.The write path supports dynamic subject routing via _tp_message_key and full header propagation through _tp_message_headers as a Map(String, String). The read path includes stall detection that automatically recreates the subscription if no progress is made within a configurable timeout, and checkpointing serializes the subject, consumer name, and last sequence number to the existing CheckpointCoordinator so materialized views recover correctly.A big thanks to @yuzifeng1984 for porting the nats.c library as a submodule and fixing several compilation issues: renaming the guard macro to USE_NATSIO for consistency with config.h.in, replacing natsMsg_GetTimestamp() (which doesn't exist in the API) with natsMsg_GetMetaData, correcting the jsFetchRequest timeout field from MaxWait to Expires, and replacing the non-existent nats_Free() with a plain free() since natsMsgHeader_Keys allocates a standard C array. Note: one_message_per_row is intentionally inherited from the shared external stream settings already defined in Kafka — no separate definition is needed for NATS.

Closes #535


NATS JetStream External Stream — Feature Reference

Overview

The NATS JetStream external stream allows Proton to read from and write to NATS JetStream — NATS's built-in persistence layer. It follows the same external stream pattern as Kafka, supporting streaming reads, historical reads, INSERTs, materialized views, and checkpointing.

Prerequisites

  • A NATS server with JetStream enabled (nats-server --jetstream)
  • The JetStream stream must already exist on the NATS server before creating the external stream in Proton (Proton validates this on CREATE)

CREATE EXTERNAL STREAM Syntax

CREATE EXTERNAL STREAM stream_name (
    col1 type1,
    col2 type2,
    ...
)
SETTINGS
    type            = 'nats_jetstream',
    url             = 'nats://host:4222',
    subject         = 'orders.>',
    stream_name     = 'ORDERS',
    data_format     = 'JSONEachRow';

Required settings:

Setting Description
type = 'nats_jetstream' Selects the NATS JetStream connector
url NATS server URL. Supports nats://, tls://, nats+tls://
subject NATS subject filter. Supports wildcards: * (single token), > (multi-token)
stream_name Name of the JetStream stream (must already exist on the server)

Format settings:

Setting Default Description
data_format JSONEachRow (or RawBLOB for single string column) Any format supported by Proton: JSONEachRow, CSV, TSV, RawBLOB, ProtobufSingle, Avro, etc.
one_message_per_row false When true, each NATS message maps to exactly one row. Auto-set to true when _tp_message_key, _tp_time, or _tp_message_headers columns are defined.

Consumer settings:

Setting Default Description
deliver_policy all all — replay from start; last — last message only; new — only future messages; by_start_sequence — from a specific sequence; by_start_time — from a specific timestamp
start_sequence 0 Starting sequence number (for deliver_policy = 'by_start_sequence')
start_time '' Unix timestamp in nanoseconds as a string (for deliver_policy = 'by_start_time')
consumer_name '' Durable consumer name. If empty, auto-generated as proton-{query_id}
durable true Create a durable consumer (position survives restarts)
ack_policy explicit none — no acks; all — ack last in batch; explicit — ack each message
max_ack_pending 1024 Max unacknowledged messages before the server pauses delivery
batch_size 256 Messages fetched per pull request
fetch_timeout_ms 5000 Timeout per pull request (ms)
nats_stall_timeout_ms 60000 If no progress for this long, subscription is recreated. 0 disables.

Authentication (use at most one):

Setting Description
nats_username + nats_password Basic username/password auth
nats_token Token-based auth
nats_creds_file Path to a NATS credentials file (JWT-based)
nats_nkey_seed NKey seed for NKey auth

TLS:

Setting Default Description
nats_tls false Enable TLS (auto-enabled if URL starts with tls:// or nats+tls://)
nats_ca_file '' CA certificate for server verification
nats_cert_file + nats_key_file '' Client certificate and key for mTLS (must be used together)

Reconnection:

Setting Default Description
reconnect_wait_ms 2000 Wait between reconnect attempts (ms)
max_reconnects 60 Max reconnect attempts. -1 = unlimited

Virtual Columns

Every NATS JetStream external stream automatically exposes these virtual columns (no need to define them in the schema):

Column Type Description
_tp_time datetime64(3, 'UTC') JetStream message timestamp
_tp_append_time datetime64(3, 'UTC') Same as _tp_time
_tp_process_time datetime64(3, 'UTC') When Proton processed the message
_tp_sn int64 JetStream stream sequence number
_tp_shard int32 Always 0 (NATS has no partitions)
_tp_message_key string The NATS subject the message was published on
_tp_message_headers map(string, string) NATS message headers
_nats_subject string Same as _tp_message_key (NATS-specific alias)
_nats_timestamp datetime64(3, 'UTC') JetStream timestamp
SELECT _tp_sn, _nats_subject, _tp_message_headers, order_id, amount
FROM table(orders_stream);

Reading (SELECT)

Historical / bounded read — fetches all existing messages and returns:

SELECT * FROM table(nats_stream);

Streaming / unbounded read — runs continuously, consuming new messages in real time:

SELECT * FROM nats_stream;

Subject filtering — use specific subjects (not wildcards) to read a subset:

-- Only US orders (stream ORDERS captures 'orders.>')
CREATE EXTERNAL STREAM us_orders (...)
SETTINGS type='nats_jetstream', subject='orders.us', stream_name='ORDERS', ...;

SELECT * FROM us_orders;

Writing (INSERT)

Write messages to NATS JetStream via INSERT. The subject setting must be a concrete subject (not a wildcard):

CREATE EXTERNAL STREAM orders_sink (
    order_id uint32, product string, amount uint32, region string
)
SETTINGS
    type = 'nats_jetstream', url = 'nats://localhost:4222',
    subject = 'orders.us', stream_name = 'ORDERS',
    data_format = 'JSONEachRow', one_message_per_row = true;

INSERT INTO orders_sink (order_id, product, amount, region)
VALUES (101, 'widget', 999, 'us');

Per-row subject routing — define _tp_message_key as a column to override the publish subject per row:

CREATE EXTERNAL STREAM orders_routed (
    order_id uint32, product string, region string,
    _tp_message_key string    -- overrides subject per row
)
SETTINGS type='nats_jetstream', subject='orders.us', stream_name='ORDERS',
         data_format='JSONEachRow', one_message_per_row=true;

INSERT INTO orders_routed (order_id, product, region, _tp_message_key)
VALUES (201, 'gadget', 'jp', 'orders.jp'),
       (202, 'widget', 'de', 'orders.de');

Custom NATS headers — define _tp_message_headers to attach headers to each message:

CREATE EXTERNAL STREAM orders_with_headers (
    order_id uint32, product string, amount uint32,
    _tp_message_headers map(string, string)
)
SETTINGS type='nats_jetstream', subject='orders.us', stream_name='ORDERS',
         data_format='JSONEachRow', one_message_per_row=true;

INSERT INTO orders_with_headers (order_id, product, amount, _tp_message_headers)
VALUES (301, 'premium', 5000, {'X-Priority':'high', 'X-Source':'proton'});

Materialized Views (Continuous ETL)

Pipe data from NATS JetStream into a Proton stream continuously:

CREATE STREAM order_archive (
    order_id uint32, product string, amount uint32, region string
);

CREATE MATERIALIZED VIEW orders_etl INTO order_archive AS
    SELECT order_id, product, amount, region
    FROM nats_orders_stream
    WHERE amount > 100;

-- Query the archive anytime:
SELECT * FROM table(order_archive) ORDER BY _tp_time;

The MV:

  • Creates a durable consumer on the JetStream stream
  • Supports checkpointing (survives Proton restarts)
  • Automatically resumes from the last checkpoint position

Deliver Policies

Policy Behavior
all Replay all messages from the beginning of the stream
last Start from the last message only
new Only consume messages published after the subscription is created
by_start_sequence Start from a specific stream sequence number (start_sequence setting)
by_start_time Start from a specific timestamp (start_time setting, Unix nanoseconds as string)
-- Start from sequence 100
CREATE EXTERNAL STREAM orders_from_100 (...)
SETTINGS type='nats_jetstream', ..., deliver_policy='by_start_sequence', start_sequence=100;

-- Start from a specific time (2026-01-01T00:00:00Z in nanoseconds)
CREATE EXTERNAL STREAM orders_from_2026 (...)
SETTINGS type='nats_jetstream', ..., deliver_policy='by_start_time', start_time='1767225600000000000';

Durable Consumers

When durable = true (default), the consumer position is persisted on the NATS server. If the query is restarted, it resumes from where it left off:

CREATE EXTERNAL STREAM orders_durable (...)
SETTINGS type='nats_jetstream', ...,
    durable = true,
    consumer_name = 'my-app-consumer',
    ack_policy = 'explicit';

-- First run: consumes all existing messages
SELECT * FROM orders_durable;

-- Second run: only new messages (position is remembered by NATS)
SELECT * FROM orders_durable;

Comparison with Kafka External Stream

Feature Kafka NATS JetStream
Type setting type = 'kafka' type = 'nats_jetstream'
Server setting brokers url
Topic / Subject topic subject (supports wildcards *, >)
Partitions Yes (shards) No (_tp_shard always 0)
Message key _tp_message_key (bytes) _tp_message_key (string = NATS subject)
Headers _tp_message_headers _tp_message_headers
Consumer groups Kafka consumer groups NATS durable consumers (consumer_name)
Offset control shards / seek_to deliver_policy / start_sequence
Auth SASL, SSL Username/password, token, NKey, creds file, TLS/mTLS
Publish model Async Sync (with JetStream ack)

Limitations & Notes

  • The JetStream stream must already exist on the NATS server. Proton does not create it.
  • Wildcard subjects (orders.>, orders.*) work for reading but not for writing. Use a concrete subject for INSERT.
  • _tp_message_key for NATS is always a string (NATS subjects are strings), unlike Kafka where it can be bytes.
  • _tp_shard is always 0 since NATS JetStream doesn't have a partition concept.
  • When defining _tp_message_key, _tp_time, or _tp_message_headers as physical columns, one_message_per_row is forced to true.

- Implement NATSJetstream storage engine with full read/write support
- Pull-based subscription with configurable batch size and fetch timeout
- Durable consumers with automatic offset tracking
- Three ACK policies: none, all, explicit (at-least-once delivery)
- Five deliver policies: all, last, new, by_start_sequence, by_start_time
- Five auth methods: user/pass, token, NKey, credentials file, mTLS
- TLS with optional CA cert and client certificates
- Virtual columns: _nats_subject, _nats_timestamp, _tp_message_key, etc.
- Dynamic subject routing via _tp_message_key on write path
- Header support via _tp_message_headers Map(String,String)
- Stall detection with automatic subscription recovery
- Checkpointing via durable consumer state for MV recovery
- Fix USE_NATSIO to USE_NATS macro mismatch in config.h.in
- Fix natsMsg_GetTimestamp to natsMsg_GetMetaData
- Fix stream_info State NumSubjects to State.Subjects Count
- Fix free(keys) to nats_Free for correct allocator usage
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Feb 10, 2026

CLA assistant check
All committers have signed the CLA.

@SBALAVIGNESH123 SBALAVIGNESH123 force-pushed the feat/nats-jetstream-connector branch from 93e1489 to 6e0cf0c Compare February 11, 2026 04:30
@SBALAVIGNESH123 SBALAVIGNESH123 force-pushed the feat/nats-jetstream-connector branch from 3a55ca9 to f404d07 Compare February 11, 2026 04:37
@chenziliang
Copy link
Copy Markdown
Collaborator

@yuzifeng1984 may you help review ?

@timeplus-io timeplus-io deleted a comment from chatgpt-codex-connector bot Feb 25, 2026
@yuzifeng1984
Copy link
Copy Markdown
Collaborator

yuzifeng1984 commented Feb 25, 2026

Looks nats lib is not included in proton. I will port that first. @chenziliang

…rror codes and offset bugs

- Remove AI-generated thinking comments in by_start_time handler
- Replace non-existent CANNOT_CONNECT_NATS with CANNOT_CONNECT_SERVER
- Fix getHeaders() crash when processing first row (row == 0)
- Fix column position shifting bug in consume() by erasing in reverse order
- Simplify consume() to single unified code path
@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor Author

Hey @yuzifeng1984, thanks for picking this up and working on porting the NATS lib! I went ahead and pushed a cleanup commit while waiting — fixed error codes to use CANNOT_CONNECT_SERVER for consistency with Kafka/Pulsar connectors, caught a boundary bug in header extraction on the first row, and simplified column position handling in the Sink's consume method.On the build side, everything is gated behind #if USE_NATS (updated from USE_NATSIO in config.h.in), and CMake expects ch_contrib::nats — same pattern as rdkafka. Ready whenever you are!

@yuzifeng1984
Copy link
Copy Markdown
Collaborator

yuzifeng1984 commented Feb 26, 2026

Hi @SBALAVIGNESH123 , I updated proton submodule to include nats lib and submitted some changes for compilation.

Change summary:

  1. Change macro name to USE_NATSIO for consistency and easy future maintenance
  2. All external streams are using the same settings. Remove one_message_per_row for it is already defined in Kafka settings. (maybe need update the description)
  3. I did not find nats_Free function. It looks natsMsgHeader_Keys just allocate the array. So changed to call free().
  4. jsFetchRequest has no MaxWait member. I changed it to Expires.

I am still looking at the PR code changes.

@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor Author

Hey @yuzifeng1984, thanks a lot for taking the time to port the nats lib and fix the compile issues — really appreciate it!
All your changes make sense to me. The USE_NATSIO rename is cleaner, free() instead of nats_Free() is the right call since it's just a plain array alloc, and Expires over MaxWait — my bad on that one. I'll update the PR description for the one_message_per_row bit too.Let me know if anything comes up during the rest of your review, happy to make changes!

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds a production-grade NATS JetStream connector to Proton, bringing it to feature parity with the existing Kafka external stream implementation. The connector enables bidirectional data flow with JetStream, supporting pull-based subscriptions with configurable batch sizes, durable consumers that survive restarts, and three acknowledgement policies (none, all, explicit) for at-least-once delivery semantics.

Changes:

  • Adds full NATS JetStream support with five authentication methods (username/password, token, NKey, credentials file, mTLS) and auto-detected TLS
  • Implements dynamic subject routing via _tp_message_key and header propagation through _tp_message_headers
  • Provides stall detection with automatic subscription recreation and checkpointing that serializes subject, consumer name, and sequence number for materialized view recovery

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/Storages/ExternalStream/StorageExternalStream.cpp Integration of NATS JetStream into the external stream factory
src/Storages/ExternalStream/ExternalStreamTypes.h Adds NATS_JETSTREAM stream type constant
src/Storages/ExternalStream/ExternalStreamSettings.h Defines 24 NATS-specific configuration settings
src/Storages/ExternalStream/ExternalStreamCounter.h Explicitly includes map header (was transitively included)
src/Processors/ProcessorID.h Adds processor IDs for NATS source and sink
src/Storages/ExternalStream/CMakeLists.txt Links NATS client library to external_stream target
src/Storages/ExternalStream/NATSJetstream/NATSJetstream.h Main storage class header defining the NATS JetStream external stream interface
src/Storages/ExternalStream/NATSJetstream/NATSJetstream.cpp Implementation of storage class with connection management, settings validation, and authentication
src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSource.h Source processor header with stall detection support
src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSource.cpp Source processor implementation with checkpointing and recovery
src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSink.h Sink processor header for writing to JetStream
src/Storages/ExternalStream/NATSJetstream/NATSJetstreamSink.cpp Sink processor implementation with synchronous publishing

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +435 to +441
if (status != NATS_OK)
throw Exception(
ErrorCodes::CANNOT_RECEIVE_MESSAGE,
"Failed to fetch messages from NATS JetStream: subject='{}' stream='{}': {}",
current_subject,
storage->getStreamName(),
natsStatus_GetText(status));
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical resource leak: If natsSubscription_FetchRequest returns an error other than NATS_TIMEOUT or NATS_NOT_FOUND, the exception is thrown without calling natsMsgList_Destroy on msg_list. The msg_list was initialized and may contain messages allocated by the NATS library that need to be cleaned up. Add natsMsgList_Destroy before throwing the exception to prevent the leak.

Suggested change
if (status != NATS_OK)
throw Exception(
ErrorCodes::CANNOT_RECEIVE_MESSAGE,
"Failed to fetch messages from NATS JetStream: subject='{}' stream='{}': {}",
current_subject,
storage->getStreamName(),
natsStatus_GetText(status));
if (status != NATS_OK)
{
natsMsgList_Destroy(&msg_list);
throw Exception(
ErrorCodes::CANNOT_RECEIVE_MESSAGE,
"Failed to fetch messages from NATS JetStream: subject='{}' stream='{}': {}",
current_subject,
storage->getStreamName(),
natsStatus_GetText(status));
}

Copilot uses AI. Check for mistakes.
}

/// The keys array must be freed
free(static_cast<void *>(keys));
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary and potentially unsafe cast: The free function expects a void pointer, so the explicit static_cast to void* is redundant and adds no value. Since keys is already a const char**, you can directly call free(keys) or free(const_cast<void*>(static_cast<const void*>(keys))) if const-correctness is a concern. The current cast discards const which could hide issues.

Suggested change
free(static_cast<void *>(keys));
free(keys);

Copilot uses AI. Check for mistakes.
Comment on lines +181 to +203
virtual_col_value_functions[pos] = [](natsMsg * msg) -> Field {
Map result;

const char ** keys = nullptr;
int num_keys = 0;
natsStatus status = natsMsgHeader_Keys(msg, &keys, &num_keys);

if (status != NATS_OK || !keys || num_keys == 0)
return result;

result.reserve(num_keys);
for (int i = 0; i < num_keys; ++i)
{
const char * value = nullptr;
if (natsMsgHeader_Get(msg, keys[i], &value) == NATS_OK && value)
result.push_back(Tuple{String(keys[i]), String(value)});
else
result.push_back(Tuple{String(keys[i]), String("null")});
}

/// The keys array must be freed
free(static_cast<void *>(keys));
return result;
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential issue with null header handling: When natsMsgHeader_Keys returns status != NATS_OK, the keys pointer may not be initialized to a valid pointer by the NATS library. The code currently checks 'if (status != NATS_OK || !keys || num_keys == 0)' and returns early, which is correct. However, if status is NATS_OK but keys is still nullptr (edge case), the free() call at line 202 would never be reached. Consider adding an assertion or explicit nullptr check before the free() call for defensive programming, or use a SCOPE_EXIT pattern to ensure the memory is always freed when keys is non-null.

Copilot uses AI. Check for mistakes.
Comment on lines +369 to +373
jsCtx * NATSJetstream::getJetStreamContext()
{
std::lock_guard lock(connection_mutex);
return js_context;
}
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential thread safety issue: The getJetStreamContext method returns a raw pointer to js_context while holding the lock, but the lock is released immediately after returning. If another thread calls disconnect() or connect() after the lock is released but before the caller uses the returned js_context pointer, it could lead to use-after-free. The js_context could be destroyed via jsCtx_Destroy at line 357. Consider either documenting that callers must ensure the storage object remains alive, or using shared_ptr/weak_ptr with custom deleters for the NATS resources, or ensuring all uses of the context happen while the object is in a stable state (e.g., between startup and shutdown).

Copilot uses AI. Check for mistakes.
Comment on lines +381 to +466
natsSubscription * NATSJetstream::createPullSubscription(
const String & consumer_name_override,
const ContextPtr & /*context*/)
{
jsSubOptions sub_opts;
jsSubOptions_Init(&sub_opts);
sub_opts.Stream = getStreamName().c_str();

/// Configure consumer
jsConsumerConfig consumer_cfg;
jsConsumerConfig_Init(&consumer_cfg);

consumer_cfg.Name = consumer_name_override.c_str();
if (isDurable())
consumer_cfg.Durable = consumer_name_override.c_str();

/// Ack policy
const auto & ack = settings->ack_policy.value;
if (ack == "none")
consumer_cfg.AckPolicy = js_AckNone;
else if (ack == "all")
consumer_cfg.AckPolicy = js_AckAll;
else
consumer_cfg.AckPolicy = js_AckExplicit;

/// Deliver policy
const auto & deliver = settings->deliver_policy.value;
if (deliver == "all")
consumer_cfg.DeliverPolicy = js_DeliverAll;
else if (deliver == "last")
consumer_cfg.DeliverPolicy = js_DeliverLast;
else if (deliver == "new")
consumer_cfg.DeliverPolicy = js_DeliverNew;
else if (deliver == "by_start_sequence")
{
consumer_cfg.DeliverPolicy = js_DeliverByStartSequence;
consumer_cfg.OptStartSeq = settings->start_sequence.value;
}
else if (deliver == "by_start_time")
{
consumer_cfg.DeliverPolicy = js_DeliverByStartTime;

/// start_time is expected as a Unix timestamp in nanoseconds (int64 in string form).
/// Example: "1700000000000000000" for 2023-11-14T22:13:20Z
const auto & time_str = settings->start_time.value;
if (!time_str.empty())
{
try
{
consumer_cfg.OptStartTime = std::stoll(time_str);
}
catch (const std::exception &)
{
throw Exception(
ErrorCodes::INVALID_SETTING_VALUE,
"NATS JetStream: 'start_time' must be a Unix timestamp in nanoseconds, got '{}'",
time_str);
}
}
}

consumer_cfg.MaxAckPending = static_cast<int64_t>(settings->max_ack_pending.value);

sub_opts.Config = consumer_cfg;

natsSubscription * subscription = nullptr;
natsStatus status = js_PullSubscribe(
&subscription,
js_context,
getSubject().c_str(),
consumer_name_override.c_str(),
nullptr,
&sub_opts,
nullptr);

if (status != NATS_OK)
throw Exception(
ErrorCodes::CANNOT_CONNECT_SERVER,
"Failed to create JetStream pull subscription: subject='{}' stream='{}' consumer='{}': {}",
getSubject(),
getStreamName(),
consumer_name_override,
natsStatus_GetText(status));

return subscription;
}
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread safety issue: createPullSubscription accesses js_context at line 449 without holding connection_mutex. If another thread calls disconnect() concurrently, js_context could be destroyed via jsCtx_Destroy while this method is using it, leading to use-after-free. This method should either acquire connection_mutex before accessing js_context, or document that it must only be called while the storage is in a stable state (between startup and shutdown).

Copilot uses AI. Check for mistakes.
@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor Author

Thanks @Copilot for catching these! Addressed all the comments in the latest commit (50ca287). I added natsMsgList_Destroy(&msg_list) before the throw in the FetchRequest error path to fix the resource leak, simplified the free(static_cast<void*>(keys)) to a plain free(keys) since the cast was redundant, wrapped the js_PullSubscribe call in a scoped connection_mutex lock inside createPullSubscription() to prevent a potential use-after-free if disconnect()
is called concurrently, and added a doc comment to getJetStreamContext() documenting the raw pointer lifetime expectation for callers.

@yuzifeng1984
Copy link
Copy Markdown
Collaborator

Fork PR can not get repository secrets. Will run CI tests after merge.

@yuzifeng1984
Copy link
Copy Markdown
Collaborator

I merged the PR.

Thanks @SBALAVIGNESH123 !

@yuzifeng1984 yuzifeng1984 merged commit 6bd8085 into timeplus-io:develop Feb 27, 2026
17 of 21 checks passed
@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor Author

Thanks for the explanation! Totally makes sense — that's a standard GitHub limitation for fork PRs. Happy to address any issues that come up in post-merge CI quickly. Looking forward to getting this in!

@gangtao
Copy link
Copy Markdown
Contributor

gangtao commented Feb 27, 2026

@SBALAVIGNESH123 thanks for your contribution, NATs connection is a very popular streaming source/sink.

@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor Author

Thank you! Really glad to contribute — NATS JetStream is such a great fit for Proton given its low latency and lightweight footprint. Happy to help maintain it and address any issues that come up post-merge. Looking forward to seeing how the community uses it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for NATS Jetstream

7 participants