2828import com .google .api .gax .core .ExecutorProvider ;
2929import com .google .api .gax .core .FixedCredentialsProvider ;
3030import com .google .api .gax .core .InstantiatingExecutorProvider ;
31- import com .google .api .gax .grpc .ChannelProvider ;
3231import com .google .auth .oauth2 .ServiceAccountCredentials ;
3332import com .google .cloud .pubsub .v1 .AckReplyConsumer ;
3433import com .google .cloud .pubsub .v1 .MessageReceiver ;
3534import com .google .cloud .pubsub .v1 .Subscriber ;
36- import com .google .cloud .pubsub .v1 .TopicAdminSettings ;
35+ import com .google .cloud .pubsub .v1 .SubscriptionAdminSettings ;
36+ import com .google .cloud .pubsub .v1 .stub .GrpcSubscriberStub ;
37+ import com .google .cloud .pubsub .v1 .stub .SubscriberStub ;
3738import com .google .common .util .concurrent .MoreExecutors ;
39+ import com .google .pubsub .v1 .AcknowledgeRequest ;
3840import com .google .pubsub .v1 .PubsubMessage ;
41+ import com .google .pubsub .v1 .PullRequest ;
42+ import com .google .pubsub .v1 .PullResponse ;
43+ import com .google .pubsub .v1 .ReceivedMessage ;
3944import com .google .pubsub .v1 .SubscriptionName ;
4045import java .io .FileInputStream ;
46+ import java .util .ArrayList ;
47+ import java .util .List ;
4148import java .util .concurrent .Executor ;
4249
4350/** This class contains snippets for the {@link Subscriber} interface. */
4451public class SubscriberSnippets {
4552
4653 private final SubscriptionName subscriptionName ;
54+
4755 private final MessageReceiver receiver ;
56+
4857 private final ApiFuture <Void > done ;
58+
4959 private final Executor executor ;
5060
5161 public SubscriberSnippets (
@@ -62,11 +72,13 @@ public SubscriberSnippets(
6272 // [TARGET startAsync()]
6373 public void startAndWait () throws Exception {
6474 Subscriber subscriber = Subscriber .defaultBuilder (subscriptionName , receiver ).build ();
65- subscriber .addListener (new Subscriber .Listener () {
66- public void failed (Subscriber .State from , Throwable failure ) {
67- // Handle error.
68- }
69- }, executor );
75+ subscriber .addListener (
76+ new Subscriber .Listener () {
77+ public void failed (Subscriber .State from , Throwable failure ) {
78+ // Handle error.
79+ }
80+ },
81+ executor );
7082 subscriber .startAsync ();
7183
7284 // Wait for a stop signal.
@@ -81,7 +93,8 @@ private void createSubscriber() throws Exception {
8193
8294 SubscriptionName subscriptionName = SubscriptionName .create (projectId , subscriptionId );
8395 // Instantiate an asynchronous message receiver
84- MessageReceiver receiver = new MessageReceiver () {
96+ MessageReceiver receiver =
97+ new MessageReceiver () {
8598 @ Override
8699 public void receiveMessage (PubsubMessage message , AckReplyConsumer consumer ) {
87100 // handle incoming message, then ack/nack the received message
@@ -108,53 +121,94 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
108121
109122 private Subscriber createSubscriberWithErrorListener (Subscriber subscriber ) throws Exception {
110123 // [START pubsub_subscriber_error_listener]
111- subscriber .addListener (new Subscriber .Listener () {
112- public void failed (Subscriber .State from , Throwable failure ) {
113- // Handle error.
114- }
115- }, MoreExecutors .directExecutor ());
124+ subscriber .addListener (
125+ new Subscriber .Listener () {
126+ public void failed (Subscriber .State from , Throwable failure ) {
127+ // Handle error.
128+ }
129+ },
130+ MoreExecutors .directExecutor ());
116131 // [END pubsub_subscriber_error_listener]
117132 return subscriber ;
118133 }
119134
120135 private Subscriber createSingleThreadedSubscriber () throws Exception {
121136 // [START pubsub_subscriber_single_threaded]
122137 // provide a separate executor service for polling
123- ExecutorProvider executorProvider = InstantiatingExecutorProvider . newBuilder ()
124- .setExecutorThreadCount (1 ).build ();
138+ ExecutorProvider executorProvider =
139+ InstantiatingExecutorProvider . newBuilder () .setExecutorThreadCount (1 ).build ();
125140
126- Subscriber subscriber = Subscriber .defaultBuilder (subscriptionName , receiver )
127- .setExecutorProvider (executorProvider )
128- .build ();
141+ Subscriber subscriber =
142+ Subscriber .defaultBuilder (subscriptionName , receiver )
143+ .setExecutorProvider (executorProvider )
144+ .build ();
129145 // [END pubsub_subscriber_single_threaded]
130146 return subscriber ;
131147 }
132148
133149 private Subscriber createSubscriberWithCustomFlowSettings () throws Exception {
134150 // [START pubsub_subscriber_flow_settings]
135- int maxMessageCount = 10 ;
151+ long maxMessageCount = 10L ;
136152 // Configure max number of messages to be pulled
137- FlowControlSettings flowControlSettings = FlowControlSettings . newBuilder ()
138- . setMaxOutstandingElementCount (maxMessageCount )
139- . build ();
140- Subscriber subscriber = Subscriber .defaultBuilder (subscriptionName , receiver )
141- .setFlowControlSettings (flowControlSettings )
142- .build ();
153+ FlowControlSettings flowControlSettings =
154+ FlowControlSettings . newBuilder (). setMaxOutstandingElementCount (maxMessageCount ). build ();
155+ Subscriber subscriber =
156+ Subscriber .defaultBuilder (subscriptionName , receiver )
157+ .setFlowControlSettings (flowControlSettings )
158+ .build ();
143159 // [END pubsub_subscriber_flow_settings]
144160 return subscriber ;
145161 }
146162
147163 private Subscriber createSubscriberWithCustomCredentials () throws Exception {
148164 // [START pubsub_subscriber_custom_credentials]
149165 CredentialsProvider credentialsProvider =
150- FixedCredentialsProvider
151- .create (ServiceAccountCredentials .fromStream (
152- new FileInputStream ("credentials.json" )));
166+ FixedCredentialsProvider .create (
167+ ServiceAccountCredentials .fromStream (new FileInputStream ("credentials.json" )));
153168
154- Subscriber subscriber = Subscriber .defaultBuilder (subscriptionName , receiver )
155- .setCredentialsProvider (credentialsProvider )
156- .build ();
169+ Subscriber subscriber =
170+ Subscriber .defaultBuilder (subscriptionName , receiver )
171+ .setCredentialsProvider (credentialsProvider )
172+ .build ();
157173 // [START pubsub_subscriber_custom_credentials]
158174 return subscriber ;
159175 }
176+
177+ static List <ReceivedMessage > createSubscriberWithSyncPull (
178+ String projectId , String subscriptionId , int numOfMessages ) throws Exception {
179+ // [START subscriber_sync_pull]
180+ SubscriptionAdminSettings subscriptionAdminSettings =
181+ SubscriptionAdminSettings .newBuilder ().build ();
182+ try (SubscriberStub subscriber = GrpcSubscriberStub .create (subscriptionAdminSettings )) {
183+ // String projectId = "my-project-id";
184+ // String subscriptionId = "my-subscription-id";
185+ // int numOfMessages = 10; // max number of messages to be pulled
186+ String subscriptionName = SubscriptionName .create (projectId , subscriptionId ).toString ();
187+ PullRequest pullRequest =
188+ PullRequest .newBuilder ()
189+ .setMaxMessages (numOfMessages )
190+ .setReturnImmediately (false ) // return immediately if messages are not available
191+ .setSubscription (subscriptionName )
192+ .build ();
193+
194+ // use pullCallable().futureCall to asynchronously perform this operation
195+ PullResponse pullResponse = subscriber .pullCallable ().call (pullRequest );
196+ List <String > ackIds = new ArrayList <>();
197+ for (ReceivedMessage message : pullResponse .getReceivedMessagesList ()) {
198+ // handle received message
199+ // ...
200+ ackIds .add (message .getAckId ());
201+ }
202+ // acknowledge received messages
203+ AcknowledgeRequest acknowledgeRequest =
204+ AcknowledgeRequest .newBuilder ()
205+ .setSubscription (subscriptionName )
206+ .addAllAckIds (ackIds )
207+ .build ();
208+ // use acknowledgeCallable().futureCall to asynchronously perform this operation
209+ subscriber .acknowledgeCallable ().call (acknowledgeRequest );
210+ return pullResponse .getReceivedMessagesList ();
211+ }
212+ // [END subscriber_sync_pull]
213+ }
160214}
0 commit comments