Skip to content

Commit 374210e

Browse files
committed
merge pubsub-hp into master
2 parents 8f4828f + 709d21d commit 374210e

29 files changed

Lines changed: 4804 additions & 444 deletions

google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 Google Inc. All Rights Reserved.
2+
* Copyright 2017 Google Inc. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,13 +16,15 @@
1616

1717
package com.google.cloud.examples.pubsub.snippets;
1818

19-
import com.google.cloud.pubsub.Message;
20-
import com.google.cloud.pubsub.PubSub;
21-
import com.google.cloud.pubsub.PubSub.MessageConsumer;
22-
import com.google.cloud.pubsub.PubSub.MessageProcessor;
23-
import com.google.cloud.pubsub.PubSubOptions;
24-
import com.google.cloud.pubsub.Subscription;
25-
import com.google.cloud.pubsub.SubscriptionInfo;
19+
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
20+
import com.google.cloud.pubsub.spi.v1.Subscriber;
21+
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
22+
import com.google.common.util.concurrent.MoreExecutors;
23+
import com.google.common.util.concurrent.SettableFuture;
24+
import com.google.pubsub.v1.PubsubMessage;
25+
import com.google.pubsub.v1.PushConfig;
26+
import com.google.pubsub.v1.SubscriptionName;
27+
import com.google.pubsub.v1.TopicName;
2628

2729
/**
2830
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and
@@ -31,18 +33,38 @@
3133
public class CreateSubscriptionAndPullMessages {
3234

3335
public static void main(String... args) throws Exception {
34-
try (PubSub pubsub = PubSubOptions.getDefaultInstance().getService()) {
35-
Subscription subscription =
36-
pubsub.create(SubscriptionInfo.of("test-topic", "test-subscription"));
37-
MessageProcessor callback = new MessageProcessor() {
38-
@Override
39-
public void process(Message message) throws Exception {
40-
System.out.printf("Received message \"%s\"%n", message.getPayloadAsString());
41-
}
42-
};
43-
// Create a message consumer and pull messages (for 60 seconds)
44-
try (MessageConsumer consumer = subscription.pullAsync(callback)) {
45-
Thread.sleep(60_000);
36+
TopicName topic = TopicName.create("test-project", "test-topic");
37+
SubscriptionName subscription = SubscriptionName.create("test-project", "test-subscription");
38+
39+
try (SubscriberClient subscriberClient = SubscriberClient.create()) {
40+
subscriberClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
41+
}
42+
43+
MessageReceiver receiver =
44+
new MessageReceiver() {
45+
@Override
46+
public void receiveMessage(
47+
PubsubMessage message, SettableFuture<MessageReceiver.AckReply> response) {
48+
System.out.println("got message: " + message.getData().toStringUtf8());
49+
response.set(MessageReceiver.AckReply.ACK);
50+
}
51+
};
52+
Subscriber subscriber = null;
53+
try {
54+
subscriber = Subscriber.newBuilder(subscription, receiver).build();
55+
subscriber.addListener(
56+
new Subscriber.SubscriberListener() {
57+
@Override
58+
public void failed(Subscriber.State from, Throwable failure) {
59+
System.err.println(failure);
60+
}
61+
},
62+
MoreExecutors.directExecutor());
63+
subscriber.startAsync().awaitRunning();
64+
Thread.sleep(60000);
65+
} finally {
66+
if (subscriber != null) {
67+
subscriber.stopAsync();
4668
}
4769
}
4870
}
Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016 Google Inc. All Rights Reserved.
2+
* Copyright 2017 Google Inc. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,24 +16,46 @@
1616

1717
package com.google.cloud.examples.pubsub.snippets;
1818

19-
import com.google.cloud.pubsub.Message;
20-
import com.google.cloud.pubsub.PubSub;
21-
import com.google.cloud.pubsub.PubSubOptions;
22-
import com.google.cloud.pubsub.Topic;
23-
import com.google.cloud.pubsub.TopicInfo;
19+
import com.google.cloud.pubsub.spi.v1.Publisher;
20+
import com.google.cloud.pubsub.spi.v1.PublisherClient;
21+
import com.google.common.util.concurrent.Futures;
22+
import com.google.common.util.concurrent.ListenableFuture;
23+
import com.google.protobuf.ByteString;
24+
import com.google.pubsub.v1.PubsubMessage;
25+
import com.google.pubsub.v1.TopicName;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.List;
2429

2530
/**
2631
* A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously
2732
* publish messages to it.
2833
*/
2934
public class CreateTopicAndPublishMessages {
30-
3135
public static void main(String... args) throws Exception {
32-
try (PubSub pubsub = PubSubOptions.getDefaultInstance().getService()) {
33-
Topic topic = pubsub.create(TopicInfo.of("test-topic"));
34-
Message message1 = Message.of("First message");
35-
Message message2 = Message.of("Second message");
36-
topic.publishAsync(message1, message2);
36+
TopicName topic = TopicName.create("test-project", "test-topic");
37+
try (PublisherClient publisherClient = PublisherClient.create()) {
38+
publisherClient.createTopic(topic);
39+
}
40+
41+
Publisher publisher = null;
42+
try {
43+
publisher = Publisher.newBuilder(topic).build();
44+
List<String> messages = Arrays.asList("first message", "second message");
45+
List<ListenableFuture<String>> messageIds = new ArrayList<>();
46+
for (String message : messages) {
47+
ByteString data = ByteString.copyFromUtf8(message);
48+
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
49+
ListenableFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
50+
messageIds.add(messageIdFuture);
51+
}
52+
for (String messageId : Futures.allAsList(messageIds).get()) {
53+
System.out.println("published with message ID: " + messageId);
54+
}
55+
} finally {
56+
if (publisher != null) {
57+
publisher.shutdown();
58+
}
3759
}
3860
}
3961
}

google-cloud-pubsub/pom.xml

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,36 @@
5353
<artifactId>grpc-auth</artifactId>
5454
<version>${grpc.version}</version>
5555
</dependency>
56+
<dependency>
57+
<groupId>com.google.guava</groupId>
58+
<artifactId>guava</artifactId>
59+
<version>20.0</version>
60+
</dependency>
61+
<dependency>
62+
<groupId>com.google.errorprone</groupId>
63+
<artifactId>error_prone_annotations</artifactId>
64+
<version>2.0.15</version>
65+
</dependency>
66+
<dependency>
67+
<groupId>joda-time</groupId>
68+
<artifactId>joda-time</artifactId>
69+
<version>2.9.4</version>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.slf4j</groupId>
73+
<artifactId>slf4j-api</artifactId>
74+
<version>1.7.21</version>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.slf4j</groupId>
78+
<artifactId>slf4j-log4j12</artifactId>
79+
<version>1.7.21</version>
80+
</dependency>
81+
<dependency>
82+
<groupId>log4j</groupId>
83+
<artifactId>log4j</artifactId>
84+
<version>1.2.17</version>
85+
</dependency>
5686
<dependency>
5787
<groupId>${project.groupId}</groupId>
5888
<artifactId>google-cloud-core</artifactId>
@@ -72,6 +102,12 @@
72102
<version>3.4</version>
73103
<scope>test</scope>
74104
</dependency>
105+
<dependency>
106+
<groupId>org.mockito</groupId>
107+
<artifactId>mockito-all</artifactId>
108+
<version>1.9.5</version>
109+
<scope>test</scope>
110+
</dependency>
75111
</dependencies>
76112
<profiles>
77113
<profile>
@@ -86,6 +122,13 @@
86122
</profile>
87123
</profiles>
88124
<build>
125+
<extensions>
126+
<extension>
127+
<groupId>kr.motd.maven</groupId>
128+
<artifactId>os-maven-plugin</artifactId>
129+
<version>1.4.0.Final</version>
130+
</extension>
131+
</extensions>
89132
<plugins>
90133
<plugin>
91134
<groupId>org.codehaus.mojo</groupId>
@@ -121,10 +164,7 @@
121164
</plugin>
122165
<plugin>
123166
<artifactId>maven-compiler-plugin</artifactId>
124-
<!-- Downgrading to 3.1 because of https://issues.apache.org/jira/browse/MCOMPILER-236 -->
125-
<!-- Upgrade to 3.5.1 which fixes the problem when available -->
126-
<!-- <version>3.5.1</version> -->
127-
<version>3.1</version>
167+
<version>3.5.1</version>
128168
<configuration>
129169
<source>1.7</source>
130170
<target>1.7</target>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2017 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub.spi.v1;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Preconditions;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
/** Provides a simplistic round robin, guarding for overflow. */
24+
class AtomicRoundRobin {
25+
private final int max;
26+
private final AtomicInteger current;
27+
28+
AtomicRoundRobin(int max) {
29+
Preconditions.checkArgument(max > 0);
30+
this.max = max;
31+
current = new AtomicInteger(0);
32+
}
33+
34+
int next() {
35+
int next = current.getAndIncrement() % max;
36+
if (next < 0) {
37+
next += max;
38+
}
39+
return next;
40+
}
41+
42+
@VisibleForTesting
43+
void set(int i) {
44+
current.set(i);
45+
}
46+
}

0 commit comments

Comments
 (0)