Skip to content

Commit 8937e9a

Browse files
authored
Merge branch 'feat/federation' into feat/edit-message
2 parents 7c3d09d + 1ff6cd1 commit 8937e9a

File tree

19 files changed

+412
-56
lines changed

19 files changed

+412
-56
lines changed

apps/meteor/app/lib/server/functions/createRoom.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { Rooms, Subscriptions, Users } from '@rocket.chat/models';
77
import { Meteor } from 'meteor/meteor';
88

99
import { createDirectRoom } from './createDirectRoom';
10+
import { setupTypingEventListenerForRoom } from '../../../../ee/server/hooks/federation';
1011
import { callbacks } from '../../../../lib/callbacks';
1112
import { beforeCreateRoomCallback } from '../../../../lib/callbacks/beforeCreateRoomCallback';
1213
import { calculateRoomRolePriorityFromRoles } from '../../../../lib/roles/calculateRoomRolePriorityFromRoles';
@@ -276,7 +277,9 @@ export const createRoom = async <T extends RoomType>(
276277
}
277278

278279
if (shouldBeHandledByFederation && federationVersion === 'native') {
280+
// TODO: move this to the hooks folder
279281
await FederationMatrix.createRoom(room, owner, members);
282+
setupTypingEventListenerForRoom(room._id);
280283
}
281284

282285
void Apps.self?.triggerEvent(AppEvents.IPostRoomCreate, room);

apps/meteor/app/reactions/server/setReaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Apps, AppEvents } from '@rocket.chat/apps';
2-
import { api, Message } from '@rocket.chat/core-services';
2+
import { api } from '@rocket.chat/core-services';
33
import type { IMessage, IRoom, IUser } from '@rocket.chat/core-typings';
44
import type { ServerMethods } from '@rocket.chat/ddp-client';
55
import { Messages, EmojiCustom, Rooms, Users } from '@rocket.chat/models';

apps/meteor/ee/server/hooks/federation/index.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { FederationMatrix } from '@rocket.chat/core-services';
21
import {
32
isEditedMessage,
43
isMessageFromMatrixFederation,
@@ -7,11 +6,32 @@ import {
76
type IRoom,
87
type IUser,
98
} from '@rocket.chat/core-typings';
9+
import { api, FederationMatrix } from '@rocket.chat/core-services';
10+
import { Rooms } from '@rocket.chat/models';
1011

12+
import notifications from '../../../../app/notifications/server/lib/Notifications';
1113
import { callbacks } from '../../../../lib/callbacks';
1214
import { afterLeaveRoomCallback } from '../../../../lib/callbacks/afterLeaveRoomCallback';
1315
import { afterRemoveFromRoomCallback } from '../../../../lib/callbacks/afterRemoveFromRoomCallback';
1416

17+
callbacks.add(
18+
'afterDeleteMessage',
19+
async (message: IMessage) => {
20+
if (!message.federation?.eventId) {
21+
return;
22+
}
23+
24+
const isFromExternalUser = message.u?.username?.includes(':');
25+
if (isFromExternalUser) {
26+
return;
27+
}
28+
29+
await FederationMatrix.deleteMessage(message);
30+
},
31+
callbacks.priority.MEDIUM,
32+
'native-federation-after-delete-message',
33+
);
34+
1535
callbacks.add(
1636
'native-federation.onAddUsersToRoom',
1737
async ({ invitees, inviter }, room) => FederationMatrix.inviteUsersToRoom(room, invitees, inviter),
@@ -93,3 +113,22 @@ callbacks.add(
93113
callbacks.priority.HIGH,
94114
'federation-matrix-after-room-message-updated',
95115
);
116+
117+
export const setupTypingEventListenerForRoom = (roomId: string): void => {
118+
notifications.streamRoom.on(`${roomId}/user-activity`, (username, activity) => {
119+
if (Array.isArray(activity) && (!activity.length || activity.includes('user-typing'))) {
120+
void api.broadcast('user.typing', {
121+
user: { username },
122+
isTyping: activity.includes('user-typing'),
123+
roomId,
124+
});
125+
}
126+
});
127+
};
128+
129+
export const setupInternalEDUEventListeners = async () => {
130+
const federatedRooms = await Rooms.findFederatedRooms({ projection: { _id: 1 } }).toArray();
131+
for (const room of federatedRooms) {
132+
setupTypingEventListenerForRoom(room._id);
133+
}
134+
};

apps/meteor/ee/server/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ export * from './apps/startup';
2020
export { registerEEBroker } from './startup';
2121

2222
await License.onLicense('federation', async () => {
23-
await import('./hooks/federation');
23+
const { setupInternalEDUEventListeners } = await import('./hooks/federation');
24+
await setupInternalEDUEventListeners();
2425
});

apps/meteor/server/modules/listeners/listeners.module.ts

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -151,35 +151,20 @@ export class ListenersModule {
151151
notifications.notifyRoom(rid, 'videoconf', callId);
152152
});
153153

154-
service.onEvent('presence.status', ({ user }) => {
155-
const { _id, username, name, status, statusText, roles } = user;
156-
if (!status || !username) {
157-
return;
158-
}
159-
160-
notifications.notifyUserInThisInstance(_id, 'userData', {
161-
type: 'updated',
162-
id: _id,
163-
diff: {
164-
status,
165-
...(statusText && { statusText }),
166-
},
167-
unset: {},
168-
});
169-
170-
notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]);
171-
172-
if (_id) {
173-
notifications.sendPresence(_id, username, STATUS_MAP[status], statusText);
174-
}
175-
});
154+
service.onEvent('presence.status', ({ user }) => this.handlePresence({ user }, notifications));
176155

177156
service.onEvent('user.updateCustomStatus', (userStatus) => {
178157
notifications.notifyLoggedInThisInstance('updateCustomUserStatus', {
179158
userStatusData: userStatus,
180159
});
181160
});
182161

162+
service.onEvent('federation-matrix.user.typing', ({ isTyping, roomId, username }) => {
163+
notifications.notifyRoom(roomId, 'user-activity', username, isTyping ? ['user-typing'] : []);
164+
});
165+
166+
service.onEvent('federation-matrix.user.presence.status', ({ user }) => this.handlePresence({ user }, notifications));
167+
183168
service.onEvent('watch.messages', async ({ message }) => {
184169
if (!message.rid) {
185170
return;
@@ -506,4 +491,30 @@ export class ListenersModule {
506491
notifications.streamRoomMessage.emit(roomId, acknowledgeMessage);
507492
});
508493
}
494+
495+
private handlePresence(
496+
{ user }: { user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'> },
497+
notifications: NotificationsModule,
498+
): void {
499+
const { _id, username, name, status, statusText, roles } = user;
500+
if (!status || !username) {
501+
return;
502+
}
503+
504+
notifications.notifyUserInThisInstance(_id, 'userData', {
505+
type: 'updated',
506+
id: _id,
507+
diff: {
508+
status,
509+
...(statusText && { statusText }),
510+
},
511+
unset: {},
512+
});
513+
514+
notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]);
515+
516+
if (_id) {
517+
notifications.sendPresence(_id, username, STATUS_MAP[status], statusText);
518+
}
519+
}
509520
}

apps/meteor/server/services/messages/hooks/BeforeFederationActions.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { isMessageFromMatrixFederation, isRoomFederated } from '@rocket.chat/core-typings';
22
import type { AtLeast, IMessage, IRoom } from '@rocket.chat/core-typings';
33

4-
import { isFederationEnabled, isFederationReady } from '../../federation/utils';
4+
import { getFederationVersion, isFederationEnabled, isFederationReady } from '../../federation/utils';
55

66
export class FederationActions {
77
public static shouldPerformAction(message: IMessage, room: AtLeast<IRoom, 'federated'>): boolean {
88
if (isMessageFromMatrixFederation(message) || isRoomFederated(room)) {
9-
return isFederationEnabled() && isFederationReady();
9+
return getFederationVersion() === 'native' || (isFederationEnabled() && isFederationReady());
1010
}
1111

1212
return true;

apps/meteor/server/services/messages/service.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,21 @@ export class MessageService extends ServiceClassInternal implements IMessageServ
9090
rid,
9191
msg,
9292
federation_event_id,
93+
tmid,
9394
}: {
9495
fromId: string;
9596
rid: string;
9697
msg: string;
9798
federation_event_id: string;
99+
tmid?: string;
98100
}): Promise<IMessage> {
99-
return executeSendMessage(fromId, { rid, msg, federation: { eventId: federation_event_id } });
101+
const threadParams = tmid ? { tmid, tshow: true } : {};
102+
return executeSendMessage(fromId, {
103+
rid,
104+
msg,
105+
...threadParams,
106+
federation: { eventId: federation_event_id },
107+
});
100108
}
101109

102110
async sendMessageWithValidation(user: IUser, message: Partial<IMessage>, room: Partial<IRoom>, upsert = false): Promise<IMessage> {

ee/packages/federation-matrix/src/FederationMatrix.ts

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import 'reflect-metadata';
22

3+
import type { PresenceState } from '@hs/core';
34
import { ConfigService, createFederationContainer, getAllServices } from '@hs/federation-sdk';
45
import type { HomeserverEventSignatures, HomeserverServices, FederationContainerOptions } from '@hs/federation-sdk';
56
import { type IFederationMatrixService, Room, ServiceClass, Settings } from '@rocket.chat/core-services';
6-
import type { IMessage, IRoom, IUser } from '@rocket.chat/core-typings';
7+
import { isDeletedMessage, isMessageFromMatrixFederation, UserStatus, type IMessage, type IRoom, type IUser } from '@rocket.chat/core-typings';
78
import { Emitter } from '@rocket.chat/emitter';
89
import { Router } from '@rocket.chat/http-router';
910
import { Logger } from '@rocket.chat/logger';
@@ -63,6 +64,58 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
6364
createFederationContainer(containerOptions, config);
6465
instance.homeserverServices = getAllServices();
6566
instance.buildMatrixHTTPRoutes();
67+
instance.onEvent('user.typing', async ({ isTyping, roomId, user: { username } }): Promise<void> => {
68+
if (!roomId || !username) {
69+
return;
70+
}
71+
const externalRoomId = await MatrixBridgedRoom.getExternalRoomId(roomId);
72+
if (!externalRoomId) {
73+
return;
74+
}
75+
const localUser = await Users.findOneByUsername(username, { projection: { _id: 1 } });
76+
if (!localUser) {
77+
return;
78+
}
79+
const externalUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(localUser._id);
80+
if (!externalUserId) {
81+
return;
82+
}
83+
void instance.homeserverServices.edu.sendTypingNotification(externalRoomId, externalUserId, isTyping);
84+
});
85+
instance.onEvent(
86+
'presence.status',
87+
async ({ user }: { user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'> }): Promise<void> => {
88+
if (!user.username || !user.status) {
89+
return;
90+
}
91+
const localUser = await Users.findOneByUsername(user.username, { projection: { _id: 1 } });
92+
if (!localUser) {
93+
return;
94+
}
95+
const externalUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(localUser._id);
96+
if (!externalUserId) {
97+
return;
98+
}
99+
100+
const roomsUserIsMemberOf = await Subscriptions.findUserFederatedRoomIds(localUser._id).toArray();
101+
const statusMap: Record<UserStatus, PresenceState> = {
102+
[UserStatus.ONLINE]: 'online',
103+
[UserStatus.OFFLINE]: 'offline',
104+
[UserStatus.AWAY]: 'unavailable',
105+
[UserStatus.BUSY]: 'unavailable',
106+
[UserStatus.DISABLED]: 'offline',
107+
};
108+
void instance.homeserverServices.edu.sendPresenceUpdateToRooms(
109+
[
110+
{
111+
user_id: externalUserId,
112+
presence: statusMap[user.status] || 'offline',
113+
},
114+
],
115+
roomsUserIsMemberOf.map(({ externalRoomId }) => externalRoomId),
116+
);
117+
},
118+
);
66119

67120
return instance;
68121
}
@@ -184,7 +237,41 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
184237

185238
const actualMatrixUserId = existingMatrixUserId || matrixUserId;
186239

187-
const result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId);
240+
let result;
241+
242+
if (!message.tmid) {
243+
result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId);
244+
} else {
245+
const threadRootMessage = await Messages.findOneById(message.tmid);
246+
const threadRootEventId = threadRootMessage?.federation?.eventId;
247+
248+
if (threadRootEventId) {
249+
const latestThreadMessage = await Messages.findOne(
250+
{
251+
'tmid': message.tmid,
252+
'federation.eventId': { $exists: true },
253+
'_id': { $ne: message._id }, // Exclude the current message
254+
},
255+
{ sort: { ts: -1 } },
256+
);
257+
const latestThreadEventId = latestThreadMessage?.federation?.eventId;
258+
259+
result = await this.homeserverServices.message.sendThreadMessage(
260+
matrixRoomId,
261+
message.msg,
262+
actualMatrixUserId,
263+
threadRootEventId,
264+
latestThreadEventId,
265+
);
266+
} else {
267+
this.logger.warn('Thread root event ID not found, sending as regular message');
268+
result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId);
269+
}
270+
}
271+
272+
if (!result) {
273+
throw new Error('Failed to send message to Matrix - no result returned');
274+
}
188275

189276
await Messages.setFederationEventIdById(message._id, result.eventId);
190277

@@ -195,6 +282,39 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
195282
}
196283
}
197284

285+
async deleteMessage(message: IMessage): Promise<void> {
286+
try {
287+
if (!isMessageFromMatrixFederation(message) || isDeletedMessage(message)) {
288+
return;
289+
}
290+
const matrixRoomId = await MatrixBridgedRoom.getExternalRoomId(message.rid);
291+
if (!matrixRoomId) {
292+
throw new Error(`No Matrix room mapping found for room ${message.rid}`);
293+
}
294+
const matrixDomain = await this.getMatrixDomain();
295+
const matrixUserId = `@${message.u.username}:${matrixDomain}`;
296+
const existingMatrixUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(message.u._id);
297+
if (!existingMatrixUserId) {
298+
await MatrixBridgedUser.createOrUpdateByLocalId(message.u._id, matrixUserId, true, matrixDomain);
299+
}
300+
301+
if (!this.homeserverServices) {
302+
this.logger.warn('Homeserver services not available, skipping message redaction');
303+
return;
304+
}
305+
const matrixEventId = message.federation?.eventId;
306+
if (!matrixEventId) {
307+
throw new Error(`No Matrix event ID mapping found for message ${message._id}`);
308+
}
309+
const eventId = await this.homeserverServices.message.redactMessage(matrixRoomId, matrixEventId, matrixUserId);
310+
311+
this.logger.debug('Message Redaction sent to Matrix successfully:', eventId);
312+
} catch (error) {
313+
this.logger.error('Failed to send redaction to Matrix:', error);
314+
throw error;
315+
}
316+
}
317+
198318
async inviteUsersToRoom(room: IRoom, usersUserName: string[], inviter: IUser): Promise<void> {
199319
try {
200320
const matrixRoomId = await MatrixBridgedRoom.getExternalRoomId(room._id);

ee/packages/federation-matrix/src/api/_matrix/transactions.ts

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -172,19 +172,15 @@ export const getMatrixTransactionsRoutes = (services: HomeserverServices) => {
172172
async (c) => {
173173
const body = await c.req.json();
174174

175-
const { pdus = [] } = body;
176-
177-
if (pdus.length === 0) {
178-
return {
179-
body: {
180-
pdus: {},
181-
edus: {},
182-
},
183-
statusCode: 200,
184-
};
175+
const { pdus = [], edus = [] } = body;
176+
177+
if (pdus.length > 0) {
178+
await event.processIncomingPDUs(pdus);
185179
}
186180

187-
await event.processIncomingPDUs(pdus);
181+
if (edus.length > 0) {
182+
await event.processIncomingEDUs(edus);
183+
}
188184

189185
return {
190186
body: {

0 commit comments

Comments
 (0)