Skip to content

Commit bcdc03f

Browse files
authored
refactor(pubsub): remove timer factory (#6631)
* refactor(pubsub): remove timer factories, test using mock completion queue * refactor(pubsub): move SubscriptionSession test defaults into the test file * dont std::move a const&
1 parent 280fbd6 commit bcdc03f

6 files changed

Lines changed: 57 additions & 136 deletions

google/cloud/pubsub/internal/subscription_lease_management.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/pubsub/internal/subscription_lease_management.h"
16+
#include <chrono>
1617

1718
namespace google {
1819
namespace cloud {
@@ -135,8 +136,9 @@ void SubscriptionLeaseManagement::StartRefreshTimer(
135136

136137
shutdown_manager_->StartOperation(__func__, "OnRefreshTimer", [&] {
137138
if (refresh_timer_.valid()) refresh_timer_.cancel();
138-
refresh_timer_ = timer_factory_(deadline).then([weak](future<Status> f) {
139-
if (auto self = weak.lock()) self->OnRefreshTimer(!f.get().ok());
139+
using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;
140+
cq_.MakeDeadlineTimer(deadline).then([weak](TimerFuture tp) {
141+
if (auto self = weak.lock()) self->OnRefreshTimer(!tp.get());
140142
});
141143
});
142144
}

google/cloud/pubsub/internal/subscription_lease_management.h

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -37,46 +37,16 @@ class SubscriptionLeaseManagement
3737
static auto constexpr kAckDeadlineSlack = std::chrono::seconds(2);
3838
static auto constexpr kMinimumAckDeadline = std::chrono::seconds(10);
3939

40-
/**
41-
* A wrapper to create timers.
42-
*
43-
* TODO(#4718) - this is only needed because we cannot mock CompletionQueue.
44-
*/
45-
using TimerFactory =
46-
std::function<future<Status>(std::chrono::system_clock::time_point)>;
47-
4840
static std::shared_ptr<SubscriptionLeaseManagement> Create(
4941
google::cloud::CompletionQueue cq,
5042
std::shared_ptr<SessionShutdownManager> shutdown_manager,
5143
std::shared_ptr<SubscriptionBatchSource> child,
5244
std::chrono::seconds max_deadline_time,
5345
std::chrono::seconds max_deadline_extension) {
54-
auto timer_factory =
55-
[cq](std::chrono::system_clock::time_point tp) mutable {
56-
return cq.MakeDeadlineTimer(tp).then(
57-
[](future<StatusOr<std::chrono::system_clock::time_point>> f) {
58-
return f.get().status();
59-
});
60-
};
61-
return std::shared_ptr<SubscriptionLeaseManagement>(
62-
new SubscriptionLeaseManagement(
63-
std::move(cq), std::move(shutdown_manager),
64-
std::move(timer_factory), std::move(child), max_deadline_time,
65-
max_deadline_extension));
66-
}
67-
68-
static std::shared_ptr<SubscriptionLeaseManagement> CreateForTesting(
69-
google::cloud::CompletionQueue cq,
70-
std::shared_ptr<SessionShutdownManager> shutdown_manager,
71-
TimerFactory timer_factory,
72-
std::shared_ptr<SubscriptionBatchSource> child,
73-
std::chrono::seconds max_deadline_time,
74-
std::chrono::seconds max_deadline_extension) {
7546
return std::shared_ptr<SubscriptionLeaseManagement>(
7647
new SubscriptionLeaseManagement(
77-
std::move(cq), std::move(shutdown_manager),
78-
std::move(timer_factory), std::move(child), max_deadline_time,
79-
max_deadline_extension));
48+
std::move(cq), std::move(shutdown_manager), std::move(child),
49+
max_deadline_time, max_deadline_extension));
8050
}
8151

8252
void Start(BatchCallback cb) override;
@@ -91,12 +61,10 @@ class SubscriptionLeaseManagement
9161
SubscriptionLeaseManagement(
9262
google::cloud::CompletionQueue cq,
9363
std::shared_ptr<SessionShutdownManager> shutdown_manager,
94-
TimerFactory timer_factory,
9564
std::shared_ptr<SubscriptionBatchSource> child,
9665
std::chrono::seconds max_deadline_time,
9766
std::chrono::seconds max_deadline_extension)
9867
: cq_(std::move(cq)),
99-
timer_factory_(std::move(timer_factory)),
10068
child_(std::move(child)),
10169
shutdown_manager_(std::move(shutdown_manager)),
10270
max_deadline_time_(max_deadline_time),
@@ -119,7 +87,6 @@ class SubscriptionLeaseManagement
11987
void NackAll(std::unique_lock<std::mutex> lk);
12088

12189
google::cloud::CompletionQueue cq_;
122-
TimerFactory const timer_factory_;
12390
std::shared_ptr<SubscriptionBatchSource> const child_;
12491
std::shared_ptr<SessionShutdownManager> const shutdown_manager_;
12592
std::chrono::seconds const max_deadline_time_;

google/cloud/pubsub/internal/subscription_lease_management_test.cc

Lines changed: 35 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "google/cloud/pubsub/internal/subscription_lease_management.h"
1616
#include "google/cloud/pubsub/testing/mock_subscription_batch_source.h"
1717
#include "google/cloud/internal/background_threads_impl.h"
18+
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
1819
#include "google/cloud/testing_util/status_matchers.h"
1920
#include <gmock/gmock.h>
2021
#include <vector>
@@ -25,9 +26,9 @@ namespace pubsub_internal {
2526
inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
2627
namespace {
2728

29+
using ::google::cloud::testing_util::FakeCompletionQueueImpl;
2830
using ::google::cloud::testing_util::IsOk;
2931
using ::google::cloud::testing_util::StatusIs;
30-
using ::testing::SizeIs;
3132
using ::testing::UnorderedElementsAre;
3233

3334
google::pubsub::v1::StreamingPullResponse GenerateMessages(
@@ -85,41 +86,35 @@ TEST(SubscriptionLeaseManagementTest, NormalLifecycle) {
8586
EXPECT_CALL(*mock, Shutdown).Times(1);
8687
}
8788

88-
google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
89-
std::vector<promise<Status>> timers;
90-
int cancel_count = 0;
91-
auto make_timer = [&](std::chrono::system_clock::time_point) {
92-
promise<Status> p([&cancel_count] { ++cancel_count; });
93-
auto f = p.get_future().then([](future<Status> f) { return f.get(); });
94-
timers.push_back(std::move(p));
95-
return f;
96-
};
89+
auto fake_cq = std::make_shared<FakeCompletionQueueImpl>();
90+
CompletionQueue cq(fake_cq);
91+
9792
auto shutdown_manager = std::make_shared<SessionShutdownManager>();
98-
auto uut = SubscriptionLeaseManagement::CreateForTesting(
99-
background.cq(), shutdown_manager, make_timer, mock, kTestDeadline,
100-
std::chrono::seconds(600));
93+
auto uut = SubscriptionLeaseManagement::Create(
94+
cq, shutdown_manager, mock, kTestDeadline, std::chrono::seconds(600));
10195

10296
auto done = shutdown_manager->Start({});
10397
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});
10498

10599
batch_callback(GenerateMessages("0-", 3));
106-
ASSERT_THAT(timers, SizeIs(1));
100+
ASSERT_EQ(1U, fake_cq->size());
107101

108102
// Ack one of the messages and then fire the timer. The expectations set above
109103
// will verify that only the remaining messages have their lease extended.
110104
uut->AckMessage("ack-0-1");
111-
timers[0].set_value({});
112-
ASSERT_THAT(timers, SizeIs(2));
105+
fake_cq->SimulateCompletion(true);
106+
ASSERT_EQ(1U, fake_cq->size());
113107

114108
// Ack one more message and trigger the new timer.
115109
uut->NackMessage("ack-0-2");
116-
timers[1].set_value({});
117-
ASSERT_THAT(timers, SizeIs(3));
110+
fake_cq->SimulateCompletion(true);
111+
ASSERT_EQ(1U, fake_cq->size());
118112

119113
shutdown_manager->MarkAsShutdown(__func__, Status{});
120114
uut->Shutdown();
121-
EXPECT_EQ(3, cancel_count);
122-
timers[2].set_value(Status(StatusCode::kCancelled, "test-cancel"));
115+
116+
fake_cq->SimulateCompletion(false);
117+
ASSERT_EQ(0U, fake_cq->size());
123118
EXPECT_THAT(done.get(), IsOk());
124119
}
125120

@@ -130,30 +125,25 @@ TEST(SubscriptionLeaseManagementTest, ShutdownOnError) {
130125
batch_callback = std::move(cb);
131126
});
132127

133-
google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
134-
std::vector<promise<Status>> timers;
135-
int cancel_count = 0;
136-
auto make_timer = [&](std::chrono::system_clock::time_point) {
137-
promise<Status> p([&cancel_count] { ++cancel_count; });
138-
auto f = p.get_future().then([](future<Status> f) { return f.get(); });
139-
timers.push_back(std::move(p));
140-
return f;
141-
};
128+
auto fake_cq = std::make_shared<FakeCompletionQueueImpl>();
129+
CompletionQueue cq(fake_cq);
130+
142131
auto shutdown_manager = std::make_shared<SessionShutdownManager>();
143-
auto uut = SubscriptionLeaseManagement::CreateForTesting(
144-
background.cq(), shutdown_manager, make_timer, mock,
145-
std::chrono::seconds(345), std::chrono::seconds(600));
132+
auto uut = SubscriptionLeaseManagement::Create(cq, shutdown_manager, mock,
133+
std::chrono::seconds(345),
134+
std::chrono::seconds(600));
146135

147136
auto done = shutdown_manager->Start({});
148137
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});
149138
batch_callback(GenerateMessages("0-", 3));
150-
EXPECT_THAT(timers, SizeIs(1));
139+
EXPECT_EQ(1U, fake_cq->size());
151140

152141
batch_callback(StatusOr<google::pubsub::v1::StreamingPullResponse>(
153142
Status(StatusCode::kPermissionDenied, "uh-oh")));
154-
ASSERT_THAT(timers, SizeIs(1));
143+
ASSERT_EQ(1U, fake_cq->size());
155144

156-
timers[0].set_value(Status(StatusCode::kCancelled, "test-cancel"));
145+
fake_cq->SimulateCompletion(false);
146+
ASSERT_EQ(0U, fake_cq->size());
157147
EXPECT_THAT(done.get(), StatusIs(StatusCode::kPermissionDenied));
158148
}
159149

@@ -198,32 +188,27 @@ TEST(SubscriptionLeaseManagementTest, UsesDeadlineExtension) {
198188
EXPECT_CALL(*mock, Shutdown).Times(1);
199189
}
200190

201-
google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
202-
std::vector<promise<Status>> timers;
203-
auto make_timer = [&](std::chrono::system_clock::time_point) {
204-
promise<Status> p;
205-
auto f = p.get_future().then([](future<Status> f) { return f.get(); });
206-
timers.push_back(std::move(p));
207-
return f;
208-
};
191+
auto fake_cq = std::make_shared<FakeCompletionQueueImpl>();
192+
CompletionQueue cq(fake_cq);
193+
209194
auto shutdown_manager = std::make_shared<SessionShutdownManager>();
210-
auto uut = SubscriptionLeaseManagement::CreateForTesting(
211-
background.cq(), shutdown_manager, make_timer, mock, kTestDeadline,
212-
kTestExtension);
195+
auto uut = SubscriptionLeaseManagement::Create(cq, shutdown_manager, mock,
196+
kTestDeadline, kTestExtension);
213197

214198
auto done = shutdown_manager->Start({});
215199
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});
216200

217201
batch_callback(GenerateMessages("0-", 1));
218-
ASSERT_THAT(timers, SizeIs(1));
202+
ASSERT_EQ(1U, fake_cq->size());
219203

220204
// Ignore message and then fire the timer. This will extend the deadline.
221-
timers[0].set_value({});
222-
ASSERT_THAT(timers, SizeIs(2));
205+
fake_cq->SimulateCompletion(true);
206+
ASSERT_EQ(1U, fake_cq->size());
223207

224208
shutdown_manager->MarkAsShutdown(__func__, Status{});
225209
uut->Shutdown();
226-
timers[1].set_value(Status(StatusCode::kCancelled, "test-cancel"));
210+
fake_cq->SimulateCompletion(false);
211+
ASSERT_EQ(0U, fake_cq->size());
227212
EXPECT_THAT(done.get(), IsOk());
228213
}
229214

google/cloud/pubsub/internal/subscription_session.cc

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -179,46 +179,6 @@ future<Status> CreateSubscriptionSession(
179179
std::move(lease_management), std::move(p));
180180
}
181181

182-
future<Status> CreateTestingSubscriptionSession(
183-
pubsub::Subscription const& subscription,
184-
pubsub::SubscriberOptions const& options,
185-
std::shared_ptr<pubsub_internal::SubscriberStub> const& stub,
186-
google::cloud::CompletionQueue const& executor,
187-
pubsub::SubscriberConnection::SubscribeParams p,
188-
std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
189-
std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
190-
if (!retry_policy) {
191-
retry_policy = pubsub::LimitedErrorCountRetryPolicy(3).clone();
192-
}
193-
if (!backoff_policy) {
194-
using us = std::chrono::microseconds;
195-
backoff_policy =
196-
pubsub::ExponentialBackoffPolicy(
197-
/*initial_delay=*/us(10), /*maximum_delay=*/us(20), /*scaling=*/2.0)
198-
.clone();
199-
}
200-
auto shutdown_manager = std::make_shared<SessionShutdownManager>();
201-
auto batch = std::make_shared<StreamingSubscriptionBatchSource>(
202-
executor, shutdown_manager, stub, subscription.FullName(),
203-
"test-client-id", options, std::move(retry_policy),
204-
std::move(backoff_policy));
205-
206-
auto cq = executor; // need a copy to make it mutable
207-
auto timer = [cq](std::chrono::system_clock::time_point) mutable {
208-
return cq.MakeRelativeTimer(std::chrono::milliseconds(50))
209-
.then([](future<StatusOr<std::chrono::system_clock::time_point>> f) {
210-
return f.get().status();
211-
});
212-
};
213-
auto lease_management = SubscriptionLeaseManagement::CreateForTesting(
214-
executor, shutdown_manager, timer, std::move(batch),
215-
options.max_deadline_time(), options.max_deadline_extension());
216-
217-
return SubscriptionSessionImpl::Create(
218-
options, std::move(executor), std::move(shutdown_manager),
219-
std::move(lease_management), std::move(p));
220-
}
221-
222182
} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
223183
} // namespace pubsub_internal
224184
} // namespace cloud

google/cloud/pubsub/internal/subscription_session.h

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,6 @@ future<Status> CreateSubscriptionSession(
4343
std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
4444
std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy);
4545

46-
future<Status> CreateTestingSubscriptionSession(
47-
pubsub::Subscription const& subscription,
48-
pubsub::SubscriberOptions const& options,
49-
std::shared_ptr<pubsub_internal::SubscriberStub> const& stub,
50-
google::cloud::CompletionQueue const& executor,
51-
pubsub::SubscriberConnection::SubscribeParams p,
52-
std::unique_ptr<pubsub::RetryPolicy const> retry_policy = {},
53-
std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy = {});
54-
5546
} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
5647
} // namespace pubsub_internal
5748
} // namespace cloud

google/cloud/pubsub/internal/subscription_session_test.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "google/cloud/testing_util/status_matchers.h"
2323
#include <gmock/gmock.h>
2424
#include <atomic>
25+
#include <chrono>
2526
#include <thread>
2627

2728
namespace google {
@@ -36,6 +37,21 @@ using ::testing::AtLeast;
3637
using ::testing::AtMost;
3738
using ::testing::InSequence;
3839

40+
future<Status> CreateTestingSubscriptionSession(
41+
pubsub::Subscription const& subscription,
42+
pubsub::SubscriberOptions const& options,
43+
std::shared_ptr<pubsub_internal::SubscriberStub> const& stub,
44+
CompletionQueue const& executor,
45+
pubsub::SubscriberConnection::SubscribeParams p) {
46+
using us = std::chrono::microseconds;
47+
return CreateSubscriptionSession(
48+
subscription, options, stub, executor, "test-client-id", std::move(p),
49+
pubsub::LimitedErrorCountRetryPolicy(3).clone(),
50+
pubsub::ExponentialBackoffPolicy(
51+
/*initial_delay=*/us(10), /*maximum_delay=*/us(20), /*scaling=*/2.0)
52+
.clone());
53+
}
54+
3955
/// @test Verify callbacks are scheduled in the background threads.
4056
TEST(SubscriptionSessionTest, ScheduleCallbacks) {
4157
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

0 commit comments

Comments
 (0)