@@ -85,17 +85,7 @@ To make authenticated requests to Google Cloud Pub/Sub, you must create a servic
8585credentials. You can then make API calls by calling methods on the Pub/Sub service object. The
8686simplest way to authenticate is to use
8787[ Application Default Credentials] ( https://developers.google.com/identity/protocols/application-default-credentials ) .
88- These credentials are automatically inferred from your environment, so you only need the following
89- code to create your service object:
90-
91- ``` java
92- import com.google.cloud.pubsub.PubSub ;
93- import com.google.cloud.pubsub.PubSubOptions ;
94-
95- try (PubSub pubsub = PubSubOptions . getDefaultInstance(). getService()) {
96- // use pubsub here
97- }
98- ```
88+ These credentials are automatically inferred from your environment.
9989
10090For other authentication options, see the
10191[ Authentication] ( https://github.com/GoogleCloudPlatform/google-cloud-java#authentication ) page.
@@ -105,65 +95,108 @@ With Pub/Sub you can create topics. A topic is a named resource to which message
10595publishers. Add the following imports at the top of your file:
10696
10797``` java
108- import com.google.cloud.pubsub.Topic ;
109- import com.google.cloud. pubsub.TopicInfo ;
98+ import com.google.cloud.pubsub.spi.v1.PublisherClient ;
99+ import com.google.pubsub.v1.TopicName ;
110100```
111101Then, to create the topic, use the following code:
112102
113103``` java
114- Topic topic = pubsub. create(TopicInfo . of(" test-topic" ));
104+ TopicName topic = TopicName . create(" test-project" , " test-topic" );
105+ try (PublisherClient publisherClient = PublisherClient . create()) {
106+ publisherClient. createTopic(topic);
107+ }
115108```
116109
117110#### Publishing messages
118111With Pub/Sub you can publish messages to a topic. Add the following import at the top of your file:
119112
120113``` java
121- import com.google.cloud.pubsub.Message ;
114+ import com.google.api.gax.core.RpcFuture ;
115+ import com.google.cloud.pubsub.spi.v1.Publisher ;
116+ import com.google.protobuf.ByteString ;
117+ import com.google.pubsub.v1.PubsubMessage ;
122118```
123119Then, to publish messages asynchronously, use the following code:
124120
125121``` java
126- Message message1 = Message . of(" First message" );
127- Message message2 = Message . of(" Second message" );
128- topic. publishAsync(message1, message2);
122+ Publisher publisher = null ;
123+ try {
124+ publisher = Publisher . newBuilder(topic). build();
125+ ByteString data = ByteString . copyFromUtf8(" my-message" );
126+ PubsubMessage pubsubMessage = PubsubMessage . newBuilder(). setData(data). build();
127+ RpcFuture<String > messageIdFuture = publisher. publish(pubsubMessage);
128+ } finally {
129+ if (publisher != null ) {
130+ publisher. shutdown();
131+ }
132+ }
129133```
130134
131135#### Creating a subscription
132136With Pub/Sub you can create subscriptions. A subscription represents the stream of messages from a
133137single, specific topic. Add the following imports at the top of your file:
134138
135139``` java
136- import com.google.cloud.pubsub.Subscription ;
137- import com.google.cloud.pubsub.SubscriptionInfo ;
140+ import com.google.cloud.pubsub.spi.v1.SubscriberClient ;
141+ import com.google.pubsub.v1.PushConfig ;
142+ import com.google.pubsub.v1.SubscriptionName ;
143+ import com.google.pubsub.v1.TopicName ;
138144```
139145Then, to create the subscription, use the following code:
140146
141147``` java
142- Subscription subscription =
143- pubsub. create(SubscriptionInfo . of(" test-topic" , " test-subscription" ));
148+ TopicName topic = TopicName . create(" test-project" , " test-topic" );
149+ SubscriptionName subscription = SubscriptionName . create(" test-project" , " test-subscription" );
150+
151+ try (SubscriberClient subscriberClient = SubscriberClient . create()) {
152+ subscriberClient. createSubscription(subscription, topic, PushConfig . getDefaultInstance(), 0 );
153+ }
144154```
145155
146156#### Pulling messages
147157With Pub/Sub you can pull messages from a subscription. Add the following imports at the top of your
148158file:
149159
150160``` java
151- import com.google.cloud.pubsub.Message ;
152- import com.google.cloud.pubsub.PubSub.MessageConsumer ;
153- import com.google.cloud.pubsub.PubSub.MessageProcessor ;
161+ import com.google.cloud.pubsub.spi.v1.AckReply ;
162+ import com.google.cloud.pubsub.spi.v1.AckReplyConsumer ;
163+ import com.google.cloud.pubsub.spi.v1.MessageReceiver ;
164+ import com.google.cloud.pubsub.spi.v1.Subscriber ;
165+ import com.google.common.util.concurrent.MoreExecutors ;
166+ import com.google.pubsub.v1.PubsubMessage ;
167+ import com.google.pubsub.v1.SubscriptionName ;
168+ import com.google.pubsub.v1.TopicName ;
154169```
155170Then, to pull messages asynchronously, use the following code:
156171
157172``` java
158- MessageProcessor callback = new MessageProcessor () {
159- @Override
160- public void process (Message message ) throws Exception {
161- System . out. printf(" Received message \" %s\" %n" , message. getPayloadAsString());
173+ MessageReceiver receiver =
174+ new MessageReceiver () {
175+ @Override
176+ public void receiveMessage (PubsubMessage message , AckReplyConsumer consumer ) {
177+ System . out. println(" got message: " + message. getData(). toStringUtf8());
178+ consumer. accept(AckReply . ACK , null );
179+ }
180+ };
181+ Subscriber subscriber = null ;
182+ try {
183+ subscriber = Subscriber . newBuilder(subscription, receiver). build();
184+ subscriber. addListener(
185+ new Subscriber .SubscriberListener () {
186+ @Override
187+ public void failed (Subscriber .State from , Throwable failure ) {
188+ // Handle failure.
189+ System . err. println(failure);
190+ }
191+ },
192+ MoreExecutors . directExecutor());
193+ subscriber. startAsync(). awaitRunning();
194+ // Pull messages for 60 seconds.
195+ Thread . sleep(60000 );
196+ } finally {
197+ if (subscriber != null ) {
198+ subscriber. stopAsync();
162199 }
163- };
164- // Create a message consumer and pull messages (for 60 seconds)
165- try (MessageConsumer consumer = subscription. pullAsync(callback)) {
166- Thread . sleep(60_000 );
167200}
168201```
169202#### Complete source code
0 commit comments