Skip to content

Commit 1a7b010

Browse files
committed
Fix potential issues with atomic pipeline updates + tests
Signed-off-by: Matthew Penn <[email protected]>
1 parent f641a4c commit 1a7b010

File tree

2 files changed

+86
-10
lines changed

2 files changed

+86
-10
lines changed

src/nat/observability/exporter/processing_exporter.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,25 +97,25 @@ def add_processor(self,
9797
# Validate type compatibility at insertion point
9898
self._validate_insertion_compatibility(processor, insert_position)
9999

100-
# Insert the processor at the calculated position
101-
if insert_position == len(self._processors):
102-
self._processors.append(processor)
103-
else:
104-
self._processors.insert(insert_position, processor)
105-
106-
# Validate and register processor name if provided (before position updates)
100+
# Pre-validate name (no side effects yet)
107101
if name is not None:
108102
if not isinstance(name, str):
109103
raise TypeError(f"Processor name must be a string, got {type(name).__name__}")
110104
if name in self._processor_names:
111105
raise ValueError(f"Processor name '{name}' already exists")
112106

113-
# Always update positions for processors that shifted
114-
for proc_name, pos in self._processor_names.items():
107+
# Shift existing name positions (do this before list mutation)
108+
for proc_name, pos in list(self._processor_names.items()):
115109
if pos >= insert_position:
116110
self._processor_names[proc_name] = pos + 1
117111

118-
# Register the processor name after position updates
112+
# Insert the processor
113+
if insert_position == len(self._processors):
114+
self._processors.append(processor)
115+
else:
116+
self._processors.insert(insert_position, processor)
117+
118+
# Record the new processor name, if provided
119119
if name is not None:
120120
self._processor_names[name] = insert_position
121121

tests/nat/observability/exporter/test_processing_exporter.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,13 +301,89 @@ def test_add_processor_duplicate_name_raises_error(self, processing_exporter):
301301
with pytest.raises(ValueError, match="Processor name 'test_name' already exists"):
302302
processing_exporter.add_processor(processor2, name="test_name")
303303

304+
def test_add_processor_atomicity_on_name_validation_failure(self, processing_exporter):
305+
"""Test that failed name validation leaves processor pipeline unchanged (atomicity)."""
306+
# Set up initial state with multiple processors
307+
processor1 = MockProcessor("proc1") # str -> int
308+
processor2 = MockBatchProcessor("proc2") # int -> list[int]
309+
310+
processing_exporter.add_processor(processor1, name="first")
311+
processing_exporter.add_processor(processor2, name="second")
312+
313+
# Capture initial state
314+
initial_processor_count = len(processing_exporter._processors)
315+
initial_processor_objects = processing_exporter._processors.copy()
316+
initial_name_mapping = processing_exporter._processor_names.copy()
317+
318+
# Attempt to add processor with duplicate name (should fail)
319+
# Make processor3 compatible with processor2's output (list[int] -> ?)
320+
class ListToIntProcessor(Processor[list[int], int]):
321+
322+
async def process(self, item: list[int]) -> int:
323+
return sum(item)
324+
325+
processor3 = ListToIntProcessor() # list[int] -> int (compatible)
326+
327+
with pytest.raises(ValueError, match="Processor name 'first' already exists"):
328+
processing_exporter.add_processor(processor3, name="first") # Duplicate name
329+
330+
# Verify complete atomicity - no partial state changes
331+
assert len(processing_exporter._processors) == initial_processor_count, \
332+
"Processor count changed after failed operation"
333+
assert processing_exporter._processors == initial_processor_objects, \
334+
"Processor list modified after failed operation"
335+
assert processing_exporter._processor_names == initial_name_mapping, \
336+
"Name mapping modified after failed operation"
337+
338+
# Verify the failed processor was not added anywhere
339+
assert processor3 not in processing_exporter._processors, \
340+
"Failed processor found in processor list"
341+
304342
def test_add_processor_non_string_name_raises_error(self, processing_exporter):
305343
"""Test that non-string processor names raise TypeError."""
306344
processor = MockProcessor()
307345

308346
with pytest.raises(TypeError, match="Processor name must be a string"):
309347
processing_exporter.add_processor(processor, name=123) # Invalid type
310348

349+
def test_add_processor_atomicity_on_type_validation_failure(self, processing_exporter):
350+
"""Test that failed type validation leaves processor pipeline unchanged (atomicity)."""
351+
# Set up initial state with multiple processors
352+
processor1 = MockProcessor("proc1") # str -> int
353+
processor2 = MockBatchProcessor("proc2") # int -> list[int]
354+
355+
processing_exporter.add_processor(processor1, name="first")
356+
processing_exporter.add_processor(processor2, name="second")
357+
358+
# Capture initial state
359+
initial_processor_count = len(processing_exporter._processors)
360+
initial_processor_objects = processing_exporter._processors.copy()
361+
initial_name_mapping = processing_exporter._processor_names.copy()
362+
363+
# Attempt to add processor with invalid name type (should fail)
364+
# Make processor3 compatible with processor2's output (list[int] -> ?)
365+
class ListToStringProcessor(Processor[list[int], str]):
366+
367+
async def process(self, item: list[int]) -> str:
368+
return str(sum(item))
369+
370+
processor3 = ListToStringProcessor() # list[int] -> str (compatible)
371+
372+
with pytest.raises(TypeError, match="Processor name must be a string"):
373+
processing_exporter.add_processor(processor3, name=123) # Invalid type
374+
375+
# Verify complete atomicity - no partial state changes
376+
assert len(processing_exporter._processors) == initial_processor_count, \
377+
"Processor count changed after failed operation"
378+
assert processing_exporter._processors == initial_processor_objects, \
379+
"Processor list modified after failed operation"
380+
assert processing_exporter._processor_names == initial_name_mapping, \
381+
"Name mapping modified after failed operation"
382+
383+
# Verify the failed processor was not added anywhere
384+
assert processor3 not in processing_exporter._processors, \
385+
"Failed processor found in processor list"
386+
311387
def test_get_processor_by_name_exists(self, processing_exporter):
312388
"""Test getting processor by name when it exists."""
313389
processor = MockProcessor()

0 commit comments

Comments
 (0)