Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions source/client/process_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ class ClusterManagerFactory : public Envoy::Upstream::ProdClusterManagerFactory
bool prefetch_connections_{};
};

ProcessImpl::ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system)
: time_system_(time_system), stats_allocator_(symbol_table_), store_root_(stats_allocator_),
ProcessImpl::ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way how we can add test coverage for this change to keep the fixed functionality working?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, yes. Lacking C++ mocks for the logging, we can't easily unit-test this issue.
But I could add an end-to-end integration test for this, see 619c05e

const std::shared_ptr<Envoy::ProcessWide>& process_wide)
: process_wide_(process_wide == nullptr ? std::make_shared<Envoy::ProcessWide>()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure about the context - do we need to worry about thread safety here or is ProcessImpl guaranteed to be constructed from a single thread?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All ProcessImpl clients should ensure there's only a single active instance at any given time. We don't have to worry about it.

: process_wide),
time_system_(time_system), stats_allocator_(symbol_table_), store_root_(stats_allocator_),
api_(std::make_unique<Envoy::Api::Impl>(platform_impl_.threadFactory(), store_root_,
time_system_, platform_impl_.fileSystem())),
dispatcher_(api_->allocateDispatcher("main_thread")), benchmark_client_factory_(options),
Expand Down
15 changes: 13 additions & 2 deletions source/client/process_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,18 @@ class ClusterManagerFactory;
*/
class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system);
/**
* Instantiates a ProcessImpl
* @param options provides the options configuration to be used.
* @param time_system provides the Envoy::Event::TimeSystem implementation that will be used.
* @param process_wide optional parameter which can be used to pass a pre-setup reference to
* an active Envoy::ProcessWide instance. ProcessImpl will add a reference to this when passed,
* and hold on that that throughout its lifetime.
* If this parameter is not supplied, ProcessImpl will contruct its own Envoy::ProcessWide
* instance.
*/
ProcessImpl(const Options& options, Envoy::Event::TimeSystem& time_system,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a doc comment? We could also explain the optional process_wide argument and say what happens if it isn't provided.

const std::shared_ptr<Envoy::ProcessWide>& process_wide = nullptr);
~ProcessImpl() override;

/**
Expand Down Expand Up @@ -96,7 +107,7 @@ class ProcessImpl : public Process, public Envoy::Logger::Loggable<Envoy::Logger
bool runInternal(OutputCollector& collector, const std::vector<UriPtr>& uris,
const UriPtr& request_source_uri, const UriPtr& tracing_uri);

Envoy::ProcessWide process_wide_;
std::shared_ptr<Envoy::ProcessWide> process_wide_;
Envoy::PlatformImpl platform_impl_;
Envoy::Event::TimeSystem& time_system_;
Envoy::Stats::SymbolTableImpl symbol_table_;
Expand Down
6 changes: 1 addition & 5 deletions source/client/service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ void ServiceImpl::handleExecutionRequest(const nighthawk::client::ExecutionReque
return;
}

ProcessImpl process(*options, time_system_);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: the fix is to not do this here. We do this once now at construction time, and hold on to the logging context throughout the lifetime of the ServiceImpl.
Note that ProcessImpl instantiations will change the log level per inbound configured options. We only allow one ProcessImpl instantiation at a time today, so this might be allright for now.

auto logging_context = std::make_unique<Envoy::Logger::Context>(
spdlog::level::from_str(
nighthawk::client::Verbosity::VerbosityOptions_Name(options->verbosity())),
"[%T.%f][%t][%L] %v", log_lock_, false);
ProcessImpl process(*options, time_system_, process_wide_);
OutputCollectorImpl output_collector(time_system_, *options);
const bool ok = process.run(output_collector);
if (!ok) {
Expand Down
15 changes: 15 additions & 0 deletions source/client/service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,34 @@
#endif

#include <future>
#include <memory>

#include "external/envoy/source/common/common/logger.h"
#include "external/envoy/source/common/common/thread.h"
#include "external/envoy/source/common/event/real_time_system.h"
#include "external/envoy/source/exe/process_wide.h"

#include "nighthawk/client/process.h"
#include "nighthawk/common/request_source.h"

namespace Nighthawk {
namespace Client {

/**
* Implements Nighthawk's gRPC service. This service allows load generation to be
* controlled by gRPC clients.
*/
class ServiceImpl final : public nighthawk::client::NighthawkService::Service,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {

public:
/**
* Constructs a new ServiceImpl instance
*/
ServiceImpl() : process_wide_(std::make_shared<Envoy::ProcessWide>()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a doc comment for the public constructor?

logging_context_ = std::make_unique<Envoy::Logger::Context>(
spdlog::level::from_str("info"), "[%T.%f][%t][%L] %v", log_lock_, false);
}
::grpc::Status ExecutionStream(
::grpc::ServerContext* context,
::grpc::ServerReaderWriter<::nighthawk::client::ExecutionResponse,
Expand All @@ -36,6 +49,8 @@ class ServiceImpl final : public nighthawk::client::NighthawkService::Service,
void writeResponse(const nighthawk::client::ExecutionResponse& response);
::grpc::Status finishGrpcStream(const bool success, absl::string_view description = "");

std::unique_ptr<Envoy::Logger::Context> logging_context_;
std::shared_ptr<Envoy::ProcessWide> process_wide_;
Envoy::Event::RealTimeSystem time_system_; // NO_CHECK_FORMAT(real_time)
Envoy::Thread::MutexBasicLockable log_lock_;
::grpc::ServerReaderWriter<::nighthawk::client::ExecutionResponse,
Expand Down
8 changes: 6 additions & 2 deletions test/integration/nighthawk_grpc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class NighthawkGrpcService(object):
Attributes:
server_ip: IP address used by the gRPC service to listen.
server_port: An integer, indicates the port used by the gRPC service to listen. 0 means that the server is not listening.
log_lines: An array of log lines emitted by the service. Available after stop() is called, reset to None on start().
"""

def __init__(self,
Expand All @@ -39,6 +40,7 @@ def __init__(self,
assert ip_version != IpVersion.UNKNOWN
self.server_port = 0
self.server_ip = server_ip
self.log_lines = None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Does this need to be public or would self._log_lines do? If public, can we document it above in the Attributes: section of the class docstring?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 593fe3c (also moved the line that resets the log to None to a better place).

self._server_process = None
self._ip_version = ip_version
self._server_binary_path = server_binary_path
Expand All @@ -55,8 +57,9 @@ def _serverThreadRunner(self):
"%s:0" % str(self.server_ip), "--service", self._service_name
]
logging.info("Nighthawk grpc service popen() args: [%s]" % args)
self._server_process = subprocess.Popen(args)
self._server_process.communicate()
self._server_process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_, stderr = self._server_process.communicate()
self.log_lines = stderr.decode("utf-8").splitlines()
self._address_file = None

def _waitUntilServerListening(self):
Expand Down Expand Up @@ -87,6 +90,7 @@ def start(self):
can be queried to get the listening port.
"""

self.log_lines = None
self._server_thread.daemon = True
self._server_thread.start()
return self._waitUntilServerListening()
Expand Down
17 changes: 13 additions & 4 deletions test/integration/test_remote_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ def test_remote_execution_basics(http_test_server_fixture):
"%s:%s" % (http_test_server_fixture.grpc_service.server_ip,
http_test_server_fixture.grpc_service.server_port)
]
parsed_json, _ = http_test_server_fixture.runNighthawkClient(args)
counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json)
assertCounterGreaterEqual(counters, "benchmark.http_2xx", 25)
repeats = 3
for i in range(repeats):
parsed_json, _ = http_test_server_fixture.runNighthawkClient(args)
counters = http_test_server_fixture.getNighthawkCounterMapFromJson(parsed_json)
assertCounterGreaterEqual(counters, "benchmark.http_2xx", 25)

http_test_server_fixture.grpc_service.stop()
# Ensure the gRPC service logs looks right. Specifically these logs ought to have sentinels
# indicative of the right number of executions. (Avoids regression of #289).
assertEqual(
repeats,
sum("Starting 1 threads / event loops" in line
for line in http_test_server_fixture.grpc_service.log_lines))

# As a control step, prove we are actually performing remote execution: re-run the command without an
# operational gRPC service. That ought to fail.
http_test_server_fixture.grpc_service.stop()
http_test_server_fixture.runNighthawkClient(args, expect_failure=True)


Expand Down