-
Notifications
You must be signed in to change notification settings - Fork 481
Enhance ProcessingExporter system to support redaction of content in telemetry traces
#751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Introduced advanced processor addition with support for naming, positioning, and relative placement (before/after). - Implemented processor removal by name, position, or object, improving usability. - Added methods for pipeline locking, resetting, and processor retrieval by name. - Enhanced type compatibility checks during processor insertion to ensure seamless integration. - Introduced a new RedactionConfigMixin for telemetry exporters, allowing for configurable redaction of PII. - Added HeaderRedactionProcessor and SpanTaggingProcessor to facilitate span redaction and tagging based on attributes and headers. This update significantly improves the flexibility and usability of the processing pipeline, ensuring better observability and data privacy management. Signed-off-by: Matthew Penn <[email protected]>
WalkthroughAdds a pluggable redaction framework and span-tagging processor, integrates them into a new OTLP adapter exporter, enhances ProcessingExporter with named/positioned processor management, pipeline locking, and optional None-dropping, provides Pydantic config mixins, updates package exports/imports, expands tests, and fixes a docs URL. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant E as OTLPSpanHeaderRedactionAdapterExporter
participant P as ProcessingExporter
participant HR as HeaderRedactionProcessor
participant ST as SpanTaggingProcessor
participant B as Base OTLP Exporter
participant S as OTLP Backend
C->>E: export(span)
E->>P: _export_with_processing(span)
P->>HR: process(span, ctx)
alt force_redact or callback approves
HR-->>P: redacted span
else no redaction
HR-->>P: original span
end
P->>ST: process(span, ctx)
alt tag_key & tag_value present
ST-->>P: tagged span
else unchanged
ST-->>P: original span
end
Note right of P: If any processor returns None and drop_nones=true\nitem is dropped and export is skipped
alt span present
P->>B: export(processed span)
B-->>S: transmit OTLP
else dropped
P-->>E: skip export
end
E-->>C: return
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Assessment against linked issues
Out-of-scope changes
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
Improve RedactionProcessor/HeaderRedactionProcessor and SpanTaggingProcessor implementations Add configuration mixins to simplify plugin development Add comprehensive tests for these features Signed-off-by: Matthew Penn <[email protected]>
Signed-off-by: Matthew Penn <[email protected]>
Signed-off-by: Matthew Penn <[email protected]>
Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
docs/source/workflows/observe/index.md (2)
20-20: Use the official first mention: “NVIDIA NeMo Agent toolkit”Per docs style, the first reference must include “NVIDIA”.
Apply:
-The NeMo Agent toolkit uses a flexible, plugin-based observability system ... +The NVIDIA NeMo Agent toolkit uses a flexible, plugin-based observability system ...
31-34: Avoid hard-coded version numbers (“As of v1.2”)Docs must not pin versions. Rephrase without the version while retaining the env var guidance.
-### Compatibility with Previous Versions -As of v1.2, the span exporter exports attributes names prefixed with `nat` by default. In prior releases the attribute names were prefixed with `aiq`, to retain compatibility the `NAT_SPAN_PREFIX` environment variable can be set to `aiq`: +### Compatibility with Previous Versions +By default, span attributes are prefixed with `nat`. To retain compatibility with older releases that used `aiq`, set the `NAT_SPAN_PREFIX` environment variable to `aiq`: export NAT_SPAN_PREFIX=aiqdocs/source/tutorials/create-a-new-workflow.md (4)
21-21: Use the official first mention: “NVIDIA NeMo Agent toolkit”First reference must include “NVIDIA”.
-... included with the NeMo Agent toolkit. This tutorial ... +... included with the NVIDIA NeMo Agent toolkit. This tutorial ...
41-41: Remove TODO/issue placeholders from docsDocs should not include TODOs or issue-tracking notes.
-<!-- This section needs to be updated once #559 is completed --> ... -<!-- Remove this once #559 is completed -->Also applies to: 85-86
240-247: Don’t hard-code version numbers in examplesAvoid specific numbers; instruct users to substitute their current MAJOR.MINOR.
- Use the first two digits of the version number. For example, if the version is `1.1.0`, then the dependency would be `nvidia-nat[langchain]~=1.1`. + Use your current MAJOR.MINOR from `nat --version`. For example: ```toml dependencies = [ - "nvidia-nat[langchain]~=1.1", + "nvidia-nat[langchain]~=MAJOR.MINOR", # Add any additional dependencies your workflow needs ]--- `251-256`: **Avoid suggesting static version pinning** The project uses setuptools-scm; remove the alternate static version block to prevent copy-paste of hard-coded versions. ```diff [build-system] requires = ["setuptools", "setuptools_scm"] build-backend = "setuptools.build_meta" ... [tool.setuptools_scm] root = "../../../.." - ``` - - Alternately if we did not want to do this we would instead: - ```toml - [build-system] - build-backend = "setuptools.build_meta" - requires = ["setuptools >= 64"] - - [project] - name = "text_file_ingest" - version = "0.1.0"Also applies to: 259-263, 264-273 </blockquote></details> <details> <summary>src/nat/observability/exporter/processing_exporter.py (1)</summary><blockquote> `382-392`: **Make type contracts explicit: pipeline can return None when drop_nones is True.** Signatures/docs currently promise PipelineOutputT but can return None; this will trip pyright/mypy and confuses callers. ```diff - async def _process_pipeline(self, item: PipelineInputT) -> PipelineOutputT: + async def _process_pipeline(self, item: PipelineInputT) -> PipelineOutputT | None: @@ - Returns: - PipelineOutputT: The processed item after running through all processors + Returns: + PipelineOutputT | None: The processed item, or None if dropped by the pipeline- Returns: - Any: The processed item after running through all processors, or None if - drop_nones is True and any processor returned None + Returns: + Any: The processed item or None if drop_nones is True and any processor returned None- final_item: PipelineOutputT = await self._process_pipeline(item) + final_item = await self._process_pipeline(item) # type: PipelineOutputT | NoneAlso applies to: 400-403, 483-487
🧹 Nitpick comments (36)
docs/source/workflows/observe/index.md (3)
113-114: Consistent “OTel” capitalizationUse “OTel” (OpenTelemetry) consistently in link text and titles.
-- **[OpenTelemetry Collector](https://opentelemetry.io/docs/collector/)** - See [Observing with OTel Collector](./observe-workflow-with-otel-collector.md) +- **[OpenTelemetry Collector](https://opentelemetry.io/docs/collector/)** - See [Observing with OTel Collector](./observe-workflow-with-otel-collector.md)-Observing with OTEL Collector <./observe-workflow-with-otel-collector.md> +Observing with OTel Collector <./observe-workflow-with-otel-collector.md>Also applies to: 161-161
150-150: Brand spelling: “LlamaIndex”Correct brand capitalization and spacing.
-... LLama Index, CrewAI, and Semantic Kernel frameworks. +... LlamaIndex, CrewAI, and Semantic Kernel frameworks.
138-147: Optional: Mention new redaction/tagging processorsGiven this PR adds header redaction and span tagging, add a brief note here with links to their docs to aid discoverability.
Happy to draft a short subsection and cross-links to the new processors.
docs/source/tutorials/create-a-new-workflow.md (2)
304-304: Grammar: “After completing the steps”Tighten the sentence.
-After completed, install the tool into the environment: +After completing the steps, install the tool into the environment:
312-312: Fix prompt grammar- --input "What does DOCA GPUNetIO to remove the CPU from the critical path?" + --input "What does DOCA GPUNetIO do to remove the CPU from the critical path?"src/nat/observability/mixin/redaction_config_mixin.py (2)
20-31: Forbid unknown fields and enable assignment validation on configs.To catch typos/misconfigurations early, set Pydantic config to
extra="forbid"andvalidate_assignment=True. Use one of the variants depending on Pydantic major version.Pydantic v2:
@@ -from pydantic import BaseModel -from pydantic import Field +from pydantic import BaseModel, Field +from pydantic import ConfigDict @@ class RedactionConfigMixin(BaseModel): @@ + # Pydantic v2 config + model_config = ConfigDict(extra="forbid", validate_assignment=True)Pydantic v1:
@@ class RedactionConfigMixin(BaseModel): @@ + class Config: + extra = "forbid" + validate_assignment = True
28-31: Revisit defaultredaction_attributes.Defaulting to ["input.value", "output.value", "metadata"] may surprise users if
redaction_enabled=Truewithout explicit attributes. Consider defaulting to[]and documenting recommended keys, or clearly documenting these defaults in user‑facing docs.src/nat/observability/mixin/tagging_config_mixin.py (2)
26-32: Optional: useStrEnumif you want direct string behavior.If you prefer avoiding
.valueat call sites,enum.StrEnum(py3.11+) lets members behave as strings. Current usage with.valueis fine; consider this only if it simplifies downstream code.Example:
from enum import StrEnum class PrivacyLevel(StrEnum): NONE = "none" LOW = "low" MEDIUM = "medium" HIGH = "high"
34-42: Forbid extras and validate assignment on Pydantic mixin.Mirror config hardening used elsewhere to catch config mistakes.
Pydantic v2:
@@ -from pydantic import BaseModel -from pydantic import Field +from pydantic import BaseModel, Field +from pydantic import ConfigDict @@ class TaggingConfigMixin(BaseModel, Generic[TagValueT]): @@ + # Pydantic v2 config + model_config = ConfigDict(extra="forbid", validate_assignment=True)Pydantic v1:
@@ class TaggingConfigMixin(BaseModel, Generic[TagValueT]): @@ + class Config: + extra = "forbid" + validate_assignment = Truesrc/nat/observability/processor/span_tagging_processor.py (2)
16-24: Remove unused logger imports.
loggingandloggerare unused and will trip ruff. Drop them or add debug logs.Apply:
-import logging @@ -from nat.utils.type_utils import override - -logger = logging.getLogger(__name__) +from nat.utils.type_utils import override
40-48: Normalizetag_keyinput to avoid whitespace‑only keys.Strip whitespace when provided to prevent setting attributes like
"nat. "unintentionally.Apply:
@@ - def __init__(self, tag_key: str | None = None, tag_value: str | None = None, span_prefix: str | None = None): - self.tag_key = tag_key + def __init__(self, tag_key: str | None = None, tag_value: str | None = None, span_prefix: str | None = None): + self.tag_key = tag_key.strip() if isinstance(tag_key, str) else tag_key self.tag_value = tag_value @@ - self._span_prefix = span_prefix + self._span_prefix = span_prefixsrc/nat/observability/processor/redaction_processor.py (1)
16-29: Remove unused logger import/variable.
logging/loggerare unused; keeps ruff happy.Apply:
-import logging @@ -logger = logging.getLogger(__name__)packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py (6)
213-217: Avoid brittlecall_count == 4assertion.The parent exporter may add processors in the future, making this fragile. Assert presence/ordering of your processors instead.
Apply:
- # Verify add_processor was called 4 times total: - # - 2 from parent OtelSpanExporter (SpanToOtelProcessor, OtelSpanBatchProcessor) - # - 2 from our class (HeaderRedactionProcessor, SpanTaggingProcessor) - assert mock_add_processor.call_count == 4 + # Verify our processors are present with expected positions. + assert any(kw.get("name") == "header_redaction" and kw.get("position") == 0 + for _, kw in mock_add_processor.call_args_list) + assert any(kw.get("name") == "span_privacy_tagging" and kw.get("position") == 1 + for _, kw in mock_add_processor.call_args_list)
218-236: Usecall.kwargsfor clarity when inspecting mock calls.Indexing into
callis less readable and easy to misinterpret. Prefercall.kwargs.Apply:
- redaction_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "header_redaction" - ] + redaction_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "header_redaction"] @@ - tagging_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "span_privacy_tagging" - ] + tagging_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "span_privacy_tagging"]
254-268: Same readability improvement for kwargs access.Apply:
- redaction_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "header_redaction" - ] + redaction_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "header_redaction"]
279-287: Same readability improvement for kwargs access (tagging).Apply:
- tagging_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "span_privacy_tagging" - ] + tagging_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "span_privacy_tagging"]Also applies to: 281-285
389-399: Tests rely on private attribute_processors.Accessing internals is brittle. Prefer verifying via
add_processorpatching (as above) or expose a minimal accessor on the exporter for tests (e.g.,get_processor(name|type)).Would you like a small helper added to the exporter for non‑private processor inspection to stabilize these tests?
Also applies to: 416-425, 460-468, 501-509
130-159: Avoid hard‑coded semantic version literals in examples.Guidelines discourage hard‑coding versions in code/docs. To avoid confusion with package versions, prefer placeholders like
"0.0.0"or"x.y.z", or constants.Apply:
- resource_attributes = {"service.name": "test-service", "service.version": "1.0"} + resource_attributes = {"service.name": "test-service", "service.version": "x.y.z"}- "service.version": "2.1.0", + "service.version": "x.y.z",Also applies to: 1119-1123
tests/nat/observability/processor/test_header_redaction_processor.py (5)
30-46: Align fixture naming with repo test guidelines.Guidelines require fixture_ prefix and name=.... Rename while preserving the public fixture name.
-@pytest.fixture -def sample_span(): +@pytest.fixture(name="sample_span") +def fixture_sample_span() -> Span:
48-57: Rename and declare fixture name.-@pytest.fixture -def mock_context_with_headers(): +@pytest.fixture(name="mock_context_with_headers") +def fixture_mock_context_with_headers() -> Context:
60-68: Rename and declare fixture name.-@pytest.fixture -def mock_context_no_headers(): +@pytest.fixture(name="mock_context_no_headers") +def fixture_mock_context_no_headers() -> Context:
467-541: Avoid testing private method; exercise cache via public API.Tests reach into _should_redact_impl; prefer repeated should_redact calls with a fixed header to validate caching to reduce coupling to internals.
352-371: Add a test for custom redaction_value.Covers non-default replacement token.
def test_redact_item_with_custom_redaction_value(sample_span): processor = HeaderRedactionProcessor(attributes=["user_id"], redaction_value="***") test_span = Span(name=sample_span.name, context=sample_span.context, parent=sample_span.parent, start_time=sample_span.start_time, end_time=sample_span.end_time, attributes=sample_span.attributes.copy(), events=sample_span.events) result = processor.redact_item(test_span) assert result.attributes["user_id"] == "***"packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (1)
56-71: Match default redaction token in example.Default is "[REDACTED]"; align the example for consistency.
- redaction_value="REDACTED", + redaction_value="[REDACTED]",src/nat/observability/processor/header_redaction_processor.py (3)
16-28: Remove unused logger to satisfy ruff.-import logging ... -logger = logging.getLogger(__name__)
64-81: Silence unused argument warning for item.Ruff may flag ARG002; explicitly discard item.
def should_redact(self, item: Span, context: Context) -> bool: """Determine if this span should be redacted based on header auth. @@ - # If force_redact is enabled, always redact regardless of other conditions + # If force_redact is enabled, always redact regardless of other conditions + # (item is unused in this strategy) + del item
87-93: Minor: drop explicit None default in Headers.get.Starlette returns None by default.
- auth_key = headers.get(self.header, None) + auth_key = headers.get(self.header)tests/nat/observability/processor/test_redaction_processor.py (2)
112-116: Fixture naming per guidelines.-@pytest.fixture -def mock_context(): +@pytest.fixture(name="mock_context") +def fixture_mock_context() -> Context:
118-131: Fixture naming per guidelines.-@pytest.fixture -def sample_span(): +@pytest.fixture(name="sample_span") +def fixture_sample_span() -> Span:tests/nat/observability/exporter/test_processing_exporter.py (3)
165-174: Align fixtures with repository testing guidelines (naming + explicit fixture name).Per coding guidelines, pytest fixtures should be named with fixture_ prefix and set name=... in the decorator.
-@pytest.fixture -def mock_context_state(): +@pytest.fixture(name="mock_context_state") +def fixture_mock_context_state(): @@ -@pytest.fixture -def processing_exporter(mock_context_state): +@pytest.fixture(name="processing_exporter") +def fixture_processing_exporter(mock_context_state):Also applies to: 176-180
972-993: Use a real coroutine in task-creation tests to avoid masking issues.Passing a Mock as “coroutine” works only because create_task is patched; use a no-op coroutine for realism.
- try: - # Use a mock coroutine that doesn't need to be awaited - mock_coro = Mock() + try: + # Use a real no-op coroutine + async def noop(): + return None + mock_coro = noop() @@ - with patch('asyncio.create_task') as mock_create_task: + with patch('asyncio.create_task') as mock_create_task: mock_task = Mock() mock_create_task.return_value = mock_task @@ - processing_exporter._create_export_task(mock_coro) + processing_exporter._create_export_task(mock_coro)- try: - # Use a mock coroutine that doesn't need to be awaited - mock_coro = Mock() + try: + # Use a real no-op coroutine + async def noop(): + return None + mock_coro = noop() @@ - processing_exporter._create_export_task(mock_coro) + processing_exporter._create_export_task(mock_coro)Also applies to: 1006-1023
352-373: Add coverage for unnamed insertion shifting name positions.Current tests validate name-position updates only when the inserted processor is named. Add a case inserting an unnamed processor to ensure existing name mappings shift correctly. I can open a follow-up PR if helpful.
Example test to add:
class TestProcessorNaming: async def test_insertion_without_name_updates_existing_name_positions(self, processing_exporter): p1 = MockProcessor("p1") p2 = MockBatchProcessor("p2") processing_exporter.add_processor(p1, name="first") processing_exporter.add_processor(p2, name="second") # Insert unnamed processor in the middle class IntToInt(Processor[int, int]): async def process(self, item: int) -> int: return item processing_exporter.add_processor(IntToInt(), position=1) assert processing_exporter._processor_names["first"] == 0 assert processing_exporter._processor_names["second"] == 2 # shifted by +1src/nat/observability/exporter/processing_exporter.py (2)
529-547: Optional: type-parameterize Coroutine and validate awaitable inputs.Parameterize Coroutine and (optionally) guard against non-coroutines to fail fast with a clearer error; adjust tests accordingly if you enable the guard.
-from collections.abc import Coroutine +from collections.abc import Coroutine +from typing import Any @@ - def _create_export_task(self, coro: Coroutine): + def _create_export_task(self, coro: Coroutine[Any, Any, Any]): """Create task with minimal overhead but proper tracking. @@ - try: + try: task = asyncio.create_task(coro)If you choose to add a guard:
import inspect if not inspect.isawaitable(coro): raise TypeError(f"{self.name}: expected coroutine, got {type(coro).__name__}")
556-566: Optional: tolerate sync shutdowns gracefully.Some processors may expose a synchronous shutdown. Collect only awaitables to avoid TypeErrors inside gather.
- for processor in getattr(self, '_processors', []): + for processor in getattr(self, '_processors', []): shutdown_method = getattr(processor, 'shutdown', None) if shutdown_method: logger.debug("Shutting down processor: %s", processor.__class__.__name__) - shutdown_tasks.append(shutdown_method()) + maybe_awaitable = shutdown_method() + if asyncio.isfuture(maybe_awaitable) or asyncio.iscoroutine(maybe_awaitable): + shutdown_tasks.append(maybe_awaitable)tests/nat/observability/processor/test_span_tagging_processor.py (2)
29-40: Align fixture naming with guidelines (fixture_ prefix + name=...).Keep the public fixture name stable while using a fixture_*-prefixed function.
-@pytest.fixture -def sample_span(): +@pytest.fixture(name="sample_span") +def fixture_sample_span():
16-26: Ensure async tests execute across environments.If pytest-asyncio isn’t configured in “auto” mode, mark async tests with @pytest.mark.asyncio or set module-level pytestmark. Verify CI config.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (14)
docs/source/tutorials/create-a-new-workflow.md(1 hunks)docs/source/workflows/observe/index.md(1 hunks)packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py(1 hunks)packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py(1 hunks)src/nat/observability/exporter/processing_exporter.py(6 hunks)src/nat/observability/mixin/redaction_config_mixin.py(1 hunks)src/nat/observability/mixin/tagging_config_mixin.py(1 hunks)src/nat/observability/processor/header_redaction_processor.py(1 hunks)src/nat/observability/processor/redaction_processor.py(1 hunks)src/nat/observability/processor/span_tagging_processor.py(1 hunks)tests/nat/observability/exporter/test_processing_exporter.py(13 hunks)tests/nat/observability/processor/test_header_redaction_processor.py(1 hunks)tests/nat/observability/processor/test_redaction_processor.py(1 hunks)tests/nat/observability/processor/test_span_tagging_processor.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (13)
docs/source/**/*.md
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
docs/source/**/*.md: Use the official naming: first use “NVIDIA NeMo Agent toolkit”; subsequent uses “NeMo Agent toolkit”; never use deprecated names in documentation
Documentation sources must be Markdown under docs/source; keep docs in sync and fix Sphinx errors/broken links
Documentation must be clear, comprehensive, free of TODO/FIXME/placeholders/offensive/outdated terms; fix spelling; adhere to Vale vocab allow/reject lists
Files:
docs/source/workflows/observe/index.mddocs/source/tutorials/create-a-new-workflow.md
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks
Files:
docs/source/workflows/observe/index.mdsrc/nat/observability/mixin/tagging_config_mixin.pysrc/nat/observability/processor/span_tagging_processor.pysrc/nat/observability/processor/header_redaction_processor.pydocs/source/tutorials/create-a-new-workflow.mdpackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pysrc/nat/observability/processor/redaction_processor.pytests/nat/observability/processor/test_span_tagging_processor.pysrc/nat/observability/mixin/redaction_config_mixin.pytests/nat/observability/processor/test_header_redaction_processor.pysrc/nat/observability/exporter/processing_exporter.pytests/nat/observability/processor/test_redaction_processor.pytests/nat/observability/exporter/test_processing_exporter.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm
Files:
docs/source/workflows/observe/index.mdsrc/nat/observability/mixin/tagging_config_mixin.pysrc/nat/observability/processor/span_tagging_processor.pysrc/nat/observability/processor/header_redaction_processor.pydocs/source/tutorials/create-a-new-workflow.mdpackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pysrc/nat/observability/processor/redaction_processor.pytests/nat/observability/processor/test_span_tagging_processor.pysrc/nat/observability/mixin/redaction_config_mixin.pytests/nat/observability/processor/test_header_redaction_processor.pysrc/nat/observability/exporter/processing_exporter.pytests/nat/observability/processor/test_redaction_processor.pytests/nat/observability/exporter/test_processing_exporter.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: pass- For Python exception handling, ensure proper stack trace preservation:
- When re-raising exceptions: use bare
raisestatements to maintain the original stack trace,
and uselogger.error()(notlogger.exception()) to avoid duplicate stack trace output.- When catching and logging exceptions without re-raising: always use
logger.exception()
to capture the full stack trace information.Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,
and should contain an Apache License 2.0 header comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
docs/source/workflows/observe/index.mdsrc/nat/observability/mixin/tagging_config_mixin.pysrc/nat/observability/processor/span_tagging_processor.pysrc/nat/observability/processor/header_redaction_processor.pydocs/source/tutorials/create-a-new-workflow.mdpackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pysrc/nat/observability/processor/redaction_processor.pytests/nat/observability/processor/test_span_tagging_processor.pysrc/nat/observability/mixin/redaction_config_mixin.pytests/nat/observability/processor/test_header_redaction_processor.pysrc/nat/observability/exporter/processing_exporter.pytests/nat/observability/processor/test_redaction_processor.pytests/nat/observability/exporter/test_processing_exporter.py
docs/source/**/*
⚙️ CodeRabbit configuration file
This directory contains the source code for the documentation. All documentation should be written in Markdown format. Any image files should be placed in the
docs/source/_staticdirectory.
Files:
docs/source/workflows/observe/index.mddocs/source/tutorials/create-a-new-workflow.md
src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
src/**/*.py: All importable Python code must live under src/
All public APIs in src/ require Python 3.11+ type hints on parameters and return values; prefer typing/collections.abc abstractions; use typing.Annotated when useful
Provide Google-style docstrings for every public module, class, function, and CLI command; first line concise with a period; surround code entities with backticks
Files:
src/nat/observability/mixin/tagging_config_mixin.pysrc/nat/observability/processor/span_tagging_processor.pysrc/nat/observability/processor/header_redaction_processor.pysrc/nat/observability/processor/redaction_processor.pysrc/nat/observability/mixin/redaction_config_mixin.pysrc/nat/observability/exporter/processing_exporter.py
src/nat/**/*
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Core functionality under src/nat should prioritize backward compatibility when changed
Files:
src/nat/observability/mixin/tagging_config_mixin.pysrc/nat/observability/processor/span_tagging_processor.pysrc/nat/observability/processor/header_redaction_processor.pysrc/nat/observability/processor/redaction_processor.pysrc/nat/observability/mixin/redaction_config_mixin.pysrc/nat/observability/exporter/processing_exporter.py
⚙️ CodeRabbit configuration file
This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.
Files:
src/nat/observability/mixin/tagging_config_mixin.pysrc/nat/observability/processor/span_tagging_processor.pysrc/nat/observability/processor/header_redaction_processor.pysrc/nat/observability/processor/redaction_processor.pysrc/nat/observability/mixin/redaction_config_mixin.pysrc/nat/observability/exporter/processing_exporter.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bareraiseand log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial
Files:
src/nat/observability/mixin/tagging_config_mixin.pysrc/nat/observability/processor/span_tagging_processor.pysrc/nat/observability/processor/header_redaction_processor.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pysrc/nat/observability/processor/redaction_processor.pytests/nat/observability/processor/test_span_tagging_processor.pysrc/nat/observability/mixin/redaction_config_mixin.pytests/nat/observability/processor/test_header_redaction_processor.pysrc/nat/observability/exporter/processing_exporter.pytests/nat/observability/processor/test_redaction_processor.pytests/nat/observability/exporter/test_processing_exporter.py
packages/*/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Packages containing Python code must have tests under packages//tests
Files:
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py
**/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock
Files:
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pytests/nat/observability/processor/test_span_tagging_processor.pytests/nat/observability/processor/test_header_redaction_processor.pytests/nat/observability/processor/test_redaction_processor.pytests/nat/observability/exporter/test_processing_exporter.py
packages/**/*
⚙️ CodeRabbit configuration file
packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain apyproject.tomlfile. - Thepyproject.tomlfile should declare a dependency onnvidia-nator another package with a name starting
withnvidia-nat-. This dependency should be declared using~=<version>, and the version should be a two
digit version (ex:~=1.0).
- Not all packages contain Python code, if they do they should also contain their own set of tests, in a
tests/directory at the same level as thepyproject.tomlfile.
Files:
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py
packages/*/src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
packages/*/src/**/*.py: All importable Python code in packages must live under packages//src/
All public APIs in packaged code require Python 3.11+ type hints; prefer typing/collections.abc; use typing.Annotated when useful
Provide Google-style docstrings for public APIs in packages; first line concise with a period; use backticks for code entities
Files:
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py
tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)
Files:
tests/nat/observability/processor/test_span_tagging_processor.pytests/nat/observability/processor/test_header_redaction_processor.pytests/nat/observability/processor/test_redaction_processor.pytests/nat/observability/exporter/test_processing_exporter.py
⚙️ CodeRabbit configuration file
tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using thetest_prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using thefixture_prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass
Files:
tests/nat/observability/processor/test_span_tagging_processor.pytests/nat/observability/processor/test_header_redaction_processor.pytests/nat/observability/processor/test_redaction_processor.pytests/nat/observability/exporter/test_processing_exporter.py
🧬 Code graph analysis (9)
src/nat/observability/processor/header_redaction_processor.py (2)
src/nat/builder/context.py (2)
Context(93-277)metadata(127-136)src/nat/observability/processor/redaction_processor.py (3)
SpanRedactionProcessor(75-77)should_redact(35-45)redact_item(48-57)
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py (6)
src/nat/builder/context.py (2)
ContextState(65-90)metadata(127-136)src/nat/observability/mixin/tagging_config_mixin.py (1)
PrivacyLevel(26-31)src/nat/observability/processor/header_redaction_processor.py (2)
HeaderRedactionProcessor(35-123)should_redact(65-93)src/nat/observability/processor/span_tagging_processor.py (2)
SpanTaggingProcessor(26-61)process(50-61)packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (1)
OTLPSpanHeaderRedactionAdapterExporter(28-144)src/nat/observability/processor/redaction_processor.py (2)
should_redact(35-45)process(60-72)
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (5)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/mixin/tagging_config_mixin.py (1)
PrivacyLevel(26-31)src/nat/observability/processor/header_redaction_processor.py (1)
HeaderRedactionProcessor(35-123)src/nat/observability/processor/span_tagging_processor.py (1)
SpanTaggingProcessor(26-61)src/nat/observability/exporter/processing_exporter.py (1)
add_processor(70-125)
src/nat/observability/processor/redaction_processor.py (3)
src/nat/builder/context.py (1)
Context(93-277)src/nat/observability/processor/header_redaction_processor.py (2)
should_redact(65-93)redact_item(110-123)tests/nat/observability/processor/test_redaction_processor.py (22)
should_redact(43-47)should_redact(63-67)should_redact(87-91)should_redact(157-158)should_redact(351-352)should_redact(374-375)should_redact(404-405)should_redact(510-512)should_redact(539-540)should_redact(574-576)should_redact(620-621)redact_item(49-53)redact_item(69-73)redact_item(93-109)redact_item(167-168)redact_item(354-355)redact_item(377-379)redact_item(407-410)redact_item(514-516)redact_item(542-553)redact_item(578-586)redact_item(623-625)
tests/nat/observability/processor/test_span_tagging_processor.py (1)
src/nat/observability/processor/span_tagging_processor.py (2)
SpanTaggingProcessor(26-61)process(50-61)
tests/nat/observability/processor/test_header_redaction_processor.py (3)
src/nat/builder/context.py (2)
Context(93-277)metadata(127-136)src/nat/observability/processor/header_redaction_processor.py (5)
HeaderRedactionProcessor(35-123)default_callback(30-32)should_redact(65-93)redact_item(110-123)_should_redact_impl(96-107)src/nat/observability/processor/redaction_processor.py (3)
should_redact(35-45)redact_item(48-57)process(60-72)
src/nat/observability/exporter/processing_exporter.py (6)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/exporter/base_exporter.py (2)
name(234-245)_pre_start(312-314)tests/nat/observability/exporter/test_processing_exporter.py (14)
input_class(740-742)process(43-49)process(61-67)process(77-79)process(94-95)process(106-110)process(124-129)process(391-392)process(431-432)process(453-454)process(500-501)process(744-745)process(912-913)process(1279-1282)src/nat/observability/processor/redaction_processor.py (1)
process(60-72)src/nat/observability/processor/span_tagging_processor.py (1)
process(50-61)src/nat/front_ends/fastapi/step_adaptor.py (1)
process(292-319)
tests/nat/observability/processor/test_redaction_processor.py (2)
src/nat/builder/context.py (1)
Context(93-277)src/nat/observability/processor/redaction_processor.py (5)
RedactionProcessor(31-72)SpanRedactionProcessor(75-77)should_redact(35-45)redact_item(48-57)process(60-72)
tests/nat/observability/exporter/test_processing_exporter.py (4)
src/nat/observability/processor/redaction_processor.py (1)
process(60-72)src/nat/observability/processor/span_tagging_processor.py (1)
process(50-61)src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/exporter/processing_exporter.py (11)
_pre_start(342-380)add_processor(70-125)remove_processor(127-179)clear_processors(181-185)reset_pipeline(187-204)get_processor_by_name(206-221)_process_pipeline(382-391)_export_with_processing(473-497)_continue_pipeline_after(438-471)_create_export_task(529-546)ProcessingExporter(39-571)
🔇 Additional comments (11)
docs/source/workflows/observe/index.md (1)
111-111: LGTM: Patronus link fixThe domain correction to https://www.patronus.ai/ looks good.
docs/source/tutorials/create-a-new-workflow.md (2)
201-201: LGTM: DOCA GPUNetIO link to archived docsThe archive target is explicit and stable; looks good.
135-140: Verify function name referenceConfirm the function is actually named
webquery_tool; if not, update to the correct identifier used in the example project.packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py (2)
21-24: Mark async tests for asyncio.Several tests are
async defbut the module lackspytest.mark.asyncio. Add a module‑level marker (or ensure project config enables auto mode).Apply:
@@ import pytest from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from starlette.datastructures import Headers +pytestmark = pytest.mark.asyncioIf your pytest config already sets asyncio/anyio auto mode, you can skip this.
990-1015: Align logging level with repo exception policy.Policy: when catching and not re‑raising, log with
logger.exception()to capture stack traces. The test assertslogger.error. Verify the implementation: if the exporter swallows exceptions, switch tologger.exception()and update the assertion; if it re‑raises elsewhere, keeperror.tests/nat/observability/processor/test_header_redaction_processor.py (2)
30-911: Solid coverage and edge cases.Broad, well-structured tests across init, decision logic, LRU, integration, and error paths. Nice use of Headers case-insensitivity and force overrides.
545-616: pytest-asyncio auto mode enabled
async tests will run without explicit@pytest.mark.asyncio(configured viaasyncio_mode = "auto"in pyproject.toml).tests/nat/observability/processor/test_redaction_processor.py (2)
181-255: Good abstraction tests for base processor.Clear checks for abstractness, state tracking, and multi-item behavior.
184-201: pytest-asyncio is already configured withasyncio_mode = "auto"in pyproject.toml, so explicit@pytest.mark.asynciomarkers aren’t needed.tests/nat/observability/exporter/test_processing_exporter.py (1)
283-346: LGTM on pipeline locking, advanced positioning, and drop_nones behavior.Good coverage of lock enforcement, before/after/position insertions, and None-dropping semantics (including logging assertions).
Also applies to: 380-407, 515-601
tests/nat/observability/processor/test_span_tagging_processor.py (1)
121-246: LGTM: Comprehensive SpanTaggingProcessor coverage.Solid validation of defaults, env var precedence/whitespace, conditional tagging, overwrites, Unicode/special chars, and type introspection. Tests read well and are deterministic.
Also applies to: 315-531
...idia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py
Show resolved
Hide resolved
...idia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py
Show resolved
Hide resolved
- Added OTLPSpanExporter and OTLPSpanAdapterExporter to the main OpenTelemetry module for easier access. - Updated privacy tag value in OTLPSpanHeaderRedactionAdapterExporter from SENSITIVE to HIGH for enhanced data privacy. - Fix type check order in ProcessingExporter - Adjusted import statements across various modules to reflect the new structure. - Added tests to ensure proper functionality of the updated exporters and redaction logic. Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.py (1)
151-154: Fix invalid port in unreachable-endpoint testPort
99999is out of range and may fail at URL parsing rather than exercising export error handling. Use a closed but valid local port (e.g.,1) or a non-resolvable host likeexample.invalid.- exporter = OTLPSpanAdapterExporter( - endpoint="http://127.0.0.1:99999/v1/traces", # Unreachable port - batch_size=1, - flush_interval=0.1) + exporter = OTLPSpanAdapterExporter( + endpoint="http://127.0.0.1:1/v1/traces", # Likely closed port to trigger connection error + batch_size=1, + flush_interval=0.1)
♻️ Duplicate comments (2)
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.py (1)
16-24: Re-export surface is correct and addresses prior feedback.
OTLPSpanHeaderRedactionAdapterExporteris now publicly exported alongside the existing exporters;__all__is updated accordingly. LGTM.packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (1)
60-71: Fix docstring example: wrong class name.Use the actual exported class name in the example.
- exporter = OTLPSpanRedactionAdapterExporter( + exporter = OTLPSpanHeaderRedactionAdapterExporter( endpoint="https://api.service.com/v1/traces", headers={"Authorization": "Bearer your-token"}, redaction_attributes=["user.email", "request.body"], redaction_header="x-user-id", redaction_callback=should_redact, redaction_value="REDACTED", privacy_tag_key="privacy.level", privacy_tag_value=PrivacyLevel.HIGH, batch_size=50, flush_interval=10.0 )
🧹 Nitpick comments (18)
tests/nat/observability/exporter/test_processing_exporter.py (6)
165-180: Align fixtures with repo guidelines (name=..., fixture_ prefix)Tests pass as-is, but guidelines require fixture functions to use the fixture_ prefix and set name=.... This keeps the external fixture names unchanged and improves consistency.
-@pytest.fixture -def mock_context_state(): +@pytest.fixture(name="mock_context_state") +def fixture_mock_context_state() -> ContextState: @@ -@pytest.fixture -def processing_exporter(mock_context_state): +@pytest.fixture(name="processing_exporter") +def fixture_processing_exporter(mock_context_state: ContextState) -> ConcreteProcessingExporter:
599-615: Reduce brittleness of log-text assertionsSeveral tests assert exact log strings. Consider asserting on substrings/patterns or using caplog.records with startswith/regex to tolerate benign wording changes.
Also applies to: 616-634, 1396-1414
1005-1026: Use a real coroutine instead of Mock for create_taskWhile patched, passing a real coroutine better mirrors runtime behavior and avoids type surprises if the patch changes.
- try: - # Use a mock coroutine that doesn't need to be awaited - mock_coro = Mock() + try: + async def _noop(): + return None + mock_coro = _noop()
1039-1056: Same here: prefer a real coroutine for error-path testKeeps tests resilient to changes in the create_task patching.
- try: - # Use a mock coroutine that doesn't need to be awaited - mock_coro = Mock() + try: + async def _noop(): + return None + mock_coro = _noop()
1124-1127: Create futures via running loop for shutdown-failure fakesUsing get_running_loop().create_future() ties futures to the active loop and avoids potential loop-association issues.
- def failing_shutdown(): - future = asyncio.Future() + def failing_shutdown(): + future = asyncio.get_running_loop().create_future() future.set_exception(RuntimeError("Shutdown failed")) return futureAlso applies to: 1179-1182
21-22: Optional: add module-level pytest.mark.asyncio
The project’s pyproject.toml already setsasyncio_mode = "auto", so all async tests run under pytest-asyncio by default; marking the module is optional.import pytest +pytestmark = pytest.mark.asyncio # optional under auto modepackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py (7)
213-237: Make call-arg assertions robust and avoid brittle total call count
- Replace tuple indexing on
callwithcall.args/call.kwargsfor clarity and forward-compat.- Avoid asserting exact total calls (parent may change its processor list). Assert presence and ordering of your processors instead.
- # Verify add_processor was called 4 times total: - # - 2 from parent OtelSpanExporter (SpanToOtelProcessor, OtelSpanBatchProcessor) - # - 2 from our class (HeaderRedactionProcessor, SpanTaggingProcessor) - assert mock_add_processor.call_count == 4 + # Verify our two processors were added with correct names and positions. # Find our redaction processor call (should have name="header_redaction") - redaction_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "header_redaction" - ] - assert len(redaction_calls) == 1 - redaction_call = redaction_calls[0] - assert redaction_call[1]["position"] == 0 - assert isinstance(redaction_call[0][0], HeaderRedactionProcessor) + redaction_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "header_redaction"] + assert redaction_calls, "header_redaction processor not added" + redaction_call = redaction_calls[0] + assert redaction_call.kwargs.get("position") == 0 + assert isinstance(redaction_call.args[0], HeaderRedactionProcessor) # Find our tagging processor call (should have name="span_privacy_tagging") - tagging_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "span_privacy_tagging" - ] - assert len(tagging_calls) == 1 - tagging_call = tagging_calls[0] - assert tagging_call[1]["position"] == 1 - assert isinstance(tagging_call[0][0], SpanTaggingProcessor) + tagging_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "span_privacy_tagging"] + assert tagging_calls, "span_privacy_tagging processor not added" + tagging_call = tagging_calls[0] + assert tagging_call.kwargs.get("position") == 1 + assert isinstance(tagging_call.args[0], SpanTaggingProcessor)
254-268: Use call.args/kwargs instead of tuple indexing for mocksImproves readability and avoids relying on private tuple structure.
- redaction_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "header_redaction" - ] - assert len(redaction_calls) == 1 - - header_processor = redaction_calls[0][0][0] + redaction_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "header_redaction"] + assert len(redaction_calls) == 1 + + header_processor = redaction_calls[0].args[0]
279-290: Use call.args/kwargs for tagging-processor assertionsAligns with recommended
unittest.mockAPI.- tagging_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "span_privacy_tagging" - ] + tagging_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "span_privacy_tagging"] assert len(tagging_calls) == 1 - tagging_processor = tagging_calls[0][0][0] + tagging_processor = tagging_calls[0].args[0] assert isinstance(tagging_processor, SpanTaggingProcessor) assert tagging_processor.tag_key == privacy_tag_key assert tagging_processor.tag_value == privacy_level.value # Should use .value
301-323: Avoid brittle total-call assertion and use call.args/kwargsKeep tests resilient to parent pipeline changes; continue to assert that your processors are present.
- # Should add 4 processors total (2 from parent + 2 from our class) - assert mock_add_processor.call_count == 4 + # Ensure both processors were added (exact total may vary depending on parent pipeline). # Find HeaderRedactionProcessor call - redaction_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "header_redaction" - ] + redaction_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "header_redaction"] assert len(redaction_calls) == 1 - header_processor = redaction_calls[0][0][0] + header_processor = redaction_calls[0].args[0] assert isinstance(header_processor, HeaderRedactionProcessor) # Find SpanTaggingProcessor call - tagging_calls = [ - call for call in mock_add_processor.call_args_list - if len(call) > 1 and call[1].get("name") == "span_privacy_tagging" - ] + tagging_calls = [c for c in mock_add_processor.call_args_list if c.kwargs.get("name") == "span_privacy_tagging"] assert len(tagging_calls) == 1 - tagging_processor = tagging_calls[0][0][0] + tagging_processor = tagging_calls[0].args[0] assert isinstance(tagging_processor, SpanTaggingProcessor) assert tagging_processor.tag_key is None assert tagging_processor.tag_value is None
334-343: Remove unused fixturesample_spanDefined but never used; drop to reduce test noise.
- @pytest.fixture - def sample_span(self): - """Create a sample span for testing.""" - return Span(name="test_span", - attributes={ - "user.email": "[email protected]", - "user.phone": "123-456-7890", - "request.id": "req_123", - "system.info": "safe_data" - })
389-399: Reduce reliance on private_processorsAccessing
exporter._processorsis brittle. Prefer patchingadd_processorand capturing constructed processors (as done elsewhere) to assert configuration, or expose a small accessor in the public API.
516-579: Fixture naming convention per repo guidelinesRepo style requires fixtures to use
fixture_prefix or setname=...in@pytest.fixture. Consider renaming fixtures (e.g.,fixture_basic_exporter_config) or addingname="basic_exporter_config"etc. Applies to fixtures across this module.packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.py (1)
48-66: Fixture naming convention per repo guidelinesRename fixtures to use
fixture_prefix or setname=...in the decorator for consistency (e.g.,fixture_basic_exporter_config,fixture_mock_context_state).packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.py (2)
22-38: Add module-level markers for asyncio and integrationThese are real HTTP integration tests; mark appropriately. Ensure
integrationis registered inpytest.inito avoid unknown-marker warnings.import pytest import pytest_httpserver from werkzeug import Request from werkzeug import Response from nat.builder.framework_enum import LLMFrameworkEnum @@ from nat.data_models.invocation_node import InvocationNode from nat.plugins.opentelemetry import OTLPSpanAdapterExporter + +pytestmark = [pytest.mark.asyncio, pytest.mark.integration]
40-48: Deduplicate helper across modules
create_test_intermediate_stepis duplicated in multiple test files. Consider moving to a shared fixture inconftest.py.packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.py (1)
45-45: Import path normalization to package root looks good.Shifting to
from nat.plugins.opentelemetry import OTLPSpanAdapterExporteraligns with the new public API surface. As a minor consistency polish with repo guidelines, consider:
- Adding explicit return type hints for these async exporter factories.
- Adding a short docstring to
langfuse_telemetry_exporterto match the others.Example annotations and docstring (non-diff, illustrative for langfuse — replicate for others):
from typing import AsyncIterator, TYPE_CHECKING if TYPE_CHECKING: from nat.plugins.opentelemetry import OtelSpanExporter @register_telemetry_exporter(config_type=LangfuseTelemetryExporter) async def langfuse_telemetry_exporter( config: LangfuseTelemetryExporter, builder: Builder ) -> AsyncIterator["OtelSpanExporter"]: """Create a Langfuse telemetry exporter.""" ...Also applies to: 81-81, 111-111, 146-146, 178-178
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (1)
16-26: Remove unused logger.
loggingandloggerare not used in this module.-import logging -from collections.abc import Callable - -from nat.builder.context import ContextState +from collections.abc import Callable + +from nat.builder.context import ContextState @@ -logger = logging.getLogger(__name__) -
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (8)
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.py(1 hunks)packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py(1 hunks)packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.py(5 hunks)packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.py(1 hunks)packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.py(1 hunks)packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py(1 hunks)src/nat/observability/exporter/processing_exporter.py(6 hunks)tests/nat/observability/exporter/test_processing_exporter.py(13 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/nat/observability/exporter/processing_exporter.py
🧰 Additional context used
📓 Path-based instructions (9)
packages/*/src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
packages/*/src/**/*.py: All importable Python code in packages must live under packages//src/
All public APIs in packaged code require Python 3.11+ type hints; prefer typing/collections.abc; use typing.Annotated when useful
Provide Google-style docstrings for public APIs in packages; first line concise with a period; use backticks for code entities
Files:
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bareraiseand log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial
Files:
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pytests/nat/observability/exporter/test_processing_exporter.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks
Files:
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pytests/nat/observability/exporter/test_processing_exporter.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm
Files:
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pytests/nat/observability/exporter/test_processing_exporter.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: pass- For Python exception handling, ensure proper stack trace preservation:
- When re-raising exceptions: use bare
raisestatements to maintain the original stack trace,
and uselogger.error()(notlogger.exception()) to avoid duplicate stack trace output.- When catching and logging exceptions without re-raising: always use
logger.exception()
to capture the full stack trace information.Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,
and should contain an Apache License 2.0 header comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pytests/nat/observability/exporter/test_processing_exporter.py
packages/**/*
⚙️ CodeRabbit configuration file
packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain apyproject.tomlfile. - Thepyproject.tomlfile should declare a dependency onnvidia-nator another package with a name starting
withnvidia-nat-. This dependency should be declared using~=<version>, and the version should be a two
digit version (ex:~=1.0).
- Not all packages contain Python code, if they do they should also contain their own set of tests, in a
tests/directory at the same level as thepyproject.tomlfile.
Files:
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/register.pypackages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py
packages/*/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Packages containing Python code must have tests under packages//tests
Files:
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py
**/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock
Files:
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.pypackages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.pytests/nat/observability/exporter/test_processing_exporter.py
tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)
Files:
tests/nat/observability/exporter/test_processing_exporter.py
⚙️ CodeRabbit configuration file
tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using thetest_prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using thefixture_prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass
Files:
tests/nat/observability/exporter/test_processing_exporter.py
🧬 Code graph analysis (4)
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (5)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/mixin/tagging_config_mixin.py (1)
PrivacyLevel(26-31)src/nat/observability/processor/header_redaction_processor.py (1)
HeaderRedactionProcessor(35-123)src/nat/observability/processor/span_tagging_processor.py (1)
SpanTaggingProcessor(26-61)src/nat/observability/exporter/processing_exporter.py (1)
add_processor(70-128)
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/__init__.py (1)
packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (1)
OTLPSpanHeaderRedactionAdapterExporter(28-144)
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py (6)
src/nat/builder/context.py (2)
ContextState(65-90)metadata(127-136)src/nat/observability/mixin/tagging_config_mixin.py (1)
PrivacyLevel(26-31)src/nat/observability/processor/header_redaction_processor.py (2)
HeaderRedactionProcessor(35-123)should_redact(65-93)src/nat/observability/processor/span_tagging_processor.py (2)
SpanTaggingProcessor(26-61)process(50-61)packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (1)
OTLPSpanHeaderRedactionAdapterExporter(28-144)src/nat/observability/exporter/processing_exporter.py (2)
export(503-518)export_processed(521-530)
tests/nat/observability/exporter/test_processing_exporter.py (1)
src/nat/observability/exporter/processing_exporter.py (11)
_pre_start(345-383)add_processor(70-128)remove_processor(130-182)clear_processors(184-188)reset_pipeline(190-207)get_processor_by_name(209-224)_process_pipeline(385-394)_export_with_processing(476-500)_continue_pipeline_after(441-474)_create_export_task(532-549)ProcessingExporter(39-574)
🔇 Additional comments (15)
tests/nat/observability/exporter/test_processing_exporter.py (12)
98-111: LGTM: Purpose-built None-returning processorClear, minimal helper for exercising drop_nones behavior; type: ignore is justified in tests.
146-147: LGTM: drop_nones wiring on test exporterConstructor correctly threads drop_nones to the base.
191-204: LGTM: Initialization assertions for drop_nones and lock stateGood coverage of default and explicit values.
223-281: LGTM: Pipeline locking semanticsSolid coverage for locked modifications and reset behavior, including running-state guard.
283-378: LGTM: Processor naming and retrieval pathsGood validation of name uniqueness, typing, and position updates on removal.
380-546: LGTM: Advanced insertion logicThorough tests for position/before/after and name-position updates.
548-634: LGTM: drop_nones pipeline/export pathsCovers single/multi-processor flows and continuation after callback.
684-697: LGTM: Object-based removalCorrectly verifies removal by instance.
697-709: LGTM: No-op removal for non-present objectAppropriate silent behavior per spec.
720-722: LGTM: Named index map cleared on pipeline clearEnsures internal maps are reset.
773-776: LGTM: Generic type validation warning pathProperty raising TypeError is a clean trigger for fallback logic.
1243-1246: LGTM: Abstract-method enforcementGood negative test for ABC compliance.
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.py (1)
31-31: Import path update looks goodSwitching to
from nat.plugins.opentelemetry import OTLPSpanAdapterExporteraligns with public exports and keeps imports consistent across tests.packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_integration.py (1)
37-37: Import path update looks goodUsing
from nat.plugins.opentelemetry import OTLPSpanAdapterExportermatches the package-level public export.packages/nvidia_nat_opentelemetry/src/nat/plugins/opentelemetry/otlp_span_redaction_adapter_exporter.py (1)
131-144: Verify processor name uniqueness and ordering.
add_processor(..., name="header_redaction"/"span_privacy_tagging", position=0/1)assumes no prior processors with the same names and that position 0 inserts at the true pipeline head post-super().__init__. Please confirm no collisions and expected order on a warm pipeline.
packages/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_adapter_exporter.py
Show resolved
Hide resolved
...es/nvidia_nat_opentelemetry/tests/observability/test_otlp_span_redaction_adapter_exporter.py
Show resolved
Hide resolved
willkill07
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just minor nits. Always a joy to review your PRs.
- Updated type check in ProcessingExporter to use isinstance for better clarity and type safety. - Changed PrivacyLevel class to inherit from str and Enum for improved functionality and compatibility. Signed-off-by: Matthew Penn <[email protected]>
…xports Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/nat/observability/exporter/processing_exporter.py (3)
106-121: Name-position mapping fix matches earlier feedback; minor iteration nit.You correctly validate duplicates first, shift existing positions regardless of naming, then register the new name. This addresses the stale-index issue previously raised.
Optional micro-nit: iterate over a snapshot to future-proof against accidental mutations.
- for proc_name, pos in self._processor_names.items(): + for proc_name, pos in list(self._processor_names.items()): if pos >= insert_position: self._processor_names[proc_name] = pos + 1
312-344: Corrected issubclass direction for processor compatibility.The check now validates that produced type is a subclass of expected input. Matches prior guidance.
353-359: First-processor vs exporter input type check uses correct subclass direction.
issubclass(self.input_class, first_processor.input_class)fixes the inversion noted earlier.
🧹 Nitpick comments (3)
src/nat/observability/exporter/processing_exporter.py (3)
130-183: Enhanced removal API is useful; consider surfacing “object not found” as debug.Silent return when removing by object preserves prior behavior, but can hide misconfigurations.
Add a debug log before returning:
elif isinstance(processor, Processor): # Remove by object (existing behavior) if processor not in self._processors: - return # Silently ignore if not found (existing behavior) + logger.debug("Attempted to remove processor object %s that is not in the pipeline", + processor.__class__.__name__) + return # Preserve behavior
209-225: Lower severity for “not found” get-by-name.Lookup failures are common in conditional flows; warning may be noisy in production. Suggest downgrading to debug to reduce log volume.
- logger.warning("Processor '%s' not found in pipeline", name) + logger.debug("Processor '%s' not found in pipeline", name)
533-537: Add explicit return type to_create_export_taskper guidelines.Public/protected methods should have return annotations. Add
-> None.- def _create_export_task(self, coro: Coroutine): + def _create_export_task(self, coro: Coroutine) -> None:
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/nat/observability/exporter/processing_exporter.py(6 hunks)src/nat/observability/mixin/tagging_config_mixin.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/nat/observability/mixin/tagging_config_mixin.py
🧰 Additional context used
📓 Path-based instructions (6)
src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
src/**/*.py: All importable Python code must live under src/
All public APIs in src/ require Python 3.11+ type hints on parameters and return values; prefer typing/collections.abc abstractions; use typing.Annotated when useful
Provide Google-style docstrings for every public module, class, function, and CLI command; first line concise with a period; surround code entities with backticks
Files:
src/nat/observability/exporter/processing_exporter.py
src/nat/**/*
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Core functionality under src/nat should prioritize backward compatibility when changed
Files:
src/nat/observability/exporter/processing_exporter.py
⚙️ CodeRabbit configuration file
This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.
Files:
src/nat/observability/exporter/processing_exporter.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bareraiseand log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial
Files:
src/nat/observability/exporter/processing_exporter.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks
Files:
src/nat/observability/exporter/processing_exporter.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm
Files:
src/nat/observability/exporter/processing_exporter.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: pass- For Python exception handling, ensure proper stack trace preservation:
- When re-raising exceptions: use bare
raisestatements to maintain the original stack trace,
and uselogger.error()(notlogger.exception()) to avoid duplicate stack trace output.- When catching and logging exceptions without re-raising: always use
logger.exception()
to capture the full stack trace information.Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,
and should contain an Apache License 2.0 header comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
src/nat/observability/exporter/processing_exporter.py
🧬 Code graph analysis (1)
src/nat/observability/exporter/processing_exporter.py (3)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/exporter/base_exporter.py (2)
name(234-245)_pre_start(312-314)tests/nat/observability/exporter/test_processing_exporter.py (15)
input_class(773-775)process(43-49)process(61-67)process(77-79)process(94-95)process(106-110)process(124-129)process(391-392)process(431-432)process(453-454)process(500-501)process(522-523)process(777-778)process(945-946)process(1312-1315)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: CI Pipeline / Check
🔇 Additional comments (10)
src/nat/observability/exporter/processing_exporter.py (10)
57-69: Solid addition: configurable None-dropping with clear wiring.Constructor and field wiring for
drop_nonesare consistent with downstream handling. No issues.
94-105: Insertion logic and type-compat checks look robust.
- Position calculation handles conflicts,
-1as append, and named anchors.- Neighbor compatibility uses correct source→target direction and sensible fallbacks.
Also applies to: 232-280, 281-312
186-189: Guarding clear with pipeline lock is appropriate.Ensures runtime immutability after start; consistent with the new lifecycle.
190-208: Reset semantics are clear and safe.Unlock + clear only when not running; good for reconfiguration flows.
226-231: Lock guard is fine and messages are actionable.Good, specific runtime error text for post-start mutations.
404-406: Docstring explicitly mentions None-path; good.Matches the implemented behavior and constructor flag.
411-415: None-drop path is correctly short-circuited.Clear debug, immediate return prevents downstream type errors.
463-467: Skip-export behavior after downstream drop is correct.Prevents exporting
Nonewhen continuation pipelines filter items out.
528-529: Docstring type forexport_processedis precise.Matches runtime behavior (single item or batch).
346-348: Discard this refactor — no subclasses override_pre_start.A repo-wide search found no subclasses of
ProcessingExporterwith their own_pre_start(outside tests), so moving the lock won’t improve subclass compatibility.Likely an incorrect or invalid review comment.
Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/nat/observability/exporter/processing_exporter.py (1)
533-549: Call the methodname(); otherwise logs show a bound method object.
BaseExporter.nameis a method, not a property. Also, tighten the coroutine type and annotate return.- def _create_export_task(self, coro: Coroutine): + def _create_export_task(self, coro: Coroutine[Any, Any, None]) -> None: @@ - if not self._running: - logger.warning("%s: Attempted to create export task while not running", self.name) + if not self._running: + logger.warning("%s: Attempted to create export task while not running", self.name()) return @@ - task = asyncio.create_task(coro) + task = asyncio.create_task(coro) self._tasks.add(task) task.add_done_callback(self._tasks.discard) @@ - logger.error("%s: Failed to create task: %s", self.name, e) + logger.error("%s: Failed to create task: %s", self.name(), e)
♻️ Duplicate comments (3)
src/nat/observability/exporter/processing_exporter.py (3)
312-344: issubclass direction is now correct.You compare source (produced) against target (expected). This resolves the earlier inversion.
355-359: First-processor input compatibility check uses correct issubclass direction.
484-489: Type widening forfinal_itemresolves the pyright mismatch flagged earlier.
🧹 Nitpick comments (8)
src/nat/observability/exporter/processing_exporter.py (8)
57-69: Add missing return annotation to init and confirm default behavior of drop_nones.
- Type hint: annotate init with -> None per repo standards.
- Back-compat: setting drop_nones=True by default could change existing behavior in src/nat; please confirm this is non-breaking for current processors, or default to False and gate via config.
- def __init__(self, context_state: ContextState | None = None, drop_nones: bool = True): + def __init__(self, context_state: ContextState | None = None, drop_nones: bool = True) -> None:
113-117: Avoid mutating a dict while iterating its live view.Use a list(...) copy to prevent accidental RuntimeError if future changes modify keys during iteration.
- for proc_name, pos in self._processor_names.items(): + for proc_name, pos in list(self._processor_names.items()): if pos >= insert_position: self._processor_names[proc_name] = pos + 1
130-183: Align “not found” semantics across removal modes.
Processorobject removal silently no-ops if not found, while name/position removals raise. Consider a consistent policy or anignore_missing: bool = Falseparameter.
190-208: Reset pipeline API reads clean and safe.Locks only when stopped; clears state. Consider INFO log when pipelines are reset to aid ops visibility.
209-225: Reduce log level on missing processor lookup.Returning None is often a control-flow path; log at DEBUG to avoid warning noise.
- logger.warning("Processor '%s' not found in pipeline", name) + logger.debug("Processor '%s' not found in pipeline", name)
346-348: Lock pipeline after successful validation, not before.Locking first can leave the pipeline stuck locked if validation later raises. Set the lock at the end when no exception occurred.
- # Lock the pipeline to prevent further modifications - self._pipeline_locked = True + # Defer locking until validations complete successfully (set at end of method)And add near the end of _pre_start (before returning):
+ # Lock the pipeline only after successful validations + self._pipeline_locked = TrueAlso applies to: 353-375
385-395: Docstring return contract should mention Optional.The signature allows None; update the docstring accordingly.
- Returns: - PipelineOutputT: The processed item after running through all processors + Returns: + PipelineOutputT | None: The processed item, or None if dropped by a processor when drop_nones=True.
551-575: Annotate_cleanupreturn type.Conform to “all public APIs have return annotations”.
- async def _cleanup(self): + async def _cleanup(self) -> None:
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/nat/observability/exporter/processing_exporter.py(7 hunks)
🧰 Additional context used
📓 Path-based instructions (6)
src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
src/**/*.py: All importable Python code must live under src/
All public APIs in src/ require Python 3.11+ type hints on parameters and return values; prefer typing/collections.abc abstractions; use typing.Annotated when useful
Provide Google-style docstrings for every public module, class, function, and CLI command; first line concise with a period; surround code entities with backticks
Files:
src/nat/observability/exporter/processing_exporter.py
src/nat/**/*
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Core functionality under src/nat should prioritize backward compatibility when changed
Files:
src/nat/observability/exporter/processing_exporter.py
⚙️ CodeRabbit configuration file
This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.
Files:
src/nat/observability/exporter/processing_exporter.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bareraiseand log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial
Files:
src/nat/observability/exporter/processing_exporter.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks
Files:
src/nat/observability/exporter/processing_exporter.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm
Files:
src/nat/observability/exporter/processing_exporter.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: pass- For Python exception handling, ensure proper stack trace preservation:
- When re-raising exceptions: use bare
raisestatements to maintain the original stack trace,
and uselogger.error()(notlogger.exception()) to avoid duplicate stack trace output.- When catching and logging exceptions without re-raising: always use
logger.exception()
to capture the full stack trace information.Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,
and should contain an Apache License 2.0 header comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
src/nat/observability/exporter/processing_exporter.py
🧬 Code graph analysis (1)
src/nat/observability/exporter/processing_exporter.py (3)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/exporter/base_exporter.py (2)
name(234-245)_pre_start(312-314)tests/nat/observability/exporter/test_processing_exporter.py (14)
process(43-49)process(61-67)process(77-79)process(94-95)process(106-110)process(124-129)process(391-392)process(431-432)process(453-454)process(500-501)process(522-523)process(777-778)process(945-946)process(1312-1315)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: CI Pipeline / Check
🔇 Additional comments (6)
src/nat/observability/exporter/processing_exporter.py (6)
186-189: LGTM: clear resets both list and name map, guarded by lock check.
226-231: LGTM: pipeline lock guard centralizes mutation checks.
232-280: Insertion position resolver looks solid and defensive.
- Single selector enforced.
- -1 handled for append.
- Typed errors for misuse.
281-311: Neighbor compatibility checks are correct and clearly separated.
404-415: LGTM: drop-on-None behavior is explicit and logged.
463-467: LGTM: continuation correctly skips export on drop.
Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
src/nat/observability/exporter/processing_exporter.py (2)
70-121: Atomic insertion and name-position bookkeeping — resolved per prior feedbackName pre‑validation happens before mutation; named-index shifts occur regardless of new name presence. This fixes the stale-index/atomicity issues previously flagged.
281-311: Type-compat checks: direction corrected and generics-safe warnings — LGTMissubclass(source, target) direction is now correct for predecessor/successor and first-processor vs exporter input. Clean fallback to warnings on TypeError.
Also applies to: 312-344, 353-356
tests/nat/observability/exporter/test_processing_exporter.py (1)
568-589: Name-position updates on mid-pipeline insertion — LGTMConfirms unnamed insertions still shift existing named indices. Matches prior review ask.
Also applies to: 590-622
🧹 Nitpick comments (3)
src/nat/observability/exporter/processing_exporter.py (2)
291-311: Guard property access when fetching input/output classes to avoid hard failures on exotic typesIf a processor’s input_class/output_class accessor raises (e.g., due to unresolved generics), add a local try/except here (like in _pre_start) to log and skip this specific edge-case check instead of failing insertion.
Apply along these lines:
@@ - if position > 0: - predecessor = self._processors[position - 1] - self._check_processor_compatibility(predecessor, - processor, - "predecessor", - predecessor.output_class, - processor.input_class, - str(predecessor.output_type), - str(processor.input_type)) + if position > 0: + predecessor = self._processors[position - 1] + try: + pred_out_class = predecessor.output_class + proc_in_class = processor.input_class + pred_out_type = str(predecessor.output_type) + proc_in_type = str(processor.input_type) + except TypeError as e: + logger.warning( + "Cannot validate type compatibility between %s (%s) and %s (%s): %s. Skipping validation.", + predecessor.__class__.__name__, predecessor.output_type, + processor.__class__.__name__, processor.input_type, e) + else: + self._check_processor_compatibility(predecessor, + processor, + "predecessor", + pred_out_class, + proc_in_class, + pred_out_type, + proc_in_type) @@ - if position < len(self._processors): - successor = self._processors[position] - self._check_processor_compatibility(processor, - successor, - "successor", - processor.output_class, - successor.input_class, - str(processor.output_type), - str(successor.input_type)) + if position < len(self._processors): + successor = self._processors[position] + try: + proc_out_class = processor.output_class + succ_in_class = successor.input_class + proc_out_type = str(processor.output_type) + succ_in_type = str(successor.input_type) + except TypeError as e: + logger.warning( + "Cannot validate type compatibility between %s (%s) and %s (%s): %s. Skipping validation.", + processor.__class__.__name__, processor.output_type, + successor.__class__.__name__, successor.input_type, e) + else: + self._check_processor_compatibility(processor, + successor, + "successor", + proc_out_class, + succ_in_class, + proc_out_type, + succ_in_type)Also applies to: 302-311
385-393: Docstring return type is out-of-date with implementation_update process_pipeline docstring to reflect optional return (None when dropped).
async def _process_pipeline(self, item: PipelineInputT) -> PipelineOutputT | None: @@ - Returns: - PipelineOutputT: The processed item after running through all processors + Returns: + PipelineOutputT | None: The processed item after all processors, or None if + a processor returned None and drop_nones is enabled.tests/nat/observability/exporter/test_processing_exporter.py (1)
1081-1102: Use a real coroutine in task-creation tests to avoid false positivesPassing a Mock to asyncio.create_task works only because create_task is patched. Use a tiny coroutine to better mirror runtime behavior.
- # Use a mock coroutine that doesn't need to be awaited - mock_coro = Mock() + # Use a real coroutine to match asyncio.create_task expectations + async def dummy(): + return None + mock_coro = dummy() @@ - # Use a mock coroutine that doesn't need to be awaited - mock_coro = Mock() + async def dummy(): + return None + mock_coro = dummy()Also applies to: 1115-1132
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/nat/observability/exporter/processing_exporter.py(7 hunks)tests/nat/observability/exporter/test_processing_exporter.py(13 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)
Files:
tests/nat/observability/exporter/test_processing_exporter.py
⚙️ CodeRabbit configuration file
tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using thetest_prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using thefixture_prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass
Files:
tests/nat/observability/exporter/test_processing_exporter.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bareraiseand log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial
Files:
tests/nat/observability/exporter/test_processing_exporter.pysrc/nat/observability/exporter/processing_exporter.py
**/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock
Files:
tests/nat/observability/exporter/test_processing_exporter.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks
Files:
tests/nat/observability/exporter/test_processing_exporter.pysrc/nat/observability/exporter/processing_exporter.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm
Files:
tests/nat/observability/exporter/test_processing_exporter.pysrc/nat/observability/exporter/processing_exporter.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: pass- For Python exception handling, ensure proper stack trace preservation:
- When re-raising exceptions: use bare
raisestatements to maintain the original stack trace,
and uselogger.error()(notlogger.exception()) to avoid duplicate stack trace output.- When catching and logging exceptions without re-raising: always use
logger.exception()
to capture the full stack trace information.Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,
and should contain an Apache License 2.0 header comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
tests/nat/observability/exporter/test_processing_exporter.pysrc/nat/observability/exporter/processing_exporter.py
src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
src/**/*.py: All importable Python code must live under src/
All public APIs in src/ require Python 3.11+ type hints on parameters and return values; prefer typing/collections.abc abstractions; use typing.Annotated when useful
Provide Google-style docstrings for every public module, class, function, and CLI command; first line concise with a period; surround code entities with backticks
Files:
src/nat/observability/exporter/processing_exporter.py
src/nat/**/*
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Core functionality under src/nat should prioritize backward compatibility when changed
Files:
src/nat/observability/exporter/processing_exporter.py
⚙️ CodeRabbit configuration file
This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.
Files:
src/nat/observability/exporter/processing_exporter.py
🧬 Code graph analysis (2)
tests/nat/observability/exporter/test_processing_exporter.py (1)
src/nat/observability/exporter/processing_exporter.py (11)
_pre_start(345-383)add_processor(70-128)remove_processor(130-182)clear_processors(184-188)reset_pipeline(190-207)get_processor_by_name(209-224)_process_pipeline(385-394)_export_with_processing(476-500)_continue_pipeline_after(441-474)_create_export_task(532-549)ProcessingExporter(39-574)
src/nat/observability/exporter/processing_exporter.py (3)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/exporter/base_exporter.py (2)
name(234-245)_pre_start(312-314)tests/nat/observability/exporter/test_processing_exporter.py (17)
input_class(849-851)process(43-49)process(61-67)process(77-79)process(94-95)process(106-110)process(124-129)process(322-323)process(367-368)process(467-468)process(507-508)process(529-530)process(576-577)process(598-599)process(853-854)process(1021-1022)process(1388-1391)
🔇 Additional comments (17)
src/nat/observability/exporter/processing_exporter.py (8)
57-69: Constructor: drop_nones + pipeline state plumbing — LGTMClear API, sane defaults, and fields align with tests.
232-280: Insertion position semantics — LGTMSingle-source of truth for position/before/after; good validation and error messages.
130-183: Removal by name/position/object with mapping updates — LGTMCovers all forms; mapping is kept consistent post-removal.
186-189: Lock guard + reset_pipeline — LGTMMutation methods respect the pipeline lock; reset rejects while running and clears names/processors atomically.
Also applies to: 190-208, 226-231
209-225: get_processor_by_name API — LGTMType checks + precise logging when missing.
404-416: Drop-nones propagation and export-skip logic — LGTMConsistent behavior mid-pipeline, after-callback continuation, and at final export gate.
Also applies to: 463-467, 486-489
421-439: Clarify list-as-batch vs list-as-item semanticsAny list is treated as a batch, which can be ambiguous when PipelineOutputT itself is a list[T]. Tests cover both, but please confirm this is the intended contract and document it in the class docstring to avoid misuse.
532-549: Task creation + graceful processor shutdown — LGTMMinimal overhead task creation with lifecycle tracking; cleanup aggregates shutdowns and preserves parent cleanup.
Also applies to: 551-575
tests/nat/observability/exporter/test_processing_exporter.py (9)
98-111: NoneReturningProcessor test helper — LGTMAccurately simulates None-return path for drop_nones coverage.
146-148: Exporter init wired with drop_nones — LGTMMatches production signature and behavior.
191-204: Initialization and pipeline locking tests — LGTMCovers default flags, locked mutations, and reset gating.
Also applies to: 223-281
283-422: Processor naming + atomicity tests — LGTMValidates duplicate-name failures leave both list and mapping untouched.
624-711: drop_nones behavior across pipeline/continuation/export — LGTMGood coverage for enabled/disabled, multi-processor, and continuation paths.
1200-1204: Shutdown error-path tests — LGTMGood use of failing futures to exercise gather(return_exceptions=True).
Also applies to: 1255-1259
1319-1322: Abstract method enforcement — LGTMEnsures ProcessingExporter remains abstract until export_processed is implemented.
1-14: SPDX/license header — LGTMHeader present and year current.
1-1607: No action needed: pytest-asyncio is declared in pyproject.toml andasyncio_mode = "auto"is set under[tool.pytest.ini_options], so allasync deftests run without explicit markers.
Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
tests/nat/observability/exporter/test_processing_exporter.py (2)
21-22: Mark async tests for pytest-asyncioEnsure async tests run in environments without auto mode by marking the module.
import pytest + +# Run async tests with pytest-asyncio in all environments +pytestmark = pytest.mark.asyncio
165-174: Fixture naming and typing per repo guidelinesFixtures should use fixture_ prefix and set name=..., with type hints on params/returns.
-@pytest.fixture -def mock_context_state(): +@pytest.fixture(name="mock_context_state") +def fixture_mock_context_state() -> Mock: """Create a mock context state.""" mock_state = Mock(spec=ContextState) mock_subject = Mock(spec=Subject) mock_event_stream = Mock() mock_event_stream.get.return_value = mock_subject mock_state.event_stream = mock_event_stream return mock_state -@pytest.fixture -def processing_exporter(mock_context_state): +@pytest.fixture(name="processing_exporter") +def fixture_processing_exporter(mock_context_state: Mock) -> ProcessingExporter[str, int]: """Create a concrete processing exporter for testing.""" return ConcreteProcessingExporter(mock_context_state)Also applies to: 176-180
🧹 Nitpick comments (4)
tests/nat/observability/exporter/test_processing_exporter.py (4)
131-140: Type hints for callback wiringAdd precise types for the callback to satisfy typing rules.
- def set_done_callback(self, callback): + def set_done_callback(self, callback: "Callable[[str], 'Awaitable[None]']") -> None: """Set callback for pipeline continuation.""" self.callback_set = True self.done_callback = callback - async def trigger_callback_manually(self, item: str): + async def trigger_callback_manually(self, item: str) -> None: """Manually trigger the callback for testing purposes.""" if self.done_callback: await self.done_callback(item)Add imports near the top if not present:
import pytest +from typing import Awaitable, Callable
1200-1204: Annotate failing_shutdown helpersMake return types explicit to quiet type checkers.
- def failing_shutdown(): + def failing_shutdown() -> asyncio.Future[None]: future = asyncio.Future() future.set_exception(RuntimeError("Shutdown failed")) return futureAlso applies to: 1255-1259
1319-1326: Relax abstract-class error assertion to avoid brittleness across Python versionsMessage text varies; match the essential bits.
- with pytest.raises(TypeError, match="Can't instantiate abstract class"): + with pytest.raises(TypeError, match=r"abstract class.*export_processed"): IncompleteExporter()
1081-1102: Use a real coroutine in task-creation test to satisfy static type checkersAvoid passing a Mock where a Coroutine is expected; close it to prevent warnings.
try: - # Use a mock coroutine that doesn't need to be awaited - mock_coro = Mock() + # Use a real coroutine and close it after scheduling + async def _noop() -> None: + return None + mock_coro = _noop() with patch('asyncio.create_task') as mock_create_task: mock_task = Mock() mock_create_task.return_value = mock_task processing_exporter._create_export_task(mock_coro) mock_create_task.assert_called_once_with(mock_coro) assert mock_task in processing_exporter._tasks mock_task.add_done_callback.assert_called_once() + mock_coro.close()
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/nat/observability/exporter/processing_exporter.py(8 hunks)tests/nat/observability/exporter/test_processing_exporter.py(13 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/nat/observability/exporter/processing_exporter.py
🧰 Additional context used
📓 Path-based instructions (6)
tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Unit tests must live under tests/ and use configured markers (e2e, integration, etc.)
Files:
tests/nat/observability/exporter/test_processing_exporter.py
⚙️ CodeRabbit configuration file
tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using thetest_prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using thefixture_prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass
Files:
tests/nat/observability/exporter/test_processing_exporter.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Follow PEP 8/20 style; format with yapf (column_limit=120) and use 4-space indentation; end files with a single newline
Run ruff (ruff check --fix) per pyproject.toml; fix warnings unless explicitly ignored; ruff is linter-only
Use snake_case for functions/variables, PascalCase for classes, and UPPER_CASE for constants
Treat pyright warnings as errors during development
Exception handling: preserve stack traces and avoid duplicate logging
When re-raising exceptions, use bareraiseand log with logger.error(), not logger.exception()
When catching and not re-raising, log with logger.exception() to capture stack trace
Validate and sanitize all user input; prefer httpx with SSL verification and follow OWASP Top‑10
Use async/await for I/O-bound work; profile CPU-heavy paths with cProfile/mprof; cache with functools.lru_cache or external cache; leverage NumPy vectorization when beneficial
Files:
tests/nat/observability/exporter/test_processing_exporter.py
**/tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/tests/**/*.py: Test functions must use the test_ prefix and snake_case
Extract repeated test code into pytest fixtures; fixtures should set name=... in @pytest.fixture and functions named with fixture_ prefix
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration
Use pytest with pytest-asyncio for async code; mock external services with pytest_httpserver or unittest.mock
Files:
tests/nat/observability/exporter/test_processing_exporter.py
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,ini,json,ipynb,txt,rst}: Every file must start with the standard SPDX Apache-2.0 header; keep copyright years up‑to‑date
All source files must include the SPDX Apache‑2.0 header; do not bypass CI header checks
Files:
tests/nat/observability/exporter/test_processing_exporter.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Never hard‑code version numbers in code or docs; versions are derived by setuptools‑scm
Files:
tests/nat/observability/exporter/test_processing_exporter.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: pass- For Python exception handling, ensure proper stack trace preservation:
- When re-raising exceptions: use bare
raisestatements to maintain the original stack trace,
and uselogger.error()(notlogger.exception()) to avoid duplicate stack trace output.- When catching and logging exceptions without re-raising: always use
logger.exception()
to capture the full stack trace information.Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code (except .mdc files that contain Cursor rules) should be licensed under the Apache License 2.0,
and should contain an Apache License 2.0 header comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
tests/nat/observability/exporter/test_processing_exporter.py
🧬 Code graph analysis (1)
tests/nat/observability/exporter/test_processing_exporter.py (2)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/exporter/processing_exporter.py (11)
_pre_start(345-385)add_processor(70-128)remove_processor(130-182)clear_processors(184-188)reset_pipeline(190-207)get_processor_by_name(209-224)_process_pipeline(387-396)_export_with_processing(478-502)_continue_pipeline_after(443-476)_create_export_task(534-551)ProcessingExporter(39-576)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: CI Pipeline / Check
🔇 Additional comments (2)
tests/nat/observability/exporter/test_processing_exporter.py (2)
624-711: drop_nones test coverage is solidGood end-to-end coverage of None handling in processing, continuation, and export paths.
1331-1462: Callback continuation tests look correctNice verification of callback wiring, continuation through remaining processors, and export side-effects.
|
/merge |
Description
Closes #756
Agentic applications will somtimes interact with sensitive data sources, increasing the sensitivity of observability traces transmitted to external systems. This PR extends the
ProcessingExportersystem to address this concern. It features aRedactionProcessorandSpanTaggingProcessorallowing the application to conditionally redact content and tag Spans with privacy classifications to enable downstream trace access control. Additionally, it increases the flexibility of processing pipeline configuration, allowing processors to be named, replaced by name/position/instance, and inserted anywhere in an existing pipeline.An example configuration
By Submitting this PR I confirm:
Summary by CodeRabbit
New Features
Improvements
Tests
Documentation