Skip to content

Commit 5569788

Browse files
committed
2 parents 0a9af6c + 526ec5b commit 5569788

File tree

9 files changed

+152
-8
lines changed

9 files changed

+152
-8
lines changed

cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ object ParagraphParser {
7171

7272
val GENERIC_STATEMENT_PREFIX: Regex =
7373
"""(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|ALTER|
74-
|DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r
74+
|DROP|GRANT|REVOKE|TRUNCATE|LIST|USE|[a-z]\w+)\s+""".r
7575

7676
val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"
7777

docs/_includes/themes/zeppelin/_navigation.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
<li class="title"><span>Run Mode</span></li>
3232
<li><a href="{{BASE_PATH}}/quickstart/kubernetes.html">Kubernetes</a></li>
3333
<li><a href="{{BASE_PATH}}/quickstart/docker.html">Docker</a></li>
34+
<li><a href="{{BASE_PATH}}/quickstart/yarn.html">Yarn</a></li>
3435
<li role="separator" class="divider"></li>
3536
<li><a href="{{BASE_PATH}}/quickstart/spark_with_zeppelin.html">Spark with Zeppelin</a></li>
3637
<li><a href="{{BASE_PATH}}/quickstart/sql_with_zeppelin.html">SQL with Zeppelin</a></li>
@@ -85,6 +86,7 @@
8586
<ul class="dropdown-menu scrollable-menu">
8687
<li class="title"><span>Basics</span></li>
8788
<li><a href="{{BASE_PATH}}/setup/basics/how_to_build.html">How to Build Zeppelin</a></li>
89+
<li><a href="{{BASE_PATH}}/setup/basics/hadoop_integration.html">Hadoop Integration</a></li>
8890
<li><a href="{{BASE_PATH}}/setup/basics/multi_user_support.html">Multi-user Support</a></li>
8991
<li role="separator" class="divider"></li>
9092
<li class="title"><span>Deployment</span></li>

docs/quickstart/yarn.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
---
2+
layout: page
3+
title: "Zeppelin on Yarn"
4+
description: "Apache Zeppelin supports to run interpreter process in yarn containers"
5+
group: usage/interpreter
6+
---
7+
<!--
8+
Licensed under the Apache License, Version 2.0 (the "License");
9+
you may not use this file except in compliance with the License.
10+
You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
-->
20+
{% include JB/setup %}
21+
22+
# Zeppelin on Yarn
23+
24+
<div id="toc"></div>
25+
26+
Zeppelin on yarn means to run interpreter process in yarn container. The key benefit is the scalability, you won't run out of memory
27+
of the zeppelin server host if you run large amount of interpreter processes.
28+
29+
## Prerequisites
30+
The following is required for yarn interpreter mode.
31+
32+
* Hadoop client (both 2.x and 3.x are supported) is installed.
33+
* `$HADOOP_HOME/bin` is put in `PATH`. Because internally zeppelin will run command `hadoop classpath` to get all the hadoop jars and put them in the classpath of Zeppelin.
34+
* Set `USE_HADOOP` as `true` in `zeppelin-env.sh`.
35+
36+
## Configuration
37+
38+
Yarn interpreter mode needs to be set for each interpreter. You can set `zeppelin.interpreter.launcher` to be `yarn` to run it in yarn mode.
39+
Besides that, you can also specify other properties as following table.
40+
41+
<table class="table-configuration">
42+
<tr>
43+
<th>Name</th>
44+
<th>Default Value</th>
45+
<th>Description</th>
46+
</tr>
47+
<tr>
48+
<td>zeppelin.interpreter.yarn.resource.memory</td>
49+
<td>1024</td>
50+
<td>memory for interpreter process, unit: mb</td>
51+
</tr>
52+
<tr>
53+
<td>zeppelin.interpreter.yarn.resource.memoryOverhead</td>
54+
<td>Amount of non-heap memory to be allocated per interpreter process in yarn interpreter mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.</td>
55+
</tr>
56+
<tr>
57+
<td>zeppelin.interpreter.yarn.resource.cores</td>
58+
<td>1</td>
59+
<td>cpu cores for interpreter process</td>
60+
</tr>
61+
<tr>
62+
<td>zeppelin.interpreter.yarn.queue</td>
63+
<td>default</td>
64+
<td>yarn queue name</td>
65+
</tr>
66+
</table>
67+
68+
## Differences with non-yarn interpreter mode (local mode)
69+
70+
There're several differences between yarn interpreter mode with non-yarn interpreter mode (local mode)
71+
72+
* New yarn app will be allocated for the interpreter process.
73+
* Any local path setting won't work in yarn interpreter process. E.g. if you run python interpreter in yarn interpreter mode, then you need to make sure the python executable of `zeppelin.python` exist in all the nodes of yarn cluster.
74+
Because the python interpreter may launch in any node.
75+
* Don't use it for spark interpreter. Instead use spark's built-in yarn-client or yarn-cluster which is more suitable for spark interpreter.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
---
2+
layout: page
3+
title: "How to integrate with hadoop"
4+
description: "How to integrate with hadoop"
5+
group: setup/basics
6+
---
7+
<!--
8+
Licensed under the Apache License, Version 2.0 (the "License");
9+
you may not use this file except in compliance with the License.
10+
You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
-->
20+
{% include JB/setup %}
21+
22+
# Integrate with hadoop
23+
24+
<div id="toc"></div>
25+
26+
Hadoop is an optional component of zeppelin unless you need the following features
27+
28+
* Use hdfs to store notes.
29+
* Use hdfs to store interpreter configuration
30+
* Use hdfs to store recovery data
31+
* Launch interpreter in yarn mode
32+
33+
## Requirements
34+
35+
In Zeppelin 0.9 doesn't ship with hadoop dependencies, you need to include hadoop jars by yourself via the following steps
36+
37+
* Hadoop client (both 2.x and 3.x are supported) is installed.
38+
* `$HADOOP_HOME/bin` is put in `PATH`. Because internally zeppelin will run command `hadoop classpath` to get all the hadoop jars and put them in the classpath of Zeppelin.
39+
* Set `USE_HADOOP` as `true` in `zeppelin-env.sh`.

spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,8 @@ def show(self, obj, **kwargs):
7373
super(IPySparkZeppelinContext, self).show(obj, **kwargs)
7474

7575
z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext(), gateway)
76+
77+
# add jars to path
78+
import sys
79+
jarlist = map(lambda url: url.replace("file:/", "/"), (conf.get("spark.jars") or "").split(","))
80+
sys.path.extend(filter(lambda jar: jar not in sys.path, jarlist))

spark/interpreter/src/main/resources/python/zeppelin_pyspark.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,8 @@ def show(self, obj, **kwargs):
7171

7272
z = __zeppelin__ = PySparkZeppelinContext(intp.getZeppelinContext(), gateway)
7373
__zeppelin__._setup_matplotlib()
74+
75+
# add jars to path
76+
import sys
77+
jarlist = map(lambda url: url.replace("file:/", "/"), (conf.get("spark.jars") or "").split(","))
78+
sys.path.extend(filter(lambda jar: jar not in sys.path, jarlist))

zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,20 @@ public void testConfInterpreter() throws IOException {
10381038
p1.setText("%spark\nimport com.databricks.spark.csv._");
10391039
note.run(p1.getId(), true);
10401040
assertEquals(Status.FINISHED, p1.getStatus());
1041+
1042+
// test pyspark imports path
1043+
Paragraph p2 = note.addNewParagraph(anonymous);
1044+
p2.setText("%spark.pyspark\nimport sys\nsys.path");
1045+
note.run(p2.getId(), true);
1046+
assertEquals(Status.FINISHED, p2.getStatus());
1047+
assertTrue(p2.getReturn().toString().contains("databricks_spark"));
1048+
1049+
Paragraph p3 = note.addNewParagraph(anonymous);
1050+
p3.setText("%spark.ipyspark\nimport sys\nsys.path");
1051+
note.run(p3.getId(), true);
1052+
assertEquals(Status.FINISHED, p3.getStatus());
1053+
assertTrue(p3.getReturn().toString().contains("databricks_spark"));
1054+
10411055
} finally {
10421056
if (null != note) {
10431057
TestUtils.getInstance(Notebook.class).removeNote(note, anonymous);

zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
8181
if (!conf.isJobManagerEnabled()) {
8282
return new ArrayList<>();
8383
}
84-
List<NoteJobInfo> notesJobInfo = new ArrayList<>();
85-
notebook.getNoteStream()
84+
85+
List<NoteJobInfo> notesJobInfo = notebook.getNoteStream()
8686
.filter(note -> authorizationService.isOwner(context.getUserAndRoles(), note.getId()))
8787
.map(note -> new NoteJobInfo(note))
8888
.filter(noteJobInfo -> noteJobInfo.unixTimeLastRun > lastUpdateServerUnixTime)

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,10 @@ public void onFailure(Exception ex, ServiceContext context) throws IOException {
506506
});
507507
}
508508

509-
public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOException {
510-
getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, null,
509+
public void broadcastUpdateNoteJobInfo(Note note, long lastUpdateUnixTime) throws IOException {
510+
ServiceContext context = new ServiceContext(new AuthenticationInfo(),
511+
getNotebookAuthorizationService().getOwners(note.getId()));
512+
getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, context,
511513
new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(null) {
512514
@Override
513515
public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
@@ -1799,7 +1801,9 @@ public void run() {
17991801
@Override
18001802
public void onParagraphRemove(Paragraph p) {
18011803
try {
1802-
getJobManagerService().getNoteJobInfoByUnixTime(System.currentTimeMillis() - 5000, null,
1804+
ServiceContext context = new ServiceContext(new AuthenticationInfo(),
1805+
getNotebookAuthorizationService().getOwners(p.getNote().getId()));
1806+
getJobManagerService().getNoteJobInfoByUnixTime(System.currentTimeMillis() - 5000, context,
18031807
new JobManagerServiceCallback());
18041808
} catch (IOException e) {
18051809
LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
@@ -1809,7 +1813,7 @@ public void onParagraphRemove(Paragraph p) {
18091813
@Override
18101814
public void onNoteRemove(Note note, AuthenticationInfo subject) {
18111815
try {
1812-
broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
1816+
broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000);
18131817
} catch (IOException e) {
18141818
LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
18151819
}
@@ -1918,7 +1922,7 @@ public void onStatusChange(Paragraph p, Status before, Status after) {
19181922
p.setStatusToUserParagraph(p.getStatus());
19191923
broadcastParagraph(p.getNote(), p);
19201924
try {
1921-
broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
1925+
broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 5000);
19221926
} catch (IOException e) {
19231927
LOG.error("can not broadcast for job manager {}", e);
19241928
}

0 commit comments

Comments
 (0)