Skip to content

Commit 9427e62

Browse files
multicast fine grained note lists to users instead of broadcast
1 parent 6614e2b commit 9427e62

File tree

2 files changed

+63
-6
lines changed

2 files changed

+63
-6
lines changed

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.net.URISyntaxException;
6060
import java.net.UnknownHostException;
6161
import java.util.*;
62+
import java.util.concurrent.ConcurrentHashMap;
6263
import java.util.concurrent.ConcurrentLinkedQueue;
6364

6465
/**
@@ -85,6 +86,8 @@ String getKey() {
8586
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create();
8687
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
8788
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
89+
final Map<String, Queue<NotebookSocket>> userConnectedSockets =
90+
new ConcurrentHashMap<String, Queue<NotebookSocket>>();
8891

8992
private Notebook notebook() {
9093
return ZeppelinServer.notebook;
@@ -160,6 +163,9 @@ public void onMessage(NotebookSocket conn, String msg) {
160163
userAndRoles.addAll(roles);
161164
}
162165
}
166+
if (StringUtils.isEmpty(conn.getUser())) {
167+
addUserConnection(messagereceived.principal, conn);
168+
}
163169
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
164170

165171
/** Lets be elegant here */
@@ -168,8 +174,7 @@ public void onMessage(NotebookSocket conn, String msg) {
168174
unicastNoteList(conn, subject);
169175
break;
170176
case RELOAD_NOTES_FROM_REPO:
171-
//broadcastReloadedNoteList(subject);
172-
unicastNoteList(conn, subject);
177+
broadcastReloadedNoteList(subject);
173178
break;
174179
case GET_HOME_NOTE:
175180
sendHomeNote(conn, userAndRoles, notebook, messagereceived);
@@ -265,6 +270,26 @@ public void onClose(NotebookSocket conn, int code, String reason) {
265270
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
266271
removeConnectionFromAllNote(conn);
267272
connectedSockets.remove(conn);
273+
removeUserConnection(conn.getUser(), conn);
274+
}
275+
276+
private void removeUserConnection(String user, NotebookSocket conn) {
277+
if (userConnectedSockets.containsKey(user)) {
278+
userConnectedSockets.get(user).remove(conn);
279+
} else {
280+
LOG.warn("Closing connection that is absent in user connections");
281+
}
282+
}
283+
284+
private void addUserConnection(String user, NotebookSocket conn) {
285+
conn.setUser(user);
286+
if (userConnectedSockets.containsKey(user)) {
287+
userConnectedSockets.get(user).add(conn);
288+
} else {
289+
Queue<NotebookSocket> socketQueue = new ConcurrentLinkedQueue<>();
290+
socketQueue.add(conn);
291+
userConnectedSockets.put(user, socketQueue);
292+
}
268293
}
269294

270295
protected Message deserializeMessage(String msg) {
@@ -380,8 +405,12 @@ private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
380405
}
381406
}
382407

383-
private void broadcastAll(Message m) {
384-
for (NotebookSocket conn : connectedSockets) {
408+
private void multicastToUser(String user, Message m) {
409+
if (!userConnectedSockets.containsKey(user)) {
410+
LOG.warn("Broadcasting to user that is not in connections map");
411+
return;
412+
}
413+
for (NotebookSocket conn: userConnectedSockets.get(user)) {
385414
try {
386415
conn.send(serializeMessage(m));
387416
} catch (IOException e) {
@@ -502,8 +531,17 @@ public void broadcastInterpreterBindings(String noteId,
502531
}
503532

504533
public void broadcastNoteList(AuthenticationInfo subject) {
534+
//send first to requesting user
505535
List<Map<String, String>> notesInfo = generateNotebooksInfo(false, subject);
506-
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
536+
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
537+
//to others afterwards
538+
for (String user: userConnectedSockets.keySet()) {
539+
if (subject.getUser() == user) {
540+
continue;
541+
}
542+
notesInfo = generateNotebooksInfo(false, new AuthenticationInfo(user));
543+
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
544+
}
507545
}
508546

509547
public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
@@ -512,8 +550,17 @@ public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
512550
}
513551

514552
public void broadcastReloadedNoteList(AuthenticationInfo subject) {
553+
//send first to requesting user
515554
List<Map<String, String>> notesInfo = generateNotebooksInfo(true, subject);
516-
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
555+
multicastToUser(subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo));
556+
//to others afterwards
557+
for (String user: userConnectedSockets.keySet()) {
558+
if (subject.getUser() == user) {
559+
continue;
560+
}
561+
notesInfo = generateNotebooksInfo(true, new AuthenticationInfo(user));
562+
multicastToUser(user, new Message(OP.NOTES_INFO).put("notes", notesInfo));
563+
}
517564
}
518565

519566
void permissionError(NotebookSocket conn, String op,

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import javax.servlet.http.HttpServletRequest;
2222

23+
import org.apache.commons.lang.StringUtils;
2324
import org.eclipse.jetty.websocket.api.Session;
2425
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
2526

@@ -32,12 +33,14 @@ public class NotebookSocket extends WebSocketAdapter {
3233
private NotebookSocketListener listener;
3334
private HttpServletRequest request;
3435
private String protocol;
36+
private String user;
3537

3638
public NotebookSocket(HttpServletRequest req, String protocol,
3739
NotebookSocketListener listener) {
3840
this.listener = listener;
3941
this.request = req;
4042
this.protocol = protocol;
43+
this.user = StringUtils.EMPTY;
4144
}
4245

4346
@Override
@@ -69,4 +72,11 @@ public void send(String serializeMessage) throws IOException {
6972
connection.getRemote().sendString(serializeMessage);
7073
}
7174

75+
public String getUser() {
76+
return user;
77+
}
78+
79+
public void setUser(String user) {
80+
this.user = user;
81+
}
7282
}

0 commit comments

Comments
 (0)