Skip to content

Commit fc4a736

Browse files
committed
stats: move message sending logic to RawSender
`RawSender` is inspired by `UDPSender` from `vthiery/cpp-statsd-client` and separating it out of `StatsdClient` is needed to implement queueing and batching support in upcoming commits. This is the start of migrating our Statsd codebase to `cpp-statsd-client`.
1 parent 9269068 commit fc4a736

File tree

5 files changed

+192
-56
lines changed

5 files changed

+192
-56
lines changed

src/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ BITCOIN_CORE_H = \
314314
stacktraces.h \
315315
streams.h \
316316
stats/client.h \
317+
stats/rawsender.h \
317318
support/allocators/mt_pooled_secure.h \
318319
support/allocators/pool.h \
319320
support/allocators/pooled_secure.h \
@@ -527,6 +528,7 @@ libbitcoin_server_a_SOURCES = \
527528
shutdown.cpp \
528529
spork.cpp \
529530
stats/client.cpp \
531+
stats/rawsender.cpp \
530532
timedata.cpp \
531533
torcontrol.cpp \
532534
txdb.cpp \

src/stats/client.cpp

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Copyright (c) 2014-2017 Statoshi Developers
2+
// Copyright (c) 2017-2023 Vincent Thiery
23
// Copyright (c) 2020-2024 The Dash Core developers
34
// Distributed under the MIT software license, see the accompanying
45
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@@ -35,8 +36,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3536

3637
#include <stats/client.h>
3738

38-
#include <compat.h>
39-
#include <netbase.h>
39+
#include <stats/rawsender.h>
4040
#include <util/system.h>
4141

4242
#include <cmath>
@@ -60,10 +60,8 @@ bool StatsdClient::ShouldSend(float sample_rate)
6060
return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand);
6161
}
6262

63-
StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, const std::string& ns,
64-
bool enabled) :
65-
m_port{port},
66-
m_host{host},
63+
StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
64+
const std::string& ns, bool enabled) :
6765
m_nodename{nodename},
6866
m_ns{ns}
6967
{
@@ -72,31 +70,19 @@ StatsdClient::StatsdClient(const std::string& host, const std::string& nodename,
7270
return;
7371
}
7472

75-
if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) {
76-
if (!netaddr->IsIPv4()) {
77-
LogPrintf("ERROR: Host %s on unsupported network, cannot init StatsdClient\n", m_host);
78-
return;
79-
}
80-
if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
81-
LogPrintf("ERROR: Cannot get socket address for %s, cannot init StatsdClient\n", m_host);
82-
return;
83-
}
84-
} else {
85-
LogPrintf("Unable to lookup host %s, cannot init StatsdClient\n", m_host);
73+
std::optional<std::string> error_opt;
74+
m_sender = std::make_unique<RawSender>(host, port, error_opt);
75+
if (error_opt.has_value()) {
76+
LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value());
77+
m_sender.reset();
8678
return;
8779
}
8880

89-
SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
90-
if (hSocket == INVALID_SOCKET) {
91-
LogPrintf("ERROR: Cannot create socket (socket() returned error %s), cannot init StatsdClient\n",
92-
NetworkErrorString(WSAGetLastError()));
93-
return;
94-
}
95-
m_sock = std::make_unique<Sock>(hSocket);
96-
97-
LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", m_host, m_port);
81+
LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", host, port);
9882
}
9983

84+
StatsdClient::~StatsdClient() {}
85+
10086
/* will change the original string */
10187
void StatsdClient::cleanup(std::string& key)
10288
{
@@ -133,7 +119,7 @@ bool StatsdClient::timing(const std::string& key, int64_t ms, float sample_rate)
133119

134120
bool StatsdClient::send(std::string key, int64_t value, const std::string& type, float sample_rate)
135121
{
136-
if (!m_sock) {
122+
if (!m_sender) {
137123
return false;
138124
}
139125

@@ -147,17 +133,22 @@ bool StatsdClient::send(std::string key, int64_t value, const std::string& type,
147133

148134
cleanup(key);
149135

150-
std::string buf{strprintf("%s%s:%d|%s", m_ns, key, value, type)};
136+
RawMessage msg{strprintf("%s%s:%d|%s", m_ns, key, value, type)};
151137
if (sample_rate < 1.f) {
152-
buf += strprintf("|@%.2f", sample_rate);
138+
msg += strprintf("|@%.2f", sample_rate);
139+
}
140+
141+
if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) {
142+
LogPrintf("ERROR: %s.\n", error_opt.value());
143+
return false;
153144
}
154145

155-
return send(buf);
146+
return true;
156147
}
157148

158149
bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate)
159150
{
160-
if (!m_sock) {
151+
if (!m_sender) {
161152
return false;
162153
}
163154

@@ -171,22 +162,13 @@ bool StatsdClient::sendDouble(std::string key, double value, const std::string&
171162

172163
cleanup(key);
173164

174-
std::string buf{strprintf("%s%s:%f|%s", m_ns, key, value, type)};
165+
RawMessage msg{strprintf("%s%s:%f|%s", m_ns, key, value, type)};
175166
if (sample_rate < 1.f) {
176-
buf += strprintf("|@%.2f", sample_rate);
167+
msg += strprintf("|@%.2f", sample_rate);
177168
}
178169

179-
return send(buf);
180-
}
181-
182-
bool StatsdClient::send(const std::string& message)
183-
{
184-
assert(m_sock);
185-
186-
if (::sendto(m_sock->Get(), message.data(), message.size(), /*flags=*/0,
187-
reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
188-
LogPrintf("ERROR: Unable to send message (sendto() returned error %s), host=%s:%d\n",
189-
NetworkErrorString(WSAGetLastError()), m_host, m_port);
170+
if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) {
171+
LogPrintf("ERROR: %s.\n", error_opt.value());
190172
return false;
191173
}
192174

src/stats/client.h

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Copyright (c) 2014-2017 Statoshi Developers
2+
// Copyright (c) 2017-2023 Vincent Thiery
23
// Copyright (c) 2020-2023 The Dash Core developers
34
// Distributed under the MIT software license, see the accompanying
45
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@@ -7,12 +8,13 @@
78
#define BITCOIN_STATS_CLIENT_H
89

910
#include <random.h>
10-
#include <threadsafety.h>
11-
#include <util/sock.h>
11+
#include <sync.h>
1212

1313
#include <memory>
1414
#include <string>
1515

16+
class RawSender;
17+
1618
static constexpr bool DEFAULT_STATSD_ENABLE{false};
1719
static constexpr uint16_t DEFAULT_STATSD_PORT{8125};
1820
static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"};
@@ -29,6 +31,7 @@ class StatsdClient
2931
public:
3032
explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
3133
const std::string& ns, bool enabled);
34+
~StatsdClient();
3235

3336
public:
3437
bool inc(const std::string& key, float sample_rate = 1.f);
@@ -45,24 +48,15 @@ class StatsdClient
4548
bool sendDouble(std::string key, double value, const std::string& type, float sample_rate);
4649

4750
private:
48-
/**
49-
* (Low Level Api) manually send a message
50-
* which might be composed of several lines.
51-
*/
52-
bool send(const std::string& message);
53-
5451
void cleanup(std::string& key);
5552
bool ShouldSend(float sample_rate);
5653

5754
private:
5855
mutable Mutex cs;
5956
mutable FastRandomContext insecure_rand GUARDED_BY(cs);
6057

61-
std::unique_ptr<Sock> m_sock{nullptr};
62-
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
58+
std::unique_ptr<RawSender> m_sender{nullptr};
6359

64-
const uint16_t m_port;
65-
const std::string m_host;
6660
const std::string m_nodename;
6761
const std::string m_ns;
6862
};

src/stats/rawsender.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright (c) 2017-2023 Vincent Thiery
2+
// Copyright (c) 2024 The Dash Core developers
3+
// Distributed under the MIT software license, see the accompanying
4+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5+
6+
#include <stats/rawsender.h>
7+
8+
#include <netaddress.h>
9+
#include <netbase.h>
10+
#include <util/sock.h>
11+
12+
RawSender::RawSender(const std::string& host, uint16_t port, std::optional<std::string>& error) :
13+
m_host{host},
14+
m_port{port}
15+
{
16+
if (host.empty()) {
17+
error = "No host specified";
18+
return;
19+
}
20+
21+
if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) {
22+
if (!netaddr->IsIPv4()) {
23+
error = strprintf("Host %s on unsupported network", m_host);
24+
return;
25+
}
26+
if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
27+
error = strprintf("Cannot get socket address for %s", m_host);
28+
return;
29+
}
30+
} else {
31+
error = strprintf("Unable to lookup host %s", m_host);
32+
return;
33+
}
34+
35+
SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
36+
if (hSocket == INVALID_SOCKET) {
37+
error = strprintf("Cannot create socket (socket() returned error %s)", NetworkErrorString(WSAGetLastError()));
38+
return;
39+
}
40+
m_sock = std::make_unique<Sock>(hSocket);
41+
42+
LogPrintf("Started RawSender sending messages to %s:%d\n", m_host, m_port);
43+
}
44+
45+
RawSender::~RawSender()
46+
{
47+
LogPrintf("Stopping RawSender instance sending messages to %s:%d. %d successes, %d failures.\n",
48+
m_host, m_port, m_successes, m_failures);
49+
}
50+
51+
std::optional<std::string> RawSender::Send(const RawMessage& msg)
52+
{
53+
if (!m_sock) {
54+
m_failures++;
55+
return "Socket not initialized, cannot send message";
56+
}
57+
58+
if (::sendto(m_sock->Get(), reinterpret_cast<const char*>(msg.data()),
59+
#ifdef WIN32
60+
static_cast<int>(msg.size()),
61+
#else
62+
msg.size(),
63+
#endif // WIN32
64+
/*flags=*/0, reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
65+
m_failures++;
66+
return strprintf("Unable to send message to %s (sendto() returned error %s)", this->ToStringHostPort(),
67+
NetworkErrorString(WSAGetLastError()));
68+
}
69+
70+
m_successes++;
71+
return std::nullopt;
72+
}
73+
74+
std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); }

src/stats/rawsender.h

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright (c) 2017-2023 Vincent Thiery
2+
// Copyright (c) 2024 The Dash Core developers
3+
// Distributed under the MIT software license, see the accompanying
4+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5+
6+
#ifndef BITCOIN_STATS_RAWSENDER_H
7+
#define BITCOIN_STATS_RAWSENDER_H
8+
9+
#include <compat.h>
10+
#include <sync.h>
11+
12+
#include <memory>
13+
#include <optional>
14+
#include <string>
15+
#include <vector>
16+
17+
class Sock;
18+
19+
struct RawMessage : public std::vector<uint8_t>
20+
{
21+
using parent_type = std::vector<value_type>;
22+
using parent_type::parent_type;
23+
24+
explicit RawMessage(const std::string& data) : parent_type{data.begin(), data.end()} {}
25+
26+
parent_type& operator+=(value_type rhs) { return append(rhs); }
27+
parent_type& operator+=(std::string::value_type rhs) { return append(rhs); }
28+
parent_type& operator+=(const parent_type& rhs) { return append(rhs); }
29+
parent_type& operator+=(const std::string& rhs) { return append(rhs); }
30+
31+
parent_type& append(value_type rhs)
32+
{
33+
push_back(rhs);
34+
return *this;
35+
}
36+
parent_type& append(std::string::value_type rhs)
37+
{
38+
push_back(static_cast<value_type>(rhs));
39+
return *this;
40+
}
41+
parent_type& append(const parent_type& rhs)
42+
{
43+
insert(end(), rhs.begin(), rhs.end());
44+
return *this;
45+
}
46+
parent_type& append(const std::string& rhs)
47+
{
48+
insert(end(), rhs.begin(), rhs.end());
49+
return *this;
50+
}
51+
};
52+
53+
class RawSender
54+
{
55+
public:
56+
RawSender(const std::string& host, uint16_t port, std::optional<std::string>& error);
57+
~RawSender();
58+
59+
RawSender(const RawSender&) = delete;
60+
RawSender& operator=(const RawSender&) = delete;
61+
RawSender(RawSender&&) = delete;
62+
63+
std::optional<std::string> Send(const RawMessage& msg);
64+
65+
std::string ToStringHostPort() const;
66+
67+
private:
68+
/* Socket used to communicate with host */
69+
std::unique_ptr<Sock> m_sock{nullptr};
70+
/* Socket address containing host information */
71+
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
72+
73+
/* Hostname of server receiving messages */
74+
const std::string m_host;
75+
/* Port of server receiving messages */
76+
const uint16_t m_port;
77+
78+
/* Number of messages sent */
79+
uint64_t m_successes{0};
80+
/* Number of messages not sent */
81+
uint64_t m_failures{0};
82+
};
83+
84+
#endif // BITCOIN_STATS_RAWSENDER_H

0 commit comments

Comments
 (0)