Skip to content

Commit 4b29e19

Browse files
committed
fix(matrix): per-room send queue and immediate read receipts
Two fixes for the Matrix channel plugin: 1. Per-room send queue (fixes #11614): Introduce send-queue.ts that serializes all sends to the same Matrix room through enqueueSend(), with a 150ms inter-send gap to ensure distinct origin_server_ts values. This prevents message ordering races when auto-reply and message tool sends fire concurrently. 2. Immediate read receipts (fixes #25840): Move sendReadReceiptMatrix from handler.ts (deep in the processing pipeline) to events.ts (on message arrival from SDK). Read receipts now fire before any processing, so Element shows 'read' immediately even when the agent is busy. Self-messages are filtered via lazy getUserId resolution. Both changes are contained within the Matrix extension and have no effect on other channels. AI-assisted: This PR was developed with AI assistance (Claude/OpenClaw). Tested against a live Matrix homeserver (Tuwunel/Conduit fork) with both fixes running in production for ~2 weeks via a local plugin fork.
1 parent 9ef0fc2 commit 4b29e19

File tree

4 files changed

+101
-9
lines changed

4 files changed

+101
-9
lines changed

extensions/matrix/src/matrix/monitor/events.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
22
import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk";
33
import type { MatrixAuth } from "../client.js";
4+
import { sendReadReceiptMatrix } from "../send.js";
45
import type { MatrixRawEvent } from "./types.js";
56
import { EventType } from "./types.js";
67

@@ -25,7 +26,35 @@ export function registerMatrixMonitorEvents(params: {
2526
onRoomMessage,
2627
} = params;
2728

28-
client.on("room.message", onRoomMessage);
29+
// Track our own user ID for filtering (resolved lazily on first message)
30+
let selfUserId: string | undefined;
31+
32+
client.on("room.message", (roomId: string, event: MatrixRawEvent) => {
33+
// Fire read receipt immediately — before the handler queues or processes.
34+
// This ensures Element shows "read" even when the agent is busy.
35+
const eventId = event?.event_id;
36+
const senderId = event?.sender;
37+
if (eventId && senderId) {
38+
const doReceipt = async () => {
39+
if (!selfUserId) {
40+
try {
41+
selfUserId = await client.getUserId();
42+
} catch {
43+
return;
44+
}
45+
}
46+
if (senderId === selfUserId) return;
47+
sendReadReceiptMatrix(roomId, eventId, client).catch((err) => {
48+
logVerboseMessage(
49+
`matrix: early read receipt failed room=${roomId} id=${eventId}: ${String(err)}`,
50+
);
51+
});
52+
};
53+
doReceipt();
54+
}
55+
// Delegate to the actual message handler
56+
onRoomMessage(roomId, event);
57+
});
2958

3059
client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => {
3160
const eventId = event?.event_id ?? "unknown";

extensions/matrix/src/matrix/monitor/handler.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import {
2121
import {
2222
reactMatrixMessage,
2323
sendMessageMatrix,
24-
sendReadReceiptMatrix,
2524
sendTypingMatrix,
2625
} from "../send.js";
2726
import {
@@ -602,13 +601,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
602601
return;
603602
}
604603

605-
if (messageId) {
606-
sendReadReceiptMatrix(roomId, messageId, client).catch((err) => {
607-
logVerboseMessage(
608-
`matrix: read receipt failed room=${roomId} id=${messageId}: ${String(err)}`,
609-
);
610-
});
611-
}
604+
// Read receipt is now sent immediately in events.ts when the message
605+
// arrives from the SDK, before handler processing. This ensures "read"
606+
// status shows even when the agent is busy with queued messages.
612607

613608
let didSendReply = false;
614609
const tableMode = core.channel.text.resolveMarkdownTableMode({
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Per-room send queue to ensure message ordering.
3+
*
4+
* Matrix orders messages by origin_server_ts. When multiple code paths
5+
* (auto-reply, message tool, media sends) fire concurrently, they race
6+
* and messages appear out of order. This queue serializes all sends
7+
* per room so they arrive in the order they were enqueued.
8+
*
9+
* See: https://github.com/openclaw/openclaw/issues/11614
10+
*/
11+
12+
const queues = new Map<string, Promise<unknown>>();
13+
14+
/**
15+
* Enqueue a send operation for a specific room.
16+
* Each send waits for the previous one to complete before starting.
17+
* The 150ms gap ensures Matrix server assigns distinct timestamps.
18+
*/
19+
export async function enqueueSend<T>(roomId: string, fn: () => Promise<T>): Promise<T> {
20+
const prev = queues.get(roomId) ?? Promise.resolve();
21+
const SEND_GAP_MS = 150;
22+
23+
const next = prev
24+
.catch(() => {}) // don't let previous failures block the queue
25+
.then(() => delay(SEND_GAP_MS))
26+
.then(() => fn());
27+
28+
// Store the chain (void version) so next enqueue waits for this one
29+
queues.set(
30+
roomId,
31+
next.then(
32+
() => {},
33+
() => {},
34+
),
35+
);
36+
37+
// Clean up empty queues to prevent memory leak
38+
next.finally(() => {
39+
setTimeout(() => {
40+
const current = queues.get(roomId);
41+
if (current && isSettled(current)) {
42+
queues.delete(roomId);
43+
}
44+
}, 1000);
45+
});
46+
47+
return next;
48+
}
49+
50+
function delay(ms: number): Promise<void> {
51+
return new Promise((r) => setTimeout(r, ms));
52+
}
53+
54+
function isSettled(p: Promise<unknown>): boolean {
55+
let settled = false;
56+
p.then(
57+
() => {
58+
settled = true;
59+
},
60+
() => {
61+
settled = true;
62+
},
63+
);
64+
return settled;
65+
}

extensions/matrix/src/matrix/send.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { PollInput } from "openclaw/plugin-sdk";
33
import { getMatrixRuntime } from "../runtime.js";
44
import { buildPollStartContent, M_POLL_START } from "./poll-types.js";
55
import { resolveMatrixClient, resolveMediaMaxBytes } from "./send/client.js";
6+
import { enqueueSend } from "./send-queue.js";
67
import {
78
buildReplyRelation,
89
buildTextContent,
@@ -49,6 +50,7 @@ export async function sendMessageMatrix(
4950
});
5051
try {
5152
const roomId = await resolveMatrixRoomId(client, to);
53+
return await enqueueSend(roomId, async () => {
5254
const cfg = getCore().config.loadConfig();
5355
const tableMode = getCore().channel.text.resolveMarkdownTableMode({
5456
cfg,
@@ -146,6 +148,7 @@ export async function sendMessageMatrix(
146148
messageId: lastMessageId || "unknown",
147149
roomId,
148150
};
151+
}); // enqueueSend
149152
} finally {
150153
if (stopOnDone) {
151154
client.stop();

0 commit comments

Comments
 (0)