Skip to content

Commit 706ffe5

Browse files
authored
fix ScyllaDB: lost page results due to not fetched the entire page (#1407)
fix #1340 * return page offset if not return local fetched data to users Change-Id: I8abe05cbdc95c28bc384e73d554a30300042f3bb
1 parent 8a2dfac commit 706ffe5

File tree

8 files changed

+288
-58
lines changed

8 files changed

+288
-58
lines changed

hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
package com.baidu.hugegraph.backend.store.cassandra;
2121

2222
import java.util.Iterator;
23+
import java.util.List;
2324
import java.util.function.BiFunction;
2425

2526
import com.baidu.hugegraph.backend.page.PageState;
2627
import com.baidu.hugegraph.backend.query.Query;
2728
import com.baidu.hugegraph.backend.store.BackendEntry;
2829
import com.baidu.hugegraph.backend.store.BackendEntryIterator;
2930
import com.baidu.hugegraph.util.E;
31+
import com.datastax.driver.core.ExecutionInfo;
3032
import com.datastax.driver.core.PagingState;
3133
import com.datastax.driver.core.ResultSet;
3234
import com.datastax.driver.core.Row;
@@ -37,24 +39,50 @@ public class CassandraEntryIterator extends BackendEntryIterator {
3739
private final Iterator<Row> rows;
3840
private final BiFunction<BackendEntry, Row, BackendEntry> merger;
3941

40-
private long remaining;
42+
private int fetchdPageSize;
43+
private long expected;
4144
private BackendEntry next;
4245

4346
public CassandraEntryIterator(ResultSet results, Query query,
4447
BiFunction<BackendEntry, Row, BackendEntry> merger) {
4548
super(query);
4649
this.results = results;
4750
this.rows = results.iterator();
48-
this.remaining = results.getAvailableWithoutFetching();
4951
this.merger = merger;
50-
this.next = null;
5152

52-
this.skipOffset();
53+
this.fetchdPageSize = results.getAvailableWithoutFetching();
54+
this.next = null;
5355

5456
if (query.paging()) {
55-
E.checkState(this.remaining == query.limit() ||
56-
results.isFullyFetched(),
57-
"Unexpected fetched page size: %s", this.remaining);
57+
assert query.offset() == 0L;
58+
assert query.limit() >= 0L || query.noLimit() : query.limit();
59+
// Skip page offset
60+
this.expected = PageState.fromString(query.page()).offset();
61+
this.skipPageOffset(query.page());
62+
// Check the number of available rows
63+
E.checkState(this.fetchdPageSize <= query.limit(),
64+
"Unexpected fetched page size: %s",
65+
this.fetchdPageSize);
66+
if (results.isFullyFetched()) {
67+
/*
68+
* All results fetched
69+
* NOTE: it may be enough or not enough for the entire page
70+
*/
71+
this.expected = this.fetchdPageSize;
72+
} else {
73+
/*
74+
* Not fully fetched, that's fetchdPageSize == query.limit(),
75+
*
76+
* NOTE: but there may be fetchdPageSize < query.limit(), means
77+
* not fetched the entire page (ScyllaDB may go here #1340),
78+
* try to fetch next page later until got the expected count.
79+
* Can simulate by: `select.setFetchSize(total - 1)`
80+
*/
81+
this.expected = query.total();
82+
}
83+
} else {
84+
this.expected = query.total();
85+
this.skipOffset();
5886
}
5987
}
6088

@@ -71,11 +99,18 @@ protected final boolean fetch() {
7199
this.next = null;
72100
}
73101

74-
while (this.remaining > 0 && this.rows.hasNext()) {
102+
while (this.expected > 0L && this.rows.hasNext()) {
103+
// Limit expected count, due to rows.hasNext() will fetch next page
104+
this.expected--;
105+
Row row = this.rows.next();
75106
if (this.query.paging()) {
76-
this.remaining--;
107+
// Update fetchdPageSize if auto fetch the next page
108+
if (this.expected > 0L && this.availableLocal() == 0) {
109+
if (this.rows.hasNext()) {
110+
this.fetchdPageSize = this.availableLocal();
111+
}
112+
}
77113
}
78-
Row row = this.rows.next();
79114
BackendEntry merged = this.merger.apply(this.current, row);
80115
if (this.current == null) {
81116
// The first time to read
@@ -112,11 +147,50 @@ protected final long skip(BackendEntry entry, long skip) {
112147

113148
@Override
114149
protected PageState pageState() {
115-
PagingState page = this.results.getExecutionInfo().getPagingState();
116-
if (page == null || this.results.isExhausted()) {
117-
return new PageState(PageState.EMPTY_BYTES, 0, (int) this.count());
150+
byte[] position;
151+
int offset = 0;
152+
int count = (int) this.count();
153+
assert this.fetched() == count;
154+
int extra = this.availableLocal();
155+
List<ExecutionInfo> infos = this.results.getAllExecutionInfo();
156+
if (extra > 0 && infos.size() >= 2) {
157+
/*
158+
* Go back to the previous page if there are still available
159+
* results fetched to local memory but not consumed, and set page
160+
* offset with consumed amount of results.
161+
*
162+
* Safely, we should get the remaining size of the current page by:
163+
* `Whitebox.getInternalState(results, "currentPage").size()`
164+
* instead of
165+
* `results.getAvailableWithoutFetching()`
166+
*/
167+
ExecutionInfo previous = infos.get(infos.size() - 2);
168+
PagingState page = previous.getPagingState();
169+
position = page.toBytes();
170+
offset = this.fetchdPageSize - extra;
171+
} else {
172+
PagingState page = this.results.getExecutionInfo().getPagingState();
173+
if (page == null || this.expected > 0L) {
174+
// Call isExhausted() will lead to try to fetch the next page
175+
E.checkState(this.results.isExhausted(),
176+
"Unexpected paging state with expected=%s, " +
177+
"ensure consume all the fetched results before " +
178+
"calling pageState()", this.expected);
179+
position = PageState.EMPTY_BYTES;
180+
} else {
181+
/*
182+
* Exist page position which used to fetch the next page.
183+
* Maybe it happens to the last page (that's the position is
184+
* at the end of results and next page is empty)
185+
*/
186+
position = page.toBytes();
187+
}
118188
}
119-
byte[] position = page.toBytes();
120-
return new PageState(position, 0, (int) this.count());
189+
190+
return new PageState(position, offset, count);
191+
}
192+
193+
private int availableLocal() {
194+
return this.results.getAvailableWithoutFetching();
121195
}
122196
}

hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,18 +204,39 @@ protected List<Select> query2Select(String table, Query query) {
204204
}
205205

206206
protected void setPageState(Query query, List<Select> selects) {
207-
if (query.noLimit()) {
207+
if (query.noLimit() && !query.paging()) {
208208
return;
209209
}
210210
for (Select select : selects) {
211-
long total = query.total();
211+
int total = (int) query.total();
212+
if (!query.noLimit()) {
213+
E.checkArgument(total == query.total(),
214+
"Invalid query limit %s", query.limit());
215+
} else {
216+
assert total == -1 : total;
217+
}
218+
212219
String page = query.page();
213220
if (page == null) {
214221
// Set limit
215-
select.limit((int) total);
222+
assert total > 0 : total;
223+
select.limit(total);
216224
} else {
217-
select.setFetchSize((int) total);
218-
// It's the first time if page is empty
225+
/*
226+
* NOTE: the `total` may be -1 when query.noLimit(),
227+
* setFetchSize(-1) means the default fetch size will be used.
228+
*/
229+
assert total > 0 || total == -1 : total;
230+
select.setFetchSize(total);
231+
232+
/*
233+
* Can't set limit here `select.limit(total)`
234+
* due to it will cause can't get the next page-state.
235+
* Also can't set `select.limit(total + 1)` due to it will
236+
* cause error "Paging state mismatch" when setPagingState().
237+
*/
238+
239+
// It's the first time if page is empty, skip setPagingState
219240
if (!page.isEmpty()) {
220241
byte[] position = PageState.fromString(page).position();
221242
try {

hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ private byte[] toBytes() {
7272
}
7373

7474
public static PageState fromString(String page) {
75+
E.checkNotNull(page, "page");
7576
return fromBytes(toBytes(page));
7677
}
7778

hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryEntryIterator.java

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ public BinaryEntryIterator(BackendIterator<Elem> results, Query query,
4646
this.merger = m;
4747
this.next = null;
4848

49-
this.skipOffset();
50-
5149
if (query.paging()) {
50+
assert query.offset() == 0L;
51+
assert PageState.fromString(query.page()).offset() == 0;
5252
this.skipPageOffset(query.page());
53+
} else {
54+
this.skipOffset();
5355
}
5456
}
5557

@@ -98,22 +100,9 @@ protected final boolean fetch() {
98100
return this.current != null;
99101
}
100102

101-
public final static long sizeOfBackendEntry(BackendEntry entry) {
102-
/*
103-
* 3 cases:
104-
* 1) one vertex per entry
105-
* 2) one edge per column (one entry <==> a vertex),
106-
* 3) one element id per column (one entry <==> an index)
107-
*/
108-
if (entry.type().isEdge() || entry.type().isIndex()) {
109-
return entry.columnsSize();
110-
}
111-
return 1L;
112-
}
113-
114103
@Override
115104
protected final long sizeOf(BackendEntry entry) {
116-
return sizeOfBackendEntry(entry);
105+
return sizeOfEntry(entry);
117106
}
118107

119108
@Override
@@ -140,10 +129,16 @@ private void removeLastRecord() {
140129
((BinaryBackendEntry) this.current).removeColumn(lastOne);
141130
}
142131

143-
private void skipPageOffset(String page) {
144-
PageState pagestate = PageState.fromString(page);
145-
if (pagestate.offset() > 0 && this.fetch()) {
146-
this.skip(this.current, pagestate.offset());
132+
public final static long sizeOfEntry(BackendEntry entry) {
133+
/*
134+
* 3 cases:
135+
* 1) one vertex per entry
136+
* 2) one edge per column (one entry <==> a vertex),
137+
* 3) one element id per column (one entry <==> an index)
138+
*/
139+
if (entry.type().isEdge() || entry.type().isIndex()) {
140+
return entry.columnsSize();
147141
}
142+
return 1L;
148143
}
149144
}

hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,28 +128,47 @@ protected final long fetched() {
128128
return this.count + ccount;
129129
}
130130

131+
protected final void skipPageOffset(String page) {
132+
PageState pageState = PageState.fromString(page);
133+
int pageOffset = pageState.offset();
134+
if (pageOffset > 0) {
135+
/*
136+
* Don't update this.count even if skipped page offset,
137+
* because the skipped records belongs to the last page.
138+
*/
139+
this.skipOffset(pageOffset);
140+
}
141+
}
142+
131143
protected void skipOffset() {
132144
long offset = this.query.offset() - this.query.actualOffset();
133145
if (offset <= 0L) {
134146
return;
135147
}
148+
long skipped = this.skipOffset(offset);
149+
this.count += skipped;
150+
this.query.goOffset(skipped);
151+
}
136152

153+
protected long skipOffset(long offset) {
154+
assert offset >= 0L;
155+
long skipped = 0L;
137156
// Skip offset
138-
while (this.count < offset && this.fetch()) {
157+
while (skipped < offset && this.fetch()) {
139158
assert this.current != null;
140159
final long size = this.sizeOf(this.current);
141-
this.count += size;
142-
if (this.count > offset) {
160+
skipped += size;
161+
if (skipped > offset) {
143162
// Skip part of sub-items in an entry
144-
final long skip = size - (this.count - offset);
145-
this.count -= this.skip(this.current, skip);
146-
assert this.count == offset;
163+
final long skip = size - (skipped - offset);
164+
skipped -= this.skip(this.current, skip);
165+
assert skipped == offset;
147166
} else {
148167
// Skip entry
149168
this.current = null;
150169
}
151170
}
152-
this.query.goOffset(this.count);
171+
return skipped;
153172
}
154173

155174
protected long sizeOf(BackendEntry entry) {

hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ protected static final BackendEntryIterator newEntryIterator(
271271
}
272272

273273
protected static final long sizeOfBackendEntry(BackendEntry entry) {
274-
return BinaryEntryIterator.sizeOfBackendEntry(entry);
274+
return BinaryEntryIterator.sizeOfEntry(entry);
275275
}
276276

277277
private static class RocksDBShardSpliter extends ShardSpliter<Session> {

0 commit comments

Comments
 (0)