@@ -220,7 +220,7 @@ def get_processor_by_name(self, name: str) -> Processor | None:
220220 if name in self ._processor_names :
221221 position = self ._processor_names [name ]
222222 return self ._processors [position ]
223- logger .warning ("Processor '%s' not found in pipeline" , name )
223+ logger .debug ("Processor '%s' not found in pipeline" , name )
224224 return None
225225
226226 def _check_pipeline_locked (self ) -> None :
@@ -343,9 +343,8 @@ def _check_processor_compatibility(self,
343343 target_type )
344344
345345 async def _pre_start (self ) -> None :
346- # Lock the pipeline to prevent further modifications
347- self ._pipeline_locked = True
348346
347+ # Validate that the pipeline is compatible with the exporter
349348 if len (self ._processors ) > 0 :
350349 first_processor = self ._processors [0 ]
351350 last_processor = self ._processors [- 1 ]
@@ -382,14 +381,17 @@ async def _pre_start(self) -> None:
382381 self .output_type ,
383382 e )
384383
384+ # Lock the pipeline to prevent further modifications
385+ self ._pipeline_locked = True
386+
385387 async def _process_pipeline (self , item : PipelineInputT ) -> PipelineOutputT | None :
386388 """Process item through all registered processors.
387389
388390 Args:
389391 item (PipelineInputT): The item to process (starts as PipelineInputT, can transform to PipelineOutputT)
390392
391393 Returns:
392- PipelineOutputT: The processed item after running through all processors
394+ PipelineOutputT | None : The processed item after running through all processors
393395 """
394396 return await self ._process_through_processors (self ._processors , item ) # type: ignore
395397
@@ -529,7 +531,7 @@ async def export_processed(self, item: PipelineOutputT | list[PipelineOutputT])
529531 """
530532 pass
531533
532- def _create_export_task (self , coro : Coroutine ):
534+ def _create_export_task (self , coro : Coroutine ) -> None :
533535 """Create task with minimal overhead but proper tracking.
534536
535537 Args:
@@ -549,7 +551,7 @@ def _create_export_task(self, coro: Coroutine):
549551 raise
550552
551553 @override
552- async def _cleanup (self ):
554+ async def _cleanup (self ) -> None :
553555 """Enhanced cleanup that shuts down all shutdown-aware processors.
554556
555557 Each processor is responsible for its own cleanup, including routing
0 commit comments