feat(core): initial implementation of syncing#17814
Conversation
packages/opencode/src/sync/index.ts
Outdated
| } | ||
|
|
||
| Database.immediateTransaction((tx) => { | ||
| const id = Identifier.ascending("workspace") |
There was a problem hiding this comment.
| const id = Identifier.ascending("workspace") | |
| const id = Identifier.ascending("event") |
There was a problem hiding this comment.
Once you have the other stuff from dev you can use the Newtype pattern for the ID :D
| `id` text PRIMARY KEY, | ||
| `aggregate_id` text NOT NULL, |
There was a problem hiding this comment.
Do we want an index on event(aggregate_id, seq)?
There was a problem hiding this comment.
I almost did that! well, not an explicit index, but a constraint saying that the combo of (aggregate_id, seq) was always unique to ensure we don't insert duplicate events (or make sure sequences aren't messed up).
An index can make writes slower, and consumes more memory, so I decided to wait until we understand the tradeoffs here better. We will be querying the latest events for an aggregate quite a bit so an index probably makes sense, but want to measure it first
packages/opencode/src/sync/index.ts
Outdated
| const row = Database.use((db) => | ||
| db | ||
| .select({ seq: EventSequenceTable.seq }) | ||
| .from(EventSequenceTable) | ||
| .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) | ||
| .get(), | ||
| ) | ||
|
|
||
| const expected = row ? row.seq + 1 : 0 | ||
| if (event.seq !== expected) { | ||
| throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`) | ||
| } | ||
|
|
||
| process(def, event) |
There was a problem hiding this comment.
Should this use immediateTransaction as well?
There was a problem hiding this comment.
It's not as important here because we aren't generating sequence ids. The main role of this is to check to make sure we've already seen all the events before it; once we've checked that, it doesn't matter what else happens in the system, the same event could even be recorded multiple times and we would just ignore any writes after the first one
This function is the weakest part of the system though and I need to rewrite it. It needs to do a lot more, such as handle an array of sequential events, validating their seq ids, filtering out any that we've already seen, and writing own the new ones all at once. I'm not sure we need immediate, maybe we do, need to think through all of that more
| Schema extends ZodObject<Record<Agg, z.ZodString>>, | ||
| >(input: { type: Type; version: Version; aggregate: Agg; schema: Schema }) { | ||
| const def = { | ||
| type: input.type, |
There was a problem hiding this comment.
Should we also use the versioned name here?
AI ELABORATION
SyncEvent.define(...)returns this object:
{
type: "session.updated",
version: "v1",
...
}
That object gets stored as Session.Event.Updated.
- A subscriber does:
Bus.subscribe(Session.Event.Updated, cb)
Bus.subscribe uses def.type directly, so it subscribes to "session.updated".
- Later,
SyncEvent.run(Session.Event.Updated, data)runs. Insiderun(), it does this:
const versionedDef = { ...def, type: versionedName(def.type, def.version) }
Bus.publish(versionedDef, ...)
So it does not publish the original object. It creates a new object with type: "session.updated.v1".
Bus.publish(...)sends the event to listeners for that exact string:"session.updated.v1". But the subscriber was listening on"session.updated". So it never fires.
There was a problem hiding this comment.
Yeah, this is a good catch, I think that will fail. I hadn't tested listening for individual events as that won't usually happen, you'll just stream in all of the events and record them. That said, this should work, and I should probably rethink this overall.
I've done a couple iterations of this, and we don't use the version field much anyway, so maybe we should merge the version into the name at the start (with a format like session.updated/v1 or something) and just forget about storing it as a separate field
208e947 to
434e454
Compare
00edb05 to
55c6eb4
Compare
7a05eaa to
4d43226
Compare
/sync-eventsroute to get a stream of sync events. You can record this anywhere. After this PR I will add more routes for replaying these events which will let you recreate sessionsThis PR is risky, but I've done extensive testing and will put it through beta first. The new code paths are relatively simple, and the db mutations should be exactly the same for each of the write paths of session data. Beta should make any major problems clear, and I'll continue to test.
VIew this doc for more technical details of how I came to the solutions in this code: https://gist.github.com/jlongster/cdd24f2b934976f9d1d486e4bce35d6c