Skip to content

Commit aea0a95

Browse files
committed
Limit data stored from conn trackers
Signed-off-by: Dom Del Nano <[email protected]>
1 parent 49563a3 commit aea0a95

File tree

3 files changed

+51
-5
lines changed

3 files changed

+51
-5
lines changed

src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct Message : public FrameBase {
6969
std::map<std::string, uint32_t> header_fields;
7070
std::map<std::string, std::string> properties, payload;
7171

72+
// TODO(ddelnano): This should take into account header_fields and properties
7273
size_t ByteSize() const override { return sizeof(Message) + payload.size(); }
7374

7475
std::string ToString() const override {

src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ DEFINE_uint32(
5757
"Ratio of how frequently conn_stats_table is populated relative to the base sampling period.");
5858

5959
DEFINE_uint32(stirling_socket_tracer_stats_logging_ratio,
60-
std::chrono::minutes(10) / px::stirling::SocketTraceConnector::kSamplingPeriod,
60+
std::chrono::minutes(3) / px::stirling::SocketTraceConnector::kSamplingPeriod,
6161
"Ratio of how frequently summary logging information is displayed.");
6262

6363
DEFINE_bool(stirling_enable_periodic_bpf_map_cleanup, true,
@@ -170,6 +170,9 @@ DEFINE_uint32(datastream_buffer_retention_size,
170170
gflags::Uint32FromEnv("PL_DATASTREAM_BUFFER_SIZE", 1024 * 1024),
171171
"The maximum size of a data stream buffer retained between cycles.");
172172

173+
DEFINE_uint64(total_conn_tracker_mem_usage,
174+
gflags::Uint64FromEnv("PX_TOTAL_CONN_TRACKER_MEM_USAGE", 100 * 1024 * 1024), /* 100 MiB */
175+
"The maximum size of the collective connection tracker buffer.");
173176
DEFINE_uint64(max_body_bytes, gflags::Uint64FromEnv("PL_STIRLING_MAX_BODY_BYTES", 512),
174177
"The maximum number of bytes in the body of protocols like HTTP");
175178

@@ -834,6 +837,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
834837
}
835838
}
836839

840+
size_t current_conn_tracker_buffer_size = 0;
837841
for (const auto& conn_tracker : conn_trackers_mgr_.active_trackers()) {
838842
const auto& transfer_spec = protocol_transfer_specs_[conn_tracker->protocol()];
839843

@@ -857,7 +861,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
857861
socket_info_mgr_.get());
858862

859863
if (transfer_spec.transfer_fn != nullptr) {
860-
transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table);
864+
current_conn_tracker_buffer_size += transfer_spec.transfer_fn(*this, ctx, conn_tracker, data_table);
861865
} else {
862866
// If there's no transfer function, then the tracker should not be holding any data.
863867
// http::ProtocolTraits is used as a placeholder; the frames deque is expected to be
@@ -868,6 +872,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
868872

869873
conn_tracker->IterationPostTick();
870874
}
875+
total_conn_tracker_mem_usage_ = current_conn_tracker_buffer_size;
871876

872877
CheckTracerState();
873878

@@ -1094,6 +1099,20 @@ void SocketTraceConnector::AcceptDataEvent(std::unique_ptr<SocketDataEvent> even
10941099
WriteDataEvent(*event);
10951100
}
10961101

1102+
auto msg_size = event->msg.size();
1103+
auto protocol = event->attr.protocol;
1104+
total_conn_tracker_mem_usage_ += msg_size;
1105+
if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1106+
total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1107+
LOG_EVERY_N(WARNING, 1000) << absl::Substitute(
1108+
"Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1109+
"Dropping data event of size $2 for protocol $3",
1110+
total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, msg_size,
1111+
protocol);
1112+
stats_.Increment(StatKey::kDroppedSocketDataEventCount);
1113+
return;
1114+
}
1115+
10971116
stats_.Increment(StatKey::kPollSocketDataEventCount);
10981117
stats_.Increment(StatKey::kPollSocketDataEventAttrSize, sizeof(event->attr));
10991118
stats_.Increment(StatKey::kPollSocketDataEventDataSize, event->msg.size());
@@ -1113,11 +1132,32 @@ void SocketTraceConnector::AcceptConnStatsEvent(conn_stats_event_t event) {
11131132
}
11141133

11151134
void SocketTraceConnector::AcceptHTTP2Header(std::unique_ptr<HTTP2HeaderEvent> event) {
1135+
if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1136+
total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1137+
LOG_EVERY_N(WARNING, 1000) << absl::Substitute(
1138+
"Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1139+
"Dropping header event",
1140+
total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage);
1141+
stats_.Increment(StatKey::kDroppedSocketDataEventCount);
1142+
return;
1143+
}
1144+
11161145
ConnTracker& tracker = GetOrCreateConnTracker(event->attr.conn_id);
11171146
tracker.AddHTTP2Header(std::move(event));
11181147
}
11191148

11201149
void SocketTraceConnector::AcceptHTTP2Data(std::unique_ptr<HTTP2DataEvent> event) {
1150+
auto payload_size = event->payload.size();
1151+
total_conn_tracker_mem_usage_ += payload_size;
1152+
if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1153+
total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1154+
LOG_EVERY_N(WARNING, 1000) << absl::Substitute(
1155+
"Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1156+
"Dropping data event of size $2",
1157+
total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, payload_size);
1158+
stats_.Increment(StatKey::kDroppedSocketDataEventCount);
1159+
return;
1160+
}
11211161
ConnTracker& tracker = GetOrCreateConnTracker(event->attr.conn_id);
11221162
tracker.AddHTTP2Data(std::move(event));
11231163
}
@@ -1791,7 +1831,7 @@ void SocketTraceConnector::WriteDataEvent(const SocketDataEvent& event) {
17911831
//-----------------------------------------------------------------------------
17921832

17931833
template <typename TProtocolTraits>
1794-
void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tracker,
1834+
size_t SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tracker,
17951835
DataTable* data_table) {
17961836
using TFrameType = typename TProtocolTraits::frame_type;
17971837
using TKey = typename TProtocolTraits::key_type;
@@ -1821,6 +1861,7 @@ void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tr
18211861
tracker->Cleanup<TProtocolTraits>(FLAGS_messages_size_limit_bytes,
18221862
FLAGS_datastream_buffer_retention_size,
18231863
message_expiry_timestamp, buffer_expiry_timestamp);
1864+
return tracker->MemUsage<TProtocolTraits>();
18241865
}
18251866

18261867
void SocketTraceConnector::TransferConnStats(ConnectorContext* ctx, DataTable* data_table) {

src/stirling/source_connectors/socket_tracer/socket_trace_connector.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ class SocketTraceConnector : public BCCSourceConnector {
213213
/* OUT */ struct go_grpc_http2_header_event_t* header_event_data_go_style);
214214

215215
template <typename TProtocolTraits>
216-
void TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table);
216+
size_t TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table);
217217
void TransferConnStats(ConnectorContext* ctx, DataTable* data_table);
218218

219219
void set_iteration_time(std::chrono::time_point<std::chrono::steady_clock> time) {
@@ -256,7 +256,7 @@ class SocketTraceConnector : public BCCSourceConnector {
256256
int32_t trace_mode = TraceMode::Off;
257257
uint32_t table_num = 0;
258258
std::vector<endpoint_role_t> trace_roles;
259-
std::function<void(SocketTraceConnector&, ConnectorContext*, ConnTracker*, DataTable*)>
259+
std::function<size_t(SocketTraceConnector&, ConnectorContext*, ConnTracker*, DataTable*)>
260260
transfer_fn = nullptr;
261261
bool enabled = false;
262262
};
@@ -300,6 +300,8 @@ class SocketTraceConnector : public BCCSourceConnector {
300300

301301
UProbeManager uprobe_mgr_;
302302

303+
size_t total_conn_tracker_mem_usage_ = 0;
304+
303305
enum class StatKey {
304306
kLossSocketDataEvent,
305307
kLossSocketControlEvent,
@@ -314,6 +316,8 @@ class SocketTraceConnector : public BCCSourceConnector {
314316
kPollSocketDataEventAttrSize,
315317
kPollSocketDataEventDataSize,
316318
kPollSocketDataEventSize,
319+
320+
kDroppedSocketDataEventCount,
317321
};
318322

319323
utils::StatCounter<StatKey> stats_;

0 commit comments

Comments
 (0)