Skip to content

Adding batch and suppress to mesa_signals#3261

Merged
quaquel merged 28 commits intomesa:mainfrom
quaquel:pathfinding_batch
Feb 15, 2026
Merged

Adding batch and suppress to mesa_signals#3261
quaquel merged 28 commits intomesa:mainfrom
quaquel:pathfinding_batch

Conversation

@quaquel
Copy link
Copy Markdown
Member

@quaquel quaquel commented Feb 8, 2026

Summary
This PR adds a batch() and suppress() context manager to HasObservables, as well as an easy to use way of controlling how batched signals are to be aggregated, even for custom user defined signals. The resulting API is straightforward:

with self.batch():
    # allsModelSignals.agent_added signals are aggregated
    MyAgent.create_agents(self, n=500, value=self.rng.random(500, ))

with self.suppress():
    # no ModelSignals.agent_added event will be emitted
    MyAgent.create_agents(self, n=500, value=self.rng.random(500, ))

@aggregate.register(MyCustomSignalType)
def _custom_aggregate(signal_type:SignalType, signals:list[Message], value:Any|None=None):
    # add custom aggregation logic for MyCustomSignalType
    ....

batch()
Buffers all signals emitted within the context. On exit, signals are aggregated per observable name:

  • Observable: collapses multiple CHANGED signals into one (old from first, new from last). No signal if no net change.
  • ObservableList: collapses all list operations (append, insert, remove, replace, set) into a single SET signal with correct old/new lists. No signal if no net change.

Nesting is supported — inner batches merge into the outer batch, only the outermost dispatches.

suppress()
Drops all signals. Nesting is supported — only the outermost exit restores normal dispatch.

Design choices

  • Intercepts at notify, not _mesa_notify, so list signals are captured regardless of current subscriber state (important for aggregating e.g., APPENDED into SET)
  • Suppress takes priority over batch — suppress() inside batch() drops signals; batch() inside suppress() also drops signals
  • Exceptions discard the buffer — no signals dispatched on error
  • Computed properties may return stale values during a batch (documented trade-off)

Comparison to #3251
The PR's resulting API is basically the same as per #3251, but the implementation details are quite different. The key differences are

  1. most logic is contained in batching.py and the helper classes _BatchContext and _SuppressContext.
  2. aggregation is handled by a function, not a class. I am relying on functools.singledispatch for registering custom aggregation handlers. The resulting API for custom signal aggregation is straightforward as shown above. Each of the arguments here is critical. signal_type is used by functools.singledispatch to pick the correct aggregate function. signals is the list of batched Message instances. value, if provided, is the value of the observable before the first signal. You can store a value via _BatchContext.capture_original_value_once. So this functionality is also available to the user to use with custom signals.

TODO

  • add an aggregation function for ModelSignals.

@falloficarus22, I appreciate your original PR and would love to read your feedback on this alternative take.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 8, 2026

Performance benchmarks:

Model Size Init time [95% CI] Run time [95% CI]
BoltzmannWealth small 🔵 -1.0% [-1.5%, -0.5%] 🔵 +0.1% [-0.1%, +0.3%]
BoltzmannWealth large 🔵 -0.2% [-0.9%, +0.4%] 🔵 +1.3% [-1.6%, +4.2%]
Schelling small 🔵 +1.4% [+1.2%, +1.8%] 🔵 +2.2% [+2.0%, +2.3%]
Schelling large 🔵 -0.6% [-1.3%, +0.1%] 🔵 -1.3% [-3.0%, +0.6%]
WolfSheep small 🔵 -3.6% [-4.9%, -2.5%] 🔵 -1.0% [-1.6%, -0.3%]
WolfSheep large 🔵 -1.6% [-4.5%, +1.0%] 🔵 +0.1% [-4.3%, +4.2%]
BoidFlockers small 🔵 +0.5% [+0.2%, +0.8%] 🔵 +2.2% [+2.0%, +2.3%]
BoidFlockers large 🔵 +0.8% [+0.4%, +1.3%] 🔵 +2.1% [+1.7%, +2.4%]

@quaquel quaquel added the feature Release notes label label Feb 8, 2026
@quaquel quaquel marked this pull request as ready for review February 8, 2026 21:05
@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 8, 2026

Performance benchmarks:

Model Size Init time [95% CI] Run time [95% CI]
BoltzmannWealth small 🔵 -2.0% [-2.8%, -1.3%] 🔵 -1.0% [-1.2%, -0.8%]
BoltzmannWealth large 🔵 +0.1% [-1.2%, +1.4%] 🔵 -2.3% [-4.6%, +0.3%]
Schelling small 🔵 +0.5% [+0.2%, +0.8%] 🔵 +0.3% [+0.2%, +0.5%]
Schelling large 🔵 -0.3% [-0.9%, +0.5%] 🔵 -2.0% [-2.8%, -1.4%]
WolfSheep small 🔵 -0.9% [-1.2%, -0.7%] 🔵 +0.5% [+0.4%, +0.7%]
WolfSheep large 🔵 -1.5% [-2.2%, -0.5%] 🔵 -0.4% [-1.6%, +0.8%]
BoidFlockers small 🔵 -1.1% [-1.4%, -0.8%] 🔵 +0.1% [-0.1%, +0.3%]
BoidFlockers large 🔵 -1.6% [-1.9%, -1.3%] 🔵 -1.0% [-1.3%, -0.7%]

@falloficarus22
Copy link
Copy Markdown
Contributor

I think this approach is better as an upstream baseline if the goal is semantic batching and first-class API.

Comment on lines +128 to +131
case ListSignals.INSERTED:
old = current[:idx] + current[idx + 1 :]
case ListSignals.REMOVED:
old = [*current[:idx], kwargs["old"], *current[idx:]]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idx should be normalized first when negative (and bounded) before slicing, otherwise reverse reconstruction is wrong for negative-index operations

batching.py reconstructs pre-batch list state using raw index, but negative indices are not normalized before reverse reconstruction. This makes emitted ListSignals.SET history wrong for operations like pop() / pop(-1) / insert(-1, x)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it inside SignalingList instead of in the aggregator. I added additional tests in both places. So everything should work fine now. Again good catch.

Copy link
Copy Markdown
Contributor

@falloficarus22 falloficarus22 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate uses module-level functools.singledispatch, so registrations are global to the Python process. That means one test/plugin/library can affect aggregation behavior in unrelated code paths.
Not a blocker, but worth considering whether aggregation should be instance-scoped or at least have a reset/isolation mechanism for tests and integrations.

Copy link
Copy Markdown
Contributor

@falloficarus22 falloficarus22 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • During suppress(), dependency invalidation signals are dropped, so computed values can stay stale indefinitely.
  • The docstring warns about it, but it would be safer to have an explicit test proving expected behavior and a note in user-facing docs.

@falloficarus22
Copy link
Copy Markdown
Contributor

What I think works well in #3251 :

  • per-instance, pluggable aggregation pipeline (add_batch_aggregator) gives very flexible composition
  • linear signal-buffer model is simple to reason about for cross-signal ordering/policies

What I think is stronger in this PR:

  • clearer user-facing API (batch() / suppress())
  • cleaner default semantics for observables (first old, last new, no-net-change suppression)
  • signal-type-specific aggregation via singledispatch is a good long-term direction for maintainability and extension

Overall, I prefer this PR’s direction.

@quaquel
Copy link
Copy Markdown
Member Author

quaquel commented Feb 9, 2026

Thanks for the detailed review. This is very usefull. Below are responses to some key points.

idx should be normalized first when negative (and bounded) before slicing, otherwise reverse reconstruction is wrong for negative-index operations

Good catch. I'll add tests and modify the code.

aggregate uses module-level functools.singledispatch, so registrations are global to the Python process. That means one test/plugin/library can affect aggregation behavior in unrelated code paths.

There is, unfortunately, no built-in way yet to clean the registration of singledispatch. However, it does use weakrefs internally so when a registered function goes out of scope it will be removed. So tests for example are unlikely to cause problems. I highly doubt this will become a problem.

The docstring warns about it, but it would be safer to have an explicit test proving expected behavior and a note in user-facing docs.

Ok, I'll add a test for this.

What I think works well in #3251 :
per-instance, pluggable aggregation pipeline (add_batch_aggregator) gives very flexible composition

It is true that in #3251, you can handle the aggregation of signals differently across HasObservable instances. I am not sure whether I like that. In my view it can be highly confusing. In the design here, The aggregation policy is direclty tied to the specific SignalType and thus easier to understand and reason about. Conceptually, it makes more sense to me that the strategy for handling aggregation is determined by the SignalType. For example, ObservableSignals can just take the old value from the first signal and the new value from the last signal. If they are different, you emit otherwise you don't. For lists, I currently just aggregate all changes into a single SET signal. I am not entirely sure about this and at least it requires careful documentation. ModelSignalTypes are currently not aggregated, but I am thinking of adding an aggregation where agents that are added and subsequently removed or vice versa are filtered out. So you keep the unique agents that are added and that are removed. But yes designing aggregations at the SignalType level foregoes some flexibility which might or might not be a problem.

linear signal-buffer model is simple to reason about for cross-signal ordering/policies

Not sure I fully understand what you mean with this. In this design, nested batching will just aggregate signals in the outermost batch so it will also be linear.

@falloficarus22
Copy link
Copy Markdown
Contributor

falloficarus22 commented Feb 9, 2026

There is, unfortunately, no built-in way yet to clean the registration of singledispatch. However, it does use weakrefs internally so when a registered function goes out of scope it will be removed. So tests for example are unlikely to cause problems. I highly doubt this will become a problem.

I still think the global-registration concern is real. My understanding is that registered handlers are kept in the dispatcher registry (strongly), so they are not automatically removed just because a local function goes out of scope. So cross-test/plugin/process-wide contamination is still possible once something registers on aggregate. I’m fine treating this as non-blocking, but I’d suggest at least documenting this behavior clearly (and maybe adding a test/reset strategy for isolation).

It is true that in #3251, you can handle the aggregation of signals differently across HasObservable instances. I am not sure whether I like that. In my view it can be highly confusing. In the design here, The aggregation policy is direclty tied to the specific SignalType and thus easier to understand and reason about. Conceptually, it makes more sense to me that the strategy for handling aggregation is determined by the SignalType. For example, ObservableSignals can just take the old value from the first signal and the new value from the last signal. If they are different, you emit otherwise you don't. For lists, I currently just aggregate all changes into a single SET signal. I am not entirely sure about this and at least it requires careful documentation. ModelSignalTypes are currently not aggregated, but I am thinking of adding an aggregation where agents that are added and subsequently removed or vice versa are filtered out. So you keep the unique agents that are added and that are removed. But yes designing aggregations at the SignalType level foregoes some flexibility which might or might not be a problem.

In general, this SignalType-scoped policy is cleaner and easier to reason about for most users, and I agree that’s a strong advantage of this PR.

Not sure I fully understand what you mean with this. In this design, nested batching will just aggregate signals in the outermost batch so it will also be linear.

My earlier “linear buffer” point was specifically about expressiveness for cross-observable/global policies: in #3251 aggregators run over one ordered signal stream, while here buffering is grouped per observable before aggregation. I agree that nested batching remains outermost/linear in both designs; my point was about aggregation scope rather than nesting mechanics.

@quaquel
Copy link
Copy Markdown
Member Author

quaquel commented Feb 9, 2026

I still think the global-registration concern is real.

I digged into the code a bit more. You are right that the registry uses hard refs not weakrefs. I found one hacky way to still do it. So if this becomes a relevant concern, we can add support for this: the base aggregate function has the register as a attribute, so you can easily clear this: aggregate.registry.clear().

My earlier “linear buffer” point was specifically about expressiveness for cross-observable/global policies: in #3251 aggregators run over one ordered signal stream, while here buffering is grouped per observable before aggregation.

Ok, I now get your point.

@falloficarus22
Copy link
Copy Markdown
Contributor

I digged into the code a bit more. You are right that the registry uses hard refs not weakrefs. I found one hacky way to still do it. So if this becomes a relevant concern, we can add support for this: the base aggregate function has the register as a attribute, so you can easily clear this: aggregate.registry.clear().

Thanks, and agreed on the hard-ref point.

aggregate.registry.clear() won’t work because registry is a mappingproxy (read-only).
The StackOverflow workaround mutates private singledispatch closure state, which is possible but pretty fragile.
I’m okay treating this as non-blocking for now, but I still think we should document global-registration behavior clearly and consider an isolation strategy for tests/plugins.

@falloficarus22
Copy link
Copy Markdown
Contributor

image

I verified one regression introduced by the negative-index normalization change in SignalingList:
ObservableList slice operations now raise TypeError because __setitem__ / __delitem__ compare index < 0 unconditionally, but for slice ops index is a slice.

Repro:

from mesa.experimental.mesa_signals import HasObservables, ObservableList

class O(HasObservables):
    items = ObservableList()
    def __init__(self):
        super().__init__()
        self.items = [1,2,3,4]

o = O()
o.items[1:3] = [9,9]   # TypeError on PR branch
del o.items[1:3]       # TypeError on PR branch

@quaquel
Copy link
Copy Markdown
Member Author

quaquel commented Feb 9, 2026

I verified one regression introduced by the negative-index normalization change in SignalingList:
ObservableList slice operations now raise TypeError because setitem / delitem compare index < 0 unconditionally, but for slice ops index is a slice.

I did some more work on this. I explicitly handle slices now, including slices with negative numbers and updated various tests. I will give it a once over hopefully later today but we are getting there.

Comment on lines +89 to +95
self.data[index] = value
self.owner.notify(
self.name,
ListSignals.REPLACED,
index=index,
old=old_value,
new=list(value),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slice assignment emits wrong new payload for one-shot iterables (generators/iterators).

  • In SignalingList.__setitem__ slice path, code does self.data[index] = value and then new=list(value).
  • If value is a generator, it is consumed by assignment first, so emitted signal has new=[] even though list content changed correctly.

Repro:

from unittest.mock import Mock

from mesa.experimental.mesa_signals import HasObservables, ListSignals, ObservableList


class O(HasObservables):
    items = ObservableList()

    def __init__(self):
        super().__init__()
        self.items = [1, 2, 3, 4]


o = O()
handler = Mock()
o.observe("items", ListSignals.REPLACED, handler)

g = (x for x in [8, 9])
o.items[1:3] = g

signal = handler.call_args.args[0]
print("actual list:", list(o.items))
print("signal old:", signal.additional_kwargs["old"])
print("signal new:", signal.additional_kwargs["new"])

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are so many edge cases around signaling list that I am starting to wonder about changing the design a bit. A lot of the trouble stems from trying to reverse engineer old from the first signal. Should we actually just aggregate all signals into a single set with old and new? or can we capture old before the signal?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think reverse-engineering from first signal keeps creating edge-case debt so I would recommend:

  • For ListSignals in batch, aggregate to a single SET(old,new).
  • Capture old before first mutation (once per list per batch), then read new at flush.
  • Keep granular per-op list signals for non-batch mode.

This is much simpler and avoids reconstructing old state from operation metadata entirely.

Copy link
Copy Markdown
Member Author

@quaquel quaquel Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it is very hard to capture old. The current implementation batches signals. Signals are emitted after the change. So, to capture it before the first change, a SignalingList has to detect that a context manager is active on its owning HasObservables instance, and set old in some way. I'll take a look to see how difficult it would be to implement this, and if it is a design that can be encapsulated so it can be used downstream if needed as well.

A SignallingList knows its owner. So it can check for self.owner._batchcontext is not None. If this is true, we are in a batch context and we then only need to add old to some data structure on _BatchContex, and ensure that this is properly flushed to the outermost context manager on each exit.

Copy link
Copy Markdown
Contributor

@falloficarus22 falloficarus22 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObservableList writes directly to _BatchContext internals (ctx._captured_values) in observable_collections.py. A small API like ctx.capture_old_once(name, value) would reduce coupling and make future refactors safer.

Copy link
Copy Markdown
Contributor

@falloficarus22 falloficarus22 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, what's left here is mostly design polish so I'll leave the rest to you maintainers to decide. Meanwhile I'll be closing #3251

@quaquel
Copy link
Copy Markdown
Member Author

quaquel commented Feb 10, 2026

ObservableList writes directly to _BatchContext internals (ctx._captured_values) in observable_collections.py. A small API like ctx.capture_old_once(name, value) would reduce coupling and make future refactors safer.

I added a capture_original_value_once convenience method to _BatchContext

@quaquel
Copy link
Copy Markdown
Member Author

quaquel commented Feb 10, 2026

@falloficarus22 Thanks a lot for your original PR and your contributions to improving this alternative take. I think it came together nicely.

@quaquel quaquel added the experimental Release notes label label Feb 10, 2026
@quaquel quaquel changed the title pathfinding for adding batch and suppress to mesa_signals Adding batch and suppress to mesa_signals Feb 10, 2026
@EwoutH
Copy link
Copy Markdown
Member

EwoutH commented Feb 15, 2026

@codebreaker32 could you give this a look?

Copy link
Copy Markdown
Collaborator

@codebreaker32 codebreaker32 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks Good!

Copy link
Copy Markdown
Member

@EwoutH EwoutH left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

If this is completely ready, please merge ASAP so I can release 3.5.

@quaquel quaquel merged commit 5e1e7a5 into mesa:main Feb 15, 2026
14 checks passed
@quaquel quaquel deleted the pathfinding_batch branch February 16, 2026 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

experimental Release notes label feature Release notes label

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants