Skip to content

Commit 256f8de

Browse files
committed
Misc cleanup of ProcessingExporter
Signed-off-by: Matthew Penn <[email protected]>
1 parent 1a7b010 commit 256f8de

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

src/nat/observability/exporter/processing_exporter.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def get_processor_by_name(self, name: str) -> Processor | None:
220220
if name in self._processor_names:
221221
position = self._processor_names[name]
222222
return self._processors[position]
223-
logger.warning("Processor '%s' not found in pipeline", name)
223+
logger.debug("Processor '%s' not found in pipeline", name)
224224
return None
225225

226226
def _check_pipeline_locked(self) -> None:
@@ -343,9 +343,8 @@ def _check_processor_compatibility(self,
343343
target_type)
344344

345345
async def _pre_start(self) -> None:
346-
# Lock the pipeline to prevent further modifications
347-
self._pipeline_locked = True
348346

347+
# Validate that the pipeline is compatible with the exporter
349348
if len(self._processors) > 0:
350349
first_processor = self._processors[0]
351350
last_processor = self._processors[-1]
@@ -382,14 +381,17 @@ async def _pre_start(self) -> None:
382381
self.output_type,
383382
e)
384383

384+
# Lock the pipeline to prevent further modifications
385+
self._pipeline_locked = True
386+
385387
async def _process_pipeline(self, item: PipelineInputT) -> PipelineOutputT | None:
386388
"""Process item through all registered processors.
387389
388390
Args:
389391
item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)
390392
391393
Returns:
392-
PipelineOutputT: The processed item after running through all processors
394+
PipelineOutputT | None: The processed item after running through all processors
393395
"""
394396
return await self._process_through_processors(self._processors, item) # type: ignore
395397

@@ -529,7 +531,7 @@ async def export_processed(self, item: PipelineOutputT | list[PipelineOutputT])
529531
"""
530532
pass
531533

532-
def _create_export_task(self, coro: Coroutine):
534+
def _create_export_task(self, coro: Coroutine) -> None:
533535
"""Create task with minimal overhead but proper tracking.
534536
535537
Args:
@@ -549,7 +551,7 @@ def _create_export_task(self, coro: Coroutine):
549551
raise
550552

551553
@override
552-
async def _cleanup(self):
554+
async def _cleanup(self) -> None:
553555
"""Enhanced cleanup that shuts down all shutdown-aware processors.
554556
555557
Each processor is responsible for its own cleanup, including routing

tests/nat/observability/exporter/test_processing_exporter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ def test_get_processor_by_name_exists(self, processing_exporter):
394394

395395
def test_get_processor_by_name_not_exists(self, processing_exporter, caplog):
396396
"""Test getting processor by name when it doesn't exist."""
397-
with caplog.at_level(logging.WARNING):
397+
with caplog.at_level(logging.DEBUG):
398398
retrieved = processing_exporter.get_processor_by_name("nonexistent")
399399

400400
assert retrieved is None

0 commit comments

Comments
 (0)