Adding batch and suppress to mesa_signals#3261
Conversation
|
Performance benchmarks:
|
|
Performance benchmarks:
|
|
I think this approach is better as an upstream baseline if the goal is semantic batching and first-class API. |
| case ListSignals.INSERTED: | ||
| old = current[:idx] + current[idx + 1 :] | ||
| case ListSignals.REMOVED: | ||
| old = [*current[:idx], kwargs["old"], *current[idx:]] |
There was a problem hiding this comment.
idx should be normalized first when negative (and bounded) before slicing, otherwise reverse reconstruction is wrong for negative-index operations
batching.py reconstructs pre-batch list state using raw index, but negative indices are not normalized before reverse reconstruction. This makes emitted ListSignals.SET history wrong for operations like pop() / pop(-1) / insert(-1, x)
There was a problem hiding this comment.
I fixed it inside SignalingList instead of in the aggregator. I added additional tests in both places. So everything should work fine now. Again good catch.
There was a problem hiding this comment.
aggregate uses module-level functools.singledispatch, so registrations are global to the Python process. That means one test/plugin/library can affect aggregation behavior in unrelated code paths.
Not a blocker, but worth considering whether aggregation should be instance-scoped or at least have a reset/isolation mechanism for tests and integrations.
falloficarus22
left a comment
There was a problem hiding this comment.
- During
suppress(), dependency invalidation signals are dropped, so computed values can stay stale indefinitely. - The docstring warns about it, but it would be safer to have an explicit test proving expected behavior and a note in user-facing docs.
|
What I think works well in #3251 :
What I think is stronger in this PR:
Overall, I prefer this PR’s direction. |
|
Thanks for the detailed review. This is very usefull. Below are responses to some key points.
Good catch. I'll add tests and modify the code.
There is, unfortunately, no built-in way yet to clean the registration of singledispatch. However, it does use weakrefs internally so when a registered function goes out of scope it will be removed. So tests for example are unlikely to cause problems. I highly doubt this will become a problem.
Ok, I'll add a test for this.
It is true that in #3251, you can handle the aggregation of signals differently across HasObservable instances. I am not sure whether I like that. In my view it can be highly confusing. In the design here, The aggregation policy is direclty tied to the specific SignalType and thus easier to understand and reason about. Conceptually, it makes more sense to me that the strategy for handling aggregation is determined by the SignalType. For example, ObservableSignals can just take the old value from the first signal and the new value from the last signal. If they are different, you emit otherwise you don't. For lists, I currently just aggregate all changes into a single SET signal. I am not entirely sure about this and at least it requires careful documentation. ModelSignalTypes are currently not aggregated, but I am thinking of adding an aggregation where agents that are added and subsequently removed or vice versa are filtered out. So you keep the unique agents that are added and that are removed. But yes designing aggregations at the SignalType level foregoes some flexibility which might or might not be a problem.
Not sure I fully understand what you mean with this. In this design, nested batching will just aggregate signals in the outermost batch so it will also be linear. |
I still think the global-registration concern is real. My understanding is that registered handlers are kept in the dispatcher registry (strongly), so they are not automatically removed just because a local function goes out of scope. So cross-test/plugin/process-wide contamination is still possible once something registers on
In general, this SignalType-scoped policy is cleaner and easier to reason about for most users, and I agree that’s a strong advantage of this PR.
My earlier “linear buffer” point was specifically about expressiveness for cross-observable/global policies: in #3251 aggregators run over one ordered signal stream, while here buffering is grouped per observable before aggregation. I agree that nested batching remains outermost/linear in both designs; my point was about aggregation scope rather than nesting mechanics. |
I digged into the code a bit more. You are right that the registry uses hard refs not weakrefs. I found one hacky way to still do it. So if this becomes a relevant concern, we can add support for this: the base
Ok, I now get your point. |
Thanks, and agreed on the hard-ref point.
|
I did some more work on this. I explicitly handle slices now, including slices with negative numbers and updated various tests. I will give it a once over hopefully later today but we are getting there. |
| self.data[index] = value | ||
| self.owner.notify( | ||
| self.name, | ||
| ListSignals.REPLACED, | ||
| index=index, | ||
| old=old_value, | ||
| new=list(value), |
There was a problem hiding this comment.
Slice assignment emits wrong new payload for one-shot iterables (generators/iterators).
- In
SignalingList.__setitem__slice path, code doesself.data[index] = valueand thennew=list(value). - If
valueis a generator, it is consumed by assignment first, so emitted signal hasnew=[]even though list content changed correctly.
Repro:
from unittest.mock import Mock
from mesa.experimental.mesa_signals import HasObservables, ListSignals, ObservableList
class O(HasObservables):
items = ObservableList()
def __init__(self):
super().__init__()
self.items = [1, 2, 3, 4]
o = O()
handler = Mock()
o.observe("items", ListSignals.REPLACED, handler)
g = (x for x in [8, 9])
o.items[1:3] = g
signal = handler.call_args.args[0]
print("actual list:", list(o.items))
print("signal old:", signal.additional_kwargs["old"])
print("signal new:", signal.additional_kwargs["new"])There was a problem hiding this comment.
there are so many edge cases around signaling list that I am starting to wonder about changing the design a bit. A lot of the trouble stems from trying to reverse engineer old from the first signal. Should we actually just aggregate all signals into a single set with old and new? or can we capture old before the signal?
There was a problem hiding this comment.
I think reverse-engineering from first signal keeps creating edge-case debt so I would recommend:
- For
ListSignalsin batch, aggregate to a singleSET(old,new). - Capture
oldbefore first mutation (once per list per batch), then readnewat flush. - Keep granular per-op list signals for non-batch mode.
This is much simpler and avoids reconstructing old state from operation metadata entirely.
There was a problem hiding this comment.
Unfortunately, it is very hard to capture old. The current implementation batches signals. Signals are emitted after the change. So, to capture it before the first change, a SignalingList has to detect that a context manager is active on its owning HasObservables instance, and set old in some way. I'll take a look to see how difficult it would be to implement this, and if it is a design that can be encapsulated so it can be used downstream if needed as well.
A SignallingList knows its owner. So it can check for self.owner._batchcontext is not None. If this is true, we are in a batch context and we then only need to add old to some data structure on _BatchContex, and ensure that this is properly flushed to the outermost context manager on each exit.
falloficarus22
left a comment
There was a problem hiding this comment.
ObservableList writes directly to _BatchContext internals (ctx._captured_values) in observable_collections.py. A small API like ctx.capture_old_once(name, value) would reduce coupling and make future refactors safer.
falloficarus22
left a comment
There was a problem hiding this comment.
At this point, what's left here is mostly design polish so I'll leave the rest to you maintainers to decide. Meanwhile I'll be closing #3251
I added a |
|
@falloficarus22 Thanks a lot for your original PR and your contributions to improving this alternative take. I think it came together nicely. |
|
@codebreaker32 could you give this a look? |
EwoutH
left a comment
There was a problem hiding this comment.
Thanks!
If this is completely ready, please merge ASAP so I can release 3.5.
Summary
This PR adds a
batch()andsuppress()context manager toHasObservables, as well as an easy to use way of controlling how batched signals are to be aggregated, even for custom user defined signals. The resulting API is straightforward:batch()Buffers all signals emitted within the context. On exit, signals are aggregated per observable name:
Nesting is supported — inner batches merge into the outer batch, only the outermost dispatches.
suppress()Drops all signals. Nesting is supported — only the outermost exit restores normal dispatch.
Design choices
Comparison to #3251
The PR's resulting API is basically the same as per #3251, but the implementation details are quite different. The key differences are
_BatchContextand_SuppressContext.functools.singledispatchfor registering custom aggregation handlers. The resulting API for custom signal aggregation is straightforward as shown above. Each of the arguments here is critical.signal_typeis used byfunctools.singledispatchto pick the correct aggregate function.signalsis the list of batched Message instances.value, if provided, is the value of the observable before the first signal. You can store a value via_BatchContext.capture_original_value_once. So this functionality is also available to the user to use with custom signals.TODO
@falloficarus22, I appreciate your original PR and would love to read your feedback on this alternative take.