Skip to content

Commit 70f4a1a

Browse files
authored
CP for #28624 into release 2.51.0 (Bigtable Python timestamp bug fix) (#28634)
1 parent 34ff286 commit 70f4a1a

File tree

4 files changed

+39
-18
lines changed

4 files changed

+39
-18
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,13 @@ public KV<ByteString, Iterable<Mutation>> apply(Row row) {
179179
.setColumnQualifier(
180180
ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get()))
181181
.setFamilyNameBytes(
182-
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()));
183-
if (mutation.containsKey("timestamp_micros")) {
184-
setMutation =
185-
setMutation.setTimestampMicros(
186-
Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get()));
187-
}
182+
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()))
183+
// Use timestamp if provided, else default to -1 (current Bigtable server time)
184+
.setTimestampMicros(
185+
mutation.containsKey("timestamp_micros")
186+
? Longs.fromByteArray(
187+
ofNullable(mutation.get("timestamp_micros")).get())
188+
: -1);
188189
bigtableMutation = Mutation.newBuilder().setSetCell(setMutation.build()).build();
189190
break;
190191
case "DeleteFromColumn":

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ public void tearDown() {
154154
public void testSetMutationsExistingColumn() {
155155
RowMutation rowMutation =
156156
RowMutation.create(tableId, "key-1")
157-
.setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a")
158-
.setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c");
157+
.setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a")
158+
.setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c");
159159
dataClient.mutateRow(rowMutation);
160160

161161
List<Map<String, byte[]>> mutations = new ArrayList<>();
@@ -165,13 +165,15 @@ public void testSetMutationsExistingColumn() {
165165
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
166166
"value", "new-val-1-a".getBytes(StandardCharsets.UTF_8),
167167
"column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8),
168-
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)));
168+
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8),
169+
"timestamp_micros", Longs.toByteArray(2000)));
169170
mutations.add(
170171
ImmutableMap.of(
171172
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
172173
"value", "new-val-1-c".getBytes(StandardCharsets.UTF_8),
173174
"column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8),
174-
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8)));
175+
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8),
176+
"timestamp_micros", Longs.toByteArray(2000)));
175177
Row mutationRow =
176178
Row.withSchema(SCHEMA)
177179
.withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8))
@@ -202,10 +204,11 @@ public void testSetMutationsExistingColumn() {
202204
.collect(Collectors.toList());
203205
assertEquals(2, cellsColA.size());
204206
assertEquals(2, cellsColC.size());
205-
System.out.println(cellsColA);
206-
System.out.println(cellsColC);
207-
assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8());
208-
assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8());
207+
// Bigtable keeps cell history ordered by descending timestamp
208+
assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8());
209+
assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8());
210+
assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8());
211+
assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8());
209212
}
210213

211214
@Test

sdks/python/apache_beam/io/gcp/bigtableio.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,10 @@ def process(self, direct_row):
252252
"type": b'SetCell',
253253
"family_name": mutation.set_cell.family_name.encode('utf-8'),
254254
"column_qualifier": mutation.set_cell.column_qualifier,
255-
"value": mutation.set_cell.value
255+
"value": mutation.set_cell.value,
256+
"timestamp_micros": struct.pack(
257+
'>q', mutation.set_cell.timestamp_micros)
256258
}
257-
micros = mutation.set_cell.timestamp_micros
258-
if micros > -1:
259-
mutation_dict['timestamp_micros'] = struct.pack('>q', micros)
260259
elif mutation.__contains__("delete_from_column"):
261260
mutation_dict = {
262261
"type": b'DeleteFromColumn',

sdks/python/apache_beam/io/gcp/bigtableio_it_test.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ def test_set_mutation(self):
223223
row1_col2_cell = Cell(b'val1-2', 200_000_000)
224224
row2_col1_cell = Cell(b'val2-1', 100_000_000)
225225
row2_col2_cell = Cell(b'val2-2', 200_000_000)
226+
# When setting this cell, we won't set a timestamp. We expect the timestamp
227+
# to default to -1, and Bigtable will set it to system time at insertion.
228+
row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time())
226229
# rows sent to write transform
227230
row1.set_cell(
228231
'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp)
@@ -232,6 +235,8 @@ def test_set_mutation(self):
232235
'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp)
233236
row2.set_cell(
234237
'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp)
238+
# don't set a timestamp here. it should default to -1
239+
row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value)
235240

236241
self.run_pipeline([row1, row2])
237242

@@ -249,6 +254,19 @@ def test_set_mutation(self):
249254
self.assertEqual(
250255
row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0])
251256

257+
# check mutation that doesn't have a timestamp set is handled properly:
258+
self.assertEqual(
259+
row2_col1_no_timestamp.value,
260+
actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value)
261+
# Bigtable sets timestamp as insertion time, which is later than the
262+
# time.time() we set when creating this test case
263+
cell_timestamp = actual_row2.find_cells('col_fam',
264+
b'col-no-timestamp')[0].timestamp
265+
self.assertTrue(
266+
row2_col1_no_timestamp.timestamp < cell_timestamp,
267+
msg="Expected cell with unset timestamp to have ingestion time "
268+
f"attached, but was {cell_timestamp}")
269+
252270
def test_delete_cells_mutation(self):
253271
col_fam = self.table.column_family('col_fam')
254272
col_fam.create()

0 commit comments

Comments
 (0)