-
Notifications
You must be signed in to change notification settings - Fork 481
Add nvidia-nat-data-flywheel subpackage with NeMo Data Flywheel integrations
#716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… to include this extra Signed-off-by: Matthew Penn <[email protected]>
Signed-off-by: Matthew Penn <[email protected]>
WalkthroughAdds a new nvidia-nat-data-flywheel subpackage implementing schema-driven span→DFW conversion, Elasticsearch exporters, adapter registry and adapters (OpenAI/NIM), processor utilities, telemetry exporter registration, context metadata plumbing, a function-tracking decorator, packaging, and extensive tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as Application
participant Ctx as Context
participant Tele as Telemetry
participant Proc as Processors
participant Reg as TraceAdapterRegistry
participant ES as Elasticsearch
App->>Ctx: push_active_function(name, input, metadata?)
Note right of Ctx #E8F0FF: FUNCTION_START emitted (includes metadata)
App->>Tele: emit Span (LLM_START)
Tele->>Proc: process(span)
Proc->>Proc: SpanToDFWRecordProcessor -> build TraceContainer
Proc->>Reg: convert(trace_container, to=DFWESRecord)
Reg-->>Proc: DFWESRecord
Proc->>Proc: DFWToDictProcessor -> dict
Proc->>Proc: DictBatchingProcessor -> batch[list[dict]]
Proc->>Proc: DictBatchFilterProcessor -> filtered batch
alt batch non-empty
Proc->>ES: bulk/index(filtered batch)
ES-->>Proc: ack
else empty
Proc-->>Tele: no-op
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Assessment against linked issues
Out-of-scope / unrelated changes
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
Renamed shcema registry.py file to schema_registry.py. Refactored `dfw_elasticsearch_exporter.py` to use `ContextState` instead of `AIQContextState`. Updated `TraceContainer` to ensure union is build before validation. Signed-off-by: Matthew Penn <[email protected]>
Signed-off-by: Matthew Penn <[email protected]>
more informative Renaming common.py -> span_extractor.py to be more informative Signed-off-by: Matthew Penn <[email protected]>
…_converter.py and test_span_to_dfw_record.py Signed-off-by: Matthew Penn <[email protected]>
nvidia-nat-data-flywheel subpackage with NeMo Data Flywheel integrations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 32
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
packages/nvidia_nat_data_flywheel/pyproject.toml (1)
45-45: Missing trailing newline at end of file.According to the coding guidelines, every file must end with a single trailing newline.
Add a trailing newline at the end of the file:
nat_data_flywheel_adapter = "nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.register" nat_data_flywheel_schema = "nat.plugins.data_flywheel.observability.schema.register" +packages/nvidia_nat_data_flywheel/tests/observability/mixin/test_elasticsearch_mixin.py (1)
509-509: Missing trailing newline at end of file.According to the coding guidelines, every file must end with a single trailing newline.
Add a trailing newline at the end of the file:
# Second bulk call (operation 5) assert bulk_calls[1][1]['operations'] == [{"index": {"_index": "sequential_test"}}, {"operation": 5}] +packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/register.py (1)
22-22: Missing trailing newline at end of file.According to the coding guidelines, every file must end with a single trailing newline.
Add a trailing newline at the end of the file:
from nat.plugins.data_flywheel.observability.schema.sink.elasticsearch import dfw_es_record +packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/nim_trace_source.py (1)
25-25: Missing trailing newline at end of file.According to the coding guidelines, every file must end with a single trailing newline.
Add a trailing newline at the end of the file:
class NIMTraceSource(OpenAITraceSourceBase): pass +src/nat/profiler/decorators/function_tracking.py (1)
78-88: Add return type annotations to public decorators (pyright).Public APIs must have return annotations. For decorators with dual usage, a pragmatic step is annotating -> Any to satisfy type checking without full overloads.
Apply this diff:
-def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None): +def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None) -> Any: @@ -def track_unregistered_function(func: Any = None, *, name: str | None = None, metadata: dict[str, Any] | None = None): +def track_unregistered_function(func: Any = None, + *, + name: str | None = None, + metadata: dict[str, Any] | None = None) -> Any:If you want stricter types, I can provide @overload variants using ParamSpec/TypeVar.
Also applies to: 257-258
🧹 Nitpick comments (77)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/__init__.py (1)
1-15: Add a minimal module docstring to satisfy repo guidelines.Guidelines require a brief Google-style module docstring for public modules. Add a concise description of this package’s purpose.
Apply:
# See the License for the specific language governing permissions and # limitations under the License. +"""Schemas for Data Flywheel sinks. + +This package hosts typed schema modules for sink backends (e.g., Elasticsearch) +used by the nvidia-nat-data-flywheel subpackage. +"""packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/__init__.py (1)
1-15: Add a short module docstring for adapter namespace.To align with the codebase requirement for module docstrings, add a concise description.
# See the License for the specific language governing permissions and # limitations under the License. +"""Trace-conversion adapters. + +Contains adapter implementations and registration hooks used to convert spans +into Data Flywheel contract-compliant records. +"""packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/__init__.py (1)
1-15: Add module docstring for top-level schema namespace.Public modules should include a one-line summary. Add a brief docstring.
# See the License for the specific language governing permissions and # limitations under the License. +"""Top-level schema namespace for Data Flywheel integration. + +Exports core Pydantic models and helpers shared across providers and sinks. +"""packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/__init__.py (1)
1-15: Add a brief module docstring to clarify package scope.This package is importable; include a short docstring per repo guidelines.
# See the License for the specific language governing permissions and # limitations under the License. +"""Observability integration for Data Flywheel. + +Provides exporters, processors, and schema plumbing to emit traces compatible +with the NeMo Data Flywheel blueprint. +"""packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/__init__.py (2)
1-14: License header present; consider year consistency across sibling packages.This file uses “2025” while other new files use “2024-2025.” Both are acceptable, but consider aligning for consistency across the subpackage.
If you prefer the range:
-# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
1-15: Add a module docstring for the Elasticsearch adapter namespace.Keep it concise to satisfy the docstring requirement and improve discoverability.
# See the License for the specific language governing permissions and # limitations under the License. +"""Elasticsearch-specific trace-conversion adapters. + +Houses adapter implementations that convert spans into Elasticsearch-backed +Data Flywheel records. +"""docs/source/workflows/observe/observe-workflow-with-phoenix.md (1)
20-24: Use the official first mention: “NVIDIA NeMo Agent toolkit”.Per repo style, the first reference should be “NVIDIA NeMo Agent toolkit”; subsequent references can use “NeMo Agent toolkit”. Update the first sentence accordingly.
-This guide provides a step-by-step process to enable observability in a NeMo Agent toolkit workflow using Phoenix for tracing and logging. By the end of this guide, you will have: +This guide provides a step-by-step process to enable observability in an NVIDIA NeMo Agent toolkit workflow using Phoenix for tracing and logging. By the end of this guide, you will have:packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py (1)
21-31: Docstring and signature describe different behaviors; align them.Docstring promises “dictionary, list, or None” and documents
valueasstr, but the function accepts dict/list and the return type excludesNone. Tighten the docstring to match the actual API.- """Deserialize a string input value to a dictionary, list, or None. + """Deserialize an input into a dictionary or list. @@ - Args: - value (str): The input value to deserialize + Args: + value (dict | list | str): Input to normalize. If already a dict or list, it is returned unchanged. @@ - Returns: - dict | list: The deserialized input value + Returns: + dict | list: The deserialized input value (JSON object or array).packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/trace_source_base.py (2)
16-18: Add a brief module docstring for public module.Public modules should include a concise Google-style module docstring per guidelines.
-from pydantic import BaseModel +"""Base Pydantic models shared by Data Flywheel trace sources.""" + +from pydantic import BaseModel
20-22: Constrain client_id to non-empty to prevent invalid records.A non-empty
client_idavoids producing sink records without a source identifier and pushes validation closer to the edge.-class TraceSourceBase(BaseModel): +class TraceSourceBase(BaseModel): """Base class for trace sources with generic framework and provider types.""" - client_id: str = Field(..., description="The client ID of the trace source") + client_id: str = Field(..., min_length=1, description="The client ID of the trace source.")Do you expect extra fields on provider models? If yes, consider allowing extras at the base with Pydantic v2 config, e.g.:
from pydantic import ConfigDict class TraceSourceBase(BaseModel): model_config = ConfigDict(extra="ignore") ...src/nat/observability/processor/processor_factory.py (3)
16-19: Add a short module docstring.-from typing import Any +"""Helpers for creating concrete, parameterized Processor subclasses at runtime.""" + +from typing import Any
39-54: Apply the same typing improvements for from-only factory.-def processor_factory_from_type(processor_class: type, from_type: type[Any]) -> type[Processor]: +def processor_factory_from_type(processor_class: type[Processor], from_type: type[Any]) -> type[Processor]: @@ - class ConcreteProcessor(processor_class[from_type]): # type: ignore + class ConcreteProcessor(processor_class[from_type]): # type: ignore[misc] pass
56-70: Apply the same typing improvements for to-only factory.-def processor_factory_to_type(processor_class: type, to_type: type[Any]) -> type[Processor]: +def processor_factory_to_type(processor_class: type[Processor], to_type: type[Any]) -> type[Processor]: @@ - class ConcreteProcessor(processor_class[to_type]): # type: ignore + class ConcreteProcessor(processor_class[to_type]): # type: ignore[misc] passIf duplicate classes become a concern, consider caching per
(processor_class, from_type, to_type)tuple to reduce class churn. I can draft anlru_cache-based version that safely handles hashing of type args if desired.packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (3)
23-28: Docstring claims resource tagging not implemented; make it accurate.The mixin currently provides client creation and export methods, not resource tagging. Update the prose to match behavior.
- This mixin provides elasticsearch-specific functionality for SpanExporter exporters. - It handles elasticsearch-specific resource tagging and uses the AsyncElasticsearch client. + This mixin provides Elasticsearch client wiring and export helpers (single and bulk). + It uses the AsyncElasticsearch client.
45-51: Consider setting a default Content-Type for compatibility.For ES 8-compatible APIs, adding the corresponding Content-Type default can reduce surprises (the client may set this automatically, but being explicit is harmless).
if headers is None: - headers = {"Accept": "application/vnd.elasticsearch+json; compatible-with=8"} + headers = { + "Accept": "application/vnd.elasticsearch+json; compatible-with=8", + "Content-Type": "application/vnd.elasticsearch+json; compatible-with=8", + }
30-51: Provide an async close to release the ES client.Expose a small lifecycle hook so exporters using the mixin can close the client on shutdown.
self._elastic_client = AsyncElasticsearch(endpoint, basic_auth=elasticsearch_auth, headers=headers) self._index = index super().__init__(*args, **kwargs) async def export_processed(self, item: dict | list[dict]) -> None: + ... + # method body unchanged + ... + + async def aclose(self) -> None: + """Close the underlying Elasticsearch client.""" + await self._elastic_client.close()packages/nvidia_nat_data_flywheel/tests/observability/mixin/test_elasticsearch_mixin.py (2)
106-106: Consider removing the type ignore comment.The
# type: ignorecomment on line 106 can likely be avoided by properly typing the parent_kwargs expansion.Consider using TypedDict or explicit parameter unpacking to avoid the type checker confusion:
- **parent_kwargs) # type: ignore # parent_kwargs expansion confuses type checker + **parent_kwargs)
249-265: Type ignore comments can be avoided with proper typing.Multiple
# type: ignorecomments are used when testing missing required parameters. These can be avoided by usingpytest.raiseswith a more specific match pattern.The type ignore comments are acceptable for intentionally incorrect test calls, but you could also consider using string-based construction to avoid them:
# Alternative approach without type ignore with pytest.raises(TypeError, match="missing.*required.*endpoint"): exec("ConcreteElasticsearchMixin(index='test_index', elasticsearch_auth=('user', 'pass'))")packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/nim_trace_source.py (2)
16-21: Consider removing unused logger.The logger is defined but not used in this module. Since this is just a pass-through class inheriting all behavior from the base class, the logger may not be needed.
Remove the unused logger if it's not needed:
-import logging - from nat.plugins.data_flywheel.observability.schema.provider.openai_trace_source import OpenAITraceSourceBase - -logger = logging.getLogger(__name__) - class NIMTraceSource(OpenAITraceSourceBase):
23-24: Consider adding class documentation.While the class inherits all functionality from
OpenAITraceSourceBase, it would be helpful to add a docstring explaining the purpose of this NIM-specific trace source.Add a docstring to explain the class purpose:
class NIMTraceSource(OpenAITraceSourceBase): + """Trace source for NVIDIA NIM (NeMo Inference Microservice) LLM providers. + + This class extends OpenAITraceSourceBase to handle trace data from NIM endpoints, + which follow the OpenAI API contract for LLM interactions. + """ passpackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/register.py (1)
20-24: Optional: Import via importlib to avoid “unused import” suppressions.You can remove lint pragmas by importing for side effects using import_module; this keeps intent explicit and avoids namespace pollution.
@@ -from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch import \ - nim_converter -from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch import \ - openai_converter +from importlib import import_module + +# Import for side effects: modules register their adapters at import time. +import_module( + "nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.nim_converter" +) +import_module( + "nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.openai_converter" +)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)
29-31: Document possible KeyError from SchemaRegistry for clarity.The method can raise if the version isn’t registered; documenting this helps users handle errors predictably.
@@ - def get_contract_class(self) -> type[BaseModel]: - """Get the Pydantic model class for this contract version.""" + def get_contract_class(self) -> type[BaseModel]: + """Get the Pydantic model class for this contract version. + + Raises: + KeyError: If no schema is registered for this version. + """ return SchemaRegistry.get_schema("elasticsearch", self.value)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/__init__.py (1)
20-27: Ensure built-in adapters register on importTo make the trace‐adapter feature self-contained and guarantee that all built-in converters are registered when this package is imported, add an explicit import of your
adapter.registermodule (which in turn brings in all of the Elasticsearch adapters) to the top‐level__init__.py.• File:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/__init__.py
Update around lines 20–27:# Trace Source Registry -from .trace_conversion.trace_adapter_registry import TraceAdapterRegistry +from .trace_conversion.trace_adapter_registry import TraceAdapterRegistry +# Ensure built-in adapters are registered when this package loads. +# Importing the adapter.register module triggers all @register_adapter calls. +from .trace_conversion.adapter import register # noqa: F401 __all__ = [ "SpanToDFWRecordProcessor", # DFW Record Processors "DFWToDictProcessor", "TraceAdapterRegistry", # Trace Source Registry ]This import pulls in
adapter/register.py, which imports each of the Elasticsearch adapters (e.g.nim_converter,openai_converter) and invokes their@register_adapterdecorators as side-effects.packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/nim_converter.py (2)
18-29: Reformat imports to avoid global lint suppressions and long lines.Use parentheses to wrap long from-imports and drop the line-length and flake8 suppressions.
@@ -# pylint: disable=line-too-long -# flake8: noqa -from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.openai_converter import \ - convert_langchain_openai -from nat.plugins.data_flywheel.observability.processor.trace_conversion.trace_adapter_registry import \ - register_adapter -from nat.plugins.data_flywheel.observability.schema.provider.nim_trace_source import \ - NIMTraceSource -from nat.plugins.data_flywheel.observability.schema.sink.elasticsearch.dfw_es_record import \ - DFWESRecord -from nat.plugins.data_flywheel.observability.schema.trace_container import \ - TraceContainer +from nat.plugins.data_flywheel.observability.processor.trace_conversion.adapter.elasticsearch.openai_converter import ( + convert_langchain_openai, +) +from nat.plugins.data_flywheel.observability.processor.trace_conversion.trace_adapter_registry import ( + register_adapter, +) +from nat.plugins.data_flywheel.observability.schema.provider.nim_trace_source import ( + NIMTraceSource, +) +from nat.plugins.data_flywheel.observability.schema.sink.elasticsearch.dfw_es_record import ( + DFWESRecord, +) +from nat.plugins.data_flywheel.observability.schema.trace_container import ( + TraceContainer, +)
34-44: Minor: Capitalize NIM consistently and add a debug log for traceability.Consistent naming and a debug line help when triaging conversions across providers.
@@ -@register_adapter(trace_source_model=NIMTraceSource) -def convert_langchain_nim(trace_source: TraceContainer) -> DFWESRecord: - """Convert a LangChain Nim trace source to a DFWESRecord. +@register_adapter(trace_source_model=NIMTraceSource) +def convert_langchain_nim(trace_source: TraceContainer) -> DFWESRecord: + """Convert a LangChain NIM trace source to a DFWESRecord. @@ - Returns: - DFWESRecord: The converted DFW record + Returns: + DFWESRecord: The converted DFW record. """ - return convert_langchain_openai(trace_source) + logger.debug("Converting NIM trace via OpenAI converter for span '%s'", trace_source.span.name) + return convert_langchain_openai(trace_source)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_extractor.py (1)
63-79: Consider adding bounds validation for timestamp values.While the error handling for type conversion is good, the function could benefit from validating that the timestamp is within reasonable bounds (e.g., not negative, not in the distant future) to catch potential data issues early.
def extract_timestamp(span: Span) -> int: """Extract timestamp from a span. Args: span (Span): The span to extract timestamp from Returns: int: The timestamp """ timestamp = span.attributes.get("nat.event_timestamp", 0) try: timestamp_int = int(float(str(timestamp))) + # Validate timestamp is reasonable (not negative, not too far in future) + if timestamp_int < 0: + logger.warning("Negative timestamp %d in span '%s', using 0", timestamp_int, span.name) + timestamp_int = 0 except (ValueError, TypeError): logger.warning("Invalid timestamp in span '%s', using 0", span.name) timestamp_int = 0 return timestamp_intpackages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/test_trace_adapter_registry.py (1)
449-460: Clarify Return of Internal Registry inlist_registered_typesThe current implementation of
TraceAdapterRegistry.list_registered_types()returns the actual class‐level_registered_typesdict, which your test explicitly verifies by identity (assert registered is internal_registry). Exposing the internal registry this way means any external code can mutate it, potentially leading to hard-to-track bugs.Please confirm whether this behavior is intentional:
• If you do want callers to be able to modify the registry directly, update the docstring to explicitly state that mutations to the returned dict affect the global registry.
• If instead you intended to protect internal state, change the method to return a shallow copy (and update the test accordingly), for example:--- a/packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/trace_adapter_registry.py +++ b/packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/trace_adapter_registry.py @@ -XX,7 +XX,12 @@ class TraceAdapterRegistry: @classmethod def list_registered_types(cls) -> dict[type, dict[type, Callable]]: """List all registered conversions: source_type -> {target_type -> converter}. - """ - return cls._registered_types + Mutating the returned dict does not affect the internal registry. + """ + # Return a shallow copy of the outer dict and each inner mapping + return { + src_type: tgt_map.copy() + for src_type, tgt_map in cls._registered_types.items() + }• If you protect the registry here, add a new test asserting that mutations to the result do not alter the internal
_registered_types.
• Otherwise, at minimum annotate the docstring to note that callers receive a live reference.packages/nvidia_nat_data_flywheel/tests/observability/utils/test_deserialize.py (2)
187-198: Confirm PII/logging policy: error messages include raw input values.The function raises ValueError containing the original input, and the tests assert for this. This is useful for debugging, but it may leak sensitive data into logs/telemetry.
Consider truncating or masking large or sensitive inputs in the error string (e.g., first N chars and length), and update tests accordingly.
Proposed pattern in deserialize.py:
raw = value if isinstance(value, str) else str(value) snippet = (raw[:256] + f"...({len(raw)} chars)") if len(raw) > 256 else raw raise ValueError(f"Failed to parse input_value: {snippet}, error: {e}") from e
24-63: Reduce duplication with parametrization.Multiple tests repeat the same arrange/act/assert pattern. Parametrizing inputs/expectations would simplify maintenance and improve readability without reducing coverage.
Example:
import pytest @pytest.mark.parametrize( "json_str, expected", [ ('{"key": "value", "number": 42}', {"key": "value", "number": 42}), ('{}', {}), ('[1, 2, 3]', [1, 2, 3]), ('null', None), ('"hello"', "hello"), ], ) def test_valid_json_values(json_str, expected): assert deserialize_span_attribute(json_str) == expectedAlso applies to: 73-101, 142-160
src/nat/observability/processor/falsy_batch_filter_processor.py (1)
43-55: Clarify “falsy” semantics and consider narrowing specializations.Minor: Document that falsy is evaluated via bool(x). If the generic processor is exposed more broadly, consider documenting that None will also be removed if present in the batch. Current concrete specializations (dict/list/set) avoid surprises, so this is just doc clarity.
Suggested doc tweak:
-"""Processor that filters out falsy items from a batch.""" +"""Processor that filters out items where bool(item) is False (e.g., {}, [], set(), None)."""packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/test_span_extractor.py (1)
146-241: Good coverage for usage/timestamp, plus call-graph and logging checks. Minor opportunities to DRY.The tests validate defaulting, type coercion, and logger warnings. Consider small parametrization to reduce duplication for the “invalid timestamp returns 0 and warns once” pattern.
Example:
@pytest.mark.parametrize("raw", ["invalid_timestamp", None, {"complex":"obj"}, ""]) def test_extract_timestamp_invalid_inputs_warns_once(raw, mocker): logger = mocker.patch("nat.plugins.data_flywheel.observability.processor.trace_conversion.span_extractor.logger") span = Span(name="test_span", context=SpanContext(), attributes={"nat.event_timestamp": raw}) assert extract_timestamp(span) == 0 logger.warning.assert_called_once_with("Invalid timestamp in span '%s', using 0", "test_span")Also applies to: 261-380
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (2)
31-41: **Expose Elasticsearch parameters explicitly with type hints instead of opaque kwargs.To satisfy “public APIs require type hints” and improve IDE/static-checker help, accept endpoint/index/elasticsearch_auth/headers explicitly and forward them, keeping **elasticsearch_kwargs for future-proofing. This also prevents silent typos in kwargs.
Apply this diff:
- def __init__(self, - context_state: ContextState | None = None, - client_id: str = "default", - contract_version: ElasticsearchContractVersion = ElasticsearchContractVersion.VERSION_1_1, - batch_size: int = 100, - flush_interval: float = 5.0, - max_queue_size: int = 1000, - drop_on_overflow: bool = False, - shutdown_timeout: float = 10.0, - **elasticsearch_kwargs): + def __init__(self, + context_state: ContextState | None = None, + client_id: str = "default", + contract_version: ElasticsearchContractVersion = ElasticsearchContractVersion.VERSION_1_1, + batch_size: int = 100, + flush_interval: float = 5.0, + max_queue_size: int = 1000, + drop_on_overflow: bool = False, + shutdown_timeout: float = 10.0, + *, + endpoint: str, + index: str, + elasticsearch_auth: tuple[str, str], + headers: dict | None = None, + **elasticsearch_kwargs): @@ - super().__init__(context_state=context_state, - batch_size=batch_size, - flush_interval=flush_interval, - max_queue_size=max_queue_size, - drop_on_overflow=drop_on_overflow, - shutdown_timeout=shutdown_timeout, - client_id=client_id, - **elasticsearch_kwargs) + super().__init__(context_state=context_state, + batch_size=batch_size, + flush_interval=flush_interval, + max_queue_size=max_queue_size, + drop_on_overflow=drop_on_overflow, + shutdown_timeout=shutdown_timeout, + client_id=client_id, + endpoint=endpoint, + index=index, + elasticsearch_auth=elasticsearch_auth, + headers=headers, + **elasticsearch_kwargs)
16-26: Docstring nit: clarify types in kwargs section for ES params.Optional: In the init docstring, specify (str) for endpoint/index, (tuple[str, str]) for elasticsearch_auth, (dict|None) for headers to match the mixin signature.
src/nat/profiler/decorators/function_tracking.py (4)
359-362: Unreachable duplicate return.There are two consecutive “return sync_wrapper” statements; the second is unreachable and flagged by linters.
Apply this diff:
- return sync_wrapper - - return sync_wrapper + return sync_wrapper
339-345: Potential memory blow-up for generators collecting all outputs.Both generator wrappers accumulate all yielded items into final_outputs to set as the “output”. For long/streaming generators this can be unbounded. Consider either:
- Only setting the last yielded item, or
- Sampling/truncating, or
- Emitting outputs incrementally (as you do in track_function) and setting output=None on end.
I can draft a variant that records only the last item and a count.
Also applies to: 302-307, 214-215
78-104: Minor: track_function is missing return type annotation as well (consistency).Same rationale as for track_unregistered_function; add -> Any for consistency and to satisfy static checks.
(See combined diff above.)
30-47: Serialization helper is reasonable; consider guarding against very large objects._str(obj) fallback might dump huge objects into telemetry. If that’s a concern, consider truncating arbitrarily long strings in _serialize_data.
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/trace_container.py (2)
36-58: Type-hint validators and tighten the union check.Add explicit parameter and return annotations to satisfy our “public APIs require type hints” guideline, and prefer identity over equality for the Any check to avoid edge cases.
Apply this diff:
- @field_validator('source', mode='before') - @classmethod - def validate_source_via_union(cls, v): + @field_validator('source', mode='before') + @classmethod + def validate_source_via_union(cls, v: Any) -> Any: @@ - current_union = TraceAdapterRegistry.get_current_union() - if current_union != Any: # Only validate if union is available + current_union = TraceAdapterRegistry.get_current_union() + if current_union is not Any: # Only validate if union is available adapter = TypeAdapter(current_union) return adapter.validate_python(v)
60-72: Annotate model-level validator for clarity and lint compliance.Add type hints to the
ensure_union_builtvalidator to comply with our typing and ruff/pyright guidance.- @model_validator(mode='before') - @classmethod - def ensure_union_built(cls, data): + @model_validator(mode='before') + @classmethod + def ensure_union_built(cls, data: Any) -> Any: """Ensure union is built before validation."""packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_to_dfw_record.py (2)
61-73: Defaulting framework to "langchain" might cause mis-detection.If upstream didn’t set
nat.framework, forcing “langchain” could accidentally match an unintended schema. Consider a neutral default like "unknown" (or omit the key and let the provider discriminators decide), depending on how your adapters discriminate.- framework = _get_string_value(span.attributes.get("nat.framework", "langchain")) + framework = _get_string_value(span.attributes.get("nat.framework")) or "unknown"If discriminators require
framework, make sure adapters treat "unknown" as non-matching so that the error path with registered adapter listing triggers correctly.
83-97: Great diagnostics on schema detection failures.Enumerating registered adapters greatly helps operability. Consider logging the offending span attribute keys at DEBUG for faster triage, but the current error is already helpful.
packages/nvidia_nat_data_flywheel/tests/observability/schema/test_trace_container.py (2)
52-57: Make cleanup import explicit to avoid NameError masking.The teardown uses
TraceAdapterRegistrywithout importing in that scope and relies on catchingNameError. Re-import explicitly to keep the intent clear and avoid accidental masking of unrelated NameErrors.- # Clean up after each test - try: - TraceAdapterRegistry.clear_registry() - except (ImportError, NameError): - pass # Registry not available + # Clean up after each test + try: + from nat.plugins.data_flywheel.observability.processor.trace_conversion.trace_adapter_registry import ( + TraceAdapterRegistry, + ) + TraceAdapterRegistry.clear_registry() + except ImportError: + pass # Registry not available
154-168: ImportError resilience test is a bit heavy-handed.Patching
builtins.__import__will trip all imports and can hide other issues. Consider narrowly patching only the registry import with import hooks or module injection viasys.modules. Optional; current test does validate the intended path.packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (4)
41-43: Brittle base-class check — prefer issubclass/isinstance over bases.
__class__.__bases__is an implementation detail and brittle with generics/MRO. You already assertisinstance(processor, BatchingProcessor). If you still want a class-level check, useissubclass(DictBatchingProcessor, BatchingProcessor).Apply this diff:
- # Verify it's properly typed for dict - assert processor.__class__.__bases__ == (BatchingProcessor, ) + # Verify inheritance without relying on fragile __bases__ + assert issubclass(DictBatchingProcessor, BatchingProcessor)
247-264: Strengthen batching-parameter assertions.The test confirms that a DictBatchingProcessor was added but doesn’t assert parameter propagation. If BatchingProcessor exposes attributes (e.g., batch_size, flush_interval, max_queue_size, drop_on_overflow, shutdown_timeout), add conditional assertions to validate values. This catches wiring regressions.
If those attributes aren’t public, we can expose read-only properties on DictBatchingProcessor to support testing. I can draft that change if you want.
187-194: Unify async test style across the suite.Here you use
asyncio.run(...). In other test modules you use nativeasync defwithpytest.mark.asyncio. Consider standardizing on one style (preferpytest.mark.asyncio) for consistency and cleaner stack traces.If you choose
pytest.mark.asyncio, ensure pytest-asyncio is enabled in the test environment.
304-323: Questionable value of “invalid parameter types” test.This test passes obviously invalid types and then accepts either outcome. It’s unlikely to catch regressions and may create noise. Consider removing it or asserting a specific behavior (e.g., raising TypeError) to make it meaningful.
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/dfw_record_processor.py (4)
16-16: Remove unused import and prefer model_dump over JSON round-trip.
jsonis only used for a model->JSON->dict round-trip. In Pydantic v2,model_dump(mode="json", by_alias=True)returns JSON-serializable Python structures without the overhead ofjson.loads(model_dump_json(...)).Apply this diff:
-import json
56-56: Avoid JSON round-trip; use model_dump(mode="json", by_alias=True).Switching to
model_dumpreduces CPU overhead and avoids transient string allocation while preserving alias behavior.Apply this diff:
- return json.loads(item.model_dump_json(by_alias=True)) + return item.model_dump(mode="json", by_alias=True)
15-15: Add a module docstring per project guidelines.Public modules require Google-style module docstrings. Add a concise first-line description ending with a period.
Apply this diff:
@@ # limitations under the License. +"""Processors bridging spans and Data Flywheel (DFW) records. + +This module provides: +- DFWToDictProcessor: serializes DFW Pydantic records to dicts using alias-aware JSON-compatible dumps. +- SpanToDFWRecordProcessor: converts NAT spans into DFW record models via the trace adapter registry. +"""
80-86: Consider accepting string event types for robustness.If upstream ever sends
"LLM_START"as a string (instead ofIntermediateStepType.LLM_START), the match won’t trigger. A small guard to normalize to enum improves resilience without altering behavior.I can provide a small patch that maps string values to the enum before matching if desired.
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (3)
16-21: Remove unused TypeVar import and ProviderT declaration.
ProviderTis unused and will trip ruff. Drop the import and declaration.Apply this diff:
-from typing import Any -from typing import TypeVar +from typing import Any @@ -ProviderT = TypeVar("ProviderT")
30-30: Use or drop logger to satisfy ruff.
loggeris currently unused. Either use it for debug messages in validators or prefix with underscore to indicate intentional unused.Example (use in validators after normalization):
-logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__)And inside
validate_input_value/validate_metadata, add:logger.debug("Normalized %s field for OpenAITraceSourceBase.", "input_value" or "metadata")
15-15: Add module docstring per project guidelines.Public modules should have a concise Google-style docstring.
Apply this diff:
@@ # limitations under the License. +"""OpenAI trace source schema provider for Data Flywheel integration. + +Defines OpenAIMetadata and an OpenAITraceSource model that normalize input_value and +metadata attributes for adapter-based conversion pipelines. +"""packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/test_span_to_dfw_record.py (2)
208-217: Make the logging test assert something meaningful.The current test doesn’t assert anything and comments suggest it might not work. Assert that the debug logger was called at least once to validate the path, or remove the test.
Apply this diff:
- # Note: This test may not work as expected since we're using real functionality - # and the logger calls depend on internal implementation details - # Consider removing this test or adapting it to test actual logging behavior + # Assert debug logging occurred at least once + assert mock_logger.debug.call_count >= 1
100-142: Fragility note: source type asserted as dict depends on registry state.These tests rely on an empty adapter registry so that
TraceContainer.sourceremains a dict. If automatic adapter registration is added in the future, this may become a typed model and break the assertions. You already clear the registry; keep this in mind if auto-registration occurs at import time.If needed, explicitly assert the registry union is
Anybefore callingget_trace_containerto make the test intent robust.packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (2)
16-21: Add a brief module docstring after the SPDX header.Public modules under packages/*/src should include a concise Google-style docstring per guidelines.
Apply this diff near the top of the file:
@@ # limitations under the License. +"""Data Flywheel base exporter and batching utilities. + +Provides a contract-driven span export pipeline that converts spans into +DFW records, serializes them to dict, batches them, and forwards to sinks. +""" + import logging from abc import abstractmethod
83-95: Property contract is fine, but consider marking contract as a class attribute for clarity.Keeping the property is OK, yet a class-level
EXPORT_CONTRACT: type[BaseModel]improves readability and prevents late-binding pitfalls.packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/adapter/elasticsearch/test_openai_converter.py (7)
85-90: The “case-sensitivity” test comment is misleading.You assert that "User" falls back to "user" which is correct, but the comment “Should fallback to default” can be clarified to “Falls back to default user role due to unmapped case.”
- assert convert_role("User") == "user" # Should fallback to default + assert convert_role("User") == "user" # Unmapped case -> fallback to default user
178-200: Prefer pytest.raises over try/except + asserts.Using pytest.raises yields clearer intent and failure messages.
- try: - create_message_by_role("user", None) - assert False, "Expected AssertionError for None content in user message" - except AssertionError as e: - assert "User message content cannot be None" in str(e) + import pytest + with pytest.raises((AssertionError, ValueError)) as exc: + create_message_by_role("user", None) + assert "User message content cannot be None" in str(exc.value) @@ - try: - create_message_by_role("system", None) - assert False, "Expected AssertionError for None content in system message" - except AssertionError as e: - assert "System message content cannot be None" in str(e) + with pytest.raises((AssertionError, ValueError)) as exc: + create_message_by_role("system", None) + assert "System message content cannot be None" in str(exc.value) @@ - try: - create_message_by_role("tool", None) - assert False, "Expected AssertionError for None content in tool message" - except AssertionError as e: - assert "Tool message content cannot be None" in str(e) + with pytest.raises((AssertionError, ValueError)) as exc: + create_message_by_role("tool", None) + assert "Tool message content cannot be None" in str(exc.value)
690-700: Tighten expectation: missing “message” should deterministically raise ValueError.The converter can avoid TypeError by guarding finish_reason type. Update the test to only accept ValueError.
- try: - convert_chat_response(chat_response, "test_span", 0) - assert False, "Expected ValueError for missing message" - except (ValueError, TypeError) as e: - # Either ValueError for missing message or TypeError for finish_reason handling - assert "Chat response missing message" in str(e) or "unhashable type" in str(e) + import pytest + with pytest.raises(ValueError) as exc: + convert_chat_response(chat_response, "test_span", 0) + assert "Chat response missing message" in str(exc.value)
701-710: Same here: ensure None “message” yields ValueError.- try: - convert_chat_response(chat_response, "test_span", 0) - assert False, "Expected ValueError for None message" - except ValueError as e: - assert "Chat response missing message" in str(e) + import pytest + with pytest.raises(ValueError) as exc: + convert_chat_response(chat_response, "test_span", 0) + assert "Chat response missing message" in str(exc.value)
931-938: Prefer pytest.raises for message conversion error.This reads cleaner and aligns with the changes that replace assert-based validation with ValueError in the converter.
- try: - convert_langchain_openai(trace_container) - assert False, "Expected error for invalid message" - except (ValueError, AssertionError) as e: - # Either AssertionError for None content or ValueError from message conversion wrapper - assert "User message content cannot be None" in str( - e) or "Failed to convert message in trace source" in str(e) + import pytest + with pytest.raises(ValueError) as exc: + convert_langchain_openai(trace_container) + assert "Failed to convert message in trace source" in str(exc.value)
959-964: Use pytest.raises for invalid chat response.If converter guards finish_reason typing, expect ValueError only.
- try: - convert_langchain_openai(trace_container) - assert False, "Expected error for invalid chat response" - except (ValueError, TypeError) as e: - # Either TypeError for unhashable dict used as key or ValueError from chat response conversion wrapper - assert "unhashable type" in str(e) or "Failed to convert chat response 0" in str(e) + import pytest + with pytest.raises(ValueError) as exc: + convert_langchain_openai(trace_container) + assert "Failed to convert chat response 0" in str(exc.value)
607-657: Add a test for non-string finish_reason to prevent TypeError regressions.Proactively cover finish_reason values that are non-hashable (e.g., dict) or unexpected types (e.g., int).
@@ class TestConvertChatResponse: def test_convert_chat_response_with_unknown_finish_reason(self): @@ assert result.finish_reason is None # Should be None for unmapped finish reasons + + def test_convert_chat_response_with_non_string_finish_reason(self): + chat_response = { + "message": { + "content": "Response with non-string finish reason", + "response_metadata": {"finish_reason": {"not": "hashable"}}, + "additional_kwargs": {}, + } + } + result = convert_chat_response(chat_response, "test_span", 0) + assert result.finish_reason is NoneAlso applies to: 670-689
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/schema_registry.py (3)
16-21: Add a module-level docstring for the registry.Public modules should have a concise docstring.
@@ import logging from typing import TypeVar from pydantic import BaseModel +"""Runtime registry for versioned DFW schema contracts per destination (e.g., Elasticsearch).""" + logger = logging.getLogger(__name__)
31-56: Decorator API and typing look good. Consider guarding concurrent access.If schemas are registered from multiple threads (plugin discovery), wrap mutations with a lock.
@@ -class SchemaRegistry: +from threading import RLock +_SCHEMA_LOCK = RLock() + +class SchemaRegistry: @@ - if name not in cls._schemas: - cls._schemas[name] = {} + with _SCHEMA_LOCK: + if name not in cls._schemas: + cls._schemas[name] = {} @@ - if version in cls._schemas[name]: - logger.warning("Overriding existing schema for %s:%s", name, version) + if version in cls._schemas[name]: + logger.warning("Overriding existing schema for %s:%s", name, version) - cls._schemas[name][version] = schema_cls + cls._schemas[name][version] = schema_clsAnd guard other mutating methods similarly (
clear).
84-95: Minor: list comprehension would be more concise.No behavior change; readability only.
- schemas = [] - for name, versions in cls._schemas.items(): - for version in versions.keys(): - schemas.append(f"{name}:{version}") - return schemas + return [f"{name}:{version}" for name, versions in cls._schemas.items() for version in versions.keys()]packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/trace_adapter_registry.py (2)
16-23: Add a module docstring.A short description helps users discover this dynamic adapter facility.
@@ import logging from collections.abc import Callable from functools import reduce from typing import Any +"""Dynamic registry that maps Pydantic trace source models to target conversion types. + +Used to select the right converter at runtime and to build a dynamic Union for validation. +""" + from nat.plugins.data_flywheel.observability.schema.trace_container import TraceContainer
36-75: Support forward-referenced return annotations (strings).If a converter uses postponed evaluation of annotations,
__annotations__['return']can be a string. Usetyping.get_type_hintsfor robustness.@@ - def decorator(func): - return_type = func.__annotations__.get('return') + def decorator(func): + from typing import get_type_hints + try: + hints = get_type_hints(func) + return_type = hints.get('return') + except Exception: + return_type = func.__annotations__.get('return') @@ - if return_type is None: + if return_type is None: raise ValueError(f"Converter function '{func.__name__}' must have a return type annotation.\n" f"Example: def {func.__name__}(trace: TraceContainer) -> DFWESRecord:")packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/openai_converter.py (3)
16-24: Add a module docstring describing the adapter.Clarifies scope (OpenAI→Elasticsearch converter, role and finish_reason normalization).
@@ import json import logging +"""OpenAI trace adapter for the Elasticsearch Data Flywheel schema. + +Normalizes roles, maps finish reasons, validates tool schemas, and converts +LangChain/OpenAI traces to DFWESRecord instances. +"""
277-299: Wrap assertion-based failures from message conversion consistently as ValueError.
convert_message_to_dfwcan raise ValueError (after the assert→ValueError change above). Catch both ValueError and AssertionError here for safety, and re-wrap as ValueError for consistent API.@@ - for message in trace_source.source.input_value: - try: - msg_result = convert_message_to_dfw(message) - messages.append(msg_result) - except ValueError as e: - raise ValueError(f"Failed to convert message in trace source: {e}") from e + for message in trace_source.source.input_value: + try: + msg_result = convert_message_to_dfw(message) + messages.append(msg_result) + except (ValueError, AssertionError) as e: + raise ValueError(f"Failed to convert message in trace source: {e}") from e
229-275: Minor: validate “message” structure early for clearer errors.Avoids doing
.get()on non-dicts and yields a predictable ValueError.@@ - message = chat_response.get("message", {}) - if message is None: - raise ValueError(f"Chat response missing message for span: '{span_name}'") + if "message" not in chat_response or chat_response.get("message") is None: + raise ValueError(f"Chat response missing message for span: '{span_name}'") + message = chat_response["message"] + if not isinstance(message, dict): + raise ValueError(f"Chat response message must be a dict in span: '{span_name}'")packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py (4)
40-46: Includecontent_filterinFinishReasonto match provider outputs.Some providers (incl. OpenAI) may return
content_filterfor safety blocks. Without it, parsing will fail for such responses.Apply this diff if applicable to your target providers:
class FinishReason(str, Enum): """Finish reason for chat completion responses.""" STOP = "stop" LENGTH = "length" TOOL_CALLS = "tool_calls" + CONTENT_FILTER = "content_filter"If other finish reasons are possible, consider a fallback strategy (e.g., allow arbitrary strings) to avoid hard failures.
103-106: MakeFunctionParameters.requiredoptional with sensible default.Tool schemas frequently omit
required. For robustness, default to an empty list.Apply this diff:
- required: list[str] = Field(..., description="The required properties of the function parameters.") + required: list[str] = Field(default_factory=list, + description="The required properties of the function parameters.")
130-151: Optional: preferSequence[...]for read-only collections.Guidelines prefer
collections.abcabstractions. UsingSequence(e.g., formessages,tool_calls,choices) broadens compatibility without constraining callers.No change required, but consider this for future iterations if it doesn’t conflict with Pydantic serialization expectations.
Also applies to: 156-168, 173-188
198-198: Consider lowering log severity for expected no-tool-call scenarios.Requests with tools but no tool calls can be normal (e.g., model decided to answer directly).
warningmay create noise;infocould be more appropriate.Apply this diff if desired:
- logger.warning("Request has tools but response has no tool calls") + logger.info("Request has tools but response has no tool calls")
...vidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
Outdated
Show resolved
Hide resolved
...a_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
Show resolved
Hide resolved
...a_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
Show resolved
Hide resolved
packages/nvidia_nat_data_flywheel/tests/observability/processor/test_dfw_record_processor.py
Show resolved
Hide resolved
packages/nvidia_nat_data_flywheel/tests/observability/processor/test_dfw_record_processor.py
Show resolved
Hide resolved
packages/nvidia_nat_data_flywheel/tests/observability/utils/test_deserialize.py
Show resolved
Hide resolved
Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (2)
30-44: Resolved: docstring type shapes now match the signature.Thanks for aligning the docstring with the signature for elasticsearch_auth and headers.
70-75: Do not ignore Elasticsearch responses; detect partial failures and surface them.As written, bulk and single-index errors will be silently dropped, causing data loss and hard-to-debug gaps. Inspect the responses and raise on failure; log useful context.
- await self._elastic_client.bulk(operations=bulk_operations) + resp = await self._elastic_client.bulk(operations=bulk_operations) + if isinstance(resp, dict) and resp.get("errors"): + # Extract first error for quick triage; full response is large + first_err = None + for it in resp.get("items", []): + meta = it.get("index") or it.get("create") or it.get("update") or it.get("delete") + if meta and meta.get("error"): + first_err = meta + break + logger.error("Elasticsearch bulk indexing reported errors (index=%s): first=%s", self._index, first_err) + raise RuntimeError("Elasticsearch bulk indexing reported errors; see logs for details.") elif isinstance(item, dict): # Single document export - await self._elastic_client.index(index=self._index, document=item) + resp = await self._elastic_client.index(index=self._index, document=item) + if isinstance(resp, dict) and resp.get("result") not in {"created", "updated"}: + logger.warning("Unexpected Elasticsearch index result (index=%s): %s", self._index, resp)Consider adding tests that simulate a partial bulk failure and assert that an exception is raised.
🧹 Nitpick comments (9)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (9)
30-37: Add missing return annotation and type for variadic params to satisfy “all public APIs typed.”init should explicitly return None, and variadic params should be typed to avoid pyright/ruff warnings given the repo settings.
Apply:
- def __init__(self, - *args, - endpoint: str, - index: str, - elasticsearch_auth: tuple[str, str], - headers: dict[str, str] | None = None, - **kwargs): + def __init__(self, + *args: "Any", + endpoint: str, + index: str, + elasticsearch_auth: tuple[str, str], + headers: dict[str, str] | None = None, + **kwargs: "Any") -> None:Add imports (see next comment) to define Any without forward refs.
16-21: Import typing symbols used in annotations.Required if you adopt the previous suggestion.
import logging +from typing import Any, Mapping, Sequence
45-47: Default both Accept and Content-Type for ES compatibility headers.When using compatibility headers with ES 8, set both Accept and Content-Type to avoid content-type negotiation surprises.
- if headers is None: - headers = {"Accept": "application/vnd.elasticsearch+json; compatible-with=8"} + if headers is None: + headers = { + "Accept": "application/vnd.elasticsearch+json; compatible-with=8", + "Content-Type": "application/vnd.elasticsearch+json; compatible-with=8", + }
52-57: Broaden typing to Sequence[Mapping[str, Any]] and improve docstring accordingly.This follows the repo guideline to prefer typing/collections.abc abstractions and conveys the payload shape.
- async def export_processed(self, item: dict | list[dict]) -> None: + async def export_processed(self, item: Mapping[str, Any] | Sequence[Mapping[str, Any]]) -> None: """Export a batch of spans. Args: - item (dict | list[dict]): Dictionary or list of dictionaries to export to Elasticsearch. + item (Mapping[str, Any] | Sequence[Mapping[str, Any]]): Single document or a sequence of documents + to export to Elasticsearch. """ - if isinstance(item, list): + if isinstance(item, Sequence) and not isinstance(item, (bytes, str)):Note: The bytes/str guard avoids treating a JSON string or bytes payload as a sequence of chars.
48-49: Optionally enable sane client defaults (timeouts/retries) and TLS guardrails.To make the exporter resilient by default, consider enabling retry_on_timeout and reasonable timeouts; optionally warn on non-HTTPS endpoints in production.
- self._elastic_client = AsyncElasticsearch(endpoint, basic_auth=elasticsearch_auth, headers=headers) + # Enable basic resiliency by default; allow callers to override via kwargs if needed + self._elastic_client = AsyncElasticsearch( + endpoint, + basic_auth=elasticsearch_auth, + headers=headers, + retry_on_timeout=True, + max_retries=3, + request_timeout=10.0, + **kwargs, + )Optionally:
- emit a warning if endpoint startswith("http://").
- expose these knobs explicitly on the mixin’s init if you want config-surface parity with other exporters.
23-28: Nit: capitalize “Elasticsearch” in docstrings.Use the proper product name casing in user-facing docs and docstrings.
- """Mixin for elasticsearch exporters. + """Mixin for Elasticsearch exporters. @@ - This mixin provides elasticsearch-specific functionality for SpanExporter exporters. - It handles elasticsearch-specific resource tagging and uses the AsyncElasticsearch client. + This mixin provides Elasticsearch-specific functionality for SpanExporter exporters. + It handles Elasticsearch-specific resource tagging and uses the AsyncElasticsearch client.
52-75: Add an async close to release the client cleanly.The mixin allocates an AsyncElasticsearch client but never closes it; add an explicit close to avoid connector/socket leaks in long-running agents.
Add this method (outside the shown range) and ensure callers invoke it on shutdown:
async def aclose(self) -> None: """Close the underlying Elasticsearch client.""" await self._elastic_client.close()If your exporter base defines a lifecycle hook (e.g., close/stop), wire this through there instead.
58-69: Minor: chunk very large bulks to control payload size and backpressure.If batches can be large, consider chunking (e.g., 1k docs per bulk call) to reduce memory usage and failure blast radius.
I can provide a chunked variant if desired.
39-44: Clarify endpoint/auth expectations in docstring.Document expected endpoint form (e.g., https://host:port) and that elasticsearch_auth is Basic Auth (user, pass). If API keys are needed later, note the future support surface.
- endpoint (str): The elasticsearch endpoint. + endpoint (str): The Elasticsearch endpoint (e.g., "https://host:9200"). @@ - elasticsearch_auth (tuple[str, str]): The elasticsearch authentication credentials. + elasticsearch_auth (tuple[str, str]): Basic authentication credentials as (username, password).
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
packages/nvidia_nat_data_flywheel/pyproject.toml(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py(1 hunks)src/nat/profiler/decorators/function_tracking.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- src/nat/profiler/decorators/function_tracking.py
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py
- packages/nvidia_nat_data_flywheel/pyproject.toml
🧰 Additional context used
📓 Path-based instructions (9)
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
packages/*/src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
All importable Python code in packages must live under packages//src/
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
{src/**/*.py,packages/*/src/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: passDocumentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header
comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
packages/**/*
⚙️ CodeRabbit configuration file
packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain apyproject.tomlfile. - Thepyproject.tomlfile should declare a dependency onnvidia-nator another package with a name starting
withnvidia-nat-. This dependency should be declared using~=<version>, and the version should be a two
digit version (ex:~=1.0).
- Not all packages contain Python code, if they do they should also contain their own set of tests, in a
tests/directory at the same level as thepyproject.tomlfile.
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
🧬 Code graph analysis (1)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (2)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (1)
export_processed(79-88)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (1)
export_processed(97-98)
🔇 Additional comments (1)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (1)
1-15: License header looks correct and up to date.SPDX header and copyright years (2024–2025) are present and correct.
willkill07
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completed a first-pass
...src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_to_dfw_record.py
Outdated
Show resolved
Hide resolved
...wheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
♻️ Duplicate comments (3)
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md (1)
20-23: Fix project naming in heading and first mention; align with repo nomenclature.Use “NeMo Agent Toolkit” (capital T) in headings, and use “NVIDIA NeMo Agent toolkit” on first mention in body; subsequent mentions “NeMo Agent toolkit”. Also prefer “GitHub repo” casing.
Apply:
-# NVIDIA Agent Toolkit Subpackage -This is a subpackage for NVIDIA Data Flywheel Blueprint integration for continuous model improvement. - -For more information about the NVIDIA NeMo Agent toolkit, please visit the [NeMo Agent toolkit GitHub Repo](https://github.com/NVIDIA/NeMo-Agent-Toolkit). +# NVIDIA NeMo Agent Toolkit Data Flywheel subpackage +This subpackage integrates the NVIDIA Data Flywheel Blueprint with the NVIDIA NeMo Agent toolkit to enable continuous model improvement. + +For more information about the NVIDIA NeMo Agent toolkit, please visit the [NeMo Agent toolkit GitHub repo](https://github.com/NVIDIA/NeMo-Agent-Toolkit).packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py (1)
18-20: Import ConfigDict to make extras behavior explicit.If you intend to preserve unknown fields arriving from upstream LangChain/OpenAI objects, configure
extra="allow"; otherwise, explicitly setextra="ignore"to codify the current behavior and avoid regressions.-from pydantic import BaseModel -from pydantic import Field +from pydantic import BaseModel, Field, ConfigDictpackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (1)
16-19: Replace inline comment with a proper module docstring.Public modules require a top-level docstring. Fold the inline note into it and keep the public re-exports. (Echoing a prior review on this file.)
-# schemas are auto-discovered by the discovery system - no manual imports needed +"""Elasticsearch sink schema public exports. + +Schemas are auto-discovered by the discovery system; no manual imports are needed. +""" from .contract_version import ContractVersion from .dfw_es_record import DFWESRecord
🧹 Nitpick comments (8)
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md (1)
18-18: Tighten banner title casing (nit).The image title currently says “NeMo Agent toolkit banner image”. For consistency with naming rules, prefer “NeMo Agent Toolkit banner image”.
Apply:
- +packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (3)
18-29: Remove unused TypeVar.
ProviderTis defined but unused.-from typing import TypeVar @@ -ProviderT = TypeVar("ProviderT")
42-70: Broaden iterable handling for input_value (optional).Accept generic sequences (e.g., tuples) in addition to lists. This improves robustness and aligns with the suggested
Sequencetyping.@@ - # Validate list of messages - if isinstance(v, list): + # Validate a sequence of messages + if isinstance(v, (list, tuple)): validated_messages = [] for msg in v: if isinstance(msg, dict): validated_messages.append(OpenAIMessage(**msg)) elif isinstance(msg, OpenAIMessage): validated_messages.append(msg) else: raise ValueError(f"Invalid message format: {msg}") return validated_messages
71-85: Remove commented-out validator to reduce noise.Dead code increases maintenance overhead.
- # @field_validator("metadata", mode="before") - # @classmethod - # def validate_metadata(cls, v: Any) -> dict[str, Any]: - # """Validate the metadata for the OpenAITraceSource.""" - # if v is None: - # return {} - # - # if isinstance(v, str): - # metadata = deserialize_span_attribute(v) - # if not isinstance(metadata, dict): - # raise ValueError(f"Invalid metadata format: {metadata}") - # return metadata - # - # raise ValueError(f"Invalid metadata format: {v}")packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (1)
21-21: Nit: ensure a single trailing newline at EOF.Repository style requires exactly one newline at the end of each file.
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)
16-21: Add a concise module docstring.All public modules need a top-level docstring per repo guidelines.
@@ -from enum import Enum +"""Versioned contract selection for the Elasticsearch sink schema. + +Exposes ContractVersion and a resolver to obtain the Pydantic contract +class via the SchemaRegistry for a given version. +""" +from enum import Enumpackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (2)
15-20: Add a module docstring summarizing the exporter.Provide a one-liner description to satisfy documentation requirements.
@@ # limitations under the License. +"""Data Flywheel exporter implementation targeting Elasticsearch.""" + import logging
58-68: Optional: validate required Elasticsearch kwargs early for clearer errors.Because ElasticsearchMixin.init requires endpoint/index/elasticsearch_auth, pre-validating produces a friendlier message than a TypeError from MRO.
@@ def __init__(self, - # Initialize both mixins - ElasticsearchMixin expects elasticsearch_kwargs, - # DFWExporter expects the standard exporter parameters - self.contract_version = contract_version + # Initialize both mixins - ElasticsearchMixin expects elasticsearch_kwargs, + # DFWExporter expects the standard exporter parameters + self.contract_version = contract_version + required = ("endpoint", "index", "elasticsearch_auth") + missing = tuple(k for k in required if k not in elasticsearch_kwargs) + if missing: + raise ValueError( + f"Missing required Elasticsearch kwargs: {missing}. " + "Provide 'endpoint', 'index', and 'elasticsearch_auth'.")
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (12)
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/openai_converter.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_to_dfw_record.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py(1 hunks)packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py(1 hunks)packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/adapter/elasticsearch/test_openai_converter.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- packages/nvidia_nat_data_flywheel/tests/observability/processor/trace_conversion/adapter/elasticsearch/test_openai_converter.py
- packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/span_to_dfw_record.py
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/trace_conversion/adapter/elasticsearch/openai_converter.py
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py
🧰 Additional context used
📓 Path-based instructions (9)
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
packages/*/src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
All importable Python code in packages must live under packages//src/
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
{src/**/*.py,packages/*/src/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: passDocumentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header
comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
packages/**/*
⚙️ CodeRabbit configuration file
packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain apyproject.tomlfile. - Thepyproject.tomlfile should declare a dependency onnvidia-nator another package with a name starting
withnvidia-nat-. This dependency should be declared using~=<version>, and the version should be a two
digit version (ex:~=1.0).
- Not all packages contain Python code, if they do they should also contain their own set of tests, in a
tests/directory at the same level as thepyproject.tomlfile.
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
🧠 Learnings (3)
📚 Learning: 2025-08-25T15:01:54.258Z
Learnt from: CR
PR: NVIDIA/NeMo-Agent-Toolkit#0
File: .cursor/rules/general.mdc:0-0
Timestamp: 2025-08-25T15:01:54.258Z
Learning: Applies to docs/source/**/*.md : Use the official naming: first use 'NVIDIA NeMo Agent toolkit'; subsequent references 'NeMo Agent toolkit'; in headings use 'NeMo Agent Toolkit' (capital T)
Applied to files:
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
📚 Learning: 2025-08-25T15:01:54.259Z
Learnt from: CR
PR: NVIDIA/NeMo-Agent-Toolkit#0
File: .cursor/rules/general.mdc:0-0
Timestamp: 2025-08-25T15:01:54.259Z
Learning: Applies to docs/source/**/*.md : Never use deprecated names (Agent Intelligence toolkit, aiqtoolkit, AgentIQ, AIQ/aiq) in documentation; update to the current naming unless explicitly referring to deprecated names
Applied to files:
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
📚 Learning: 2025-08-25T15:01:54.258Z
Learnt from: CR
PR: NVIDIA/NeMo-Agent-Toolkit#0
File: .cursor/rules/general.mdc:0-0
Timestamp: 2025-08-25T15:01:54.258Z
Learning: Applies to {src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py} : Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)
Applied to files:
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
🧬 Code graph analysis (4)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py (3)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py (1)
OpenAIMessage(23-31)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/trace_source_base.py (1)
TraceSourceBase(20-22)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/utils/deserialize.py (1)
deserialize_span_attribute(23-42)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (2)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)
ContractVersion(23-31)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/dfw_es_record.py (1)
DFWESRecord(192-222)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/schema_registry.py (2)
SchemaRegistry(26-135)get_schema(58-81)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (4)
src/nat/builder/context.py (1)
ContextState(65-90)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (3)
DFWExporter(43-98)export_contract(85-94)export_processed(97-98)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (2)
ElasticsearchMixin(23-75)export_processed(52-75)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (2)
ContractVersion(23-31)get_contract_class(29-31)
🪛 LanguageTool
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md
[grammar] ~20-~20: There might be a mistake here.
Context: ...age") # NVIDIA Agent Toolkit Subpackage This is a subpackage for NVIDIA Data Fly...
(QB_NEW_EN)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: CI Pipeline / Test (amd64, 3.12)
- GitHub Check: CI Pipeline / Test (arm64, 3.12)
- GitHub Check: CI Pipeline / Test (arm64, 3.11)
- GitHub Check: CI Pipeline / Build Wheels
- GitHub Check: CI Pipeline / Test (amd64, 3.11)
🔇 Additional comments (7)
packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.md (2)
1-16: SPDX/Apache-2.0 header looks correct.Header block is present, year is 2025, and identifiers match Apache-2.0.
24-24: EOF newline count correct –packages/nvidia_nat_data_flywheel/src/nat/meta/pypi.mdends with exactly one trailing newline and no extra blank lines, satisfying our style rules.packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/__init__.py (1)
17-20: Re-export surface looks good.Import targets exist and all exposes the minimal public surface consumers need.
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/sink/elasticsearch/contract_version.py (1)
23-31: Enum and resolver design LGTM.
- String-valued Enum aids serialization.
- get_contract_class delegates to the registry cleanly with a precise return type.
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (3)
70-78: Contract resolution property looks correct.Property returns the Pydantic contract class via the selected version; this pairs correctly with DFWExporter’s processor wiring.
79-88: Export delegation LGTM.Delegating to ElasticsearchMixin handles both single-doc and bulk cases; type hints align with the mixin signature.
1-15: Packaging sanity check passed
- Verified that
packages/nvidia_nat_data_flywheel/pyproject.tomlexists- Confirmed it declares the required dependency
nvidia-nat~=1.3per repo guidelines
..._flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
Outdated
Show resolved
Hide resolved
..._data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
Show resolved
Hide resolved
..._data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_message.py
Show resolved
Hide resolved
..._flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
Show resolved
Hide resolved
..._flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
Show resolved
Hide resolved
..._flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
Show resolved
Hide resolved
..._flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
Show resolved
Hide resolved
Signed-off-by: Matthew Penn <[email protected]>
…c Field descriptions Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (8)
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (4)
31-40: Avoid brittle base-class checks; use issubclass instead of inspecting bases.Directly asserting
__bases__is fragile and will fail if a mixin or intermediate base is introduced. Preferissubclass(and you already assertisinstance).- # Verify it's properly typed for dict - assert processor.__class__.__bases__ == (BatchingProcessor, ) + # Verify class relationship without relying on fragile __bases__ tuple + assert issubclass(DictBatchingProcessor, BatchingProcessor)
73-76: Redundant export_contract override in the concrete test exporter.The base class already exposes
export_contractfrom_export_contract. Overriding it here adds noise without additional value.- @property - def export_contract(self) -> type[BaseModel]: - return MockExportContract
109-117: Strengthen processor-chain assertions to verify type and order, not just count.Counting calls doesn’t guarantee correct wiring. Validate the exact types and order of processors added.
Add imports near the top (see separate diff below), then extend the assertions:
# Verify processors were added (4 total: span, dict, batching, filter) assert mock_add_processor.call_count == 4 + calls = [args[0] for args, _ in mock_add_processor.call_args_list] + assert isinstance(calls[0], SpanToDFWRecordProcessor) + assert isinstance(calls[1], DFWToDictProcessor) + assert isinstance(calls[2], DictBatchingProcessor) + assert isinstance(calls[3], DictBatchFilterProcessor) + # Optional: ensure client_id propagated to the span->DFW processor + assert getattr(calls[0], "_client_id", None) == client_idAnd add the required imports:
@@ from nat.plugins.data_flywheel.observability.exporter.dfw_exporter import DFWExporter from nat.plugins.data_flywheel.observability.exporter.dfw_exporter import DictBatchingProcessor +from nat.plugins.data_flywheel.observability.processor.dfw_record_processor import ( + DFWToDictProcessor, + SpanToDFWRecordProcessor, +) +from nat.observability.processor.falsy_batch_filter_processor import DictBatchFilterProcessor
153-170: Optional: Assert batching processor placement and type in the batching-params test.You already count calls. Also assert that the 3rd processor is the batching one to catch order regressions.
# Verify that processors were added assert mock_add_processor.call_count == 4 + calls = [args[0] for args, _ in mock_add_processor.call_args_list] + assert isinstance(calls[2], DictBatchingProcessor)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (4)
1-15: Add a concise module-level docstring (Google style).Public modules in packages/*/src should have a top-level docstring per the guidelines.
@@ # limitations under the License. +"""Data Flywheel exporter base and dict-batching processor. + +Defines: +- DictBatchingProcessor: a dict-specialized batching processor for bulk exports. +- DFWExporter: an abstract base wiring the processing chain + Span -> DFW Pydantic record -> dict -> batched dicts -> filtered batch. +""" +
16-29: Remove unused logger import/variable or log something.
loggeris declared but never used; ruff will flag this.-import logging @@ -logger = logging.getLogger(__name__)
44-64: Validate constructor arguments to prevent silent misconfiguration.Add lightweight runtime checks for
export_contracttype and batching parameters (positives and correct types). This avoids hard-to-debug failures further down the pipeline.def __init__(self, export_contract: type[BaseModel], context_state: ContextState | None = None, batch_size: int = 100, flush_interval: float = 5.0, max_queue_size: int = 1000, drop_on_overflow: bool = False, shutdown_timeout: float = 10.0, client_id: str = "default"): @@ - super().__init__(context_state) + super().__init__(context_state) + + # Basic validation to catch misconfiguration early + if not isinstance(export_contract, type) or not issubclass(export_contract, BaseModel): + raise TypeError("export_contract must be a Pydantic BaseModel subclass") + for name, value, predicate in ( + ("batch_size", batch_size, lambda v: isinstance(v, int) and v > 0), + ("max_queue_size", max_queue_size, lambda v: isinstance(v, int) and v > 0), + ("flush_interval", flush_interval, lambda v: isinstance(v, (int, float)) and v > 0), + ("shutdown_timeout", shutdown_timeout, lambda v: isinstance(v, (int, float)) and v >= 0), + ): + if not predicate(value): + raise ValueError(f"{name} must be a positive number (got {value!r})")
41-79: Optional: allow class-level contract declaration to simplify subclasses.Some exporters may prefer a class attribute (e.g.,
EXPORT_CONTRACT = MyContract) over passingexport_contractat instantiation. Supporting both simplifies subclass ergonomics without invoking virtual properties during construction.-class DFWExporter(SpanExporter[Span, dict]): +class DFWExporter(SpanExporter[Span, dict]): @@ - def __init__(self, - export_contract: type[BaseModel], + def __init__(self, + export_contract: type[BaseModel], context_state: ContextState | None = None, @@ - # Store the contract for property access - self._export_contract = export_contract + # Store the contract for property access + self._export_contract = export_contractFollow-up (in a future change): make
export_contractoptional and fall back togetattr(type(self), "EXPORT_CONTRACT", None)with a clear error if neither is provided.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py(1 hunks)packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py(1 hunks)packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/schema/provider/openai_trace_source.py
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py
- packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_elasticsearch_exporter.py
🧰 Additional context used
📓 Path-based instructions (9)
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial
Files:
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline
Files:
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template
Files:
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation
Files:
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: passDocumentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header
comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
packages/**/*
⚙️ CodeRabbit configuration file
packages/**/*: - This directory contains optional plugin packages for the toolkit, each should contain apyproject.tomlfile. - Thepyproject.tomlfile should declare a dependency onnvidia-nator another package with a name starting
withnvidia-nat-. This dependency should be declared using~=<version>, and the version should be a two
digit version (ex:~=1.0).
- Not all packages contain Python code, if they do they should also contain their own set of tests, in a
tests/directory at the same level as thepyproject.tomlfile.
Files:
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.pypackages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
packages/*/src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
All importable Python code in packages must live under packages//src/
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
{src/**/*.py,packages/*/src/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period
Files:
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
🧬 Code graph analysis (2)
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (2)
src/nat/builder/context.py (1)
ContextState(65-90)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (4)
DFWExporter(41-95)DictBatchingProcessor(32-38)export_contract(82-91)export_processed(94-95)
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (5)
src/nat/builder/context.py (1)
ContextState(65-90)src/nat/observability/processor/falsy_batch_filter_processor.py (1)
DictBatchFilterProcessor(43-45)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/processor/dfw_record_processor.py (2)
DFWToDictProcessor(35-56)SpanToDFWRecordProcessor(59-86)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_elasticsearch_exporter.py (1)
export_processed(65-74)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/mixin/elasticsearch_mixin.py (1)
export_processed(52-77)
🔇 Additional comments (7)
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py (4)
85-92: LGTM: default initialization path is exercised and contract wiring is checked.
200-209: LGTM: integration-style instantiation checks look good.Covers multiple instances and contract consistency.
186-196: Marktest_invalid_parameter_typesas xfail until strict validation is addedIt looks like the current
ConcreteDFWExporterconstructor doesn’t perform any runtime type checks, so asserting that it “should still work” isn’t preventing its misuse—but switching to expecting an exception today would break the test suite. If you plan to enforce strict parameter validation soon, it’s safer to mark this test as an expected failure until those checks land.• In
packages/nvidia_nat_data_flywheel/tests/observability/exporter/test_dfw_exporter.py, change the decorator ontest_invalid_parameter_types:- def test_invalid_parameter_types(self): + @pytest.mark.xfail( + reason="ConcreteDFWExporter will raise on invalid parameter types once validation is implemented", + strict=False, + ) + def test_invalid_parameter_types(self):
132-138: The script will check for any existing pytest-asyncio usage or configuration in your repo (references, config files, and dependencies). I’ll await the results to determine if marking async tests is already handled globally or if adding@pytest.mark.asynciois still needed.packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py (3)
70-79: Processor chain wiring looks correct and ordered.Span -> DFW record -> dict -> batching -> falsy filter. This matches the intended pipeline and keeps the sink-facing type to dict.
81-92: Property accessor is simple and correct.Returning the class reference enables consistent identity checks in tests and downstream code.
93-95: No action needed—abstractness is already enforced via ABC
SpanExporterinherits fromProcessingExporter→BaseExporter→Exporter, andExporteris defined as a subclass ofABC. This means the@abstractmethodonexport_processedis enforced byABCMeta, preventing instantiation until all abstract methods are implemented.•
src/nat/observability/exporter/exporter.py:
class Exporter(ABC):ensuresABCMetain the MRO.
•src/nat/observability/exporter/base_exporter.py:
class BaseExporter(Exporter):inherits that abstractness.
•src/nat/observability/exporter/processing_exporter.py:
class ProcessingExporter(..., BaseExporter, ...):continues the chain.
•src/nat/observability/exporter/span_exporter.py:
class SpanExporter(ProcessingExporter[...], SerializeMixin):finally includesABCMetavia its ancestors.No changes are required here.
Likely an incorrect or invalid review comment.
...vidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/exporter/dfw_exporter.py
Show resolved
Hide resolved
… safety Signed-off-by: Matthew Penn <[email protected]>
packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py
Outdated
Show resolved
Hide resolved
Ensure all arguments are used in `track_unregistered_function` when calling `push_active_function` Update elasticsearch version in pyproject.toml Signed-off-by: Matthew Penn <[email protected]>
Move test_function_tracking.py under decorators folder Signed-off-by: Matthew Penn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/nat/profiler/decorators/function_tracking.py (1)
80-80: Add return type annotation to public decorator.Public APIs must annotate return types. Minimal, non-invasive fix:
-def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None): +def track_function(func: Any = None, *, metadata: dict[str, Any] | None = None) -> Any:
♻️ Duplicate comments (1)
src/nat/profiler/decorators/function_tracking.py (1)
66-66: Use “nat” (lowercase) in docstrings, not “NAT”.Style rule: documentation should use “nat”. Update the docstring.
- """Push an intermediate step to the NAT Event Stream.""" + """Push an intermediate step to the nat event stream."""
🧹 Nitpick comments (4)
tests/nat/profiler/decorators/test_function_tracking.py (2)
220-231: Fixture should follow repository fixture-naming rule.Per guidelines, fixtures must use the fixture_ prefix and set name= on the decorator. Adjust as below.
- @pytest.fixture - def mock_context(self): + @pytest.fixture(name="mock_context") + def fixture_mock_context(self): """Mock Context and its push_active_function method.""" with patch('nat.profiler.decorators.function_tracking.Context') as mock_context_class: mock_context_instance = Mock() mock_manager = Mock() mock_context_instance.push_active_function.return_value.__enter__ = Mock(return_value=mock_manager) mock_context_instance.push_active_function.return_value.__exit__ = Mock(return_value=None) mock_context_class.get.return_value = mock_context_instance yield mock_context_instance, mock_manager
32-44: Make purely synchronous tests non-async to avoid unnecessary event-loop overhead.This test doesn’t await anything. Consider making it a regular test function.
-async def test_sync_function_no_metadata(reactive_stream: Subject): +def test_sync_function_no_metadata(reactive_stream: Subject):src/nat/profiler/decorators/function_tracking.py (2)
62-66: Tighten parameter types for args/kwargs.These are always serialized structures; reflect that in the signature.
- args: Any = None, - kwargs: Any = None, + args: list[Any] | None = None, + kwargs: dict[str, Any] | None = None, output: Any = None, metadata: dict[str, Any] | None = None) -> None:
333-349: Generator wrappers aggregate all outputs; consider a streaming-friendly option.Both generator wrappers for track_unregistered_function build a full list before setting output. For long/streaming generators this inflates memory and delays visibility of results. If practical, make aggregation optional (e.g., flag to skip collecting and set_output(None) or a capped buffer).
Also applies to: 375-392
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
packages/nvidia_nat_data_flywheel/pyproject.toml(1 hunks)packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py(1 hunks)src/nat/profiler/decorators/function_tracking.py(2 hunks)tests/nat/profiler/decorators/test_function_tracking.py(1 hunks)tests/nat/profiler/test_function_tracking.py(0 hunks)
💤 Files with no reviewable changes (1)
- tests/nat/profiler/test_function_tracking.py
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/nvidia_nat_data_flywheel/pyproject.toml
- packages/nvidia_nat_data_flywheel/src/nat/plugins/data_flywheel/observability/register.py
🧰 Additional context used
📓 Path-based instructions (11)
{tests/**/*.py,examples/*/tests/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
{tests/**/*.py,examples/*/tests/**/*.py}: Unit tests live in tests/ (or examples/*/tests) and use markers defined in pyproject.toml (e2e, integration)
Use pytest with pytest-asyncio for asynchronous tests
Mock external services with pytest_httpserver or unittest.mock; do not hit live endpoints in tests
Mark expensive tests with @pytest.mark.slow or @pytest.mark.integration to skip by default
Files:
tests/nat/profiler/decorators/test_function_tracking.py
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.py: Format Python code with yapf (PEP8 base, column_limit=120) run second in the toolchain
Run ruff (ruff check --fix) using configuration in pyproject.toml; fix warnings unless explicitly ignored; ruff is a linter only (not for formatting)
Respect naming: snake_case for functions and variables, PascalCase for classes, UPPER_CASE for constants
Prefer typing/collections.abc abstractions (e.g., Sequence over list)
Use typing.Annotated for units or extra metadata when useful
Treat pyright warnings (configured in pyproject.toml) as errors during development
Prefer httpx with SSL verification enabled by default for HTTP requests; follow OWASP Top-10
Use async/await for I/O-bound work (HTTP, DB, file reads)
Cache expensive computations with functools.lru_cache or an external cache when appropriate
Leverage NumPy vectorized operations when beneficial
Files:
tests/nat/profiler/decorators/test_function_tracking.pysrc/nat/profiler/decorators/function_tracking.py
**/*.{py,md,sh,yml,yaml,toml,json,ini,cfg,txt,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Indent with 4 spaces (no tabs) and ensure every file ends with a single trailing newline
Files:
tests/nat/profiler/decorators/test_function_tracking.pysrc/nat/profiler/decorators/function_tracking.py
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
**/*.{py,sh,md,yml,yaml,toml,json,ini,txt,cfg,rst,pyi}: Every file must start with the standard SPDX Apache-2.0 header
All source files must include the SPDX Apache-2.0 header template
Files:
tests/nat/profiler/decorators/test_function_tracking.pysrc/nat/profiler/decorators/function_tracking.py
**/*.{py,md}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Version numbers are derived by setuptools-scm; never hard-code versions in code or documentation
Files:
tests/nat/profiler/decorators/test_function_tracking.pysrc/nat/profiler/decorators/function_tracking.py
**/*
⚙️ CodeRabbit configuration file
**/*: # Code Review Instructions
- Ensure the code follows best practices and coding standards. - For Python code, follow
PEP 20 and
PEP 8 for style guidelines.- Check for security vulnerabilities and potential issues. - Python methods should use type hints for all parameters and return values.
Example:def my_function(param1: int, param2: str) -> bool: passDocumentation Review Instructions - Verify that documentation and comments are clear and comprehensive. - Verify that the documentation doesn't contain any TODOs, FIXMEs or placeholder text like "lorem ipsum". - Verify that the documentation doesn't contain any offensive or outdated terms. - Verify that documentation and comments are free of spelling mistakes, ensure the documentation doesn't contain any
words listed in the
ci/vale/styles/config/vocabularies/nat/reject.txtfile, words that might appear to be
spelling mistakes but are listed in theci/vale/styles/config/vocabularies/nat/accept.txtfile are OK.Misc. - All code should be licensed under the Apache License 2.0, and should contain an Apache License 2.0 header
comment at the top of each file.
- Confirm that copyright years are up-to date whenever a file is changed.
Files:
tests/nat/profiler/decorators/test_function_tracking.pysrc/nat/profiler/decorators/function_tracking.py
tests/**/*.py
⚙️ CodeRabbit configuration file
tests/**/*.py: - Ensure that tests are comprehensive, cover edge cases, and validate the functionality of the code. - Test functions should be named using thetest_prefix, using snake_case. - Any frequently repeated code should be extracted into pytest fixtures. - Pytest fixtures should define the name argument when applying the pytest.fixture decorator. The fixture
function being decorated should be named using thefixture_prefix, using snake_case. Example:
@pytest.fixture(name="my_fixture")
def fixture_my_fixture():
pass
Files:
tests/nat/profiler/decorators/test_function_tracking.py
{src/**/*.py,packages/*/src/**/*.py,examples/*/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
Use abbreviations: 'nat' for API namespace and CLI tool, 'nvidia-nat' for package name, 'NAT' only for env var prefixes and informal code comments (never in documentation)
Files:
src/nat/profiler/decorators/function_tracking.py
src/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
All importable Python code must live under src/ so namespace packages resolve correctly
Files:
src/nat/profiler/decorators/function_tracking.py
{src/**/*.py,packages/*/src/**/*.py}
📄 CodeRabbit inference engine (.cursor/rules/general.mdc)
{src/**/*.py,packages/*/src/**/*.py}: All public APIs require Python 3.11+ type hints on parameters and return values
Provide Google-style docstrings for every public module, class, function and CLI command
Docstring first line must be a concise description ending with a period
Files:
src/nat/profiler/decorators/function_tracking.py
src/nat/**/*
⚙️ CodeRabbit configuration file
This directory contains the core functionality of the toolkit. Changes should prioritize backward compatibility.
Files:
src/nat/profiler/decorators/function_tracking.py
🧬 Code graph analysis (2)
tests/nat/profiler/decorators/test_function_tracking.py (2)
src/nat/profiler/decorators/function_tracking.py (2)
track_function(80-256)track_unregistered_function(259-408)src/nat/builder/context.py (4)
metadata(127-136)output(58-59)push_active_function(178-221)set_output(61-62)
src/nat/profiler/decorators/function_tracking.py (1)
src/nat/builder/context.py (6)
metadata(127-136)Context(93-277)get(89-90)get(267-277)push_active_function(178-221)set_output(61-62)
🔇 Additional comments (2)
tests/nat/profiler/decorators/test_function_tracking.py (1)
32-434: Strong test coverage for decorators and event semantics.Good breadth: sync/async funcs, (a)sync generators, class methods, kwargs, Pydantic serialization, metadata, manual decoration, and error cases. Assertions on event ordering and payloads look solid.
src/nat/profiler/decorators/function_tracking.py (1)
258-409: Decorator behavior and metadata propagation look correct.Wrapper selection (sync/async/(a)sync gen), context handling, input_data shape, and metadata propagation to Context.push_active_function are consistent with tests and guidelines.
…to streamline documentation. Signed-off-by: Matthew Penn <[email protected]>
|
/merge |
Description
Closes #719
This PR introduces the
nvidia-nat-data-flywheelsubpackage, which extends the NeMo Agent Toolkit's observability system to support NVIDIA Data Flywheel Blueprint integration. This integration provides developers with the ability to distill smaller LLMs with NeMo Microservices Platform (NMP) for components of their agentic application. Such efforts could deliver more efficient deployments and/or lower latency during runtime. This initial integration focuses on NIM/OpenAI LLM providers used in Langchain/LangGraph based workflows/plugins. Key features of this PR are summarized below:Trace Conversion Pipeline:
Elasticsearch Export:
Workload Identification:
workload_idvia parent registered function instance name or using thetrack_unregistered_functiondecorator. Thetrack_unregistered_functiondecorator is useful for methods that are not themselves toolkit plugins (e.g. nodes in a LangGraph)workload_idis used as a key in Data Flywheel Blueprint to trigger fine-tuning/distillation jobs for atomic parts of an agentic workflowUsage Example:
After installing this subpackage, this feature can be activated with the following workflow YAML configuration:
To add higher fidelity scoping to functions that are not NAT registered functions:
Documentation will be included in a follow-on PR.
By Submitting this PR I confirm:
Summary by CodeRabbit
New Features
Documentation
Chores
Tests