1515#include " google/cloud/pubsub/internal/subscription_lease_management.h"
1616#include " google/cloud/pubsub/testing/mock_subscription_batch_source.h"
1717#include " google/cloud/internal/background_threads_impl.h"
18+ #include " google/cloud/testing_util/fake_completion_queue_impl.h"
1819#include " google/cloud/testing_util/status_matchers.h"
1920#include < gmock/gmock.h>
2021#include < vector>
@@ -25,9 +26,9 @@ namespace pubsub_internal {
2526inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
2627namespace {
2728
29+ using ::google::cloud::testing_util::FakeCompletionQueueImpl;
2830using ::google::cloud::testing_util::IsOk;
2931using ::google::cloud::testing_util::StatusIs;
30- using ::testing::SizeIs;
3132using ::testing::UnorderedElementsAre;
3233
3334google::pubsub::v1::StreamingPullResponse GenerateMessages (
@@ -85,41 +86,35 @@ TEST(SubscriptionLeaseManagementTest, NormalLifecycle) {
8586 EXPECT_CALL (*mock, Shutdown).Times (1 );
8687 }
8788
88- google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
89- std::vector<promise<Status>> timers;
90- int cancel_count = 0 ;
91- auto make_timer = [&](std::chrono::system_clock::time_point) {
92- promise<Status> p ([&cancel_count] { ++cancel_count; });
93- auto f = p.get_future ().then ([](future<Status> f) { return f.get (); });
94- timers.push_back (std::move (p));
95- return f;
96- };
89+ auto fake_cq = std::make_shared<FakeCompletionQueueImpl>();
90+ CompletionQueue cq (fake_cq);
91+
9792 auto shutdown_manager = std::make_shared<SessionShutdownManager>();
98- auto uut = SubscriptionLeaseManagement::CreateForTesting (
99- background.cq (), shutdown_manager, make_timer, mock, kTestDeadline ,
100- std::chrono::seconds (600 ));
93+ auto uut = SubscriptionLeaseManagement::Create (
94+ cq, shutdown_manager, mock, kTestDeadline , std::chrono::seconds (600 ));
10195
10296 auto done = shutdown_manager->Start ({});
10397 uut->Start ([](StatusOr<google::pubsub::v1::StreamingPullResponse> const &) {});
10498
10599 batch_callback (GenerateMessages (" 0-" , 3 ));
106- ASSERT_THAT (timers, SizeIs ( 1 ));
100+ ASSERT_EQ ( 1U , fake_cq-> size ( ));
107101
108102 // Ack one of the messages and then fire the timer. The expectations set above
109103 // will verify that only the remaining messages have their lease extended.
110104 uut->AckMessage (" ack-0-1" );
111- timers[ 0 ]. set_value ({} );
112- ASSERT_THAT (timers, SizeIs ( 2 ));
105+ fake_cq-> SimulateCompletion ( true );
106+ ASSERT_EQ ( 1U , fake_cq-> size ( ));
113107
114108 // Ack one more message and trigger the new timer.
115109 uut->NackMessage (" ack-0-2" );
116- timers[ 1 ]. set_value ({} );
117- ASSERT_THAT (timers, SizeIs ( 3 ));
110+ fake_cq-> SimulateCompletion ( true );
111+ ASSERT_EQ ( 1U , fake_cq-> size ( ));
118112
119113 shutdown_manager->MarkAsShutdown (__func__, Status{});
120114 uut->Shutdown ();
121- EXPECT_EQ (3 , cancel_count);
122- timers[2 ].set_value (Status (StatusCode::kCancelled , " test-cancel" ));
115+
116+ fake_cq->SimulateCompletion (false );
117+ ASSERT_EQ (0U , fake_cq->size ());
123118 EXPECT_THAT (done.get (), IsOk ());
124119}
125120
@@ -130,30 +125,25 @@ TEST(SubscriptionLeaseManagementTest, ShutdownOnError) {
130125 batch_callback = std::move (cb);
131126 });
132127
133- google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
134- std::vector<promise<Status>> timers;
135- int cancel_count = 0 ;
136- auto make_timer = [&](std::chrono::system_clock::time_point) {
137- promise<Status> p ([&cancel_count] { ++cancel_count; });
138- auto f = p.get_future ().then ([](future<Status> f) { return f.get (); });
139- timers.push_back (std::move (p));
140- return f;
141- };
128+ auto fake_cq = std::make_shared<FakeCompletionQueueImpl>();
129+ CompletionQueue cq (fake_cq);
130+
142131 auto shutdown_manager = std::make_shared<SessionShutdownManager>();
143- auto uut = SubscriptionLeaseManagement::CreateForTesting (
144- background. cq (), shutdown_manager, make_timer, mock ,
145- std::chrono::seconds ( 345 ), std::chrono::seconds (600 ));
132+ auto uut = SubscriptionLeaseManagement::Create (cq, shutdown_manager, mock,
133+ std::chrono::seconds ( 345 ) ,
134+ std::chrono::seconds (600 ));
146135
147136 auto done = shutdown_manager->Start ({});
148137 uut->Start ([](StatusOr<google::pubsub::v1::StreamingPullResponse> const &) {});
149138 batch_callback (GenerateMessages (" 0-" , 3 ));
150- EXPECT_THAT (timers, SizeIs ( 1 ));
139+ EXPECT_EQ ( 1U , fake_cq-> size ( ));
151140
152141 batch_callback (StatusOr<google::pubsub::v1::StreamingPullResponse>(
153142 Status (StatusCode::kPermissionDenied , " uh-oh" )));
154- ASSERT_THAT (timers, SizeIs ( 1 ));
143+ ASSERT_EQ ( 1U , fake_cq-> size ( ));
155144
156- timers[0 ].set_value (Status (StatusCode::kCancelled , " test-cancel" ));
145+ fake_cq->SimulateCompletion (false );
146+ ASSERT_EQ (0U , fake_cq->size ());
157147 EXPECT_THAT (done.get (), StatusIs (StatusCode::kPermissionDenied ));
158148}
159149
@@ -198,32 +188,27 @@ TEST(SubscriptionLeaseManagementTest, UsesDeadlineExtension) {
198188 EXPECT_CALL (*mock, Shutdown).Times (1 );
199189 }
200190
201- google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
202- std::vector<promise<Status>> timers;
203- auto make_timer = [&](std::chrono::system_clock::time_point) {
204- promise<Status> p;
205- auto f = p.get_future ().then ([](future<Status> f) { return f.get (); });
206- timers.push_back (std::move (p));
207- return f;
208- };
191+ auto fake_cq = std::make_shared<FakeCompletionQueueImpl>();
192+ CompletionQueue cq (fake_cq);
193+
209194 auto shutdown_manager = std::make_shared<SessionShutdownManager>();
210- auto uut = SubscriptionLeaseManagement::CreateForTesting (
211- background.cq (), shutdown_manager, make_timer, mock, kTestDeadline ,
212- kTestExtension );
195+ auto uut = SubscriptionLeaseManagement::Create (cq, shutdown_manager, mock,
196+ kTestDeadline , kTestExtension );
213197
214198 auto done = shutdown_manager->Start ({});
215199 uut->Start ([](StatusOr<google::pubsub::v1::StreamingPullResponse> const &) {});
216200
217201 batch_callback (GenerateMessages (" 0-" , 1 ));
218- ASSERT_THAT (timers, SizeIs ( 1 ));
202+ ASSERT_EQ ( 1U , fake_cq-> size ( ));
219203
220204 // Ignore message and then fire the timer. This will extend the deadline.
221- timers[ 0 ]. set_value ({} );
222- ASSERT_THAT (timers, SizeIs ( 2 ));
205+ fake_cq-> SimulateCompletion ( true );
206+ ASSERT_EQ ( 1U , fake_cq-> size ( ));
223207
224208 shutdown_manager->MarkAsShutdown (__func__, Status{});
225209 uut->Shutdown ();
226- timers[1 ].set_value (Status (StatusCode::kCancelled , " test-cancel" ));
210+ fake_cq->SimulateCompletion (false );
211+ ASSERT_EQ (0U , fake_cq->size ());
227212 EXPECT_THAT (done.get (), IsOk ());
228213}
229214
0 commit comments