Skip to content

Conversation

@mpenn
Copy link
Contributor

@mpenn mpenn commented Aug 25, 2025

Description

Closes #719

This PR introduces the nvidia-nat-data-flywheel subpackage, which extends the NeMo Agent Toolkit's observability system to support NVIDIA Data Flywheel Blueprint integration. This integration provides developers with the ability to distill smaller LLMs with NeMo Microservices Platform (NMP) for components of their agentic application. Such efforts could deliver more efficient deployments and/or lower latency during runtime. This initial integration focuses on NIM/OpenAI LLM providers used in Langchain/LangGraph based workflows/plugins. Key features of this PR are summarized below:

Trace Conversion Pipeline:

  • Converts Spans to Data Flywheel contract-compliant records
  • Supports LLM_START events (completions/tool calls), but can be extended to support others as needed
  • Implements an adapter pattern with registry to support extensibility to other framework/providers over time

Elasticsearch Export:

  • Initial sink to elasticsearch is built in to support Data Flywheel blueprint
  • Extensible to support additional sinks

Workload Identification:

  • Each trace can be mapped to its a workload_id via parent registered function instance name or using the track_unregistered_function decorator. The track_unregistered_function decorator is useful for methods that are not themselves toolkit plugins (e.g. nodes in a LangGraph)
  • The workload_id is used as a key in Data Flywheel Blueprint to trigger fine-tuning/distillation jobs for atomic parts of an agentic workflow

Usage Example:

After installing this subpackage, this feature can be activated with the following workflow YAML configuration:

general:
    tracing:
      data_flywheel:
        _type: data_flywheel_elasticsearch
        client_id: my_nat_app
        index: flywheel
        endpoint: ${ELASTICSEARCH_ENDPOINT}
        batch_size: 10
        elasticsearch_auth:
          - elastic
          - elastic

To add higher fidelity scoping to functions that are not NAT registered functions:

# `name` argument give this function its own `workfload_id` and 
# `metadata` allows you to supply additional user defined attributes in the traces
@track_unregistered_function(name="my_scoped_name", metadata={"key": "value"})
def my_function(input_data: str) -> str:
    return "notional output"

Documentation will be included in a follow-on PR.

By Submitting this PR I confirm:

  • I am familiar with the Contributing Guidelines.
  • We require that all contributors "sign-off" on their commits. This certifies that the contribution is your original work, or you have rights to submit it under the same license, or a compatible license.
    • Any contribution which contains commits that are not Signed-Off will not be accepted.
  • When the PR is ready for review, new or existing tests cover these changes.
  • When the PR is ready for review, the documentation is up to date with these changes.

Summary by CodeRabbit

  • New Features

    • New Data Flywheel subpackage: configurable Elasticsearch telemetry exporter with versioned schemas, OpenAI & NIM trace adapters, dynamic trace-adapter registry, and registered plugin entry points.
    • Function-invocation telemetry accepts optional metadata; new decorator to track function execution and outputs.
    • New processors and factory utilities for batching and filtering.
  • Documentation

    • Subpackage README/metadata added.
  • Chores

    • Packaging config and workspace registration for the new subpackage.
  • Tests

    • Extensive unit and integration tests across exporters, mixins, processors, adapters, schemas, and utilities.

@coderabbitai
Copy link

coderabbitai bot commented Aug 25, 2025

Walkthrough

Adds a new nvidia-nat-data-flywheel subpackage implementing schema-driven span→DFW conversion, Elasticsearch exporters, adapter registry and adapters (OpenAI/NIM), processor utilities, telemetry exporter registration, context metadata plumbing, a function-tracking decorator, packaging, and extensive tests.

Changes

Cohort / File(s) Summary
Packaging & Workspace
pyproject.toml, packages/nvidia_nat_data_flywheel/pyproject.toml
New subpackage packaging (setuptools + setuptools_scm), metadata, optional extra data-flywheel, workspace source, and three nat.components entry points.
Docs
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
PyPI README/license and project description added.
Exporter Core (DFW + ES)
packages/.../observability/exporter/dfw_exporter.py, .../mixin/elasticsearch_mixin.py, .../exporter/dfw_elasticsearch_exporter.py
Adds DictBatchingProcessor and DFWExporter base, ElasticsearchMixin (async index/bulk), and DFWElasticsearchExporter with contract-version wiring.
Processors & Factories
packages/.../processor/dfw_record_processor.py, packages/.../processor/__init__.py, src/nat/observability/processor/processor_factory.py, src/nat/observability/processor/falsy_batch_filter_processor.py
Span→DFW and DFW→dict processors, public re-exports, processor factory helpers, and falsy-batch filter processors (dict/list/set).
Trace Conversion & Utilities
packages/.../trace_conversion/trace_adapter_registry.py, .../span_to_dfw_record.py, .../span_extractor.py, .../trace_conversion/__init__.py, .../adapter/register.py
Dynamic adapter registry, TraceContainer integration, span extractors (timestamp/usage), span→DFW conversion entrypoints, public API surface, and adapter registration side-effects.
Adapters (Elasticsearch)
.../adapter/elasticsearch/openai_converter.py, .../adapter/elasticsearch/nim_converter.py, .../adapter/elasticsearch/__init__.py
OpenAI converter with message/tool/response mapping and NIM adapter delegating to OpenAI converter; adapters registered via registry.
Schema Core & Sink (Elasticsearch)
.../schema/schema_registry.py, .../trace_container.py, .../trace_source_base.py, .../schema/sink/elasticsearch/dfw_es_record.py, .../contract_version.py, .../schema/sink/elasticsearch/__init__.py, .../schema/register.py, .../schema/provider/*
SchemaRegistry, TraceContainer model (dynamic union), trace source base/provider models, versioned Elasticsearch DFW schemas (DFWESRecord), ContractVersion enum, and registration plumbing.
Component Registration
.../observability/register.py, package init files
Registers DFWElasticsearchTelemetryExporter config and async telemetry exporter factory; package initializers added.
Core Telemetry Hooks
src/nat/builder/context.py, src/nat/profiler/decorators/function_tracking.py
Context.push_active_function accepts optional metadata; new track_unregistered_function decorator added (supports sync/async/generator and optional metadata).
Utils
packages/.../utils/deserialize.py
Adds deserialize_span_attribute for JSON/coercion of span attributes.
Tests
packages/nvidia_nat_data_flywheel/tests/**, tests/nat/profiler/decorators/test_function_tracking.py, ...
Extensive unit and integration-style tests for exporters, mixin, processors, adapters, registry, schema registry, trace container, span extractors, deserialize utility, and function-tracking behavior.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant App as Application
  participant Ctx as Context
  participant Tele as Telemetry
  participant Proc as Processors
  participant Reg as TraceAdapterRegistry
  participant ES as Elasticsearch

  App->>Ctx: push_active_function(name, input, metadata?)
  Note right of Ctx #E8F0FF: FUNCTION_START emitted (includes metadata)
  App->>Tele: emit Span (LLM_START)
  Tele->>Proc: process(span)
  Proc->>Proc: SpanToDFWRecordProcessor -> build TraceContainer
  Proc->>Reg: convert(trace_container, to=DFWESRecord)
  Reg-->>Proc: DFWESRecord
  Proc->>Proc: DFWToDictProcessor -> dict
  Proc->>Proc: DictBatchingProcessor -> batch[list[dict]]
  Proc->>Proc: DictBatchFilterProcessor -> filtered batch
  alt batch non-empty
    Proc->>ES: bulk/index(filtered batch)
    ES-->>Proc: ack
  else empty
    Proc-->>Tele: no-op
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Assessment against linked issues

Objective Addressed Explanation
Enable exporting intermediate steps to NeMo Data Flywheel via simple config (#719)
Provide a telemetry exporter selectable via YAML tracing section (component registration/entry points) (#719)
Support fine-grained workload scoping without requiring additional registered functions (#719) Workload_id is included in record schemas but no config/UI/runtime filter or selector for workload-scoped sampling/selection is implemented in this diff.
Minimal burden on developers to enable feature (plug-and-play) (#719)

Out-of-scope / unrelated changes

Code Change Explanation
Duplicate track_unregistered_function definition (src/nat/profiler/decorators/function_tracking.py) The module contains two identical top-level definitions of the decorator (same signature). This duplication is unrelated to Flywheel exporter objectives and should be deduplicated.

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 2a9e2c2 and bdb0bce.

📒 Files selected for processing (1)
  • src/nat/profiler/decorators/function_tracking.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/nat/profiler/decorators/function_tracking.py
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbit in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbit in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbit gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbit read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbit help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbit ignore or @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbit summary or @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbit or @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mpenn mpenn self-assigned this Aug 25, 2025
@mpenn mpenn added feature request New feature or request non-breaking Non-breaking change labels Aug 25, 2025
mpenn added 5 commits August 25, 2025 23:37
Renamed shcema registry.py file to schema_registry.py.

Refactored `dfw_elasticsearch_exporter.py` to use `ContextState` instead of `AIQContextState`.

Updated `TraceContainer` to ensure union is build before validation.

Signed-off-by: Matthew Penn <[email protected]>
Signed-off-by: Matthew Penn <[email protected]>
more informative

Renaming common.py -> span_extractor.py to be more informative

Signed-off-by: Matthew Penn <[email protected]>
…_converter.py and test_span_to_dfw_record.py

Signed-off-by: Matthew Penn <[email protected]>
@mpenn mpenn changed the title Add aiqtoolkit-data-flywheel subpackage with NeMo Data Flywheel integrations Add nvidia-nat-data-flywheel subpackage with NeMo Data Flywheel integrations Aug 26, 2025
@mpenn mpenn marked this pull request as ready for review August 26, 2025 14:27
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 32

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
packages/nvidia_nat_data_flywheel/pyproject.toml (1)

45-45: Missing trailing newline at end of file.

According to the coding guidelines, every file must end with a single trailing newline.

Add a trailing newline at the end of the file:

 nat_data_flywheel_adapter = "nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.register"
 nat_data_flywheel_schema = "nat.plugins.data_flywheel.observability.schema.register"
+
packages/nvidia_nat_data_flywheel/tests/observability/mixin/test_elasticsearch_mixin.py (1)

509-509: Missing trailing newline at end of file.

According to the coding guidelines, every file must end with a single trailing newline.

Add a trailing newline at the end of the file:

         # Second bulk call (operation 5)
         assert bulk_calls[1][1]['operations'] == [{"index": {"_index": "sequential_test"}}, {"operation": 5}]
+
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/register.py (1)

22-22: Missing trailing newline at end of file.

According to the coding guidelines, every file must end with a single trailing newline.

Add a trailing newline at the end of the file:

 from nat.plugins.data_flywheel.observability.schema.sink.elasticsearch import dfw_es_record
+
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/nim_trace_source.py (1)

25-25: Missing trailing newline at end of file.

According to the coding guidelines, every file must end with a single trailing newline.

Add a trailing newline at the end of the file:

 class NIMTraceSource(OpenAITraceSourceBase):
     pass
+
src/nat/profiler/decorators/function_tracking.py (1)

78-88: Add return type annotations to public decorators (pyright).

Public APIs must have return annotations. For decorators with dual usage, a pragmatic step is annotating -> Any to satisfy type checking without full overloads.

Apply this diff:

-def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None):
+def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None) -> Any:
@@
-def track_unregistered_function(func: Any = None, *, name: str | None = None, metadata: dict[str, Any] | None = None):
+def track_unregistered_function(func: Any = None,
+                                *,
+                                name: str | None = None,
+                                metadata: dict[str, Any] | None = None) -> Any:

If you want stricter types, I can provide @overload variants using ParamSpec/TypeVar.

Also applies to: 257-258

🧹 Nitpick comments (77)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/__init__.py (1)

1-15: Add a minimal module docstring to satisfy repo guidelines.

Guidelines require a brief Google-style module docstring for public modules. Add a concise description of this package’s purpose.

Apply:

 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""Schemas for Data Flywheel sinks.
+
+This package hosts typed schema modules for sink backends (e.g., Elasticsearch)
+used by the nvidia-nat-data-flywheel subpackage.
+"""
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/__init__.py (1)

1-15: Add a short module docstring for adapter namespace.

To align with the codebase requirement for module docstrings, add a concise description.

 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""Trace-conversion adapters.
+
+Contains adapter implementations and registration hooks used to convert spans
+into Data Flywheel contract-compliant records.
+"""
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/__init__.py (1)

1-15: Add module docstring for top-level schema namespace.

Public modules should include a one-line summary. Add a brief docstring.

 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""Top-level schema namespace for Data Flywheel integration.
+
+Exports core Pydantic models and helpers shared across providers and sinks.
+"""
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/__init__.py (1)

1-15: Add a brief module docstring to clarify package scope.

This package is importable; include a short docstring per repo guidelines.

 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""Observability integration for Data Flywheel.
+
+Provides exporters, processors, and schema plumbing to emit traces compatible
+with the NeMo Data Flywheel blueprint.
+"""
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/__init__.py (2)

1-14: License header present; consider year consistency across sibling packages.

This file uses “2025” while other new files use “2024-2025.” Both are acceptable, but consider aligning for consistency across the subpackage.

If you prefer the range:

-# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.

1-15: Add a module docstring for the Elasticsearch adapter namespace.

Keep it concise to satisfy the docstring requirement and improve discoverability.

 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""Elasticsearch-specific trace-conversion adapters.
+
+Houses adapter implementations that convert spans into Elasticsearch-backed
+Data Flywheel records.
+"""
docs/source/workflows/observe/observe-workflow-with-phoenix.md (1)

20-24: Use the official first mention: “NVIDIA NeMo Agent toolkit”.

Per repo style, the first reference should be “NVIDIA NeMo Agent toolkit”; subsequent references can use “NeMo Agent toolkit”. Update the first sentence accordingly.

-This guide provides a step-by-step process to enable observability in a NeMo Agent toolkit workflow using Phoenix for tracing and logging. By the end of this guide, you will have:
+This guide provides a step-by-step process to enable observability in an NVIDIA NeMo Agent toolkit workflow using Phoenix for tracing and logging. By the end of this guide, you will have:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py (1)

21-31: Docstring and signature describe different behaviors; align them.

Docstring promises “dictionary, list, or None” and documents value as str, but the function accepts dict/list and the return type excludes None. Tighten the docstring to match the actual API.

-    """Deserialize a string input value to a dictionary, list, or None.
+    """Deserialize an input into a dictionary or list.
@@
-    Args:
-        value (str): The input value to deserialize
+    Args:
+        value (dict | list | str): Input to normalize. If already a dict or list, it is returned unchanged.
@@
-    Returns:
-        dict | list: The deserialized input value
+    Returns:
+        dict | list: The deserialized input value (JSON object or array).
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/trace_source_base.py (2)

16-18: Add a brief module docstring for public module.

Public modules should include a concise Google-style module docstring per guidelines.

-from pydantic import BaseModel
+"""Base Pydantic models shared by Data Flywheel trace sources."""
+
+from pydantic import BaseModel

20-22: Constrain client_id to non-empty to prevent invalid records.

A non-empty client_id avoids producing sink records without a source identifier and pushes validation closer to the edge.

-class TraceSourceBase(BaseModel):
+class TraceSourceBase(BaseModel):
     """Base class for trace sources with generic framework and provider types."""
-    client_id: str = Field(..., description="The client ID of the trace source")
+    client_id: str = Field(..., min_length=1, description="The client ID of the trace source.")

Do you expect extra fields on provider models? If yes, consider allowing extras at the base with Pydantic v2 config, e.g.:

from pydantic import ConfigDict

class TraceSourceBase(BaseModel):
    model_config = ConfigDict(extra="ignore")
    ...
src/nat/observability/processor/processor_factory.py (3)

16-19: Add a short module docstring.

-from typing import Any
+"""Helpers for creating concrete, parameterized Processor subclasses at runtime."""
+
+from typing import Any

39-54: Apply the same typing improvements for from-only factory.

-def processor_factory_from_type(processor_class: type, from_type: type[Any]) -> type[Processor]:
+def processor_factory_from_type(processor_class: type[Processor], from_type: type[Any]) -> type[Processor]:
@@
-    class ConcreteProcessor(processor_class[from_type]):  # type: ignore
+    class ConcreteProcessor(processor_class[from_type]):  # type: ignore[misc]
         pass

56-70: Apply the same typing improvements for to-only factory.

-def processor_factory_to_type(processor_class: type, to_type: type[Any]) -> type[Processor]:
+def processor_factory_to_type(processor_class: type[Processor], to_type: type[Any]) -> type[Processor]:
@@
-    class ConcreteProcessor(processor_class[to_type]):  # type: ignore
+    class ConcreteProcessor(processor_class[to_type]):  # type: ignore[misc]
         pass

If duplicate classes become a concern, consider caching per (processor_class, from_type, to_type) tuple to reduce class churn. I can draft an lru_cache-based version that safely handles hashing of type args if desired.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (3)

23-28: Docstring claims resource tagging not implemented; make it accurate.

The mixin currently provides client creation and export methods, not resource tagging. Update the prose to match behavior.

-    This mixin provides elasticsearch-specific functionality for SpanExporter exporters.
-    It handles elasticsearch-specific resource tagging and uses the AsyncElasticsearch client.
+    This mixin provides Elasticsearch client wiring and export helpers (single and bulk).
+    It uses the AsyncElasticsearch client.

45-51: Consider setting a default Content-Type for compatibility.

For ES 8-compatible APIs, adding the corresponding Content-Type default can reduce surprises (the client may set this automatically, but being explicit is harmless).

         if headers is None:
-            headers = {"Accept": "application/vnd.elasticsearch+json; compatible-with=8"}
+            headers = {
+                "Accept": "application/vnd.elasticsearch+json; compatible-with=8",
+                "Content-Type": "application/vnd.elasticsearch+json; compatible-with=8",
+            }

30-51: Provide an async close to release the ES client.

Expose a small lifecycle hook so exporters using the mixin can close the client on shutdown.

         self._elastic_client = AsyncElasticsearch(endpoint, basic_auth=elasticsearch_auth, headers=headers)
         self._index = index
         super().__init__(*args, **kwargs)
 
     async def export_processed(self, item: dict | list[dict]) -> None:
+        ...
+        # method body unchanged
+        ...
+
+    async def aclose(self) -> None:
+        """Close the underlying Elasticsearch client."""
+        await self._elastic_client.close()
packages/nvidia_nat_data_flywheel/tests/observability/mixin/test_elasticsearch_mixin.py (2)

106-106: Consider removing the type ignore comment.

The # type: ignore comment on line 106 can likely be avoided by properly typing the parent_kwargs expansion.

Consider using TypedDict or explicit parameter unpacking to avoid the type checker confusion:

-            **parent_kwargs)  # type: ignore  # parent_kwargs expansion confuses type checker
+            **parent_kwargs)

249-265: Type ignore comments can be avoided with proper typing.

Multiple # type: ignore comments are used when testing missing required parameters. These can be avoided by using pytest.raises with a more specific match pattern.

The type ignore comments are acceptable for intentionally incorrect test calls, but you could also consider using string-based construction to avoid them:

# Alternative approach without type ignore
with pytest.raises(TypeError, match="missing.*required.*endpoint"):
    exec("ConcreteElasticsearchMixin(index='test_index', elasticsearch_auth=('user', 'pass'))")
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/nim_trace_source.py (2)

16-21: Consider removing unused logger.

The logger is defined but not used in this module. Since this is just a pass-through class inheriting all behavior from the base class, the logger may not be needed.

Remove the unused logger if it's not needed:

-import logging
-
 from nat.plugins.data_flywheel.observability.schema.provider.openai_trace_source import OpenAITraceSourceBase
-
-logger = logging.getLogger(__name__)
-
 
 class NIMTraceSource(OpenAITraceSourceBase):

23-24: Consider adding class documentation.

While the class inherits all functionality from OpenAITraceSourceBase, it would be helpful to add a docstring explaining the purpose of this NIM-specific trace source.

Add a docstring to explain the class purpose:

 class NIMTraceSource(OpenAITraceSourceBase):
+    """Trace source for NVIDIA NIM (NeMo Inference Microservice) LLM providers.
+    
+    This class extends OpenAITraceSourceBase to handle trace data from NIM endpoints,
+    which follow the OpenAI API contract for LLM interactions.
+    """
     pass
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/register.py (1)

20-24: Optional: Import via importlib to avoid “unused import” suppressions.

You can remove lint pragmas by importing for side effects using import_module; this keeps intent explicit and avoids namespace pollution.

@@
-from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch import \
-    nim_converter
-from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch import \
-    openai_converter
+from importlib import import_module
+
+# Import for side effects: modules register their adapters at import time.
+import_module(
+    "nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.nim_converter"
+)
+import_module(
+    "nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.openai_converter"
+)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)

29-31: Document possible KeyError from SchemaRegistry for clarity.

The method can raise if the version isn’t registered; documenting this helps users handle errors predictably.

@@
-    def get_contract_class(self) -> type[BaseModel]:
-        """Get the Pydantic model class for this contract version."""
+    def get_contract_class(self) -> type[BaseModel]:
+        """Get the Pydantic model class for this contract version.
+
+        Raises:
+            KeyError: If no schema is registered for this version.
+        """
         return SchemaRegistry.get_schema("elasticsearch", self.value)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/__init__.py (1)

20-27: Ensure built-in adapters register on import

To make the trace‐adapter feature self-contained and guarantee that all built-in converters are registered when this package is imported, add an explicit import of your adapter.register module (which in turn brings in all of the Elasticsearch adapters) to the top‐level __init__.py.

• File: packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/__init__.py
Update around lines 20–27:

 # Trace Source Registry
-from .trace_conversion.trace_adapter_registry import TraceAdapterRegistry
+from .trace_conversion.trace_adapter_registry import TraceAdapterRegistry
+# Ensure built-in adapters are registered when this package loads.
+# Importing the adapter.register module triggers all @register_adapter calls.
+from .trace_conversion.adapter import register  # noqa: F401

 __all__ = [
     "SpanToDFWRecordProcessor",  # DFW Record Processors
     "DFWToDictProcessor",
     "TraceAdapterRegistry",  # Trace Source Registry
 ]

This import pulls in adapter/register.py, which imports each of the Elasticsearch adapters (e.g. nim_converter, openai_converter) and invokes their @register_adapter decorators as side-effects.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/nim_converter.py (2)

18-29: Reformat imports to avoid global lint suppressions and long lines.

Use parentheses to wrap long from-imports and drop the line-length and flake8 suppressions.

@@
-# pylint: disable=line-too-long
-# flake8: noqa
-from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.openai_converter import \
-    convert_langchain_openai
-from nat.plugins.data_flywheel.observability.processor.trace_conversion.trace_adapter_registry import \
-    register_adapter
-from nat.plugins.data_flywheel.observability.schema.provider.nim_trace_source import \
-    NIMTraceSource
-from nat.plugins.data_flywheel.observability.schema.sink.elasticsearch.dfw_es_record import \
-    DFWESRecord
-from nat.plugins.data_flywheel.observability.schema.trace_container import \
-    TraceContainer
+from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.openai_converter import (
+    convert_langchain_openai,
+)
+from nat.plugins.data_flywheel.observability.processor.trace_conversion.trace_adapter_registry import (
+    register_adapter,
+)
+from nat.plugins.data_flywheel.observability.schema.provider.nim_trace_source import (
+    NIMTraceSource,
+)
+from nat.plugins.data_flywheel.observability.schema.sink.elasticsearch.dfw_es_record import (
+    DFWESRecord,
+)
+from nat.plugins.data_flywheel.observability.schema.trace_container import (
+    TraceContainer,
+)

34-44: Minor: Capitalize NIM consistently and add a debug log for traceability.

Consistent naming and a debug line help when triaging conversions across providers.

@@
-@register_adapter(trace_source_model=NIMTraceSource)
-def convert_langchain_nim(trace_source: TraceContainer) -> DFWESRecord:
-    """Convert a LangChain Nim trace source to a DFWESRecord.
+@register_adapter(trace_source_model=NIMTraceSource)
+def convert_langchain_nim(trace_source: TraceContainer) -> DFWESRecord:
+    """Convert a LangChain NIM trace source to a DFWESRecord.
@@
-    Returns:
-        DFWESRecord: The converted DFW record
+    Returns:
+        DFWESRecord: The converted DFW record.
     """
-    return convert_langchain_openai(trace_source)
+    logger.debug("Converting NIM trace via OpenAI converter for span '%s'", trace_source.span.name)
+    return convert_langchain_openai(trace_source)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_extractor.py (1)

63-79: Consider adding bounds validation for timestamp values.

While the error handling for type conversion is good, the function could benefit from validating that the timestamp is within reasonable bounds (e.g., not negative, not in the distant future) to catch potential data issues early.

def extract_timestamp(span: Span) -> int:
    """Extract timestamp from a span.

    Args:
        span (Span): The span to extract timestamp from

    Returns:
        int: The timestamp
    """
    timestamp = span.attributes.get("nat.event_timestamp", 0)
    try:
        timestamp_int = int(float(str(timestamp)))
+       # Validate timestamp is reasonable (not negative, not too far in future)
+       if timestamp_int < 0:
+           logger.warning("Negative timestamp %d in span '%s', using 0", timestamp_int, span.name)
+           timestamp_int = 0
    except (ValueError, TypeError):
        logger.warning("Invalid timestamp in span '%s', using 0", span.name)
        timestamp_int = 0

    return timestamp_int
packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/test_trace_adapter_registry.py (1)

449-460: Clarify Return of Internal Registry in list_registered_types

The current implementation of TraceAdapterRegistry.list_registered_types() returns the actual class‐level _registered_types dict, which your test explicitly verifies by identity (assert registered is internal_registry). Exposing the internal registry this way means any external code can mutate it, potentially leading to hard-to-track bugs.

Please confirm whether this behavior is intentional:

• If you do want callers to be able to modify the registry directly, update the docstring to explicitly state that mutations to the returned dict affect the global registry.
• If instead you intended to protect internal state, change the method to return a shallow copy (and update the test accordingly), for example:

--- a/packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/trace_adapter_registry.py
+++ b/packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/trace_adapter_registry.py
@@ -XX,7 +XX,12 @@ class TraceAdapterRegistry:
     @classmethod
     def list_registered_types(cls) -> dict[type, dict[type, Callable]]:
         """List all registered conversions: source_type -> {target_type -> converter}.
-        """
-        return cls._registered_types
+        Mutating the returned dict does not affect the internal registry.
+        """
+        # Return a shallow copy of the outer dict and each inner mapping
+        return {
+            src_type: tgt_map.copy()
+            for src_type, tgt_map in cls._registered_types.items()
+        }

• If you protect the registry here, add a new test asserting that mutations to the result do not alter the internal _registered_types.
• Otherwise, at minimum annotate the docstring to note that callers receive a live reference.

packages/nvidia_nat_data_flywheel/tests/observability/utils/test_deserialize.py (2)

187-198: Confirm PII/logging policy: error messages include raw input values.

The function raises ValueError containing the original input, and the tests assert for this. This is useful for debugging, but it may leak sensitive data into logs/telemetry.

Consider truncating or masking large or sensitive inputs in the error string (e.g., first N chars and length), and update tests accordingly.

Proposed pattern in deserialize.py:

raw = value if isinstance(value, str) else str(value)
snippet = (raw[:256] + f"...({len(raw)} chars)") if len(raw) > 256 else raw
raise ValueError(f"Failed to parse input_value: {snippet}, error: {e}") from e

24-63: Reduce duplication with parametrization.

Multiple tests repeat the same arrange/act/assert pattern. Parametrizing inputs/expectations would simplify maintenance and improve readability without reducing coverage.

Example:

import pytest

@pytest.mark.parametrize(
    "json_str, expected",
    [
        ('{"key": "value", "number": 42}', {"key": "value", "number": 42}),
        ('{}', {}),
        ('[1, 2, 3]', [1, 2, 3]),
        ('null', None),
        ('"hello"', "hello"),
    ],
)
def test_valid_json_values(json_str, expected):
    assert deserialize_span_attribute(json_str) == expected

Also applies to: 73-101, 142-160

src/nat/observability/processor/falsy_batch_filter_processor.py (1)

43-55: Clarify “falsy” semantics and consider narrowing specializations.

Minor: Document that falsy is evaluated via bool(x). If the generic processor is exposed more broadly, consider documenting that None will also be removed if present in the batch. Current concrete specializations (dict/list/set) avoid surprises, so this is just doc clarity.

Suggested doc tweak:

-"""Processor that filters out falsy items from a batch."""
+"""Processor that filters out items where bool(item) is False (e.g., {}, [], set(), None)."""
packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/test_span_extractor.py (1)

146-241: Good coverage for usage/timestamp, plus call-graph and logging checks. Minor opportunities to DRY.

The tests validate defaulting, type coercion, and logger warnings. Consider small parametrization to reduce duplication for the “invalid timestamp returns 0 and warns once” pattern.

Example:

@pytest.mark.parametrize("raw", ["invalid_timestamp", None, {"complex":"obj"}, ""])
def test_extract_timestamp_invalid_inputs_warns_once(raw, mocker):
    logger = mocker.patch("nat.plugins.data_flywheel.observability.processor.trace_conversion.span_extractor.logger")
    span = Span(name="test_span", context=SpanContext(), attributes={"nat.event_timestamp": raw})
    assert extract_timestamp(span) == 0
    logger.warning.assert_called_once_with("Invalid timestamp in span '%s', using 0", "test_span")

Also applies to: 261-380

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (2)

31-41: **Expose Elasticsearch parameters explicitly with type hints instead of opaque kwargs.

To satisfy “public APIs require type hints” and improve IDE/static-checker help, accept endpoint/index/elasticsearch_auth/headers explicitly and forward them, keeping **elasticsearch_kwargs for future-proofing. This also prevents silent typos in kwargs.

Apply this diff:

-    def __init__(self,
-                 context_state: ContextState | None = None,
-                 client_id: str = "default",
-                 contract_version: ElasticsearchContractVersion = ElasticsearchContractVersion.VERSION_1_1,
-                 batch_size: int = 100,
-                 flush_interval: float = 5.0,
-                 max_queue_size: int = 1000,
-                 drop_on_overflow: bool = False,
-                 shutdown_timeout: float = 10.0,
-                 **elasticsearch_kwargs):
+    def __init__(self,
+                 context_state: ContextState | None = None,
+                 client_id: str = "default",
+                 contract_version: ElasticsearchContractVersion = ElasticsearchContractVersion.VERSION_1_1,
+                 batch_size: int = 100,
+                 flush_interval: float = 5.0,
+                 max_queue_size: int = 1000,
+                 drop_on_overflow: bool = False,
+                 shutdown_timeout: float = 10.0,
+                 *,
+                 endpoint: str,
+                 index: str,
+                 elasticsearch_auth: tuple[str, str],
+                 headers: dict | None = None,
+                 **elasticsearch_kwargs):
@@
-        super().__init__(context_state=context_state,
-                         batch_size=batch_size,
-                         flush_interval=flush_interval,
-                         max_queue_size=max_queue_size,
-                         drop_on_overflow=drop_on_overflow,
-                         shutdown_timeout=shutdown_timeout,
-                         client_id=client_id,
-                         **elasticsearch_kwargs)
+        super().__init__(context_state=context_state,
+                         batch_size=batch_size,
+                         flush_interval=flush_interval,
+                         max_queue_size=max_queue_size,
+                         drop_on_overflow=drop_on_overflow,
+                         shutdown_timeout=shutdown_timeout,
+                         client_id=client_id,
+                         endpoint=endpoint,
+                         index=index,
+                         elasticsearch_auth=elasticsearch_auth,
+                         headers=headers,
+                         **elasticsearch_kwargs)

16-26: Docstring nit: clarify types in kwargs section for ES params.

Optional: In the init docstring, specify (str) for endpoint/index, (tuple[str, str]) for elasticsearch_auth, (dict|None) for headers to match the mixin signature.

src/nat/profiler/decorators/function_tracking.py (4)

359-362: Unreachable duplicate return.

There are two consecutive “return sync_wrapper” statements; the second is unreachable and flagged by linters.

Apply this diff:

-    return sync_wrapper
-
-    return sync_wrapper
+    return sync_wrapper

339-345: Potential memory blow-up for generators collecting all outputs.

Both generator wrappers accumulate all yielded items into final_outputs to set as the “output”. For long/streaming generators this can be unbounded. Consider either:

  • Only setting the last yielded item, or
  • Sampling/truncating, or
  • Emitting outputs incrementally (as you do in track_function) and setting output=None on end.

I can draft a variant that records only the last item and a count.

Also applies to: 302-307, 214-215


78-104: Minor: track_function is missing return type annotation as well (consistency).

Same rationale as for track_unregistered_function; add -> Any for consistency and to satisfy static checks.

(See combined diff above.)


30-47: Serialization helper is reasonable; consider guarding against very large objects.

_str(obj) fallback might dump huge objects into telemetry. If that’s a concern, consider truncating arbitrarily long strings in _serialize_data.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/trace_container.py (2)

36-58: Type-hint validators and tighten the union check.

Add explicit parameter and return annotations to satisfy our “public APIs require type hints” guideline, and prefer identity over equality for the Any check to avoid edge cases.

Apply this diff:

-    @field_validator('source', mode='before')
-    @classmethod
-    def validate_source_via_union(cls, v):
+    @field_validator('source', mode='before')
+    @classmethod
+    def validate_source_via_union(cls, v: Any) -> Any:
@@
-                current_union = TraceAdapterRegistry.get_current_union()
-                if current_union != Any:  # Only validate if union is available
+                current_union = TraceAdapterRegistry.get_current_union()
+                if current_union is not Any:  # Only validate if union is available
                     adapter = TypeAdapter(current_union)
                     return adapter.validate_python(v)

60-72: Annotate model-level validator for clarity and lint compliance.

Add type hints to the ensure_union_built validator to comply with our typing and ruff/pyright guidance.

-    @model_validator(mode='before')
-    @classmethod
-    def ensure_union_built(cls, data):
+    @model_validator(mode='before')
+    @classmethod
+    def ensure_union_built(cls, data: Any) -> Any:
         """Ensure union is built before validation."""
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_to_dfw_record.py (2)

61-73: Defaulting framework to "langchain" might cause mis-detection.

If upstream didn’t set nat.framework, forcing “langchain” could accidentally match an unintended schema. Consider a neutral default like "unknown" (or omit the key and let the provider discriminators decide), depending on how your adapters discriminate.

-    framework = _get_string_value(span.attributes.get("nat.framework", "langchain"))
+    framework = _get_string_value(span.attributes.get("nat.framework")) or "unknown"

If discriminators require framework, make sure adapters treat "unknown" as non-matching so that the error path with registered adapter listing triggers correctly.


83-97: Great diagnostics on schema detection failures.

Enumerating registered adapters greatly helps operability. Consider logging the offending span attribute keys at DEBUG for faster triage, but the current error is already helpful.

packages/nvidia_nat_data_flywheel/tests/observability/schema/test_trace_container.py (2)

52-57: Make cleanup import explicit to avoid NameError masking.

The teardown uses TraceAdapterRegistry without importing in that scope and relies on catching NameError. Re-import explicitly to keep the intent clear and avoid accidental masking of unrelated NameErrors.

-        # Clean up after each test
-        try:
-            TraceAdapterRegistry.clear_registry()
-        except (ImportError, NameError):
-            pass  # Registry not available
+        # Clean up after each test
+        try:
+            from nat.plugins.data_flywheel.observability.processor.trace_conversion.trace_adapter_registry import (
+                TraceAdapterRegistry,
+            )
+            TraceAdapterRegistry.clear_registry()
+        except ImportError:
+            pass  # Registry not available

154-168: ImportError resilience test is a bit heavy-handed.

Patching builtins.__import__ will trip all imports and can hide other issues. Consider narrowly patching only the registry import with import hooks or module injection via sys.modules. Optional; current test does validate the intended path.

packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (4)

41-43: Brittle base-class check — prefer issubclass/isinstance over bases.

__class__.__bases__ is an implementation detail and brittle with generics/MRO. You already assert isinstance(processor, BatchingProcessor). If you still want a class-level check, use issubclass(DictBatchingProcessor, BatchingProcessor).

Apply this diff:

-        # Verify it's properly typed for dict
-        assert processor.__class__.__bases__ == (BatchingProcessor, )
+        # Verify inheritance without relying on fragile __bases__
+        assert issubclass(DictBatchingProcessor, BatchingProcessor)

247-264: Strengthen batching-parameter assertions.

The test confirms that a DictBatchingProcessor was added but doesn’t assert parameter propagation. If BatchingProcessor exposes attributes (e.g., batch_size, flush_interval, max_queue_size, drop_on_overflow, shutdown_timeout), add conditional assertions to validate values. This catches wiring regressions.

If those attributes aren’t public, we can expose read-only properties on DictBatchingProcessor to support testing. I can draft that change if you want.


187-194: Unify async test style across the suite.

Here you use asyncio.run(...). In other test modules you use native async def with pytest.mark.asyncio. Consider standardizing on one style (prefer pytest.mark.asyncio) for consistency and cleaner stack traces.

If you choose pytest.mark.asyncio, ensure pytest-asyncio is enabled in the test environment.


304-323: Questionable value of “invalid parameter types” test.

This test passes obviously invalid types and then accepts either outcome. It’s unlikely to catch regressions and may create noise. Consider removing it or asserting a specific behavior (e.g., raising TypeError) to make it meaningful.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/dfw_record_processor.py (4)

16-16: Remove unused import and prefer model_dump over JSON round-trip.

json is only used for a model->JSON->dict round-trip. In Pydantic v2, model_dump(mode="json", by_alias=True) returns JSON-serializable Python structures without the overhead of json.loads(model_dump_json(...)).

Apply this diff:

-import json

56-56: Avoid JSON round-trip; use model_dump(mode="json", by_alias=True).

Switching to model_dump reduces CPU overhead and avoids transient string allocation while preserving alias behavior.

Apply this diff:

-        return json.loads(item.model_dump_json(by_alias=True))
+        return item.model_dump(mode="json", by_alias=True)

15-15: Add a module docstring per project guidelines.

Public modules require Google-style module docstrings. Add a concise first-line description ending with a period.

Apply this diff:

@@
 # limitations under the License.
 
+"""Processors bridging spans and Data Flywheel (DFW) records.
+
+This module provides:
+- DFWToDictProcessor: serializes DFW Pydantic records to dicts using alias-aware JSON-compatible dumps.
+- SpanToDFWRecordProcessor: converts NAT spans into DFW record models via the trace adapter registry.
+"""

80-86: Consider accepting string event types for robustness.

If upstream ever sends "LLM_START" as a string (instead of IntermediateStepType.LLM_START), the match won’t trigger. A small guard to normalize to enum improves resilience without altering behavior.

I can provide a small patch that maps string values to the enum before matching if desired.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (3)

16-21: Remove unused TypeVar import and ProviderT declaration.

ProviderT is unused and will trip ruff. Drop the import and declaration.

Apply this diff:

-from typing import Any
-from typing import TypeVar
+from typing import Any
@@
-ProviderT = TypeVar("ProviderT")

30-30: Use or drop logger to satisfy ruff.

logger is currently unused. Either use it for debug messages in validators or prefix with underscore to indicate intentional unused.

Example (use in validators after normalization):

-logger = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)

And inside validate_input_value/validate_metadata, add:

logger.debug("Normalized %s field for OpenAITraceSourceBase.", "input_value" or "metadata")

15-15: Add module docstring per project guidelines.

Public modules should have a concise Google-style docstring.

Apply this diff:

@@
 # limitations under the License.
 
+"""OpenAI trace source schema provider for Data Flywheel integration.
+
+Defines OpenAIMetadata and an OpenAITraceSource model that normalize input_value and
+metadata attributes for adapter-based conversion pipelines.
+"""
packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/test_span_to_dfw_record.py (2)

208-217: Make the logging test assert something meaningful.

The current test doesn’t assert anything and comments suggest it might not work. Assert that the debug logger was called at least once to validate the path, or remove the test.

Apply this diff:

-        # Note: This test may not work as expected since we're using real functionality
-        # and the logger calls depend on internal implementation details
-        # Consider removing this test or adapting it to test actual logging behavior
+        # Assert debug logging occurred at least once
+        assert mock_logger.debug.call_count >= 1

100-142: Fragility note: source type asserted as dict depends on registry state.

These tests rely on an empty adapter registry so that TraceContainer.source remains a dict. If automatic adapter registration is added in the future, this may become a typed model and break the assertions. You already clear the registry; keep this in mind if auto-registration occurs at import time.

If needed, explicitly assert the registry union is Any before calling get_trace_container to make the test intent robust.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (2)

16-21: Add a brief module docstring after the SPDX header.

Public modules under packages/*/src should include a concise Google-style docstring per guidelines.

Apply this diff near the top of the file:

@@
 # limitations under the License.
 
+"""Data Flywheel base exporter and batching utilities.
+
+Provides a contract-driven span export pipeline that converts spans into
+DFW records, serializes them to dict, batches them, and forwards to sinks.
+"""
+
 import logging
 from abc import abstractmethod

83-95: Property contract is fine, but consider marking contract as a class attribute for clarity.

Keeping the property is OK, yet a class-level EXPORT_CONTRACT: type[BaseModel] improves readability and prevents late-binding pitfalls.

packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/adapter/elasticsearch/test_openai_converter.py (7)

85-90: The “case-sensitivity” test comment is misleading.

You assert that "User" falls back to "user" which is correct, but the comment “Should fallback to default” can be clarified to “Falls back to default user role due to unmapped case.”

-        assert convert_role("User") == "user"  # Should fallback to default
+        assert convert_role("User") == "user"  # Unmapped case -> fallback to default user

178-200: Prefer pytest.raises over try/except + asserts.

Using pytest.raises yields clearer intent and failure messages.

-        try:
-            create_message_by_role("user", None)
-            assert False, "Expected AssertionError for None content in user message"
-        except AssertionError as e:
-            assert "User message content cannot be None" in str(e)
+        import pytest
+        with pytest.raises((AssertionError, ValueError)) as exc:
+            create_message_by_role("user", None)
+        assert "User message content cannot be None" in str(exc.value)
@@
-        try:
-            create_message_by_role("system", None)
-            assert False, "Expected AssertionError for None content in system message"
-        except AssertionError as e:
-            assert "System message content cannot be None" in str(e)
+        with pytest.raises((AssertionError, ValueError)) as exc:
+            create_message_by_role("system", None)
+        assert "System message content cannot be None" in str(exc.value)
@@
-        try:
-            create_message_by_role("tool", None)
-            assert False, "Expected AssertionError for None content in tool message"
-        except AssertionError as e:
-            assert "Tool message content cannot be None" in str(e)
+        with pytest.raises((AssertionError, ValueError)) as exc:
+            create_message_by_role("tool", None)
+        assert "Tool message content cannot be None" in str(exc.value)

690-700: Tighten expectation: missing “message” should deterministically raise ValueError.

The converter can avoid TypeError by guarding finish_reason type. Update the test to only accept ValueError.

-        try:
-            convert_chat_response(chat_response, "test_span", 0)
-            assert False, "Expected ValueError for missing message"
-        except (ValueError, TypeError) as e:
-            # Either ValueError for missing message or TypeError for finish_reason handling
-            assert "Chat response missing message" in str(e) or "unhashable type" in str(e)
+        import pytest
+        with pytest.raises(ValueError) as exc:
+            convert_chat_response(chat_response, "test_span", 0)
+        assert "Chat response missing message" in str(exc.value)

701-710: Same here: ensure None “message” yields ValueError.

-        try:
-            convert_chat_response(chat_response, "test_span", 0)
-            assert False, "Expected ValueError for None message"
-        except ValueError as e:
-            assert "Chat response missing message" in str(e)
+        import pytest
+        with pytest.raises(ValueError) as exc:
+            convert_chat_response(chat_response, "test_span", 0)
+        assert "Chat response missing message" in str(exc.value)

931-938: Prefer pytest.raises for message conversion error.

This reads cleaner and aligns with the changes that replace assert-based validation with ValueError in the converter.

-        try:
-            convert_langchain_openai(trace_container)
-            assert False, "Expected error for invalid message"
-        except (ValueError, AssertionError) as e:
-            # Either AssertionError for None content or ValueError from message conversion wrapper
-            assert "User message content cannot be None" in str(
-                e) or "Failed to convert message in trace source" in str(e)
+        import pytest
+        with pytest.raises(ValueError) as exc:
+            convert_langchain_openai(trace_container)
+        assert "Failed to convert message in trace source" in str(exc.value)

959-964: Use pytest.raises for invalid chat response.

If converter guards finish_reason typing, expect ValueError only.

-        try:
-            convert_langchain_openai(trace_container)
-            assert False, "Expected error for invalid chat response"
-        except (ValueError, TypeError) as e:
-            # Either TypeError for unhashable dict used as key or ValueError from chat response conversion wrapper
-            assert "unhashable type" in str(e) or "Failed to convert chat response 0" in str(e)
+        import pytest
+        with pytest.raises(ValueError) as exc:
+            convert_langchain_openai(trace_container)
+        assert "Failed to convert chat response 0" in str(exc.value)

607-657: Add a test for non-string finish_reason to prevent TypeError regressions.

Proactively cover finish_reason values that are non-hashable (e.g., dict) or unexpected types (e.g., int).

@@ class TestConvertChatResponse:
     def test_convert_chat_response_with_unknown_finish_reason(self):
@@
         assert result.finish_reason is None  # Should be None for unmapped finish reasons
+
+    def test_convert_chat_response_with_non_string_finish_reason(self):
+        chat_response = {
+            "message": {
+                "content": "Response with non-string finish reason",
+                "response_metadata": {"finish_reason": {"not": "hashable"}},
+                "additional_kwargs": {},
+            }
+        }
+        result = convert_chat_response(chat_response, "test_span", 0)
+        assert result.finish_reason is None

Also applies to: 670-689

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/schema_registry.py (3)

16-21: Add a module-level docstring for the registry.

Public modules should have a concise docstring.

@@
 import logging
 from typing import TypeVar
 
 from pydantic import BaseModel
 
+"""Runtime registry for versioned DFW schema contracts per destination (e.g., Elasticsearch)."""
+
 logger = logging.getLogger(__name__)

31-56: Decorator API and typing look good. Consider guarding concurrent access.

If schemas are registered from multiple threads (plugin discovery), wrap mutations with a lock.

@@
-class SchemaRegistry:
+from threading import RLock
+_SCHEMA_LOCK = RLock()
+
+class SchemaRegistry:
@@
-            if name not in cls._schemas:
-                cls._schemas[name] = {}
+            with _SCHEMA_LOCK:
+                if name not in cls._schemas:
+                    cls._schemas[name] = {}
@@
-            if version in cls._schemas[name]:
-                logger.warning("Overriding existing schema for %s:%s", name, version)
+                if version in cls._schemas[name]:
+                    logger.warning("Overriding existing schema for %s:%s", name, version)
 
-            cls._schemas[name][version] = schema_cls
+                cls._schemas[name][version] = schema_cls

And guard other mutating methods similarly (clear).


84-95: Minor: list comprehension would be more concise.

No behavior change; readability only.

-        schemas = []
-        for name, versions in cls._schemas.items():
-            for version in versions.keys():
-                schemas.append(f"{name}:{version}")
-        return schemas
+        return [f"{name}:{version}" for name, versions in cls._schemas.items() for version in versions.keys()]
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/trace_adapter_registry.py (2)

16-23: Add a module docstring.

A short description helps users discover this dynamic adapter facility.

@@
 import logging
 from collections.abc import Callable
 from functools import reduce
 from typing import Any
 
+"""Dynamic registry that maps Pydantic trace source models to target conversion types.
+
+Used to select the right converter at runtime and to build a dynamic Union for validation.
+"""
+
 from nat.plugins.data_flywheel.observability.schema.trace_container import TraceContainer

36-75: Support forward-referenced return annotations (strings).

If a converter uses postponed evaluation of annotations, __annotations__['return'] can be a string. Use typing.get_type_hints for robustness.

@@
-        def decorator(func):
-            return_type = func.__annotations__.get('return')
+        def decorator(func):
+            from typing import get_type_hints
+            try:
+                hints = get_type_hints(func)
+                return_type = hints.get('return')
+            except Exception:
+                return_type = func.__annotations__.get('return')
@@
-            if return_type is None:
+            if return_type is None:
                 raise ValueError(f"Converter function '{func.__name__}' must have a return type annotation.\n"
                                  f"Example: def {func.__name__}(trace: TraceContainer) -> DFWESRecord:")
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/openai_converter.py (3)

16-24: Add a module docstring describing the adapter.

Clarifies scope (OpenAI→Elasticsearch converter, role and finish_reason normalization).

@@
 import json
 import logging
 
+"""OpenAI trace adapter for the Elasticsearch Data Flywheel schema.
+
+Normalizes roles, maps finish reasons, validates tool schemas, and converts
+LangChain/OpenAI traces to DFWESRecord instances.
+"""

277-299: Wrap assertion-based failures from message conversion consistently as ValueError.

convert_message_to_dfw can raise ValueError (after the assert→ValueError change above). Catch both ValueError and AssertionError here for safety, and re-wrap as ValueError for consistent API.

@@
-    for message in trace_source.source.input_value:
-        try:
-            msg_result = convert_message_to_dfw(message)
-            messages.append(msg_result)
-        except ValueError as e:
-            raise ValueError(f"Failed to convert message in trace source: {e}") from e
+    for message in trace_source.source.input_value:
+        try:
+            msg_result = convert_message_to_dfw(message)
+            messages.append(msg_result)
+        except (ValueError, AssertionError) as e:
+            raise ValueError(f"Failed to convert message in trace source: {e}") from e

229-275: Minor: validate “message” structure early for clearer errors.

Avoids doing .get() on non-dicts and yields a predictable ValueError.

@@
-    message = chat_response.get("message", {})
-    if message is None:
-        raise ValueError(f"Chat response missing message for span: '{span_name}'")
+    if "message" not in chat_response or chat_response.get("message") is None:
+        raise ValueError(f"Chat response missing message for span: '{span_name}'")
+    message = chat_response["message"]
+    if not isinstance(message, dict):
+        raise ValueError(f"Chat response message must be a dict in span: '{span_name}'")
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py (4)

40-46: Include content_filter in FinishReason to match provider outputs.

Some providers (incl. OpenAI) may return content_filter for safety blocks. Without it, parsing will fail for such responses.

Apply this diff if applicable to your target providers:

 class FinishReason(str, Enum):
     """Finish reason for chat completion responses."""
 
     STOP = "stop"
     LENGTH = "length"
     TOOL_CALLS = "tool_calls"
+    CONTENT_FILTER = "content_filter"

If other finish reasons are possible, consider a fallback strategy (e.g., allow arbitrary strings) to avoid hard failures.


103-106: Make FunctionParameters.required optional with sensible default.

Tool schemas frequently omit required. For robustness, default to an empty list.

Apply this diff:

-    required: list[str] = Field(..., description="The required properties of the function parameters.")
+    required: list[str] = Field(default_factory=list,
+                                 description="The required properties of the function parameters.")

130-151: Optional: prefer Sequence[...] for read-only collections.

Guidelines prefer collections.abc abstractions. Using Sequence (e.g., for messages, tool_calls, choices) broadens compatibility without constraining callers.

No change required, but consider this for future iterations if it doesn’t conflict with Pydantic serialization expectations.

Also applies to: 156-168, 173-188


198-198: Consider lowering log severity for expected no-tool-call scenarios.

Requests with tools but no tool calls can be normal (e.g., model decided to answer directly). warning may create noise; info could be more appropriate.

Apply this diff if desired:

-                logger.warning("Request has tools but response has no tool calls")
+                logger.info("Request has tools but response has no tool calls")

Signed-off-by: Matthew Penn <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (2)

30-44: Resolved: docstring type shapes now match the signature.

Thanks for aligning the docstring with the signature for elasticsearch_auth and headers.


70-75: Do not ignore Elasticsearch responses; detect partial failures and surface them.

As written, bulk and single-index errors will be silently dropped, causing data loss and hard-to-debug gaps. Inspect the responses and raise on failure; log useful context.

-            await self._elastic_client.bulk(operations=bulk_operations)
+            resp = await self._elastic_client.bulk(operations=bulk_operations)
+            if isinstance(resp, dict) and resp.get("errors"):
+                # Extract first error for quick triage; full response is large
+                first_err = None
+                for it in resp.get("items", []):
+                    meta = it.get("index") or it.get("create") or it.get("update") or it.get("delete")
+                    if meta and meta.get("error"):
+                        first_err = meta
+                        break
+                logger.error("Elasticsearch bulk indexing reported errors (index=%s): first=%s", self._index, first_err)
+                raise RuntimeError("Elasticsearch bulk indexing reported errors; see logs for details.")
         elif isinstance(item, dict):
             # Single document export
-            await self._elastic_client.index(index=self._index, document=item)
+            resp = await self._elastic_client.index(index=self._index, document=item)
+            if isinstance(resp, dict) and resp.get("result") not in {"created", "updated"}:
+                logger.warning("Unexpected Elasticsearch index result (index=%s): %s", self._index, resp)

Consider adding tests that simulate a partial bulk failure and assert that an exception is raised.

🧹 Nitpick comments (9)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (9)

30-37: Add missing return annotation and type for variadic params to satisfy “all public APIs typed.”

init should explicitly return None, and variadic params should be typed to avoid pyright/ruff warnings given the repo settings.

Apply:

-    def __init__(self,
-                 *args,
-                 endpoint: str,
-                 index: str,
-                 elasticsearch_auth: tuple[str, str],
-                 headers: dict[str, str] | None = None,
-                 **kwargs):
+    def __init__(self,
+                 *args: "Any",
+                 endpoint: str,
+                 index: str,
+                 elasticsearch_auth: tuple[str, str],
+                 headers: dict[str, str] | None = None,
+                 **kwargs: "Any") -> None:

Add imports (see next comment) to define Any without forward refs.


16-21: Import typing symbols used in annotations.

Required if you adopt the previous suggestion.

 import logging
+from typing import Any, Mapping, Sequence

45-47: Default both Accept and Content-Type for ES compatibility headers.

When using compatibility headers with ES 8, set both Accept and Content-Type to avoid content-type negotiation surprises.

-        if headers is None:
-            headers = {"Accept": "application/vnd.elasticsearch+json; compatible-with=8"}
+        if headers is None:
+            headers = {
+                "Accept": "application/vnd.elasticsearch+json; compatible-with=8",
+                "Content-Type": "application/vnd.elasticsearch+json; compatible-with=8",
+            }

52-57: Broaden typing to Sequence[Mapping[str, Any]] and improve docstring accordingly.

This follows the repo guideline to prefer typing/collections.abc abstractions and conveys the payload shape.

-    async def export_processed(self, item: dict | list[dict]) -> None:
+    async def export_processed(self, item: Mapping[str, Any] | Sequence[Mapping[str, Any]]) -> None:
         """Export a batch of spans.
 
         Args:
-            item (dict | list[dict]): Dictionary or list of dictionaries to export to Elasticsearch.
+            item (Mapping[str, Any] | Sequence[Mapping[str, Any]]): Single document or a sequence of documents
+                to export to Elasticsearch.
         """
-        if isinstance(item, list):
+        if isinstance(item, Sequence) and not isinstance(item, (bytes, str)):

Note: The bytes/str guard avoids treating a JSON string or bytes payload as a sequence of chars.


48-49: Optionally enable sane client defaults (timeouts/retries) and TLS guardrails.

To make the exporter resilient by default, consider enabling retry_on_timeout and reasonable timeouts; optionally warn on non-HTTPS endpoints in production.

-        self._elastic_client = AsyncElasticsearch(endpoint, basic_auth=elasticsearch_auth, headers=headers)
+        # Enable basic resiliency by default; allow callers to override via kwargs if needed
+        self._elastic_client = AsyncElasticsearch(
+            endpoint,
+            basic_auth=elasticsearch_auth,
+            headers=headers,
+            retry_on_timeout=True,
+            max_retries=3,
+            request_timeout=10.0,
+            **kwargs,
+        )

Optionally:

  • emit a warning if endpoint startswith("http://").
  • expose these knobs explicitly on the mixin’s init if you want config-surface parity with other exporters.

23-28: Nit: capitalize “Elasticsearch” in docstrings.

Use the proper product name casing in user-facing docs and docstrings.

-    """Mixin for elasticsearch exporters.
+    """Mixin for Elasticsearch exporters.
@@
-    This mixin provides elasticsearch-specific functionality for SpanExporter exporters.
-    It handles elasticsearch-specific resource tagging and uses the AsyncElasticsearch client.
+    This mixin provides Elasticsearch-specific functionality for SpanExporter exporters.
+    It handles Elasticsearch-specific resource tagging and uses the AsyncElasticsearch client.

52-75: Add an async close to release the client cleanly.

The mixin allocates an AsyncElasticsearch client but never closes it; add an explicit close to avoid connector/socket leaks in long-running agents.

Add this method (outside the shown range) and ensure callers invoke it on shutdown:

async def aclose(self) -> None:
    """Close the underlying Elasticsearch client."""
    await self._elastic_client.close()

If your exporter base defines a lifecycle hook (e.g., close/stop), wire this through there instead.


58-69: Minor: chunk very large bulks to control payload size and backpressure.

If batches can be large, consider chunking (e.g., 1k docs per bulk call) to reduce memory usage and failure blast radius.

I can provide a chunked variant if desired.


39-44: Clarify endpoint/auth expectations in docstring.

Document expected endpoint form (e.g., https://host:port) and that elasticsearch_auth is Basic Auth (user, pass). If API keys are needed later, note the future support surface.

-            endpoint (str): The elasticsearch endpoint.
+            endpoint (str): The Elasticsearch endpoint (e.g., "https://host:9200").
@@
-            elasticsearch_auth (tuple[str, str]): The elasticsearch authentication credentials.
+            elasticsearch_auth (tuple[str, str]): Basic authentication credentials as (username, password).
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1df87a7 and e342138.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • packages/nvidia_nat_data_flywheel/pyproject.toml (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py (1 hunks)
  • src/nat/profiler/decorators/function_tracking.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/nat/profiler/decorators/function_tracking.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py
  • packages/nvidia_nat_data_flywheel/pyproject.toml
🧰 Additional context used
📓 Path-based instructions (9)
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
packages/*/src/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

All importable Python code in packages must live under packages//src/

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
{src/**/*.py,packages/*/src/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header

comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
packages/**/*

⚙️ CodeRabbit configuration file

packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain a pyproject.toml file. - The pyproject.toml file should declare a dependency on nvidia-nat or another package with a name starting
with nvidia-nat-. This dependency should be declared using ~=<version>, and the version should be a two
digit version (ex: ~=1.0).

  • Not all packages contain Python code, if they do they should also contain their own set of tests, in a
    tests/ directory at the same level as the pyproject.toml file.

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
🧬 Code graph analysis (1)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (2)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (1)
  • export_processed (79-88)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (1)
  • export_processed (97-98)
🔇 Additional comments (1)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (1)

1-15: License header looks correct and up to date.

SPDX header and copyright years (2024–2025) are present and correct.

Copy link
Member

@willkill07 willkill07 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed a first-pass

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

♻️ Duplicate comments (3)
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md (1)

20-23: Fix project naming in heading and first mention; align with repo nomenclature.

Use “NeMo Agent Toolkit” (capital T) in headings, and use “NVIDIA NeMo Agent toolkit” on first mention in body; subsequent mentions “NeMo Agent toolkit”. Also prefer “GitHub repo” casing.

Apply:

-# NVIDIA Agent Toolkit Subpackage
-This is a subpackage for NVIDIA Data Flywheel Blueprint integration for continuous model improvement.
-
-For more information about the NVIDIA NeMo Agent toolkit, please visit the [NeMo Agent toolkit GitHub Repo](https://github.com/NVIDIA/NeMo-Agent-Toolkit).
+# NVIDIA NeMo Agent Toolkit Data Flywheel subpackage
+This subpackage integrates the NVIDIA Data Flywheel Blueprint with the NVIDIA NeMo Agent toolkit to enable continuous model improvement.
+
+For more information about the NVIDIA NeMo Agent toolkit, please visit the [NeMo Agent toolkit GitHub repo](https://github.com/NVIDIA/NeMo-Agent-Toolkit).
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py (1)

18-20: Import ConfigDict to make extras behavior explicit.

If you intend to preserve unknown fields arriving from upstream LangChain/OpenAI objects, configure extra="allow"; otherwise, explicitly set extra="ignore" to codify the current behavior and avoid regressions.

-from pydantic import BaseModel
-from pydantic import Field
+from pydantic import BaseModel, Field, ConfigDict
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (1)

16-19: Replace inline comment with a proper module docstring.

Public modules require a top-level docstring. Fold the inline note into it and keep the public re-exports. (Echoing a prior review on this file.)

-#  schemas are auto-discovered by the discovery system - no manual imports needed
+"""Elasticsearch sink schema public exports.
+
+Schemas are auto-discovered by the discovery system; no manual imports are needed.
+"""
 from .contract_version import ContractVersion
 from .dfw_es_record import DFWESRecord
🧹 Nitpick comments (8)
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md (1)

18-18: Tighten banner title casing (nit).

The image title currently says “NeMo Agent toolkit banner image”. For consistency with naming rules, prefer “NeMo Agent Toolkit banner image”.

Apply:

-![NVIDIA NeMo Agent Toolkit](https://media.githubusercontent.com/media/NVIDIA/NeMo-Agent-Toolkit/refs/heads/main/docs/source/_static/banner.png "NeMo Agent toolkit banner image")
+![NVIDIA NeMo Agent Toolkit](https://media.githubusercontent.com/media/NVIDIA/NeMo-Agent-Toolkit/refs/heads/main/docs/source/_static/banner.png "NeMo Agent Toolkit banner image")
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (3)

18-29: Remove unused TypeVar.

ProviderT is defined but unused.

-from typing import TypeVar
@@
-ProviderT = TypeVar("ProviderT")

42-70: Broaden iterable handling for input_value (optional).

Accept generic sequences (e.g., tuples) in addition to lists. This improves robustness and aligns with the suggested Sequence typing.

@@
-        # Validate list of messages
-        if isinstance(v, list):
+        # Validate a sequence of messages
+        if isinstance(v, (list, tuple)):
             validated_messages = []
             for msg in v:
                 if isinstance(msg, dict):
                     validated_messages.append(OpenAIMessage(**msg))
                 elif isinstance(msg, OpenAIMessage):
                     validated_messages.append(msg)
                 else:
                     raise ValueError(f"Invalid message format: {msg}")
             return validated_messages

71-85: Remove commented-out validator to reduce noise.

Dead code increases maintenance overhead.

-    # @field_validator("metadata", mode="before")
-    # @classmethod
-    # def validate_metadata(cls, v: Any) -> dict[str, Any]:
-    #     """Validate the metadata for the OpenAITraceSource."""
-    #     if v is None:
-    #         return {}
-    #
-    #     if isinstance(v, str):
-    #         metadata = deserialize_span_attribute(v)
-    #         if not isinstance(metadata, dict):
-    #             raise ValueError(f"Invalid metadata format: {metadata}")
-    #         return metadata
-    #
-    #     raise ValueError(f"Invalid metadata format: {v}")
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (1)

21-21: Nit: ensure a single trailing newline at EOF.

Repository style requires exactly one newline at the end of each file.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)

16-21: Add a concise module docstring.

All public modules need a top-level docstring per repo guidelines.

@@
-from enum import Enum
+"""Versioned contract selection for the Elasticsearch sink schema.
+
+Exposes ContractVersion and a resolver to obtain the Pydantic contract
+class via the SchemaRegistry for a given version.
+"""
+from enum import Enum
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (2)

15-20: Add a module docstring summarizing the exporter.

Provide a one-liner description to satisfy documentation requirements.

@@
 # limitations under the License.
 
+"""Data Flywheel exporter implementation targeting Elasticsearch."""
+
 import logging

58-68: Optional: validate required Elasticsearch kwargs early for clearer errors.

Because ElasticsearchMixin.init requires endpoint/index/elasticsearch_auth, pre-validating produces a friendlier message than a TypeError from MRO.

@@     def __init__(self,
-        # Initialize both mixins - ElasticsearchMixin expects elasticsearch_kwargs,
-        # DFWExporter expects the standard exporter parameters
-        self.contract_version = contract_version
+        # Initialize both mixins - ElasticsearchMixin expects elasticsearch_kwargs,
+        # DFWExporter expects the standard exporter parameters
+        self.contract_version = contract_version
+        required = ("endpoint", "index", "elasticsearch_auth")
+        missing = tuple(k for k in required if k not in elasticsearch_kwargs)
+        if missing:
+            raise ValueError(
+                f"Missing required Elasticsearch kwargs: {missing}. "
+                "Provide 'endpoint', 'index', and 'elasticsearch_auth'.")
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e342138 and 863835c.

📒 Files selected for processing (12)
  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/openai_converter.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_to_dfw_record.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/adapter/elasticsearch/test_openai_converter.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/adapter/elasticsearch/test_openai_converter.py
  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_to_dfw_record.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/openai_converter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py
🧰 Additional context used
📓 Path-based instructions (9)
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
packages/*/src/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

All importable Python code in packages must live under packages//src/

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
{src/**/*.py,packages/*/src/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header

comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
packages/**/*

⚙️ CodeRabbit configuration file

packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain a pyproject.toml file. - The pyproject.toml file should declare a dependency on nvidia-nat or another package with a name starting
with nvidia-nat-. This dependency should be declared using ~=<version>, and the version should be a two
digit version (ex: ~=1.0).

  • Not all packages contain Python code, if they do they should also contain their own set of tests, in a
    tests/ directory at the same level as the pyproject.toml file.

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
🧠 Learnings (3)
📚 Learning: 2025-08-25T15:01:54.258Z
Learnt from: CR
PR: NVIDIA/NeMo-Agent-Toolkit#0
File: .cursor/rules/general.mdc:0-0
Timestamp: 2025-08-25T15:01:54.258Z
Learning: Applies to docs/source/**/*.md : Use the official naming: first use 'NVIDIA NeMo Agent toolkit'; subsequent references 'NeMo Agent toolkit'; in headings use 'NeMo Agent Toolkit' (capital T)

Applied to files:

  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
📚 Learning: 2025-08-25T15:01:54.259Z
Learnt from: CR
PR: NVIDIA/NeMo-Agent-Toolkit#0
File: .cursor/rules/general.mdc:0-0
Timestamp: 2025-08-25T15:01:54.259Z
Learning: Applies to docs/source/**/*.md : Never use deprecated names (Agent Intelligence toolkit, aiqtoolkit, AgentIQ, AIQ/aiq) in documentation; update to the current naming unless explicitly referring to deprecated names

Applied to files:

  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
📚 Learning: 2025-08-25T15:01:54.258Z
Learnt from: CR
PR: NVIDIA/NeMo-Agent-Toolkit#0
File: .cursor/rules/general.mdc:0-0
Timestamp: 2025-08-25T15:01:54.258Z
Learning: Applies to {src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py} : Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)

Applied to files:

  • packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
🧬 Code graph analysis (4)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (3)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py (1)
  • OpenAIMessage (23-31)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/trace_source_base.py (1)
  • TraceSourceBase (20-22)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py (1)
  • deserialize_span_attribute (23-42)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (2)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)
  • ContractVersion (23-31)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py (1)
  • DFWESRecord (192-222)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/schema_registry.py (2)
  • SchemaRegistry (26-135)
  • get_schema (58-81)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (4)
src/nat/builder/context.py (1)
  • ContextState (65-90)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (3)
  • DFWExporter (43-98)
  • export_contract (85-94)
  • export_processed (97-98)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (2)
  • ElasticsearchMixin (23-75)
  • export_processed (52-75)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (2)
  • ContractVersion (23-31)
  • get_contract_class (29-31)
🪛 LanguageTool
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md

[grammar] ~20-~20: There might be a mistake here.
Context: ...age") # NVIDIA Agent Toolkit Subpackage This is a subpackage for NVIDIA Data Fly...

(QB_NEW_EN)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: CI Pipeline / Test (amd64, 3.12)
  • GitHub Check: CI Pipeline / Test (arm64, 3.12)
  • GitHub Check: CI Pipeline / Test (arm64, 3.11)
  • GitHub Check: CI Pipeline / Build Wheels
  • GitHub Check: CI Pipeline / Test (amd64, 3.11)
🔇 Additional comments (7)
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md (2)

1-16: SPDX/Apache-2.0 header looks correct.

Header block is present, year is 2025, and identifiers match Apache-2.0.


24-24: EOF newline count correctpackages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md ends with exactly one trailing newline and no extra blank lines, satisfying our style rules.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (1)

17-20: Re-export surface looks good.

Import targets exist and all exposes the minimal public surface consumers need.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)

23-31: Enum and resolver design LGTM.

  • String-valued Enum aids serialization.
  • get_contract_class delegates to the registry cleanly with a precise return type.
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (3)

70-78: Contract resolution property looks correct.

Property returns the Pydantic contract class via the selected version; this pairs correctly with DFWExporter’s processor wiring.


79-88: Export delegation LGTM.

Delegating to ElasticsearchMixin handles both single-doc and bulk cases; type hints align with the mixin signature.


1-15: Packaging sanity check passed

  • Verified that packages/nvidia_nat_data_flywheel/pyproject.toml exists
  • Confirmed it declares the required dependency nvidia-nat~=1.3 per repo guidelines

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (8)
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (4)

31-40: Avoid brittle base-class checks; use issubclass instead of inspecting bases.

Directly asserting __bases__ is fragile and will fail if a mixin or intermediate base is introduced. Prefer issubclass (and you already assert isinstance).

-        # Verify it's properly typed for dict
-        assert processor.__class__.__bases__ == (BatchingProcessor, )
+        # Verify class relationship without relying on fragile __bases__ tuple
+        assert issubclass(DictBatchingProcessor, BatchingProcessor)

73-76: Redundant export_contract override in the concrete test exporter.

The base class already exposes export_contract from _export_contract. Overriding it here adds noise without additional value.

-    @property
-    def export_contract(self) -> type[BaseModel]:
-        return MockExportContract

109-117: Strengthen processor-chain assertions to verify type and order, not just count.

Counting calls doesn’t guarantee correct wiring. Validate the exact types and order of processors added.

Add imports near the top (see separate diff below), then extend the assertions:

         # Verify processors were added (4 total: span, dict, batching, filter)
         assert mock_add_processor.call_count == 4
+        calls = [args[0] for args, _ in mock_add_processor.call_args_list]
+        assert isinstance(calls[0], SpanToDFWRecordProcessor)
+        assert isinstance(calls[1], DFWToDictProcessor)
+        assert isinstance(calls[2], DictBatchingProcessor)
+        assert isinstance(calls[3], DictBatchFilterProcessor)
+        # Optional: ensure client_id propagated to the span->DFW processor
+        assert getattr(calls[0], "_client_id", None) == client_id

And add the required imports:

@@
 from nat.plugins.data_flywheel.observability.exporter.dfw_exporter import DFWExporter
 from nat.plugins.data_flywheel.observability.exporter.dfw_exporter import DictBatchingProcessor
+from nat.plugins.data_flywheel.observability.processor.dfw_record_processor import (
+    DFWToDictProcessor,
+    SpanToDFWRecordProcessor,
+)
+from nat.observability.processor.falsy_batch_filter_processor import DictBatchFilterProcessor

153-170: Optional: Assert batching processor placement and type in the batching-params test.

You already count calls. Also assert that the 3rd processor is the batching one to catch order regressions.

             # Verify that processors were added
             assert mock_add_processor.call_count == 4
+            calls = [args[0] for args, _ in mock_add_processor.call_args_list]
+            assert isinstance(calls[2], DictBatchingProcessor)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (4)

1-15: Add a concise module-level docstring (Google style).

Public modules in packages/*/src should have a top-level docstring per the guidelines.

@@
 # limitations under the License.
 
+"""Data Flywheel exporter base and dict-batching processor.
+
+Defines:
+- DictBatchingProcessor: a dict-specialized batching processor for bulk exports.
+- DFWExporter: an abstract base wiring the processing chain
+  Span -> DFW Pydantic record -> dict -> batched dicts -> filtered batch.
+"""
+

16-29: Remove unused logger import/variable or log something.

logger is declared but never used; ruff will flag this.

-import logging
@@
-logger = logging.getLogger(__name__)

44-64: Validate constructor arguments to prevent silent misconfiguration.

Add lightweight runtime checks for export_contract type and batching parameters (positives and correct types). This avoids hard-to-debug failures further down the pipeline.

     def __init__(self,
                  export_contract: type[BaseModel],
                  context_state: ContextState | None = None,
                  batch_size: int = 100,
                  flush_interval: float = 5.0,
                  max_queue_size: int = 1000,
                  drop_on_overflow: bool = False,
                  shutdown_timeout: float = 10.0,
                  client_id: str = "default"):
@@
-        super().__init__(context_state)
+        super().__init__(context_state)
+
+        # Basic validation to catch misconfiguration early
+        if not isinstance(export_contract, type) or not issubclass(export_contract, BaseModel):
+            raise TypeError("export_contract must be a Pydantic BaseModel subclass")
+        for name, value, predicate in (
+            ("batch_size", batch_size, lambda v: isinstance(v, int) and v > 0),
+            ("max_queue_size", max_queue_size, lambda v: isinstance(v, int) and v > 0),
+            ("flush_interval", flush_interval, lambda v: isinstance(v, (int, float)) and v > 0),
+            ("shutdown_timeout", shutdown_timeout, lambda v: isinstance(v, (int, float)) and v >= 0),
+        ):
+            if not predicate(value):
+                raise ValueError(f"{name} must be a positive number (got {value!r})")

41-79: Optional: allow class-level contract declaration to simplify subclasses.

Some exporters may prefer a class attribute (e.g., EXPORT_CONTRACT = MyContract) over passing export_contract at instantiation. Supporting both simplifies subclass ergonomics without invoking virtual properties during construction.

-class DFWExporter(SpanExporter[Span, dict]):
+class DFWExporter(SpanExporter[Span, dict]):
@@
-    def __init__(self,
-                 export_contract: type[BaseModel],
+    def __init__(self,
+                 export_contract: type[BaseModel],
                  context_state: ContextState | None = None,
@@
-        # Store the contract for property access
-        self._export_contract = export_contract
+        # Store the contract for property access
+        self._export_contract = export_contract

Follow-up (in a future change): make export_contract optional and fall back to getattr(type(self), "EXPORT_CONTRACT", None) with a clear error if neither is provided.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 863835c and 905628f.

📒 Files selected for processing (6)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py (1 hunks)
  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py
🧰 Additional context used
📓 Path-based instructions (9)
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial

Files:

  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline

Files:

  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template

Files:

  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation

Files:

  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header

comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
packages/**/*

⚙️ CodeRabbit configuration file

packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain a pyproject.toml file. - The pyproject.toml file should declare a dependency on nvidia-nat or another package with a name starting
with nvidia-nat-. This dependency should be declared using ~=<version>, and the version should be a two
digit version (ex: ~=1.0).

  • Not all packages contain Python code, if they do they should also contain their own set of tests, in a
    tests/ directory at the same level as the pyproject.toml file.

Files:

  • packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
packages/*/src/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

All importable Python code in packages must live under packages//src/

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
{src/**/*.py,packages/*/src/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period

Files:

  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
🧬 Code graph analysis (2)
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (2)
src/nat/builder/context.py (1)
  • ContextState (65-90)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (4)
  • DFWExporter (41-95)
  • DictBatchingProcessor (32-38)
  • export_contract (82-91)
  • export_processed (94-95)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (5)
src/nat/builder/context.py (1)
  • ContextState (65-90)
src/nat/observability/processor/falsy_batch_filter_processor.py (1)
  • DictBatchFilterProcessor (43-45)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/dfw_record_processor.py (2)
  • DFWToDictProcessor (35-56)
  • SpanToDFWRecordProcessor (59-86)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (1)
  • export_processed (65-74)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (1)
  • export_processed (52-77)
🔇 Additional comments (7)
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (4)

85-92: LGTM: default initialization path is exercised and contract wiring is checked.


200-209: LGTM: integration-style instantiation checks look good.

Covers multiple instances and contract consistency.


186-196: Mark test_invalid_parameter_types as xfail until strict validation is added

It looks like the current ConcreteDFWExporter constructor doesn’t perform any runtime type checks, so asserting that it “should still work” isn’t preventing its misuse—but switching to expecting an exception today would break the test suite. If you plan to enforce strict parameter validation soon, it’s safer to mark this test as an expected failure until those checks land.

• In packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py, change the decorator on test_invalid_parameter_types:

-    def test_invalid_parameter_types(self):
+    @pytest.mark.xfail(
+        reason="ConcreteDFWExporter will raise on invalid parameter types once validation is implemented",
+        strict=False,
+    )
+    def test_invalid_parameter_types(self):

132-138: The script will check for any existing pytest-asyncio usage or configuration in your repo (references, config files, and dependencies). I’ll await the results to determine if marking async tests is already handled globally or if adding @pytest.mark.asyncio is still needed.

packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (3)

70-79: Processor chain wiring looks correct and ordered.

Span -> DFW record -> dict -> batching -> falsy filter. This matches the intended pipeline and keeps the sink-facing type to dict.


81-92: Property accessor is simple and correct.

Returning the class reference enables consistent identity checks in tests and downstream code.


93-95: No action needed—abstractness is already enforced via ABC

SpanExporter inherits from ProcessingExporterBaseExporterExporter, and Exporter is defined as a subclass of ABC. This means the @abstractmethod on export_processed is enforced by ABCMeta, preventing instantiation until all abstract methods are implemented.

src/nat/observability/exporter/exporter.py:
class Exporter(ABC): ensures ABCMeta in the MRO.
src/nat/observability/exporter/base_exporter.py:
class BaseExporter(Exporter): inherits that abstractness.
src/nat/observability/exporter/processing_exporter.py:
class ProcessingExporter(..., BaseExporter, ...): continues the chain.
src/nat/observability/exporter/span_exporter.py:
class SpanExporter(ProcessingExporter[...], SerializeMixin): finally includes ABCMeta via its ancestors.

No changes are required here.

Likely an incorrect or invalid review comment.

mpenn added 3 commits August 27, 2025 20:35
Ensure all arguments are used in `track_unregistered_function` when
calling `push_active_function`

Update elasticsearch version in pyproject.toml

Signed-off-by: Matthew Penn <[email protected]>
Move test_function_tracking.py under decorators folder

Signed-off-by: Matthew Penn <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/nat/profiler/decorators/function_tracking.py (1)

80-80: Add return type annotation to public decorator.

Public APIs must annotate return types. Minimal, non-invasive fix:

-def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None):
+def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None) -> Any:
♻️ Duplicate comments (1)
src/nat/profiler/decorators/function_tracking.py (1)

66-66: Use “nat” (lowercase) in docstrings, not “NAT”.

Style rule: documentation should use “nat”. Update the docstring.

-    """Push an intermediate step to the NAT Event Stream."""
+    """Push an intermediate step to the nat event stream."""
🧹 Nitpick comments (4)
tests/nat/profiler/decorators/test_function_tracking.py (2)

220-231: Fixture should follow repository fixture-naming rule.

Per guidelines, fixtures must use the fixture_ prefix and set name= on the decorator. Adjust as below.

-    @pytest.fixture
-    def mock_context(self):
+    @pytest.fixture(name="mock_context")
+    def fixture_mock_context(self):
         """Mock Context and its push_active_function method."""
         with patch('nat.profiler.decorators.function_tracking.Context') as mock_context_class:
             mock_context_instance = Mock()
             mock_manager = Mock()
             mock_context_instance.push_active_function.return_value.__enter__ = Mock(return_value=mock_manager)
             mock_context_instance.push_active_function.return_value.__exit__ = Mock(return_value=None)
             mock_context_class.get.return_value = mock_context_instance

             yield mock_context_instance, mock_manager

32-44: Make purely synchronous tests non-async to avoid unnecessary event-loop overhead.

This test doesn’t await anything. Consider making it a regular test function.

-async def test_sync_function_no_metadata(reactive_stream: Subject):
+def test_sync_function_no_metadata(reactive_stream: Subject):
src/nat/profiler/decorators/function_tracking.py (2)

62-66: Tighten parameter types for args/kwargs.

These are always serialized structures; reflect that in the signature.

-                           args: Any = None,
-                           kwargs: Any = None,
+                           args: list[Any] | None = None,
+                           kwargs: dict[str, Any] | None = None,
                            output: Any = None,
                            metadata: dict[str, Any] | None = None) -> None:

333-349: Generator wrappers aggregate all outputs; consider a streaming-friendly option.

Both generator wrappers for track_unregistered_function build a full list before setting output. For long/streaming generators this inflates memory and delays visibility of results. If practical, make aggregation optional (e.g., flag to skip collecting and set_output(None) or a capped buffer).

Also applies to: 375-392

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 59a4a85 and 2a9e2c2.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • packages/nvidia_nat_data_flywheel/pyproject.toml (1 hunks)
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py (1 hunks)
  • src/nat/profiler/decorators/function_tracking.py (2 hunks)
  • tests/nat/profiler/decorators/test_function_tracking.py (1 hunks)
  • tests/nat/profiler/test_function_tracking.py (0 hunks)
💤 Files with no reviewable changes (1)
  • tests/nat/profiler/test_function_tracking.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/nvidia_nat_data_flywheel/pyproject.toml
  • packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py
🧰 Additional context used
📓 Path-based instructions (11)
{tests/**/*.py,examples/*/tests/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

{tests/**/*.py,examples/*/tests/**/*.py}: Unit tests live in tests/ (or examples/*/tests) and use markers defined in pyproject.toml (e2e, integration)
Use pytest with pytest-asyncio for asynchronous tests
Mock external services with pytest_httpserver or unittest.mock; do not hit live endpoints in tests
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration to skip by default

Files:

  • tests/nat/profiler/decorators/test_function_tracking.py
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial

Files:

  • tests/nat/profiler/decorators/test_function_tracking.py
  • src/nat/profiler/decorators/function_tracking.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline

Files:

  • tests/nat/profiler/decorators/test_function_tracking.py
  • src/nat/profiler/decorators/function_tracking.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template

Files:

  • tests/nat/profiler/decorators/test_function_tracking.py
  • src/nat/profiler/decorators/function_tracking.py
**/*.{py,md}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation

Files:

  • tests/nat/profiler/decorators/test_function_tracking.py
  • src/nat/profiler/decorators/function_tracking.py
**/*

⚙️ CodeRabbit configuration file

**/*: # Code Review Instructions

  • Ensure the code follows best practices and coding standards. - For Python code, follow
    PEP 20 and
    PEP 8 for style guidelines.
  • Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
    Example:
    def my_function(param1: int, param2: str) -> bool:
        pass

Documentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any

words listed in the ci/vale/styles/config/vocabularies/nat/reject.txt file, words that might appear to be
spelling mistakes but are listed in the ci/vale/styles/config/vocabularies/nat/accept.txt file are OK.

Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header

comment at the top of each file.

  • Confirm that copyright years are up-to date whenever a file is changed.

Files:

  • tests/nat/profiler/decorators/test_function_tracking.py
  • src/nat/profiler/decorators/function_tracking.py
tests/**/*.py

⚙️ CodeRabbit configuration file

tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using the test_ prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using the fixture_ prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass

Files:

  • tests/nat/profiler/decorators/test_function_tracking.py
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)

Files:

  • src/nat/profiler/decorators/function_tracking.py
src/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

All importable Python code must live under src/ so namespace packages resolve correctly

Files:

  • src/nat/profiler/decorators/function_tracking.py
{src/**/*.py,packages/*/src/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/general.mdc)

{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period

Files:

  • src/nat/profiler/decorators/function_tracking.py
src/nat/**/*

⚙️ CodeRabbit configuration file

This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.

Files:

  • src/nat/profiler/decorators/function_tracking.py
🧬 Code graph analysis (2)
tests/nat/profiler/decorators/test_function_tracking.py (2)
src/nat/profiler/decorators/function_tracking.py (2)
  • track_function (80-256)
  • track_unregistered_function (259-408)
src/nat/builder/context.py (4)
  • metadata (127-136)
  • output (58-59)
  • push_active_function (178-221)
  • set_output (61-62)
src/nat/profiler/decorators/function_tracking.py (1)
src/nat/builder/context.py (6)
  • metadata (127-136)
  • Context (93-277)
  • get (89-90)
  • get (267-277)
  • push_active_function (178-221)
  • set_output (61-62)
🔇 Additional comments (2)
tests/nat/profiler/decorators/test_function_tracking.py (1)

32-434: Strong test coverage for decorators and event semantics.

Good breadth: sync/async funcs, (a)sync generators, class methods, kwargs, Pydantic serialization, metadata, manual decoration, and error cases. Assertions on event ordering and payloads look solid.

src/nat/profiler/decorators/function_tracking.py (1)

258-409: Decorator behavior and metadata propagation look correct.

Wrapper selection (sync/async/(a)sync gen), context handling, input_data shape, and metadata propagation to Context.push_active_function are consistent with tests and guidelines.

@mpenn
Copy link
Contributor Author

mpenn commented Aug 28, 2025

/merge

@rapids-bot rapids-bot bot merged commit 11ce344 into NVIDIA:develop Aug 28, 2025
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request non-breaking Non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Export intermediate steps data to NeMo Data Flywheel

2 participants