Skip to content

Commit 71a3478

Browse files
committed
adding batch and transaction
1 parent 364f00a commit 71a3478

8 files changed

Lines changed: 276 additions & 41 deletions

File tree

src/main/java/com/google/gcloud/datastore/BatchWriteOption.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@ public abstract class BatchWriteOption implements Serializable {
66

77
private static final long serialVersionUID = -3932758377282659839L;
88

9-
BatchWriteOption() {
10-
// package protected
11-
}
12-
139
public static final class ForceWrites extends BatchWriteOption {
1410

1511
private static final long serialVersionUID = 2555054296046232799L;
@@ -20,11 +16,22 @@ public ForceWrites(boolean force) {
2016
this.force = force;
2117
}
2218

23-
public boolean isForce() {
19+
public boolean force() {
2420
return force;
2521
}
22+
23+
@Override
24+
void apply(BatchWriterImpl batchWriter) {
25+
batchWriter.apply(this);
26+
}
2627
}
2728

29+
BatchWriteOption() {
30+
// package protected
31+
}
32+
33+
abstract void apply(BatchWriterImpl batchWriter);
34+
2835
public static ForceWrites forceWrites() {
2936
return new ForceWrites(true);
3037
}

src/main/java/com/google/gcloud/datastore/BatchWriter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@
22

33
public interface BatchWriter extends DatastoreWriter {
44

5+
/**
6+
* {@inheritDoc}
7+
* @throws DatastoreServiceException if a given entity already added to this batch
8+
*/
9+
@Override
10+
void add(Entity... entity);
11+
12+
/**
13+
* {@inheritDoc}
14+
* @throws DatastoreServiceException if an entity is marked for deletion in this batch
15+
*/
16+
@Override
17+
void update(Entity... entity);
18+
519
/**
620
* Submit the batch to the Datastore.
721
*
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.google.gcloud.datastore;
2+
3+
import static com.google.gcloud.datastore.DatastoreServiceException.Code.FAILED_PRECONDITION;
4+
5+
import com.google.api.services.datastore.DatastoreV1;
6+
7+
import java.util.LinkedHashMap;
8+
import java.util.LinkedHashSet;
9+
10+
class BatchWriterImpl implements BatchWriter {
11+
12+
private final LinkedHashMap<Key, Entity> toAdd = new LinkedHashMap<>();
13+
private final LinkedHashMap<Key, Entity> toUpdate = new LinkedHashMap<>();
14+
private final LinkedHashMap<Key, Entity> toPut = new LinkedHashMap<>();
15+
private final LinkedHashSet<Key> toDelete = new LinkedHashSet<>();
16+
private final DatastoreServiceImpl datastore;
17+
18+
private boolean force;
19+
protected boolean isValid = true;
20+
21+
BatchWriterImpl(DatastoreServiceImpl datastore, BatchWriteOption... options) {
22+
this.datastore = datastore;
23+
force = datastore.getOptions().force();
24+
for (BatchWriteOption option : options) {
25+
option.apply(this);
26+
}
27+
}
28+
29+
// Apply all valid options
30+
31+
void apply(BatchWriteOption.ForceWrites forceOptions) {
32+
this.force = forceOptions.force();
33+
}
34+
35+
void apply(BatchWriteOption other) {
36+
// dont care
37+
}
38+
39+
////////////////////
40+
41+
DatastoreServiceException newBatchFailure(Entity entity, String msg) {
42+
isValid = false;
43+
return new DatastoreServiceException(FAILED_PRECONDITION,
44+
new RuntimeException("Entity with the key " + entity.key() + " " + msg));
45+
}
46+
47+
protected void checkValid() {
48+
if (!isValid) {
49+
throw new DatastoreServiceException(FAILED_PRECONDITION,
50+
new RuntimeException("BatchWriter is in an invalid state"));
51+
}
52+
}
53+
54+
@Override
55+
public void add(Entity... entities) {
56+
checkValid();
57+
for (Entity entity : entities) {
58+
Key key = entity.key();
59+
if (toAdd.containsKey(key) || toUpdate.containsKey(key) || toPut.containsKey(key)) {
60+
throw newBatchFailure(entity, "was already added or updated in this batch");
61+
}
62+
if (toDelete.remove(key)) {
63+
toPut.put(key, entity);
64+
} else {
65+
toAdd.put(key, entity);
66+
}
67+
}
68+
}
69+
70+
@Override
71+
public void update(Entity... entities) {
72+
checkValid();
73+
for (Entity entity : entities) {
74+
Key key = entity.key();
75+
if (toDelete.contains(key)) {
76+
throw newBatchFailure(entity, "was alredy deleted in this batch");
77+
}
78+
if (toAdd.remove(key) != null || toPut.containsKey(key)) {
79+
toPut.put(key, entity);
80+
} else {
81+
toUpdate.put(key, entity);
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public void put(Entity... entities) {
88+
checkValid();
89+
for (Entity entity : entities) {
90+
Key key = entity.key();
91+
toAdd.remove(key);
92+
toUpdate.remove(key);
93+
toDelete.remove(key);
94+
toPut.put(key, entity);
95+
}
96+
}
97+
98+
@Override
99+
public void delete(Key... keys) {
100+
checkValid();
101+
for (Key key : keys) {
102+
toAdd.remove(key);
103+
toUpdate.remove(key);
104+
toPut.remove(key);
105+
toDelete.add(key);
106+
}
107+
}
108+
109+
@Override
110+
public void submit() {
111+
checkValid();
112+
DatastoreV1.Mutation.Builder mutationPb = DatastoreV1.Mutation.newBuilder();
113+
for (Entity entity : toAdd.values()) {
114+
mutationPb.addInsert(entity.toPb());
115+
}
116+
for (Entity entity : toUpdate.values()) {
117+
mutationPb.addUpdate(entity.toPb());
118+
}
119+
for (Entity entity : toPut.values()) {
120+
mutationPb.addUpsert(entity.toPb());
121+
}
122+
for (Key key : toDelete) {
123+
mutationPb.addDelete(key.toPb());
124+
}
125+
if (force) {
126+
mutationPb.setForce(force);
127+
}
128+
DatastoreV1.CommitRequest.Builder requestPb = newCommitRequest();
129+
requestPb.setMutation(mutationPb);
130+
datastore.comitMutation(requestPb);
131+
isValid = false;
132+
}
133+
134+
protected DatastoreV1.CommitRequest.Builder newCommitRequest() {
135+
DatastoreV1.CommitRequest.Builder requestPb = DatastoreV1.CommitRequest.newBuilder();
136+
requestPb.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
137+
return requestPb;
138+
}
139+
}

src/main/java/com/google/gcloud/datastore/DatastoreService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,28 +49,28 @@ public interface DatastoreService extends DatastoreReader, DatastoreWriter {
4949
Iterator<Key> allocateIds(PartialKey... key);
5050

5151
/**
52-
* @see DatastoreWriter#add(Entity...)
52+
* {@inheritDoc}
5353
* @throws DatastoreServiceExcepiton upon failure
5454
*/
5555
@Override
5656
void add(Entity... entity);
5757

5858
/**
59-
* @see DatastoreWriter#update(Entity...)
59+
* {@inheritDoc}
6060
* @throws DatastoreServiceExcepiton upon failure
6161
*/
6262
@Override
6363
void update(Entity... entity);
6464

6565
/**
66-
* @see DatastoreWriter#put(Entity...)
66+
* {@inheritDoc}
6767
* @throws DatastoreServiceExcepiton upon failure
6868
*/
6969
@Override
7070
void put(Entity... entity);
7171

7272
/**
73-
* @see DatastoreWriter#delete(Key...)
73+
* {@inheritDoc}
7474
* @throws DatastoreServiceExcepiton upon failure
7575
*/
7676
@Override

src/main/java/com/google/gcloud/datastore/DatastoreServiceImpl.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.google.gcloud.datastore;
22

33
import com.google.api.services.datastore.DatastoreV1;
4+
import com.google.api.services.datastore.DatastoreV1.BeginTransactionResponse;
45
import com.google.api.services.datastore.client.Datastore;
56
import com.google.api.services.datastore.client.DatastoreException;
67
import com.google.common.collect.AbstractIterator;
8+
import com.google.protobuf.ByteString;
79

810
import java.util.HashMap;
911
import java.util.Iterator;
@@ -27,14 +29,21 @@ public DatastoreServiceOptions getOptions() {
2729

2830
@Override
2931
public Transaction newTransaction(TransactionOption... transactionOption) {
30-
// TODO Auto-generated method stub
31-
return null;
32+
return new TransactionImpl(this, transactionOption);
33+
}
34+
35+
ByteString requestTransactionId(DatastoreV1.BeginTransactionRequest.Builder requestPb) {
36+
try {
37+
BeginTransactionResponse responsePb = datastore.beginTransaction(requestPb.build());
38+
return responsePb.getTransaction();
39+
} catch (DatastoreException e) {
40+
throw DatastoreServiceException.translateAndPropagate(e);
41+
}
3242
}
3343

3444
@Override
3545
public BatchWriter newBatchWriter(BatchWriteOption... batchWriteOption) {
36-
// TODO Auto-generated method stub
37-
return null;
46+
return new BatchWriterImpl(this, batchWriteOption);
3847
}
3948

4049
@Override
@@ -123,7 +132,10 @@ private void comitMutation(DatastoreV1.Mutation.Builder mutationPb) {
123132
}
124133
DatastoreV1.CommitRequest.Builder requestPb = DatastoreV1.CommitRequest.newBuilder();
125134
requestPb.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
126-
requestPb.setMutation(mutationPb.build());
135+
requestPb.setMutation(mutationPb);
136+
}
137+
138+
void comitMutation(DatastoreV1.CommitRequest.Builder requestPb) {
127139
try {
128140
datastore.commit(requestPb.build());
129141
} catch (DatastoreException e) {

src/main/java/com/google/gcloud/datastore/DatastoreWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ public interface DatastoreWriter {
77

88
/**
99
* A Datastore add operation.
10-
* The operation will fail if an entity already exists.
10+
* The operation will fail if an entity with the same key already exists.
1111
*/
1212
void add(Entity... entity);
1313

1414
/**
1515
* A Datastore update operation.
16-
* The operation will fail if an entity does not already exist.
16+
* The operation will fail if an entity with the same key does not already exist.
1717
*/
1818
void update(Entity... entity);
1919

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.google.gcloud.datastore;
2+
3+
import com.google.api.services.datastore.DatastoreV1;
4+
import com.google.gcloud.datastore.TransactionOption.IsolationLevel;
5+
import com.google.protobuf.ByteString;
6+
7+
import java.util.Iterator;
8+
9+
public final class TransactionImpl extends BatchWriterImpl implements Transaction {
10+
11+
private final ByteString transaction;
12+
private IsolationLevel isolationLevel;
13+
14+
TransactionImpl(DatastoreServiceImpl datastore, TransactionOption... options) {
15+
super(datastore, options);
16+
DatastoreV1.BeginTransactionRequest.Builder requestPb =
17+
DatastoreV1.BeginTransactionRequest.newBuilder();
18+
if (isolationLevel != null) {
19+
requestPb.setIsolationLevel(isolationLevel.level().toPb());
20+
}
21+
transaction = datastore.requestTransactionId(requestPb);
22+
}
23+
24+
void apply(IsolationLevel isolationLevel) {
25+
// TODO(ozarov): validate that this concept actually works!!!
26+
this.isolationLevel = isolationLevel;
27+
}
28+
29+
@Override
30+
public Entity get(Key key) {
31+
// TODO Auto-generated method stub
32+
return null;
33+
}
34+
35+
@Override
36+
public Iterator<Entity> get(Key... key) {
37+
// TODO Auto-generated method stub
38+
return null;
39+
}
40+
41+
@Override
42+
public QueryResult<PartialEntity> runQuery(Query query) {
43+
// TODO Auto-generated method stub
44+
return null;
45+
}
46+
47+
@Override
48+
public void commit() {
49+
submit();
50+
}
51+
52+
@Override
53+
public void rollback() {
54+
isValid = false;
55+
}
56+
57+
@Override
58+
protected DatastoreV1.CommitRequest.Builder newCommitRequest() {
59+
DatastoreV1.CommitRequest.Builder requestPb = DatastoreV1.CommitRequest.newBuilder();
60+
requestPb.setMode(DatastoreV1.CommitRequest.Mode.TRANSACTIONAL);
61+
requestPb.setTransaction(transaction);
62+
return requestPb;
63+
}
64+
}

0 commit comments

Comments
 (0)