5959import java .net .URISyntaxException ;
6060import java .net .UnknownHostException ;
6161import java .util .*;
62+ import java .util .concurrent .ConcurrentHashMap ;
6263import java .util .concurrent .ConcurrentLinkedQueue ;
6364
6465/**
@@ -85,6 +86,8 @@ String getKey() {
8586 Gson gson = new GsonBuilder ().setDateFormat ("yyyy-MM-dd'T'HH:mm:ssZ" ).create ();
8687 final Map <String , List <NotebookSocket >> noteSocketMap = new HashMap <>();
8788 final Queue <NotebookSocket > connectedSockets = new ConcurrentLinkedQueue <>();
89+ final Map <String , Queue <NotebookSocket >> userConnectedSockets =
90+ new ConcurrentHashMap <String , Queue <NotebookSocket >>();
8891
8992 private Notebook notebook () {
9093 return ZeppelinServer .notebook ;
@@ -160,6 +163,9 @@ public void onMessage(NotebookSocket conn, String msg) {
160163 userAndRoles .addAll (roles );
161164 }
162165 }
166+ if (StringUtils .isEmpty (conn .getUser ())) {
167+ addUserConnection (messagereceived .principal , conn );
168+ }
163169 AuthenticationInfo subject = new AuthenticationInfo (messagereceived .principal );
164170
165171 /** Lets be elegant here */
@@ -168,8 +174,7 @@ public void onMessage(NotebookSocket conn, String msg) {
168174 unicastNoteList (conn , subject );
169175 break ;
170176 case RELOAD_NOTES_FROM_REPO :
171- //broadcastReloadedNoteList(subject);
172- unicastNoteList (conn , subject );
177+ broadcastReloadedNoteList (subject );
173178 break ;
174179 case GET_HOME_NOTE :
175180 sendHomeNote (conn , userAndRoles , notebook , messagereceived );
@@ -265,6 +270,26 @@ public void onClose(NotebookSocket conn, int code, String reason) {
265270 .getRemoteAddr (), conn .getRequest ().getRemotePort (), code , reason );
266271 removeConnectionFromAllNote (conn );
267272 connectedSockets .remove (conn );
273+ removeUserConnection (conn .getUser (), conn );
274+ }
275+
276+ private void removeUserConnection (String user , NotebookSocket conn ) {
277+ if (userConnectedSockets .containsKey (user )) {
278+ userConnectedSockets .get (user ).remove (conn );
279+ } else {
280+ LOG .warn ("Closing connection that is absent in user connections" );
281+ }
282+ }
283+
284+ private void addUserConnection (String user , NotebookSocket conn ) {
285+ conn .setUser (user );
286+ if (userConnectedSockets .containsKey (user )) {
287+ userConnectedSockets .get (user ).add (conn );
288+ } else {
289+ Queue <NotebookSocket > socketQueue = new ConcurrentLinkedQueue <>();
290+ socketQueue .add (conn );
291+ userConnectedSockets .put (user , socketQueue );
292+ }
268293 }
269294
270295 protected Message deserializeMessage (String msg ) {
@@ -380,8 +405,12 @@ private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
380405 }
381406 }
382407
383- private void broadcastAll (Message m ) {
384- for (NotebookSocket conn : connectedSockets ) {
408+ private void multicastToUser (String user , Message m ) {
409+ if (!userConnectedSockets .containsKey (user )) {
410+ LOG .warn ("Broadcasting to user that is not in connections map" );
411+ return ;
412+ }
413+ for (NotebookSocket conn : userConnectedSockets .get (user )) {
385414 try {
386415 conn .send (serializeMessage (m ));
387416 } catch (IOException e ) {
@@ -502,8 +531,17 @@ public void broadcastInterpreterBindings(String noteId,
502531 }
503532
504533 public void broadcastNoteList (AuthenticationInfo subject ) {
534+ //send first to requesting user
505535 List <Map <String , String >> notesInfo = generateNotebooksInfo (false , subject );
506- broadcastAll (new Message (OP .NOTES_INFO ).put ("notes" , notesInfo ));
536+ multicastToUser (subject .getUser (), new Message (OP .NOTES_INFO ).put ("notes" , notesInfo ));
537+ //to others afterwards
538+ for (String user : userConnectedSockets .keySet ()) {
539+ if (subject .getUser () == user ) {
540+ continue ;
541+ }
542+ notesInfo = generateNotebooksInfo (false , new AuthenticationInfo (user ));
543+ multicastToUser (user , new Message (OP .NOTES_INFO ).put ("notes" , notesInfo ));
544+ }
507545 }
508546
509547 public void unicastNoteList (NotebookSocket conn , AuthenticationInfo subject ) {
@@ -512,8 +550,17 @@ public void unicastNoteList(NotebookSocket conn, AuthenticationInfo subject) {
512550 }
513551
514552 public void broadcastReloadedNoteList (AuthenticationInfo subject ) {
553+ //send first to requesting user
515554 List <Map <String , String >> notesInfo = generateNotebooksInfo (true , subject );
516- broadcastAll (new Message (OP .NOTES_INFO ).put ("notes" , notesInfo ));
555+ multicastToUser (subject .getUser (), new Message (OP .NOTES_INFO ).put ("notes" , notesInfo ));
556+ //to others afterwards
557+ for (String user : userConnectedSockets .keySet ()) {
558+ if (subject .getUser () == user ) {
559+ continue ;
560+ }
561+ notesInfo = generateNotebooksInfo (true , new AuthenticationInfo (user ));
562+ multicastToUser (user , new Message (OP .NOTES_INFO ).put ("notes" , notesInfo ));
563+ }
517564 }
518565
519566 void permissionError (NotebookSocket conn , String op ,
0 commit comments