Skip to content

Commit f86d984

Browse files
majnemertensorflow-copybara
authored andcommitted
Remove usage of tensorflow::BlockingCounter.
I'm migrating users to absl::BlockingCouner. In this case, we are using the BlockingCounter as a barrier so let's use absl::Barrier. PiperOrigin-RevId: 723714825
1 parent 018da28 commit f86d984

File tree

2 files changed

+32
-31
lines changed

2 files changed

+32
-31
lines changed

tensorflow_serving/core/BUILD

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,11 +511,12 @@ cc_test(
511511
"//tensorflow_serving/util:any_ptr",
512512
"//tensorflow_serving/util:event_bus",
513513
"//tensorflow_serving/util:threadpool_executor",
514+
"@com_google_absl//absl/status",
515+
"@com_google_absl//absl/synchronization",
514516
"@com_google_absl//absl/types:optional",
515517
"@org_tensorflow//tensorflow/core:lib",
516518
"@org_tensorflow//tensorflow/core:protos_all_cc",
517519
"@org_tensorflow//tensorflow/core:test",
518-
"@org_tensorflow//tensorflow/core/platform:blocking_counter",
519520
],
520521
)
521522

tensorflow_serving/core/basic_manager_test.cc

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ limitations under the License.
2727
#include <gmock/gmock.h>
2828
#include <gtest/gtest.h>
2929
#include "absl/status/status.h"
30+
#include "absl/synchronization/barrier.h"
31+
#include "absl/synchronization/notification.h"
3032
#include "absl/types/optional.h"
3133
#include "tensorflow/core/lib/core/errors.h"
3234
#include "tensorflow/core/lib/core/status_test_util.h"
3335
#include "tensorflow/core/lib/strings/strcat.h"
34-
#include "tensorflow/core/platform/blocking_counter.h"
3536
#include "tensorflow/core/platform/errors.h"
3637
#include "tensorflow/core/platform/null_file_system.h"
3738
#include "tensorflow/core/protobuf/error_codes.pb.h"
@@ -317,13 +318,13 @@ TEST_P(BasicManagerTest, UpdateServingMapServableHandleLatest) {
317318
// We will now try to unload v1, but we only allow it to move out from kReady
318319
// state, and not complete the unload. Also, after it moves out from kReady,
319320
// the serving map is also updated, so v0 would be the latest.
320-
Notification unload_started;
321-
Notification finish_unload;
321+
absl::Notification unload_started;
322+
absl::Notification finish_unload;
322323
EXPECT_CALL(*notify_to_unload, Unload()).WillOnce(Invoke([&]() {
323324
unload_started.Notify();
324325
finish_unload.WaitForNotification();
325326
}));
326-
Notification unload_finished;
327+
absl::Notification unload_finished;
327328
std::unique_ptr<Thread> unload_last_servable(
328329
Env::Default()->StartThread({}, "UnloadLastServable", [&]() {
329330
basic_manager_->UnloadServable(id1, [&](const Status& status) {
@@ -554,7 +555,7 @@ TEST_P(BasicManagerTest, DestructOnNonServingThread) {
554555
TF_ASSERT_OK(status);
555556
EXPECT_EQ(7, **latest_handle);
556557

557-
Notification done_unload_servable;
558+
absl::Notification done_unload_servable;
558559
std::unique_ptr<Thread> unload_servable(
559560
Env::Default()->StartThread({}, "UnloadServable", [&]() {
560561
// Unload the servable.
@@ -702,8 +703,8 @@ TEST_P(BasicManagerTest, EventBusServableLifecycle) {
702703
EXPECT_THAT(*servable_state_monitor_.GetState(id),
703704
EqualsServableState(start_state));
704705

705-
Notification load_called;
706-
Notification load_continue;
706+
absl::Notification load_called;
707+
absl::Notification load_continue;
707708
EXPECT_CALL(*loader, LoadWithMetadata(Loader::Metadata{id}))
708709
.WillOnce(InvokeWithoutArgs([&]() {
709710
load_called.Notify();
@@ -732,8 +733,8 @@ TEST_P(BasicManagerTest, EventBusServableLifecycle) {
732733
EXPECT_THAT(*servable_state_monitor_.GetState(id),
733734
EqualsServableState(available_state));
734735

735-
Notification unload_called;
736-
Notification unload_continue;
736+
absl::Notification unload_called;
737+
absl::Notification unload_continue;
737738
EXPECT_CALL(*loader, Unload()).WillOnce(Invoke([&]() {
738739
unload_called.Notify();
739740
unload_continue.WaitForNotification();
@@ -824,7 +825,7 @@ TEST_P(BasicManagerTest, InterleavedLoadsAndUnloads) {
824825
executor.Schedule([this, i]() {
825826
const ServableId id = {kServableName3, i};
826827
TF_ASSERT_OK(basic_manager_->ManageServable(CreateServable(id)));
827-
Notification load_done;
828+
absl::Notification load_done;
828829
basic_manager_->LoadServable(id, [&load_done](const Status& status) {
829830
TF_ASSERT_OK(status);
830831
load_done.Notify();
@@ -893,8 +894,8 @@ TEST_F(SetNumLoadThreadsBasicManagerTest, ThreadPoolsNotAliveSimultaneously) {
893894

894895
const ServableId id0 = {kServableName3, 0};
895896
TF_ASSERT_OK(basic_manager_->ManageServable(CreateServable(id0)));
896-
Notification notify_for_setting;
897-
Notification continue_load;
897+
absl::Notification notify_for_setting;
898+
absl::Notification continue_load;
898899
basic_manager_->LoadServable(id0, [&](const Status& status) {
899900
notify_for_setting.Notify();
900901
continue_load.WaitForNotification();
@@ -1147,8 +1148,8 @@ TEST_P(BasicManagerTest, RetryOnLoadErrorCancelledLoad) {
11471148
TF_ASSERT_OK(
11481149
basic_manager_->ManageServable({id, std::unique_ptr<Loader>(loader)}));
11491150

1150-
Notification load_called;
1151-
Notification load_should_return;
1151+
absl::Notification load_called;
1152+
absl::Notification load_should_return;
11521153
EXPECT_CALL(*loader, LoadWithMetadata(Loader::Metadata{id}))
11531154
.WillOnce(InvokeWithoutArgs([&load_called, &load_should_return]() {
11541155
load_called.Notify();
@@ -1175,8 +1176,8 @@ TEST_P(BasicManagerTest, LoadAfterCancelledLoad) {
11751176
TF_ASSERT_OK(
11761177
basic_manager_->ManageServable({id, std::unique_ptr<Loader>(loader)}));
11771178

1178-
Notification load_called;
1179-
Notification load_should_return;
1179+
absl::Notification load_called;
1180+
absl::Notification load_should_return;
11801181
EXPECT_CALL(*loader, LoadWithMetadata(Loader::Metadata{id}))
11811182
.WillOnce(InvokeWithoutArgs([&load_called, &load_should_return]() {
11821183
load_called.Notify();
@@ -1277,7 +1278,7 @@ class ResourceConstrainedBasicManagerTest : public ::testing::Test {
12771278
// resource units, i.e. half of the total system resources.
12781279
class BarrierLoader : public Loader {
12791280
public:
1280-
explicit BarrierLoader(BlockingCounter* counter) : counter_(counter) {}
1281+
explicit BarrierLoader(absl::Barrier* barrier) : barrier_(barrier) {}
12811282
~BarrierLoader() override = default;
12821283

12831284
Status EstimateResources(ResourceAllocation* estimate) const override {
@@ -1286,8 +1287,7 @@ class BarrierLoader : public Loader {
12861287
}
12871288

12881289
Status Load() override {
1289-
counter_->DecrementCount();
1290-
counter_->Wait();
1290+
barrier_->Block();
12911291
return OkStatus();
12921292
}
12931293

@@ -1296,7 +1296,7 @@ class BarrierLoader : public Loader {
12961296
AnyPtr servable() override { return AnyPtr(); }
12971297

12981298
private:
1299-
BlockingCounter* const counter_;
1299+
absl::Barrier* const barrier_;
13001300

13011301
TF_DISALLOW_COPY_AND_ASSIGN(BarrierLoader);
13021302
};
@@ -1306,7 +1306,7 @@ TEST_F(ResourceConstrainedBasicManagerTest, ConcurrentLoads) {
13061306
// concurrently (i.e. the manager should not serialize them needlessly).
13071307
// BarrierLoader verifies that the Load() calls occur concurrently.
13081308
int kNumLoaders = 2;
1309-
BlockingCounter barrier(kNumLoaders);
1309+
absl::Barrier barrier(kNumLoaders);
13101310
for (int i = 0; i < kNumLoaders; ++i) {
13111311
std::unique_ptr<Loader> loader(new BarrierLoader(&barrier));
13121312
const ServableId id = {"barrier", i};
@@ -1333,7 +1333,7 @@ TEST_F(ResourceConstrainedBasicManagerTest, InsufficientResources) {
13331333
.WillOnce(Return(OkStatus()));
13341334
TF_ASSERT_OK(basic_manager_->ManageServable(
13351335
CreateServableData(hogging_id, std::unique_ptr<Loader>(hogging_loader))));
1336-
Notification hogging_loaded;
1336+
absl::Notification hogging_loaded;
13371337
basic_manager_->LoadServable(hogging_id,
13381338
[&hogging_loaded](const Status& status) {
13391339
TF_EXPECT_OK(status);
@@ -1351,7 +1351,7 @@ TEST_F(ResourceConstrainedBasicManagerTest, InsufficientResources) {
13511351
}));
13521352
TF_ASSERT_OK(basic_manager_->ManageServable(CreateServableData(
13531353
rejected_id, std::unique_ptr<Loader>(rejected_loader))));
1354-
Notification rejection_received;
1354+
absl::Notification rejection_received;
13551355
Status rejected_status;
13561356
basic_manager_->LoadServable(
13571357
rejected_id,
@@ -1386,7 +1386,7 @@ TEST_F(ResourceConstrainedBasicManagerTest, ResourcesReleasedIfLoadFails) {
13861386
.WillOnce(Return(errors::Unknown("Load failure")));
13871387
TF_ASSERT_OK(basic_manager_->ManageServable(
13881388
CreateServableData(failing_id, std::unique_ptr<Loader>(failing_loader))));
1389-
Notification failing_failed;
1389+
absl::Notification failing_failed;
13901390
basic_manager_->LoadServable(failing_id,
13911391
[&failing_failed](const Status& status) {
13921392
EXPECT_FALSE(status.ok());
@@ -1440,7 +1440,7 @@ TEST_F(ResourceConstrainedBasicManagerTest,
14401440
}
14411441
TF_ASSERT_OK(basic_manager_->ManageServable(CreateServableData(
14421442
overestimating_id, std::unique_ptr<Loader>(overestimating_loader))));
1443-
Notification overestimating_loaded;
1443+
absl::Notification overestimating_loaded;
14441444
basic_manager_->LoadServable(overestimating_id,
14451445
[&overestimating_loaded](const Status& status) {
14461446
TF_EXPECT_OK(status);
@@ -1476,7 +1476,7 @@ TEST_F(ResourceConstrainedBasicManagerTest, ResourcesReleasedAfterUnload) {
14761476
*estimate = CreateResourceQuantity(10);
14771477
return OkStatus();
14781478
}));
1479-
Notification load_done;
1479+
absl::Notification load_done;
14801480
EXPECT_CALL(*unloading_loader,
14811481
LoadWithMetadata(Loader::Metadata{unloading_id}))
14821482
.WillOnce(Return(OkStatus()));
@@ -1488,8 +1488,8 @@ TEST_F(ResourceConstrainedBasicManagerTest, ResourcesReleasedAfterUnload) {
14881488
load_done.Notify();
14891489
});
14901490
load_done.WaitForNotification();
1491-
Notification unload_started;
1492-
Notification finish_unload;
1491+
absl::Notification unload_started;
1492+
absl::Notification finish_unload;
14931493
EXPECT_CALL(*unloading_loader, Unload())
14941494
.WillOnce(Invoke([&unload_started, &finish_unload] {
14951495
unload_started.Notify();
@@ -1531,8 +1531,8 @@ TEST_F(ResourceConstrainedBasicManagerTest, FirstLoadDeniedSecondOneApproved) {
15311531
// A first loader that gets rejected due to insufficient resources.
15321532
const ServableId denied_id = {"denied", 0};
15331533
test_util::MockLoader* denied_loader = new NiceMock<test_util::MockLoader>;
1534-
Notification denied_estimate_started;
1535-
Notification finish_denied_estimate;
1534+
absl::Notification denied_estimate_started;
1535+
absl::Notification finish_denied_estimate;
15361536
EXPECT_CALL(*denied_loader, EstimateResources(_))
15371537
.WillOnce(Invoke([&denied_estimate_started,
15381538
&finish_denied_estimate](ResourceAllocation* estimate) {

0 commit comments

Comments
 (0)