Skip to content

feat(core): initial implementation of syncing#17814

Merged
jlongster merged 22 commits intodevfrom
jlongster/syncer
Mar 25, 2026
Merged

feat(core): initial implementation of syncing#17814
jlongster merged 22 commits intodevfrom
jlongster/syncer

Conversation

@jlongster
Copy link
Copy Markdown
Contributor

@jlongster jlongster commented Mar 16, 2026

  • This is a system inspired by event sourcing that tracks mutations of session-related data through events
  • We don't need distributed clocks. We only support a single writer and many readers. Events can be total ordered via a sequential integer, guaranteed to update atomically via sqlite transactions
  • Only events per session are guaranteed an order. New sessions always start with a new sequence of 0
  • There is a new /sync-events route to get a stream of sync events. You can record this anywhere. After this PR I will add more routes for replaying these events which will let you recreate sessions
  • This PR changes all session mutations to go through the new system. I decided not to put every single new code path behind a flag because it would be onerous. I did put a few things behind a flag, like actually writing the events into the db, so that we can easily change the schema if we need to and to make sure it's not bloating the database too much yet.

This PR is risky, but I've done extensive testing and will put it through beta first. The new code paths are relatively simple, and the db mutations should be exactly the same for each of the write paths of session data. Beta should make any major problems clear, and I'll continue to test.

VIew this doc for more technical details of how I came to the solutions in this code: https://gist.github.com/jlongster/cdd24f2b934976f9d1d486e4bce35d6c

}

Database.immediateTransaction((tx) => {
const id = Identifier.ascending("workspace")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const id = Identifier.ascending("workspace")
const id = Identifier.ascending("event")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you have the other stuff from dev you can use the Newtype pattern for the ID :D

Comment on lines +7 to +8
`id` text PRIMARY KEY,
`aggregate_id` text NOT NULL,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want an index on event(aggregate_id, seq)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost did that! well, not an explicit index, but a constraint saying that the combo of (aggregate_id, seq) was always unique to ensure we don't insert duplicate events (or make sure sequences aren't messed up).

An index can make writes slower, and consumes more memory, so I decided to wait until we understand the tradeoffs here better. We will be querying the latest events for an aggregate quite a bit so an index probably makes sense, but want to measure it first

Comment on lines +119 to +132
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)

const expected = row ? row.seq + 1 : 0
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}

process(def, event)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use immediateTransaction as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not as important here because we aren't generating sequence ids. The main role of this is to check to make sure we've already seen all the events before it; once we've checked that, it doesn't matter what else happens in the system, the same event could even be recorded multiple times and we would just ignore any writes after the first one

This function is the weakest part of the system though and I need to rewrite it. It needs to do a lot more, such as handle an array of sequential events, validating their seq ids, filtering out any that we've already seen, and writing own the new ones all at once. I'm not sure we need immediate, maybe we do, need to think through all of that more

Schema extends ZodObject<Record<Agg, z.ZodString>>,
>(input: { type: Type; version: Version; aggregate: Agg; schema: Schema }) {
const def = {
type: input.type,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also use the versioned name here?

AI ELABORATION

  1. SyncEvent.define(...) returns this object:
{
  type: "session.updated",
  version: "v1",
  ...
}

That object gets stored as Session.Event.Updated.

  1. A subscriber does:
Bus.subscribe(Session.Event.Updated, cb)

Bus.subscribe uses def.type directly, so it subscribes to "session.updated".

  1. Later, SyncEvent.run(Session.Event.Updated, data) runs. Inside run(), it does this:
const versionedDef = { ...def, type: versionedName(def.type, def.version) }
Bus.publish(versionedDef, ...)

So it does not publish the original object. It creates a new object with type: "session.updated.v1".

  1. Bus.publish(...) sends the event to listeners for that exact string: "session.updated.v1". But the subscriber was listening on "session.updated". So it never fires.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a good catch, I think that will fail. I hadn't tested listening for individual events as that won't usually happen, you'll just stream in all of the events and record them. That said, this should work, and I should probably rethink this overall.

I've done a couple iterations of this, and we don't use the version field much anyway, so maybe we should merge the version into the name at the start (with a format like session.updated/v1 or something) and just forget about storing it as a separate field

@jlongster jlongster force-pushed the jlongster/syncer branch 3 times, most recently from 208e947 to 434e454 Compare March 23, 2026 03:31
@jlongster jlongster marked this pull request as ready for review March 24, 2026 01:01
@jlongster jlongster added beta and removed beta labels Mar 24, 2026
@jlongster jlongster merged commit b0017bf into dev Mar 25, 2026
8 checks passed
@jlongster jlongster deleted the jlongster/syncer branch March 25, 2026 14:47
Andres77872 pushed a commit to Andres77872/opencode that referenced this pull request Mar 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants