Skip to content

Commit e12139c

Browse files
author
Otto van der Schaaf
committed
Review feedback
Signed-off-by: Otto van der Schaaf <[email protected]>
1 parent 98f96fe commit e12139c

File tree

3 files changed

+93
-87
lines changed

3 files changed

+93
-87
lines changed

source/sink/service_impl.cc

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace Nighthawk {
1111

1212
using ::Envoy::Protobuf::util::MessageDifferencer;
1313

14-
SinkServiceImpl::SinkServiceImpl(std::unique_ptr<Sink>&& sink) : sink_(std::move(sink)) {}
14+
SinkServiceImpl::SinkServiceImpl(std::unique_ptr<Sink> sink) : sink_(std::move(sink)) {}
1515

1616
grpc::Status SinkServiceImpl::StoreExecutionResponseStream(
1717
grpc::ServerContext*, grpc::ServerReader<nighthawk::StoreExecutionRequest>* request_reader,
@@ -20,7 +20,7 @@ grpc::Status SinkServiceImpl::StoreExecutionResponseStream(
2020
while (request_reader->Read(&request)) {
2121
ENVOY_LOG(info, "StoreExecutionResponseStream request {}", request.DebugString());
2222
const nighthawk::client::ExecutionResponse& response_to_store = request.execution_response();
23-
const auto status = sink_->StoreExecutionResultPiece(response_to_store);
23+
const absl::Status status = sink_->StoreExecutionResultPiece(response_to_store);
2424
if (!status.ok()) {
2525
ENVOY_LOG(error, "StoreExecutionResponseStream failure: {}", status.ToString());
2626
return grpc::Status(grpc::StatusCode::INTERNAL, status.ToString());
@@ -29,52 +29,48 @@ grpc::Status SinkServiceImpl::StoreExecutionResponseStream(
2929
return grpc::Status::OK;
3030
};
3131

32+
grpc::Status SinkServiceImpl::abslStatusToGrpcStatus(const absl::Status status) {
33+
const grpc::Status grpc_status =
34+
status.ok() ? grpc::Status::OK : grpc::Status(grpc::StatusCode::INTERNAL, status.ToString());
35+
ENVOY_LOG(trace, "Finishing stream with status {} / message {}.", grpc_status.error_code(),
36+
grpc_status.error_message());
37+
return grpc_status;
38+
}
39+
3240
grpc::Status SinkServiceImpl::SinkRequestStream(
3341
grpc::ServerContext*,
3442
grpc::ServerReaderWriter<nighthawk::SinkResponse, nighthawk::SinkRequest>* stream) {
3543
nighthawk::SinkRequest request;
36-
absl::Status status = absl::OkStatus();
3744
while (stream->Read(&request)) {
3845
ENVOY_LOG(trace, "Inbound SinkRequest {}", request.DebugString());
39-
absl::StatusOr<std::vector<nighthawk::client::ExecutionResponse>>
40-
status_or_execution_responses = sink_->LoadExecutionResult(request.execution_id());
41-
status.Update(status_or_execution_responses.status());
42-
if (status.ok()) {
43-
const std::vector<nighthawk::client::ExecutionResponse>& responses =
44-
status_or_execution_responses.value();
45-
absl::StatusOr<nighthawk::client::ExecutionResponse> response =
46-
mergeExecutionResponses(request.execution_id(), responses);
47-
status.Update(response.status());
48-
if (status.ok()) {
49-
nighthawk::SinkResponse sink_response;
50-
*(sink_response.mutable_execution_response()) = response.value();
51-
if (!stream->Write(sink_response)) {
52-
status.Update(
53-
absl::Status(absl::StatusCode::kInternal, "Failure writing response to stream."));
54-
}
55-
}
46+
absl::StatusOr<std::vector<nighthawk::client::ExecutionResponse>> execution_responses =
47+
sink_->LoadExecutionResult(request.execution_id());
48+
if (!execution_responses.status().ok()) {
49+
return abslStatusToGrpcStatus(execution_responses.status());
5650
}
57-
if (!status.ok()) {
58-
ENVOY_LOG(error, "Failure while handling SinkRequest: {} -> {}", request.DebugString(),
59-
status.ToString());
60-
break;
51+
absl::StatusOr<nighthawk::client::ExecutionResponse> response =
52+
mergeExecutionResponses(request.execution_id(), *execution_responses);
53+
if (!response.status().ok()) {
54+
return abslStatusToGrpcStatus(response.status());
55+
}
56+
nighthawk::SinkResponse sink_response;
57+
*(sink_response.mutable_execution_response()) = *response;
58+
if (!stream->Write(sink_response)) {
59+
return abslStatusToGrpcStatus(
60+
absl::Status(absl::StatusCode::kInternal, "Failure writing response to stream."));
6161
}
6262
}
63-
grpc::Status grpc_status =
64-
status.ok() ? grpc::Status::OK : grpc::Status(grpc::StatusCode::INTERNAL, status.ToString());
65-
ENVOY_LOG(trace, "Finishing stream with status {} / message {}.", grpc_status.error_code(),
66-
grpc_status.error_message());
67-
return grpc_status;
63+
return abslStatusToGrpcStatus(absl::OkStatus());
6864
}
6965

7066
absl::Status mergeOutput(const nighthawk::client::Output& input_to_merge,
7167
nighthawk::client::Output& merge_target) {
7268
if (!merge_target.has_options()) {
7369
// If no options are set, that means this is the first part of the merge.
7470
// Set some properties that shouldbe equal amongst all Output instances.
75-
*(merge_target.mutable_options()) = input_to_merge.options();
76-
*(merge_target.mutable_timestamp()) = input_to_merge.timestamp();
77-
*(merge_target.mutable_version()) = input_to_merge.version();
71+
*merge_target.mutable_options() = input_to_merge.options();
72+
*merge_target.mutable_timestamp() = input_to_merge.timestamp();
73+
*merge_target.mutable_version() = input_to_merge.version();
7874
} else {
7975
// Options used should not diverge for a executions under a single execution id.
8076
// Versions probably shouldn't either. We sanity check these things here, and
@@ -93,7 +89,7 @@ absl::Status mergeOutput(const nighthawk::client::Output& input_to_merge,
9389
}
9490
}
9591
// Append all input results into our own results.
96-
for (const auto& result : input_to_merge.results()) {
92+
for (const nighthawk::client::Result& result : input_to_merge.results()) {
9793
merge_target.add_results()->MergeFrom(result);
9894
}
9995
return absl::OkStatus();

source/sink/service_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class SinkServiceImpl final : public nighthawk::NighthawkSink::Service,
5252
*
5353
* @param sink Sink backend that will be used to load and store.
5454
*/
55-
SinkServiceImpl(std::unique_ptr<Sink>&& sink);
55+
SinkServiceImpl(std::unique_ptr<Sink> sink);
5656

5757
grpc::Status
5858
StoreExecutionResponseStream(grpc::ServerContext* context,
@@ -64,6 +64,7 @@ class SinkServiceImpl final : public nighthawk::NighthawkSink::Service,
6464
grpc::ServerReaderWriter<nighthawk::SinkResponse, nighthawk::SinkRequest>* stream) override;
6565

6666
private:
67+
grpc::Status abslStatusToGrpcStatus(const absl::Status status);
6768
std::unique_ptr<Sink> sink_;
6869
};
6970

test/sink/sink_service_test.cc

Lines changed: 62 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,15 @@ TEST_P(SinkServiceTest, LoadSingleResultWithJustExecutionResponse) {
7777
response.mutable_execution_id()->assign(kTestId);
7878
response.mutable_output();
7979
request_.set_execution_id(kTestId);
80-
auto r = stub_->SinkRequestStream(&context_);
80+
std::unique_ptr<grpc::ClientReaderWriter<SinkRequest, SinkResponse>> reader_writer =
81+
stub_->SinkRequestStream(&context_);
8182
EXPECT_CALL(*sink_, LoadExecutionResult(kTestId)).WillOnce(Return(response_from_mock_sink));
82-
r->Write(request_, {});
83-
EXPECT_TRUE(r->WritesDone());
84-
ASSERT_TRUE(r->Read(&response_));
83+
reader_writer->Write(request_, {});
84+
EXPECT_TRUE(reader_writer->WritesDone());
85+
ASSERT_TRUE(reader_writer->Read(&response_));
8586
EXPECT_TRUE(response_.has_execution_response());
8687
EXPECT_EQ(response_.execution_response().execution_id(), kTestId);
87-
EXPECT_TRUE(r->Finish().ok());
88+
EXPECT_TRUE(reader_writer->Finish().ok());
8889
}
8990

9091
TEST_P(SinkServiceTest, LoadSingleSinkYieldsWrongExecutionId) {
@@ -94,12 +95,13 @@ TEST_P(SinkServiceTest, LoadSingleSinkYieldsWrongExecutionId) {
9495
std::vector<ExecutionResponse>{{}};
9596
response_from_mock_sink.value().at(0).mutable_execution_id()->assign("wrong-id");
9697
request_.set_execution_id(kTestId);
97-
auto r = stub_->SinkRequestStream(&context_);
98+
std::unique_ptr<grpc::ClientReaderWriter<SinkRequest, SinkResponse>> reader_writer =
99+
stub_->SinkRequestStream(&context_);
98100
EXPECT_CALL(*sink_, LoadExecutionResult(kTestId)).WillOnce(Return(response_from_mock_sink));
99-
r->Write(request_, {});
100-
EXPECT_TRUE(r->WritesDone());
101-
EXPECT_FALSE(r->Read(&response_));
102-
grpc::Status status = r->Finish();
101+
reader_writer->Write(request_, {});
102+
EXPECT_TRUE(reader_writer->WritesDone());
103+
EXPECT_FALSE(reader_writer->Read(&response_));
104+
grpc::Status status = reader_writer->Finish();
103105
EXPECT_FALSE(status.ok());
104106
EXPECT_EQ(status.error_message(), "INTERNAL: Expected execution_id 'test-id' got 'wrong-id'");
105107
}
@@ -110,12 +112,13 @@ TEST_P(SinkServiceTest, LoadSingleSinkYieldsEmptyResultSet) {
110112
absl::StatusOr<std::vector<ExecutionResponse>> response_from_mock_sink =
111113
std::vector<ExecutionResponse>{};
112114
request_.set_execution_id(kTestId);
113-
auto r = stub_->SinkRequestStream(&context_);
115+
std::unique_ptr<grpc::ClientReaderWriter<SinkRequest, SinkResponse>> reader_writer =
116+
stub_->SinkRequestStream(&context_);
114117
EXPECT_CALL(*sink_, LoadExecutionResult(kTestId)).WillOnce(Return(response_from_mock_sink));
115-
r->Write(request_, {});
116-
EXPECT_TRUE(r->WritesDone());
117-
EXPECT_FALSE(r->Read(&response_));
118-
grpc::Status status = r->Finish();
118+
reader_writer->Write(request_, {});
119+
EXPECT_TRUE(reader_writer->WritesDone());
120+
EXPECT_FALSE(reader_writer->Read(&response_));
121+
grpc::Status status = reader_writer->Finish();
119122
EXPECT_FALSE(status.ok());
120123
EXPECT_EQ(status.error_message(), "NOT_FOUND: No results");
121124
}
@@ -139,13 +142,14 @@ TEST_P(SinkServiceTest, LoadTwoResultsWithExecutionResponseWhereOneHasErrorDetai
139142

140143
request_.set_execution_id(kTestId);
141144

142-
auto r = stub_->SinkRequestStream(&context_);
145+
std::unique_ptr<grpc::ClientReaderWriter<SinkRequest, SinkResponse>> reader_writer =
146+
stub_->SinkRequestStream(&context_);
143147
EXPECT_CALL(*sink_, LoadExecutionResult(kTestId)).WillOnce(Return(response_from_mock_sink));
144-
r->Write(request_, {});
145-
EXPECT_TRUE(r->WritesDone());
148+
reader_writer->Write(request_, {});
149+
EXPECT_TRUE(reader_writer->WritesDone());
146150

147151
// Make sure that the response we get reflects what the mock sink's Load call returned.
148-
ASSERT_TRUE(r->Read(&response_));
152+
ASSERT_TRUE(reader_writer->Read(&response_));
149153
EXPECT_TRUE(response_.has_execution_response());
150154
EXPECT_EQ(response_.execution_response().execution_id(), kTestId);
151155
ASSERT_TRUE(response_.execution_response().has_error_detail());
@@ -158,31 +162,33 @@ TEST_P(SinkServiceTest, LoadTwoResultsWithExecutionResponseWhereOneHasErrorDetai
158162
Envoy::MessageUtil::unpackTo(response_.execution_response().error_detail().details(0), status);
159163
// TODO(XXX): proper equivalence test.
160164
EXPECT_EQ(status.DebugString(), error_detail->DebugString());
161-
EXPECT_TRUE(r->Finish().ok());
165+
EXPECT_TRUE(reader_writer->Finish().ok());
162166
}
163167

164168
TEST_P(SinkServiceTest, LoadWhenSinkYieldsFailureStatus) {
165169
absl::StatusOr<std::vector<ExecutionResponse>> response_from_mock_sink =
166170
absl::InvalidArgumentError("test");
167-
auto r = stub_->SinkRequestStream(&context_);
171+
std::unique_ptr<grpc::ClientReaderWriter<SinkRequest, SinkResponse>> reader_writer =
172+
stub_->SinkRequestStream(&context_);
168173
EXPECT_CALL(*sink_, LoadExecutionResult(_)).WillOnce(Return(response_from_mock_sink));
169-
r->Write(request_, {});
170-
EXPECT_TRUE(r->WritesDone());
171-
EXPECT_FALSE(r->Read(&response_));
172-
grpc::Status status = r->Finish();
174+
reader_writer->Write(request_, {});
175+
EXPECT_TRUE(reader_writer->WritesDone());
176+
EXPECT_FALSE(reader_writer->Read(&response_));
177+
grpc::Status status = reader_writer->Finish();
173178
EXPECT_FALSE(status.ok());
174179
EXPECT_EQ(status.error_message(), "INVALID_ARGUMENT: test");
175180
}
176181

177182
TEST_P(SinkServiceTest, ResultWriteFailure) {
178183
// This test covers the flow where the gRPC service fails while writing a reply message to the
179184
// stream. We don't have any expectations other then that the service doesn't crash in that flow.
180-
auto r = stub_->SinkRequestStream(&context_);
185+
std::unique_ptr<grpc::ClientReaderWriter<SinkRequest, SinkResponse>> reader_writer =
186+
stub_->SinkRequestStream(&context_);
181187
absl::Notification notification;
182188
EXPECT_CALL(*sink_, LoadExecutionResult(_))
183189
.WillOnce(testing::DoAll(Invoke([&notification]() { notification.Notify(); }),
184190
Return(std::vector<ExecutionResponse>{{}, {}})));
185-
EXPECT_TRUE(r->Write(request_, {}));
191+
EXPECT_TRUE(reader_writer->Write(request_, {}));
186192
// Wait for the expected invokation to avoid a race with test execution end.
187193
notification.WaitForNotification();
188194
context_.TryCancel();
@@ -205,53 +211,58 @@ TEST_P(SinkServiceTest, LoadWithOutputMergeFailure) {
205211
request_.set_execution_id(kTestId);
206212
nighthawk::client::CommandLineOptions* options_2 = response_2.mutable_output()->mutable_options();
207213
options_2->mutable_requests_per_second()->set_value(2);
208-
auto r = stub_->SinkRequestStream(&context_);
214+
std::unique_ptr<grpc::ClientReaderWriter<SinkRequest, SinkResponse>> reader_writer =
215+
stub_->SinkRequestStream(&context_);
209216
EXPECT_CALL(*sink_, LoadExecutionResult(kTestId)).WillOnce(Return(response_from_mock_sink));
210-
r->Write(request_, {});
211-
EXPECT_TRUE(r->WritesDone());
212-
ASSERT_FALSE(r->Read(&response_));
217+
reader_writer->Write(request_, {});
218+
EXPECT_TRUE(reader_writer->WritesDone());
219+
ASSERT_FALSE(reader_writer->Read(&response_));
213220
EXPECT_FALSE(response_.has_execution_response());
214-
grpc::Status status = r->Finish();
221+
grpc::Status status = reader_writer->Finish();
215222
EXPECT_FALSE(status.ok());
216223
EXPECT_THAT(status.error_message(), HasSubstr("INTERNAL: Options divergence detected"));
217224
}
218225

219226
TEST_P(SinkServiceTest, StoreExecutionResponseStreamOK) {
220227
StoreExecutionResponse response;
221228
ExecutionResponse result_to_store;
222-
auto r = stub_->StoreExecutionResponseStream(&context_, &response);
229+
std::unique_ptr<::grpc::ClientWriter<::nighthawk::StoreExecutionRequest>> writer =
230+
stub_->StoreExecutionResponseStream(&context_, &response);
223231
EXPECT_CALL(*sink_, StoreExecutionResultPiece(_))
224232
.WillOnce(Return(absl::OkStatus()))
225233
.WillOnce(Return(absl::OkStatus()));
226-
EXPECT_TRUE(r->Write({}));
227-
EXPECT_TRUE(r->Write({}));
228-
EXPECT_TRUE(r->WritesDone());
229-
grpc::Status status = r->Finish();
234+
EXPECT_TRUE(writer->Write({}));
235+
EXPECT_TRUE(writer->Write({}));
236+
EXPECT_TRUE(writer->WritesDone());
237+
grpc::Status status = writer->Finish();
230238
EXPECT_TRUE(status.ok());
231239
}
232240

233241
TEST_P(SinkServiceTest, StoreExecutionResponseStreamFailure) {
234242
StoreExecutionResponse response;
235243
ExecutionResponse result_to_store;
236-
auto r = stub_->StoreExecutionResponseStream(&context_, &response);
244+
std::unique_ptr<::grpc::ClientWriter<::nighthawk::StoreExecutionRequest>> writer =
245+
stub_->StoreExecutionResponseStream(&context_, &response);
237246
EXPECT_CALL(*sink_, StoreExecutionResultPiece(_))
238247
.WillOnce(Return(absl::InvalidArgumentError("test")));
239-
EXPECT_TRUE(r->Write({}));
240-
EXPECT_TRUE(r->WritesDone());
241-
grpc::Status status = r->Finish();
248+
EXPECT_TRUE(writer->Write({}));
249+
EXPECT_TRUE(writer->WritesDone());
250+
grpc::Status status = writer->Finish();
242251
EXPECT_FALSE(status.ok());
243252
}
244253

245254
TEST(ResponseVectorHandling, EmptyVectorYieldsNotOK) {
246255
std::vector<ExecutionResponse> responses;
247-
absl::StatusOr<ExecutionResponse> response = mergeExecutionResponses("foo", responses);
256+
absl::StatusOr<ExecutionResponse> response =
257+
mergeExecutionResponses(/*execution_id=*/"foo", responses);
248258
EXPECT_FALSE(response.ok());
249259
}
250260

251261
TEST(ResponseVectorHandling, NoResultsInOutputYieldsNone) {
252262
ExecutionResponse result;
253263
std::vector<ExecutionResponse> responses{result, result, result};
254-
absl::StatusOr<ExecutionResponse> response = mergeExecutionResponses("", responses);
264+
absl::StatusOr<ExecutionResponse> response =
265+
mergeExecutionResponses(/*execution_id=*/"", responses);
255266
EXPECT_TRUE(response.ok());
256267
EXPECT_EQ(response.value().output().results().size(), 0);
257268
}
@@ -260,26 +271,24 @@ TEST(ResponseVectorHandling, MergeThreeYieldsThree) {
260271
ExecutionResponse result;
261272
result.mutable_output()->add_results();
262273
std::vector<ExecutionResponse> responses{result, result, result};
263-
absl::StatusOr<ExecutionResponse> response = mergeExecutionResponses("", responses);
274+
absl::StatusOr<ExecutionResponse> response =
275+
mergeExecutionResponses(/*execution_id=*/"", responses);
264276
EXPECT_TRUE(response.ok());
265277
EXPECT_EQ(response.value().output().results().size(), 3);
266278
}
267279

268280
TEST(MergeOutputs, MergeDivergingOptionsInResultsFails) {
269-
const std::string kTestId = "test-id";
270281
std::vector<ExecutionResponse> responses;
271-
ExecutionResponse response_1;
272-
response_1.mutable_execution_id()->assign(kTestId);
273-
nighthawk::client::CommandLineOptions* options_1 = response_1.mutable_output()->mutable_options();
282+
nighthawk::client::Output output_1;
283+
nighthawk::client::CommandLineOptions* options_1 = output_1.mutable_options();
274284
options_1->mutable_requests_per_second()->set_value(1);
275-
ExecutionResponse response_2;
276-
response_2.mutable_execution_id()->assign(kTestId);
277-
nighthawk::client::CommandLineOptions* options_2 = response_2.mutable_output()->mutable_options();
285+
nighthawk::client::Output output_2;
286+
nighthawk::client::CommandLineOptions* options_2 = output_2.mutable_options();
278287
options_2->mutable_requests_per_second()->set_value(2);
279288
nighthawk::client::Output merged_output;
280-
absl::Status status_1 = mergeOutput(response_1.output(), merged_output);
289+
absl::Status status_1 = mergeOutput(output_1, merged_output);
281290
EXPECT_TRUE(status_1.ok());
282-
absl::Status status_2 = mergeOutput(response_2.output(), merged_output);
291+
absl::Status status_2 = mergeOutput(output_2, merged_output);
283292
EXPECT_FALSE(status_2.ok());
284293
EXPECT_THAT(status_2.message(), HasSubstr("Options divergence detected"));
285294
}

0 commit comments

Comments
 (0)