Skip to content

Feat/issue 1093 python external stream#1095

Merged
yokofly merged 24 commits intodevelopfrom
feat/issue-1093-python-external-stream
Jan 28, 2026
Merged

Feat/issue 1093 python external stream#1095
yokofly merged 24 commits intodevelopfrom
feat/issue-1093-python-external-stream

Conversation

@yokofly
Copy link
Copy Markdown
Collaborator

@yokofly yokofly commented Jan 27, 2026

Summary

Add type='python' external streams with:

  • Python read (batch return or streaming generator/iterator)
  • Python write on INSERT via a sink function
  • python_table() (alias python_pipe) to apply a Python external stream over an input source
  • python() (alias python_query) to query a metastore Python UDF returning TABLE(...)

Result shape rules (must-follow)

Let the external stream schema have N columns:

  • N > 1: a row must be a tuple/list of length N
    • Batch: return [(c1, c2, ...), ...]
    • Streaming: yield (c1, c2, ...) or yield [(c1, c2, ...), ...]
  • N == 1: scalars are allowed and normalized
    • Batch: return [v1, v2, ...] or return [(v1,), ...]
    • Streaming: yield v or yield [v1, v2, ...] (normalized)

Unsupported: coroutine / async generator results.

Examples

1) Basic read + write


CREATE EXTERNAL STREAM py_basic_stream(value int32)
AS $$
def py_basic_read():
    return [1, 2]
def py_basic_sink(value):
    # value is a list for the single column
    if value != [42]:
        raise Exception(f"Unexpected input: {value}")
$$
SETTINGS type='python', read_function_name='py_basic_read', write_function_name='py_basic_sink';

SELECT * FROM py_basic_stream;
INSERT INTO py_basic_stream VALUES (42);

2) Streaming generator (multi-column, yield tuple rows)

CREATE EXTERNAL STREAM py_gen_stream
(
  a int32,
  b string
)
AS $$
import time

def read_py_gen_stream():
    n = 0
    while True:
        # yield a batch of rows; each row is a tuple with 2 elements (matches the schema)
        yield [
            (n, f"v{n}"),
            (n + 1, f"v{n+1}"),
        ]
        n += 2
        time.sleep(0.1)
$$
SETTINGS type='python', mode='streaming', read_function_name='read_py_gen_stream';

SELECT * FROM py_gen_stream LIMIT 10;

3) python_table() transform over an input source

create random stream input_stream( i int, j int) settings eps=10;

CREATE EXTERNAL STREAM py_sum(sum int32)
AS $$
def py_sum(col1, col2):
    return [c1 + c2 for c1, c2 in zip(col1, col2)]
$$
SETTINGS type='python';

SELECT *
  FROM python_table(py_sum, (SELECT i, j FROM input_stream), i, j)
  ORDER BY sum;
Example to show real time hack-news posts

CREATE EXTERNAL STREAM hn_latest
(
    id uint64,
    type string,
    by string,
    title string,
    url string,
    received_at datetime64(3),
    raw_json string
)
AS $$
import json
import time
import urllib.request
from datetime import datetime, timezone

MAX_RETRIES = 3
BACKOFF_SEC = 0.5
LOOKBACK = 50
BATCH_SIZE = 20
POLL_SEC = 2.0
def get_json(url, timeout=5):
    last_err = None
    for attempt in range(MAX_RETRIES):
        try:
            with urllib.request.urlopen(url, timeout=timeout) as resp:
                return json.loads(resp.read().decode("utf-8"))
        except Exception as e:
            last_err = e
            time.sleep(BACKOFF_SEC * (2 ** attempt))
    raise last_err
def read_hn_latest():
    base = "https://hacker-news.firebaseio.com/v0"
    next_id = 0
    while True:
        try:
            max_id = int(get_json(f"{base}/maxitem.json"))
            if next_id == 0:
                next_id = max(0, max_id - LOOKBACK)
            end_id = min(max_id, next_id + BATCH_SIZE)
            rows = []
            for item_id in range(next_id + 1, end_id + 1):
                item = get_json(f"{base}/item/{item_id}.json") or {}
                rows.append((
                    int(item.get("id") or item_id),
                    str(item.get("type") or ""),
                    str(item.get("by") or ""),
                    str(item.get("title") or ""),
                    str(item.get("url") or ""),
                    datetime.now(timezone.utc),
                    json.dumps(item, ensure_ascii=False),
                ))
            next_id = end_id
            if rows:
                yield rows
            else:
                time.sleep(POLL_SEC)
        except Exception as e:
            # Keep the stream alive; surface error as a synthetic row
            yield [(0, "error", "", str(e), "", datetime.now(timezone.utc), json.dumps({"error": str(e)}))]
            time.sleep(POLL_SEC)
$$
SETTINGS type='python', mode='streaming', read_function_name='read_hn_latest';

SELECT * FROM hn_latest LIMIT 10;


Compatibility / Notes

  • No checkpointing is supported for Python external streams (stateful checkpoint/restore is not available).

@yokofly yokofly self-assigned this Jan 27, 2026
Copy link
Copy Markdown
Contributor

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f975c04950

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@yokofly
Copy link
Copy Markdown
Collaborator Author

yokofly commented Jan 28, 2026

@codex review

Copy link
Copy Markdown
Contributor

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3dd5602f0a

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@yokofly
Copy link
Copy Markdown
Collaborator Author

yokofly commented Jan 28, 2026

@codex review

Copy link
Copy Markdown
Contributor

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e0376d4a10

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

- Preserve row count for zero-column projections in PythonTableTransform.

- Skip empty generator batches in PythonStreamingSource instead of terminating.

- Add regression tests for both cases.
@yokofly
Copy link
Copy Markdown
Collaborator Author

yokofly commented Jan 28, 2026

@codex review

Copy link
Copy Markdown
Contributor

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

if (TableFunctionFactory::instance().isSupportSubqueryTableFunctionName(func.name) && func.arguments->children.size() > 0)
{
auto & ast = func.arguments->children[0];
if (const auto * identifier = ast->as<ASTIdentifier>())
{
if (identifier->isShort())

P2 Badge Rewrite CTEs for python_table source argument

The new WITH-subquery rewrite only substitutes func.arguments->children[0], but python_table expects the source in its second argument (args[1] in TableFunctionPythonTable::parseArguments). As a result, a CTE alias used as the source (e.g., WITH src AS (...) SELECT * FROM python_table(py_stream, src, col1)) is left as an identifier and later resolved as a real table, producing UNKNOWN_TABLE errors. Consider extending the rewrite to handle the correct argument position for python_table (or making the position configurable per table function).

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@yokofly yokofly force-pushed the feat/issue-1093-python-external-stream branch from ee04b12 to 18a33c6 Compare January 28, 2026 06:46
@yokofly
Copy link
Copy Markdown
Collaborator Author

yokofly commented Jan 28, 2026

@codex review

Copy link
Copy Markdown
Contributor

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 18a33c65f7

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@yokofly yokofly marked this pull request as draft January 28, 2026 08:49
@yokofly
Copy link
Copy Markdown
Collaborator Author

yokofly commented Jan 28, 2026

@codex review

Copy link
Copy Markdown
Contributor

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2bf00d9a4f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@yokofly
Copy link
Copy Markdown
Collaborator Author

yokofly commented Jan 28, 2026

@codex review

@chatgpt-codex-connector
Copy link
Copy Markdown
Contributor

Codex Review: Didn't find any major issues. Breezy!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@yokofly yokofly marked this pull request as ready for review January 28, 2026 09:58
Copy link
Copy Markdown
Contributor

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5254928172

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +283 to +287
const Py_ssize_t rows = PyList_Size(normalized.get());
if (rows <= 0)
continue;

return Chunk(Columns{}, static_cast<UInt64>(rows));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Validate row arity even when no columns are requested

When the output header has zero columns (e.g., SELECT count()), the streaming path returns a chunk based only on PyList_Size without checking that each row matches the declared tuple schema. If a Python generator yields rows with missing/extra elements, count() will still succeed and report a row count while SELECT * would throw, leading to inconsistent results and masking schema errors. Consider validating row shapes in this zero‑column path (or reusing the conversion routine) before emitting the count.

Useful? React with 👍 / 👎.

@yokofly
Copy link
Copy Markdown
Collaborator Author

yokofly commented Jan 28, 2026

Addressed review r2735903551: even when output header has 0 columns (e.g. count-only), PythonStreamingSource now validates each row is tuple/list and its arity matches the declared tuple schema before emitting the row-count chunk. Added UT: CPythonTest.PythonStreamingSourceNoColumnsRejectsWrongArity.

@yokofly yokofly merged commit 69a282d into develop Jan 28, 2026
8 of 11 checks passed
@yokofly yokofly deleted the feat/issue-1093-python-external-stream branch January 28, 2026 12:52
@yokofly yokofly linked an issue Jan 28, 2026 that may be closed by this pull request
@yokofly yokofly restored the feat/issue-1093-python-external-stream branch January 28, 2026 14:33
@yokofly yokofly deleted the feat/issue-1093-python-external-stream branch February 10, 2026 07:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: python external stream as source

1 participant