Skip to content

Commit 6a3906f

Browse files
committed
Merge branch 'master' into notebook-search
Conflicts: zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
2 parents 2b2f8dc + 4c269e6 commit 6a3906f

File tree

5 files changed

+62
-75
lines changed

5 files changed

+62
-75
lines changed

zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java

Lines changed: 39 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -60,55 +60,65 @@
6060
* Main class of Zeppelin.
6161
*
6262
*/
63-
6463
public class ZeppelinServer extends Application {
6564
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
6665

6766
public static Notebook notebook;
68-
public static NotebookServer notebookServer;
69-
public static Server jettyServer;
67+
public static Server jettyWebServer;
68+
public static NotebookServer notebookWsServer;
7069

7170
private SchedulerFactory schedulerFactory;
7271
private InterpreterFactory replFactory;
7372
private NotebookRepo notebookRepo;
7473
private SearchService notebookIndex;
7574

76-
public static void main(String[] args) throws Exception {
75+
public ZeppelinServer() throws Exception {
7776
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
78-
conf.setProperty("args", args);
7977

80-
jettyServer = setupJettyServer(conf);
78+
this.schedulerFactory = new SchedulerFactory();
79+
this.replFactory = new InterpreterFactory(conf, notebookWsServer);
80+
this.notebookRepo = new NotebookRepoSync(conf);
81+
82+
notebook = new Notebook(conf,
83+
notebookRepo, schedulerFactory, replFactory, notebookWsServer, notebookIndex);
84+
}
85+
86+
public static void main(String[] args) throws InterruptedException {
87+
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
88+
conf.setProperty("args", args);
8189

8290
// REST api
83-
final ServletContextHandler restApi = setupRestApiContextHandler(conf);
91+
final ServletContextHandler restApiContext = setupRestApiContextHandler(conf);
8492

8593
// Notebook server
86-
final ServletContextHandler notebook = setupNotebookServer(conf);
94+
final ServletContextHandler notebookContext = setupNotebookServer(conf);
8795

8896
// Web UI
8997
final WebAppContext webApp = setupWebAppContext(conf);
9098

9199
// add all handlers
92100
ContextHandlerCollection contexts = new ContextHandlerCollection();
93-
contexts.setHandlers(new Handler[]{restApi, notebook, webApp});
94-
jettyServer.setHandler(contexts);
101+
contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp});
102+
103+
jettyWebServer = setupJettyServer(conf);
104+
jettyWebServer.setHandler(contexts);
95105

96-
LOG.info("Start zeppelin server");
106+
LOG.info("Starting zeppelin server");
97107
try {
98-
jettyServer.start();
108+
jettyWebServer.start(); //Instantiates ZeppelinServer
99109
} catch (Exception e) {
100110
LOG.error("Error while running jettyServer", e);
101111
System.exit(-1);
102112
}
103-
LOG.info("Started zeppelin server");
113+
LOG.info("Done, zeppelin server started");
104114

105115
Runtime.getRuntime().addShutdownHook(new Thread(){
106116
@Override public void run() {
107117
LOG.info("Shutting down Zeppelin Server ... ");
108118
try {
109-
jettyServer.stop();
110-
ZeppelinServer.notebook.getInterpreterFactory().close();
111-
ZeppelinServer.notebook.close();
119+
jettyWebServer.stop();
120+
notebook.getInterpreterFactory().close();
121+
notebook.close();
112122
} catch (Exception e) {
113123
LOG.error("Error while stopping servlet container", e);
114124
}
@@ -127,18 +137,15 @@ public static void main(String[] args) throws Exception {
127137
System.exit(0);
128138
}
129139

130-
jettyServer.join();
140+
jettyWebServer.join();
131141
ZeppelinServer.notebook.getInterpreterFactory().close();
132142
}
133143

134-
private static Server setupJettyServer(ZeppelinConfiguration conf)
135-
throws Exception {
136-
144+
private static Server setupJettyServer(ZeppelinConfiguration conf) {
137145
AbstractConnector connector;
138146
if (conf.useSsl()) {
139147
connector = new SslSelectChannelConnector(getSslContextFactory(conf));
140-
}
141-
else {
148+
} else {
142149
connector = new SelectChannelConnector();
143150
}
144151

@@ -155,11 +162,9 @@ private static Server setupJettyServer(ZeppelinConfiguration conf)
155162
return server;
156163
}
157164

158-
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf)
159-
throws Exception {
160-
161-
notebookServer = new NotebookServer();
162-
final ServletHolder servletHolder = new ServletHolder(notebookServer);
165+
private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) {
166+
notebookWsServer = new NotebookServer();
167+
final ServletHolder servletHolder = new ServletHolder(notebookWsServer);
163168
servletHolder.setInitParameter("maxTextMessageSize", "1024000");
164169

165170
final ServletContextHandler cxfContext = new ServletContextHandler(
@@ -173,9 +178,8 @@ private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration c
173178
return cxfContext;
174179
}
175180

176-
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
177-
throws Exception {
178-
181+
@SuppressWarnings("deprecation")
182+
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
179183
// Note that the API for the SslContextFactory is different for
180184
// Jetty version 9
181185
SslContextFactory sslContextFactory = new SslContextFactory();
@@ -196,6 +200,7 @@ private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf
196200
return sslContextFactory;
197201
}
198202

203+
@SuppressWarnings("unused") //TODO(bzz) why unused?
199204
private static SSLContext getSslContext(ZeppelinConfiguration conf)
200205
throws Exception {
201206

@@ -242,40 +247,25 @@ private static WebAppContext setupWebAppContext(
242247
webApp.setTempDirectory(warTempDirectory);
243248
}
244249
// Explicit bind to root
245-
webApp.addServlet(
246-
new ServletHolder(new DefaultServlet()),
247-
"/*"
248-
);
250+
webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*");
249251
return webApp;
250252
}
251253

252-
public ZeppelinServer() throws Exception {
253-
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
254-
255-
this.schedulerFactory = new SchedulerFactory();
256-
this.replFactory = new InterpreterFactory(conf, notebookServer);
257-
this.notebookIndex = new SearchService();
258-
this.notebookRepo = new NotebookRepoSync(conf);
259-
260-
notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory,
261-
notebookServer, notebookIndex);
262-
}
263-
264254
@Override
265255
public Set<Class<?>> getClasses() {
266256
Set<Class<?>> classes = new HashSet<Class<?>>();
267257
return classes;
268258
}
269259

270260
@Override
271-
public java.util.Set<java.lang.Object> getSingletons() {
272-
Set<Object> singletons = new HashSet<Object>();
261+
public Set<Object> getSingletons() {
262+
Set<Object> singletons = new HashSet<>();
273263

274264
/** Rest-api root endpoint */
275265
ZeppelinRestApi root = new ZeppelinRestApi();
276266
singletons.add(root);
277267

278-
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer, notebookIndex);
268+
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer, notebookIndex);
279269
singletons.add(notebookApi);
280270

281271
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.net.URISyntaxException;
2121
import java.net.UnknownHostException;
2222
import java.util.*;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
2325
import javax.servlet.http.HttpServletRequest;
2426

2527
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -45,6 +47,7 @@
4547
import org.quartz.SchedulerException;
4648
import org.slf4j.Logger;
4749
import org.slf4j.LoggerFactory;
50+
4851
import com.google.common.base.Strings;
4952
import com.google.gson.Gson;
5053

@@ -57,7 +60,7 @@ public class NotebookServer extends WebSocketServlet implements
5760
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
5861
Gson gson = new Gson();
5962
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
60-
final List<NotebookSocket> connectedSockets = new LinkedList<>();
63+
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
6164

6265
private Notebook notebook() {
6366
return ZeppelinServer.notebook;
@@ -84,9 +87,7 @@ public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) {
8487
public void onOpen(NotebookSocket conn) {
8588
LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
8689
conn.getRequest().getRemotePort());
87-
synchronized (connectedSockets) {
88-
connectedSockets.add(conn);
89-
}
90+
connectedSockets.add(conn);
9091
}
9192

9293
@Override
@@ -146,8 +147,7 @@ public void onMessage(NotebookSocket conn, String msg) {
146147
completion(conn, notebook, messagereceived);
147148
break;
148149
case PING:
149-
pong();
150-
break;
150+
break; //do nothing
151151
case ANGULAR_OBJECT_UPDATED:
152152
angularObjectUpdated(conn, notebook, messagereceived);
153153
break;
@@ -165,9 +165,7 @@ public void onClose(NotebookSocket conn, int code, String reason) {
165165
LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest()
166166
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
167167
removeConnectionFromAllNote(conn);
168-
synchronized (connectedSockets) {
169-
connectedSockets.remove(conn);
170-
}
168+
connectedSockets.remove(conn);
171169
}
172170

173171
protected Message deserializeMessage(String msg) {
@@ -284,13 +282,11 @@ private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
284282
}
285283

286284
private void broadcastAll(Message m) {
287-
synchronized (connectedSockets) {
288-
for (NotebookSocket conn : connectedSockets) {
289-
try {
290-
conn.send(serializeMessage(m));
291-
} catch (IOException e) {
292-
LOG.error("socket error", e);
293-
}
285+
for (NotebookSocket conn : connectedSockets) {
286+
try {
287+
conn.send(serializeMessage(m));
288+
} catch (IOException e) {
289+
LOG.error("socket error", e);
294290
}
295291
}
296292
}
@@ -729,6 +725,7 @@ private void runParagraph(NotebookSocket conn, Notebook notebook,
729725
public static class ParagraphJobListener implements JobListener {
730726
private NotebookServer notebookServer;
731727
private Note note;
728+
732729
public ParagraphJobListener(NotebookServer notebookServer, Note note) {
733730
this.notebookServer = notebookServer;
734731
this.note = note;
@@ -770,8 +767,6 @@ public void afterStatusChange(Job job, Status before, Status after) {
770767
public JobListener getParagraphJobListener(Note note) {
771768
return new ParagraphJobListener(this, note);
772769
}
773-
private void pong() {
774-
}
775770

776771
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
777772
List<InterpreterSetting> settings = note.getNoteReplLoader()

zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929

3030
import org.apache.commons.httpclient.HttpClient;
3131
import org.apache.commons.httpclient.HttpMethodBase;
32-
import org.apache.commons.httpclient.methods.*;
33-
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
32+
import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
33+
import org.apache.commons.httpclient.methods.DeleteMethod;
34+
import org.apache.commons.httpclient.methods.GetMethod;
35+
import org.apache.commons.httpclient.methods.PostMethod;
36+
import org.apache.commons.httpclient.methods.PutMethod;
37+
import org.apache.commons.httpclient.methods.RequestEntity;
3438
import org.apache.zeppelin.interpreter.InterpreterGroup;
3539
import org.apache.zeppelin.interpreter.InterpreterOption;
3640
import org.apache.zeppelin.interpreter.InterpreterSetting;
@@ -207,7 +211,7 @@ protected static void shutDown() throws Exception {
207211
}
208212

209213
LOG.info("Terminating test Zeppelin...");
210-
ZeppelinServer.jettyServer.stop();
214+
ZeppelinServer.jettyWebServer.stop();
211215
executor.shutdown();
212216

213217
long s = System.currentTimeMillis();

zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static void init() throws Exception {
6060
AbstractTestRestApi.startUp();
6161
gson = new Gson();
6262
notebook = ZeppelinServer.notebook;
63-
notebookServer = ZeppelinServer.notebookServer;
63+
notebookServer = ZeppelinServer.notebookWsServer;
6464
}
6565

6666
@AfterClass

zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,19 @@
1818
package org.apache.zeppelin.conf;
1919

2020
import java.net.URL;
21-
import java.util.*;
21+
import java.util.Arrays;
22+
import java.util.List;
2223

2324
import org.apache.commons.configuration.ConfigurationException;
2425
import org.apache.commons.configuration.XMLConfiguration;
2526
import org.apache.commons.configuration.tree.ConfigurationNode;
26-
import org.apache.zeppelin.notebook.repo.S3NotebookRepo;
2727
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

3131
/**
3232
* Zeppelin configuration.
3333
*
34-
* @author Leemoonsoo
35-
*
3634
*/
3735
public class ZeppelinConfiguration extends XMLConfiguration {
3836
private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";

0 commit comments

Comments
 (0)