@@ -57,7 +57,7 @@ DEFINE_uint32(
5757 " Ratio of how frequently conn_stats_table is populated relative to the base sampling period." );
5858
5959DEFINE_uint32 (stirling_socket_tracer_stats_logging_ratio,
60- std::chrono::minutes (10 ) / px::stirling::SocketTraceConnector::kSamplingPeriod,
60+ std::chrono::minutes (3 ) / px::stirling::SocketTraceConnector::kSamplingPeriod,
6161 "Ratio of how frequently summary logging information is displayed.");
6262
6363DEFINE_bool (stirling_enable_periodic_bpf_map_cleanup, true ,
@@ -170,6 +170,9 @@ DEFINE_uint32(datastream_buffer_retention_size,
170170 gflags::Uint32FromEnv (" PL_DATASTREAM_BUFFER_SIZE" , 1024 * 1024 ),
171171 "The maximum size of a data stream buffer retained between cycles.");
172172
173+ DEFINE_uint64 (total_conn_tracker_mem_usage,
174+ gflags::Uint64FromEnv (" PX_TOTAL_CONN_TRACKER_MEM_USAGE" , 100 * 1024 * 1024 ), /* 100 MiB */
175+ "The maximum size of the collective connection tracker buffer.");
173176DEFINE_uint64 (max_body_bytes, gflags::Uint64FromEnv(" PL_STIRLING_MAX_BODY_BYTES" , 512 ),
174177 "The maximum number of bytes in the body of protocols like HTTP");
175178
@@ -834,6 +837,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
834837 }
835838 }
836839
840+ size_t current_conn_tracker_buffer_size = 0 ;
837841 for (const auto & conn_tracker : conn_trackers_mgr_.active_trackers ()) {
838842 const auto & transfer_spec = protocol_transfer_specs_[conn_tracker->protocol ()];
839843
@@ -857,7 +861,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
857861 socket_info_mgr_.get ());
858862
859863 if (transfer_spec.transfer_fn != nullptr ) {
860- transfer_spec.transfer_fn (*this , ctx, conn_tracker, data_table);
864+ current_conn_tracker_buffer_size += transfer_spec.transfer_fn (*this , ctx, conn_tracker, data_table);
861865 } else {
862866 // If there's no transfer function, then the tracker should not be holding any data.
863867 // http::ProtocolTraits is used as a placeholder; the frames deque is expected to be
@@ -868,6 +872,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
868872
869873 conn_tracker->IterationPostTick ();
870874 }
875+ total_conn_tracker_mem_usage_ = current_conn_tracker_buffer_size;
871876
872877 CheckTracerState ();
873878
@@ -1094,6 +1099,20 @@ void SocketTraceConnector::AcceptDataEvent(std::unique_ptr<SocketDataEvent> even
10941099 WriteDataEvent (*event);
10951100 }
10961101
1102+ auto msg_size = event->msg .size ();
1103+ auto protocol = event->attr .protocol ;
1104+ total_conn_tracker_mem_usage_ += msg_size;
1105+ if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1106+ total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1107+ LOG_EVERY_N (WARNING, 1000 ) << absl::Substitute (
1108+ " Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1109+ " Dropping data event of size $2 for protocol $3" ,
1110+ total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, msg_size,
1111+ protocol);
1112+ stats_.Increment (StatKey::kDroppedSocketDataEventCount );
1113+ return ;
1114+ }
1115+
10971116 stats_.Increment (StatKey::kPollSocketDataEventCount );
10981117 stats_.Increment (StatKey::kPollSocketDataEventAttrSize , sizeof (event->attr ));
10991118 stats_.Increment (StatKey::kPollSocketDataEventDataSize , event->msg .size ());
@@ -1113,11 +1132,32 @@ void SocketTraceConnector::AcceptConnStatsEvent(conn_stats_event_t event) {
11131132}
11141133
11151134void SocketTraceConnector::AcceptHTTP2Header (std::unique_ptr<HTTP2HeaderEvent> event) {
1135+ if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1136+ total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1137+ LOG_EVERY_N (WARNING, 1000 ) << absl::Substitute (
1138+ " Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1139+ " Dropping header event" ,
1140+ total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage);
1141+ stats_.Increment (StatKey::kDroppedSocketDataEventCount );
1142+ return ;
1143+ }
1144+
11161145 ConnTracker& tracker = GetOrCreateConnTracker (event->attr .conn_id );
11171146 tracker.AddHTTP2Header (std::move (event));
11181147}
11191148
11201149void SocketTraceConnector::AcceptHTTP2Data (std::unique_ptr<HTTP2DataEvent> event) {
1150+ auto payload_size = event->payload .size ();
1151+ total_conn_tracker_mem_usage_ += payload_size;
1152+ if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1153+ total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1154+ LOG_EVERY_N (WARNING, 1000 ) << absl::Substitute (
1155+ " Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1156+ " Dropping data event of size $2" ,
1157+ total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, payload_size);
1158+ stats_.Increment (StatKey::kDroppedSocketDataEventCount );
1159+ return ;
1160+ }
11211161 ConnTracker& tracker = GetOrCreateConnTracker (event->attr .conn_id );
11221162 tracker.AddHTTP2Data (std::move (event));
11231163}
@@ -1791,7 +1831,7 @@ void SocketTraceConnector::WriteDataEvent(const SocketDataEvent& event) {
17911831// -----------------------------------------------------------------------------
17921832
17931833template <typename TProtocolTraits>
1794- void SocketTraceConnector::TransferStream (ConnectorContext* ctx, ConnTracker* tracker,
1834+ size_t SocketTraceConnector::TransferStream (ConnectorContext* ctx, ConnTracker* tracker,
17951835 DataTable* data_table) {
17961836 using TFrameType = typename TProtocolTraits::frame_type;
17971837 using TKey = typename TProtocolTraits::key_type;
@@ -1821,6 +1861,7 @@ void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tr
18211861 tracker->Cleanup <TProtocolTraits>(FLAGS_messages_size_limit_bytes,
18221862 FLAGS_datastream_buffer_retention_size,
18231863 message_expiry_timestamp, buffer_expiry_timestamp);
1864+ return tracker->MemUsage <TProtocolTraits>();
18241865}
18251866
18261867void SocketTraceConnector::TransferConnStats (ConnectorContext* ctx, DataTable* data_table) {
0 commit comments