Skip to content

Commit 39c7369

Browse files
committed
Merge remote-tracking branch 'origin/master' into guava_force
2 parents 6ca9251 + 3930d12 commit 39c7369

6 files changed

Lines changed: 111 additions & 57 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ class BeamModulePlugin implements Plugin<Project> {
422422
// a dependency version which should match across multiple
423423
// Maven artifacts.
424424
def activemq_version = "5.14.5"
425-
def autovalue_version = "1.7.4"
425+
def autovalue_version = "1.7.2"
426426
def aws_java_sdk_version = "1.11.718"
427427
def aws_java_sdk2_version = "2.13.54"
428428
def cassandra_driver_version = "3.10.2"
@@ -435,7 +435,7 @@ class BeamModulePlugin implements Plugin<Project> {
435435
def google_code_gson_version = "2.8.6"
436436
def google_oauth_clients_version = "1.31.0"
437437
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
438-
def grpc_version = "1.35.0"
438+
def grpc_version = "1.32.2"
439439
def guava_version = "30.1-jre"
440440
def hadoop_version = "2.10.1"
441441
def hamcrest_version = "2.1"
@@ -447,7 +447,7 @@ class BeamModulePlugin implements Plugin<Project> {
447447
def jsr305_version = "3.0.2"
448448
def kafka_version = "2.4.1"
449449
def nemo_version = "0.1"
450-
def netty_version = "4.1.52.Final"
450+
def netty_version = "4.1.51.Final"
451451
def postgres_version = "42.2.16"
452452
def powermock_version = "2.0.9"
453453
def protobuf_version = "3.12.0"
@@ -509,7 +509,7 @@ class BeamModulePlugin implements Plugin<Project> {
509509
google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version",
510510
google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version",
511511
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
512-
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20201030-$google_clients_version",
512+
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20200719-$google_clients_version",
513513
google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20200501-$google_clients_version",
514514
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20200720-$google_clients_version",
515515
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20200713-$google_clients_version",
@@ -519,7 +519,7 @@ class BeamModulePlugin implements Plugin<Project> {
519519
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version
520520
google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version
521521
google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version
522-
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.8.5",
522+
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage", // google_cloud_platform_libraries_bom sets version
523523
google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.16.0",
524524
google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2",
525525
google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
@@ -530,9 +530,9 @@ class BeamModulePlugin implements Plugin<Project> {
530530
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub:$google_cloud_pubsub_version",
531531
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite:$google_cloud_pubsublite_version",
532532
// The GCP Libraries BOM dashboard shows the versions set by the BOM:
533-
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/16.3.0/artifact_details.html
533+
// https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/13.2.0/artifact_details.html
534534
// Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
535-
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:16.3.0",
535+
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:13.2.0",
536536
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
537537
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
538538
// google-http-client's version is explicitly declared for sdks/java/maven-archetypes/examples
@@ -583,7 +583,7 @@ class BeamModulePlugin implements Plugin<Project> {
583583
jackson_module_scala : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
584584
jaxb_api : "jakarta.xml.bind:jakarta.xml.bind-api:$jaxb_api_version",
585585
jaxb_impl : "com.sun.xml.bind:jaxb-impl:$jaxb_api_version",
586-
joda_time : "joda-time:joda-time:2.10.5",
586+
joda_time : "joda-time:joda-time:2.10.10",
587587
jsonassert : "org.skyscreamer:jsonassert:1.5.0",
588588
jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version",
589589
junit : "junit:junit:4.13.1",
@@ -601,7 +601,7 @@ class BeamModulePlugin implements Plugin<Project> {
601601
powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version",
602602
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
603603
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",
604-
proto_google_cloud_bigquerybeta2_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version
604+
proto_google_cloud_bigquery_storage_v1beta1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1", // google_cloud_platform_libraries_bom sets version
605605
proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version
606606
proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version
607607
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version

sdks/java/container/license_scripts/dep_urls_java.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jaxen:
4141
'1.1.6':
4242
type: "3-Clause BSD"
4343
libraries-bom:
44-
'16.3.0':
44+
'13.2.0':
4545
license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"
4646
type: "Apache License 2.0"
4747
paranamer:

sdks/java/io/google-cloud-platform/build.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ dependencies {
6161
compile library.java.google_http_client
6262
compile library.java.google_http_client_jackson2
6363
compile library.java.grpc_alts
64-
compile library.java.grpc_api
6564
compile library.java.grpc_auth
6665
compile library.java.grpc_core
6766
compile library.java.grpc_context
@@ -78,7 +77,7 @@ dependencies {
7877
compile library.java.junit
7978
compile library.java.netty_handler
8079
compile library.java.netty_tcnative_boringssl_static
81-
compile library.java.proto_google_cloud_bigquerybeta2_storage_v1
80+
compile library.java.proto_google_cloud_bigquery_storage_v1beta1
8281
compile library.java.proto_google_cloud_bigtable_v2
8382
compile library.java.proto_google_cloud_datastore_v1
8483
compile library.java.proto_google_cloud_pubsub_v1

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ public void processElement(ProcessContext ctx) throws Exception {
7070

7171
producer.send(
7272
new ProducerRecord<>(
73-
topicName, null, timestampMillis, record.key(), record.value(), record.headers()),
73+
topicName,
74+
record.partition(),
75+
timestampMillis,
76+
record.key(),
77+
record.value(),
78+
record.headers()),
7479
new SendCallback());
7580

7681
elementsWritten.inc();

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,9 +1435,49 @@ public void testSinkProducerRecordsWithCustomTS() throws Exception {
14351435
}
14361436
}
14371437

1438+
@Test
1439+
public void testSinkProducerRecordsWithCustomPartition() throws Exception {
1440+
int numElements = 1000;
1441+
1442+
try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
1443+
1444+
ProducerSendCompletionThread completionThread =
1445+
new ProducerSendCompletionThread(producerWrapper.mockProducer).start();
1446+
1447+
final String defaultTopic = "test";
1448+
final Integer partition = 1;
1449+
1450+
p.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
1451+
.apply(ParDo.of(new KV2ProducerRecord(defaultTopic, partition)))
1452+
.setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of()))
1453+
.apply(
1454+
KafkaIO.<Integer, Long>writeRecords()
1455+
.withBootstrapServers("none")
1456+
.withKeySerializer(IntegerSerializer.class)
1457+
.withValueSerializer(LongSerializer.class)
1458+
.withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));
1459+
1460+
p.run();
1461+
1462+
completionThread.shutdown();
1463+
1464+
// Verify that messages are written with user-defined timestamp
1465+
List<ProducerRecord<Integer, Long>> sent = producerWrapper.mockProducer.history();
1466+
1467+
for (int i = 0; i < numElements; i++) {
1468+
ProducerRecord<Integer, Long> record = sent.get(i);
1469+
assertEquals(defaultTopic, record.topic());
1470+
assertEquals(partition, record.partition());
1471+
assertEquals(i, record.key().intValue());
1472+
assertEquals(i, record.value().longValue());
1473+
}
1474+
}
1475+
}
1476+
14381477
private static class KV2ProducerRecord
14391478
extends DoFn<KV<Integer, Long>, ProducerRecord<Integer, Long>> {
14401479
final String topic;
1480+
final Integer partition;
14411481
final boolean isSingleTopic;
14421482
final Long ts;
14431483
final SimpleEntry<String, String> header;
@@ -1446,6 +1486,10 @@ private static class KV2ProducerRecord
14461486
this(topic, true);
14471487
}
14481488

1489+
KV2ProducerRecord(String topic, Integer partition) {
1490+
this(topic, true, null, null, partition);
1491+
}
1492+
14491493
KV2ProducerRecord(String topic, Long ts) {
14501494
this(topic, true, ts);
14511495
}
@@ -1455,12 +1499,22 @@ private static class KV2ProducerRecord
14551499
}
14561500

14571501
KV2ProducerRecord(String topic, boolean isSingleTopic, Long ts) {
1458-
this(topic, isSingleTopic, ts, null);
1502+
this(topic, isSingleTopic, ts, null, null);
14591503
}
14601504

14611505
KV2ProducerRecord(
14621506
String topic, boolean isSingleTopic, Long ts, SimpleEntry<String, String> header) {
1507+
this(topic, isSingleTopic, ts, header, null);
1508+
}
1509+
1510+
KV2ProducerRecord(
1511+
String topic,
1512+
boolean isSingleTopic,
1513+
Long ts,
1514+
SimpleEntry<String, String> header,
1515+
Integer partition) {
14631516
this.topic = topic;
1517+
this.partition = partition;
14641518
this.isSingleTopic = isSingleTopic;
14651519
this.ts = ts;
14661520
this.header = header;
@@ -1477,14 +1531,16 @@ public void processElement(ProcessContext ctx) {
14771531
header.getKey(), header.getValue().getBytes(StandardCharsets.UTF_8)));
14781532
}
14791533
if (isSingleTopic) {
1480-
ctx.output(new ProducerRecord<>(topic, null, ts, kv.getKey(), kv.getValue(), headers));
1534+
ctx.output(new ProducerRecord<>(topic, partition, ts, kv.getKey(), kv.getValue(), headers));
14811535
} else {
14821536
if (kv.getKey() % 2 == 0) {
14831537
ctx.output(
1484-
new ProducerRecord<>(topic + "_2", null, ts, kv.getKey(), kv.getValue(), headers));
1538+
new ProducerRecord<>(
1539+
topic + "_2", partition, ts, kv.getKey(), kv.getValue(), headers));
14851540
} else {
14861541
ctx.output(
1487-
new ProducerRecord<>(topic + "_1", null, ts, kv.getKey(), kv.getValue(), headers));
1542+
new ProducerRecord<>(
1543+
topic + "_1", partition, ts, kv.getKey(), kv.getValue(), headers));
14881544
}
14891545
}
14901546
}

sdks/python/apache_beam/runners/interactive/interactive_environment_test.py

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -182,75 +182,69 @@ def test_cleanup_invoked_when_new_env_replace_not_none_env(self):
182182
mocked_cleanup.assert_called_once()
183183

184184
def test_cleanup_not_invoked_when_cm_changed_from_none(self):
185-
ie.new_env()
185+
env = ie.InteractiveEnvironment()
186186
with patch('apache_beam.runners.interactive.interactive_environment'
187187
'.InteractiveEnvironment.cleanup') as mocked_cleanup:
188188
dummy_pipeline = 'dummy'
189-
self.assertIsNone(ie.current_env().get_cache_manager(dummy_pipeline))
189+
self.assertIsNone(env.get_cache_manager(dummy_pipeline))
190190
cache_manager = cache.FileBasedCacheManager()
191-
ie.current_env().set_cache_manager(cache_manager, dummy_pipeline)
191+
env.set_cache_manager(cache_manager, dummy_pipeline)
192192
mocked_cleanup.assert_not_called()
193-
self.assertIs(
194-
ie.current_env().get_cache_manager(dummy_pipeline), cache_manager)
193+
self.assertIs(env.get_cache_manager(dummy_pipeline), cache_manager)
195194

196195
def test_cleanup_invoked_when_not_none_cm_changed(self):
197-
ie.new_env()
196+
env = ie.InteractiveEnvironment()
198197
with patch('apache_beam.runners.interactive.interactive_environment'
199198
'.InteractiveEnvironment.cleanup') as mocked_cleanup:
200199
dummy_pipeline = 'dummy'
201-
ie.current_env().set_cache_manager(
202-
cache.FileBasedCacheManager(), dummy_pipeline)
200+
env.set_cache_manager(cache.FileBasedCacheManager(), dummy_pipeline)
203201
mocked_cleanup.assert_not_called()
204-
ie.current_env().set_cache_manager(
205-
cache.FileBasedCacheManager(), dummy_pipeline)
202+
env.set_cache_manager(cache.FileBasedCacheManager(), dummy_pipeline)
206203
mocked_cleanup.assert_called_once()
207204

208205
def test_noop_when_cm_is_not_changed(self):
209206
cache_manager = cache.FileBasedCacheManager()
210207
dummy_pipeline = 'dummy'
211-
ie.new_env()
208+
env = ie.InteractiveEnvironment()
212209
with patch('apache_beam.runners.interactive.interactive_environment'
213210
'.InteractiveEnvironment.cleanup') as mocked_cleanup:
214-
ie.current_env()._cache_managers[str(id(dummy_pipeline))] = cache_manager
211+
env._cache_managers[str(id(dummy_pipeline))] = cache_manager
215212
mocked_cleanup.assert_not_called()
216-
ie.current_env().set_cache_manager(cache_manager, dummy_pipeline)
213+
env.set_cache_manager(cache_manager, dummy_pipeline)
217214
mocked_cleanup.assert_not_called()
218215

219216
def test_get_cache_manager_creates_cache_manager_if_absent(self):
220-
ie.new_env()
217+
env = ie.InteractiveEnvironment()
221218
dummy_pipeline = 'dummy'
222-
self.assertIsNone(ie.current_env().get_cache_manager(dummy_pipeline))
219+
self.assertIsNone(env.get_cache_manager(dummy_pipeline))
223220
self.assertIsNotNone(
224-
ie.current_env().get_cache_manager(
225-
dummy_pipeline, create_if_absent=True))
221+
env.get_cache_manager(dummy_pipeline, create_if_absent=True))
226222

227223
def test_track_user_pipeline_cleanup_non_inspectable_pipeline(self):
228224
ie.new_env()
225+
dummy_pipeline_1 = beam.Pipeline()
226+
dummy_pipeline_2 = beam.Pipeline()
227+
dummy_pipeline_3 = beam.Pipeline()
228+
dummy_pipeline_4 = beam.Pipeline()
229+
dummy_pcoll = dummy_pipeline_4 | beam.Create([1])
230+
dummy_pipeline_5 = beam.Pipeline()
231+
dummy_non_inspectable_pipeline = 'dummy'
232+
ie.current_env().watch(locals())
233+
from apache_beam.runners.interactive.background_caching_job import BackgroundCachingJob
234+
ie.current_env().set_background_caching_job(
235+
dummy_pipeline_1,
236+
BackgroundCachingJob(
237+
runner.PipelineResult(runner.PipelineState.DONE), limiters=[]))
238+
ie.current_env().set_test_stream_service_controller(dummy_pipeline_2, None)
239+
ie.current_env().set_cache_manager(
240+
cache.FileBasedCacheManager(), dummy_pipeline_3)
241+
ie.current_env().mark_pcollection_computed([dummy_pcoll])
242+
ie.current_env().set_cached_source_signature(
243+
dummy_non_inspectable_pipeline, None)
244+
ie.current_env().set_pipeline_result(
245+
dummy_pipeline_5, runner.PipelineResult(runner.PipelineState.RUNNING))
229246
with patch('apache_beam.runners.interactive.interactive_environment'
230247
'.InteractiveEnvironment.cleanup') as mocked_cleanup:
231-
dummy_pipeline_1 = beam.Pipeline()
232-
dummy_pipeline_2 = beam.Pipeline()
233-
dummy_pipeline_3 = beam.Pipeline()
234-
dummy_pipeline_4 = beam.Pipeline()
235-
dummy_pcoll = dummy_pipeline_4 | beam.Create([1])
236-
dummy_pipeline_5 = beam.Pipeline()
237-
dummy_non_inspectable_pipeline = 'dummy'
238-
ie.current_env().watch(locals())
239-
from apache_beam.runners.interactive.background_caching_job import BackgroundCachingJob
240-
ie.current_env().set_background_caching_job(
241-
dummy_pipeline_1,
242-
BackgroundCachingJob(
243-
runner.PipelineResult(runner.PipelineState.DONE), limiters=[]))
244-
ie.current_env().set_test_stream_service_controller(
245-
dummy_pipeline_2, None)
246-
ie.current_env().set_cache_manager(
247-
cache.FileBasedCacheManager(), dummy_pipeline_3)
248-
ie.current_env().mark_pcollection_computed([dummy_pcoll])
249-
ie.current_env().set_cached_source_signature(
250-
dummy_non_inspectable_pipeline, None)
251-
ie.current_env().set_pipeline_result(
252-
dummy_pipeline_5, runner.PipelineResult(runner.PipelineState.RUNNING))
253-
mocked_cleanup.assert_not_called()
254248
ie.current_env().track_user_pipelines()
255249
mocked_cleanup.assert_called_once()
256250

0 commit comments

Comments
 (0)