Skip to content

Commit 22aecb3

Browse files
committed
2 parents ac03666 + dfbea2e commit 22aecb3

File tree

7 files changed

+171
-37
lines changed

7 files changed

+171
-37
lines changed

geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.commons.lang.StringUtils;
2222
import org.apache.zeppelin.interpreter.Interpreter;
2323
import org.apache.zeppelin.interpreter.InterpreterContext;
24-
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
2524
import org.apache.zeppelin.interpreter.InterpreterResult;
2625
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
2726
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -38,7 +37,7 @@
3837
import com.gemstone.gemfire.pdx.PdxInstance;
3938

4039
/**
41-
* Apache Geode OQL Interpreter (http://geode.incubator.apache.org)
40+
* Apache Geode OQL Interpreter (http://geode.apache.org)
4241
*
4342
* <ul>
4443
* <li>{@code geode.locator.host} - The Geode Locator {@code <HOST>} to connect to.</li>
@@ -87,30 +86,12 @@ public class GeodeOqlInterpreter extends Interpreter {
8786

8887
private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class);
8988

90-
public static final String DEFAULT_PORT = "10334";
91-
public static final String DEFAULT_HOST = "localhost";
92-
public static final String DEFAULT_MAX_RESULT = "1000";
93-
9489
private static final char NEWLINE = '\n';
9590
private static final char TAB = '\t';
9691
private static final char WHITESPACE = ' ';
9792

9893
private static final String TABLE_MAGIC_TAG = "%table ";
9994

100-
public static final String LOCATOR_HOST = "geode.locator.host";
101-
public static final String LOCATOR_PORT = "geode.locator.port";
102-
public static final String MAX_RESULT = "geode.max.result";
103-
104-
static {
105-
Interpreter.register(
106-
"oql",
107-
"geode",
108-
GeodeOqlInterpreter.class.getName(),
109-
new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.")
110-
.add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port")
111-
.add(MAX_RESULT, DEFAULT_MAX_RESULT, "Max number of OQL result to display.").build());
112-
}
113-
11495
private ClientCache clientCache = null;
11596
private QueryService queryService = null;
11697
private Exception exceptionOnConnect;
@@ -122,8 +103,8 @@ public GeodeOqlInterpreter(Properties property) {
122103

123104
protected ClientCache getClientCache() {
124105

125-
String locatorHost = getProperty(LOCATOR_HOST);
126-
int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT));
106+
String locatorHost = getProperty("geode.locator.host");
107+
int locatorPort = Integer.valueOf(getProperty("geode.locator.port"));
127108

128109
ClientCache clientCache =
129110
new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create();
@@ -139,7 +120,7 @@ public void open() {
139120
close();
140121

141122
try {
142-
maxResult = Integer.valueOf(getProperty(MAX_RESULT));
123+
maxResult = Integer.valueOf(getProperty("geode.max.result"));
143124

144125
clientCache = getClientCache();
145126
queryService = clientCache.getQueryService();
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
[
2+
{
3+
"group": "geode",
4+
"name": "oql",
5+
"className": "org.apache.zeppelin.geode.GeodeOqlInterpreter",
6+
"properties": {
7+
"geode.locator.host": {
8+
"envName": null,
9+
"propertyName": "geode.locator.host",
10+
"defaultValue": "localhost",
11+
"description": "The Geode Locator Host."
12+
},
13+
"geode.locator.port": {
14+
"envName": null,
15+
"propertyName": "geode.locator.port",
16+
"defaultValue": "10334",
17+
"description": "The Geode Locator Port."
18+
},
19+
"geode.max.result": {
20+
"envName": null,
21+
"propertyName": "geode.max.result",
22+
"defaultValue": "1000",
23+
"description": "Max number of OQL result to display."
24+
}
25+
},
26+
"editor": {
27+
"language": "sql"
28+
}
29+
}
30+
]

geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ private static Iterator<Object> asIterator(Object... items) {
5858
public void testOpenCommandIndempotency() {
5959

6060
Properties properties = new Properties();
61-
properties.put(LOCATOR_HOST, DEFAULT_HOST);
62-
properties.put(LOCATOR_PORT, DEFAULT_PORT);
63-
properties.put(MAX_RESULT, DEFAULT_MAX_RESULT);
61+
properties.put("geode.locator.host", "localhost");
62+
properties.put("geode.locator.port", "10334");
63+
properties.put("geode.max.result", "1000");
6464

6565
GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(properties));
6666

zeppelin-interpreter/src/main/java/org/apache/zeppelin/user/AuthenticationInfo.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@
1818

1919
package org.apache.zeppelin.user;
2020

21+
import org.apache.commons.lang.StringUtils;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
2125
/***
2226
*
2327
*/
2428
public class AuthenticationInfo {
29+
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationInfo.class);
2530
String user;
2631
String ticket;
2732
UserCredentials userCredentials;
33+
public static final AuthenticationInfo ANONYMOUS = new AuthenticationInfo("anonymous",
34+
"anonymous");
2835

2936
public AuthenticationInfo() {}
3037

@@ -66,4 +73,17 @@ public void setUserCredentials(UserCredentials userCredentials) {
6673
this.userCredentials = userCredentials;
6774
}
6875

76+
public static boolean isAnonymous(AuthenticationInfo subject) {
77+
if (subject == null) {
78+
LOG.warn("Subject is null, assuming anonymous. "
79+
+ "Not recommended to use subject as null except in tests");
80+
return true;
81+
}
82+
return subject.isAnonymous();
83+
}
84+
85+
public boolean isAnonymous() {
86+
return ANONYMOUS.equals(this) || "anonymous".equalsIgnoreCase(this.getUser())
87+
|| StringUtils.isEmpty(this.getUser());
88+
}
6989
}

zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.MalformedURLException;
2121
import java.net.URI;
2222
import java.net.URISyntaxException;
23+
import java.util.HashSet;
2324
import java.util.concurrent.atomic.AtomicInteger;
2425

2526
import org.apache.commons.httpclient.HttpClient;
@@ -36,6 +37,7 @@
3637
import org.apache.shiro.authz.AuthorizationInfo;
3738
import org.apache.shiro.realm.AuthorizingRealm;
3839
import org.apache.shiro.subject.PrincipalCollection;
40+
import org.apache.zeppelin.server.ZeppelinServer;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
4143

@@ -135,6 +137,7 @@ protected User authenticateUser(String requestBody) {
135137
}
136138
responseBody = put.getResponseBodyAsString();
137139
put.releaseConnection();
140+
138141
} catch (IOException e) {
139142
LOG.error("Cannot login user", e);
140143
throw new AuthenticationException(e.getMessage());
@@ -147,6 +150,13 @@ protected User authenticateUser(String requestBody) {
147150
LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
148151
throw new AuthenticationException("Cannot login to ZeppelinHub");
149152
}
153+
154+
/* TODO(khalid): add proper roles and add listener */
155+
HashSet<String> userAndRoles = new HashSet<String>();
156+
userAndRoles.add(account.login);
157+
ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
158+
new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);
159+
150160
return account;
151161
}
152162

zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import java.util.Collections;
2525
import java.util.Date;
2626
import java.util.HashMap;
27+
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Map;
30+
import java.util.Set;
2931

3032
import org.apache.zeppelin.conf.ZeppelinConfiguration;
3133
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
@@ -90,14 +92,6 @@ public NotebookRepoSync(ZeppelinConfiguration conf) {
9092
LOG.info("No storage could be initialized, using default {} storage", defaultStorage);
9193
initializeDefaultStorage(conf);
9294
}
93-
if (getRepoCount() > 1) {
94-
try {
95-
AuthenticationInfo subject = new AuthenticationInfo("anonymous");
96-
sync(0, 1, subject);
97-
} catch (IOException e) {
98-
LOG.warn("Failed to sync with secondary storage on start {}", e);
99-
}
100-
}
10195
}
10296

10397
@SuppressWarnings("static-access")
@@ -170,6 +164,10 @@ public void remove(String noteId, AuthenticationInfo subject) throws IOException
170164
/* TODO(khalid): handle case when removing from secondary storage fails */
171165
}
172166

167+
void remove(int repoIndex, String noteId, AuthenticationInfo subject) throws IOException {
168+
getRepo(repoIndex).remove(noteId, subject);
169+
}
170+
173171
/**
174172
* Copies new/updated notes from source to destination storage
175173
*
@@ -195,7 +193,7 @@ void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) th
195193
for (String id : pushNoteIds) {
196194
LOG.info("ID : " + id);
197195
}
198-
pushNotes(subject, pushNoteIds, srcRepo, dstRepo);
196+
pushNotes(subject, pushNoteIDs, srcRepo, dstRepo, false);
199197
} else {
200198
LOG.info("Nothing to push");
201199
}
@@ -205,7 +203,7 @@ void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) th
205203
for (String id : pullNoteIds) {
206204
LOG.info("ID : " + id);
207205
}
208-
pushNotes(subject, pullNoteIds, dstRepo, srcRepo);
206+
pushNotes(subject, pullNoteIDs, dstRepo, srcRepo, true);
209207
} else {
210208
LOG.info("Nothing to pull");
211209
}
@@ -228,16 +226,43 @@ public void sync(AuthenticationInfo subject) throws IOException {
228226
}
229227

230228
private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo,
231-
NotebookRepo remoteRepo) {
229+
NotebookRepo remoteRepo, boolean setPermissions) {
232230
for (String id : ids) {
233231
try {
234232
remoteRepo.save(localRepo.get(id, subject), subject);
233+
if (setPermissions && emptyNoteAcl(id)) {
234+
makePrivate(id, subject);
235+
}
235236
} catch (IOException e) {
236237
LOG.error("Failed to push note to storage, moving onto next one", e);
237238
}
238239
}
239240
}
240241

242+
private boolean emptyNoteAcl(String noteId) {
243+
NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance();
244+
return notebookAuthorization.getOwners(noteId).isEmpty()
245+
&& notebookAuthorization.getReaders(noteId).isEmpty()
246+
&& notebookAuthorization.getWriters(noteId).isEmpty();
247+
}
248+
249+
private void makePrivate(String noteId, AuthenticationInfo subject) {
250+
if (AuthenticationInfo.isAnonymous(subject)) {
251+
LOG.info("User is anonymous, permissions are not set for pulled notes");
252+
return;
253+
}
254+
NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance();
255+
Set<String> users = notebookAuthorization.getOwners(noteId);
256+
users.add(subject.getUser());
257+
notebookAuthorization.setOwners(noteId, users);
258+
users = notebookAuthorization.getReaders(noteId);
259+
users.add(subject.getUser());
260+
notebookAuthorization.setReaders(noteId, users);
261+
users = notebookAuthorization.getWriters(noteId);
262+
users.add(subject.getUser());
263+
notebookAuthorization.setWriters(noteId, users);
264+
}
265+
241266
private void deleteNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo repo)
242267
throws IOException {
243268
for (String id : ids) {

zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424

2525
import java.io.File;
2626
import java.io.IOException;
27+
import java.util.HashSet;
2728
import java.util.Map;
29+
import java.util.Set;
2830

2931
import org.apache.commons.io.FileUtils;
3032
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -314,6 +316,72 @@ public void testCheckpointOneStorage() throws IOException, SchedulerException {
314316
notebookRepoSync.remove(note.getId(), anonymous);
315317
}
316318

319+
@Test
320+
public void testSyncWithAcl() throws IOException {
321+
/* scenario 1 - note exists with acl on main storage */
322+
AuthenticationInfo user1 = new AuthenticationInfo("user1");
323+
Note note = notebookSync.createNote(user1);
324+
assertEquals(0, note.getParagraphs().size());
325+
326+
// saved on both storages
327+
assertEquals(1, notebookRepoSync.list(0, null).size());
328+
assertEquals(1, notebookRepoSync.list(1, null).size());
329+
330+
/* check that user1 is the only owner */
331+
NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
332+
Set<String> entity = new HashSet<String>();
333+
entity.add(user1.getUser());
334+
assertEquals(true, authInfo.isOwner(note.getId(), entity));
335+
assertEquals(1, authInfo.getOwners(note.getId()).size());
336+
assertEquals(0, authInfo.getReaders(note.getId()).size());
337+
assertEquals(0, authInfo.getWriters(note.getId()).size());
338+
339+
/* update note and save on secondary storage */
340+
Paragraph p1 = note.addParagraph();
341+
p1.setText("hello world");
342+
assertEquals(1, note.getParagraphs().size());
343+
notebookRepoSync.save(1, note, null);
344+
345+
/* check paragraph isn't saved into first storage */
346+
assertEquals(0, notebookRepoSync.get(0,
347+
notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size());
348+
/* check paragraph is saved into second storage */
349+
assertEquals(1, notebookRepoSync.get(1,
350+
notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size());
351+
352+
/* now sync by user1 */
353+
notebookRepoSync.sync(user1);
354+
355+
/* check that note updated and acl are same on main storage*/
356+
assertEquals(1, notebookRepoSync.get(0,
357+
notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size());
358+
assertEquals(true, authInfo.isOwner(note.getId(), entity));
359+
assertEquals(1, authInfo.getOwners(note.getId()).size());
360+
assertEquals(0, authInfo.getReaders(note.getId()).size());
361+
assertEquals(0, authInfo.getWriters(note.getId()).size());
362+
363+
/* scenario 2 - note doesn't exist on main storage */
364+
/* remove from main storage */
365+
notebookRepoSync.remove(0, note.getId(), user1);
366+
assertEquals(0, notebookRepoSync.list(0, null).size());
367+
assertEquals(1, notebookRepoSync.list(1, null).size());
368+
authInfo.removeNote(note.getId());
369+
assertEquals(0, authInfo.getOwners(note.getId()).size());
370+
assertEquals(0, authInfo.getReaders(note.getId()).size());
371+
assertEquals(0, authInfo.getWriters(note.getId()).size());
372+
373+
/* now sync - should bring note from secondary storage with added acl */
374+
notebookRepoSync.sync(user1);
375+
assertEquals(1, notebookRepoSync.list(0, null).size());
376+
assertEquals(1, notebookRepoSync.list(1, null).size());
377+
assertEquals(1, authInfo.getOwners(note.getId()).size());
378+
assertEquals(1, authInfo.getReaders(note.getId()).size());
379+
assertEquals(1, authInfo.getWriters(note.getId()).size());
380+
assertEquals(true, authInfo.isOwner(note.getId(), entity));
381+
assertEquals(true, authInfo.isReader(note.getId(), entity));
382+
assertEquals(true, authInfo.isWriter(note.getId(), entity));
383+
}
384+
317385
static void delete(File file){
318386
if(file.isFile()) file.delete();
319387
else if(file.isDirectory()){

0 commit comments

Comments
 (0)