@@ -170,6 +170,11 @@ DEFINE_uint32(datastream_buffer_retention_size,
170170 gflags::Uint32FromEnv (" PL_DATASTREAM_BUFFER_SIZE" , 1024 * 1024 ),
171171 "The maximum size of a data stream buffer retained between cycles.");
172172
173+ DEFINE_uint64 (total_conn_tracker_mem_usage,
174+ gflags::Uint64FromEnv (" PX_TOTAL_CONN_TRACKER_MEM_USAGE" , 0 ),
175+ "The maximum size in bytes of the collective connection tracker buffers. Defaults to "
176+ "0, which corresponds to no limit. When data is beyond this limit, new data is "
177+ "dropped until the limit is no longer exceeded.");
173178DEFINE_uint64 (max_body_bytes, gflags::Uint64FromEnv(" PL_STIRLING_MAX_BODY_BYTES" , 512 ),
174179 "The maximum number of bytes in the body of protocols like HTTP");
175180
@@ -834,6 +839,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
834839 }
835840 }
836841
842+ size_t current_conn_tracker_buffer_size = 0 ;
837843 for (const auto & conn_tracker : conn_trackers_mgr_.active_trackers ()) {
838844 const auto & transfer_spec = protocol_transfer_specs_[conn_tracker->protocol ()];
839845
@@ -857,7 +863,8 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
857863 socket_info_mgr_.get ());
858864
859865 if (transfer_spec.transfer_fn != nullptr ) {
860- transfer_spec.transfer_fn (*this , ctx, conn_tracker, data_table);
866+ current_conn_tracker_buffer_size +=
867+ transfer_spec.transfer_fn (*this , ctx, conn_tracker, data_table);
861868 } else {
862869 // If there's no transfer function, then the tracker should not be holding any data.
863870 // http::ProtocolTraits is used as a placeholder; the frames deque is expected to be
@@ -868,6 +875,7 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) {
868875
869876 conn_tracker->IterationPostTick ();
870877 }
878+ total_conn_tracker_mem_usage_ = current_conn_tracker_buffer_size;
871879
872880 CheckTracerState ();
873881
@@ -1094,6 +1102,19 @@ void SocketTraceConnector::AcceptDataEvent(std::unique_ptr<SocketDataEvent> even
10941102 WriteDataEvent (*event);
10951103 }
10961104
1105+ auto msg_size = event->msg .size ();
1106+ auto protocol = event->attr .protocol ;
1107+ total_conn_tracker_mem_usage_ += msg_size;
1108+ if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1109+ total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1110+ VLOG_EVERY_N (1 , 1000 ) << absl::Substitute (
1111+ " Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1112+ " Dropping data event of size $2 for protocol $3" ,
1113+ total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, msg_size, protocol);
1114+ stats_.Increment (StatKey::kDroppedSocketDataEvent );
1115+ return ;
1116+ }
1117+
10971118 stats_.Increment (StatKey::kPollSocketDataEventCount );
10981119 stats_.Increment (StatKey::kPollSocketDataEventAttrSize , sizeof (event->attr ));
10991120 stats_.Increment (StatKey::kPollSocketDataEventDataSize , event->msg .size ());
@@ -1113,11 +1134,32 @@ void SocketTraceConnector::AcceptConnStatsEvent(conn_stats_event_t event) {
11131134}
11141135
11151136void SocketTraceConnector::AcceptHTTP2Header (std::unique_ptr<HTTP2HeaderEvent> event) {
1137+ if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1138+ total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1139+ VLOG_EVERY_N (1 , 1000 ) << absl::Substitute (
1140+ " Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1141+ " Dropping header event" ,
1142+ total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage);
1143+ stats_.Increment (StatKey::kDroppedSocketDataEvent );
1144+ return ;
1145+ }
1146+
11161147 ConnTracker& tracker = GetOrCreateConnTracker (event->attr .conn_id );
11171148 tracker.AddHTTP2Header (std::move (event));
11181149}
11191150
11201151void SocketTraceConnector::AcceptHTTP2Data (std::unique_ptr<HTTP2DataEvent> event) {
1152+ auto payload_size = event->payload .size ();
1153+ total_conn_tracker_mem_usage_ += payload_size;
1154+ if (FLAGS_total_conn_tracker_mem_usage != 0 &&
1155+ total_conn_tracker_mem_usage_ > FLAGS_total_conn_tracker_mem_usage) {
1156+ VLOG_EVERY_N (1 , 1000 ) << absl::Substitute (
1157+ " Total buffer size of all active ConnTrackers $0 exceeds the limit $1. "
1158+ " Dropping data event of size $2" ,
1159+ total_conn_tracker_mem_usage_, FLAGS_total_conn_tracker_mem_usage, payload_size);
1160+ stats_.Increment (StatKey::kDroppedSocketDataEvent );
1161+ return ;
1162+ }
11211163 ConnTracker& tracker = GetOrCreateConnTracker (event->attr .conn_id );
11221164 tracker.AddHTTP2Data (std::move (event));
11231165}
@@ -1791,8 +1833,8 @@ void SocketTraceConnector::WriteDataEvent(const SocketDataEvent& event) {
17911833// -----------------------------------------------------------------------------
17921834
17931835template <typename TProtocolTraits>
1794- void SocketTraceConnector::TransferStream (ConnectorContext* ctx, ConnTracker* tracker,
1795- DataTable* data_table) {
1836+ size_t SocketTraceConnector::TransferStream (ConnectorContext* ctx, ConnTracker* tracker,
1837+ DataTable* data_table) {
17961838 using TFrameType = typename TProtocolTraits::frame_type;
17971839 using TKey = typename TProtocolTraits::key_type;
17981840
@@ -1821,6 +1863,7 @@ void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tr
18211863 tracker->Cleanup <TProtocolTraits>(FLAGS_messages_size_limit_bytes,
18221864 FLAGS_datastream_buffer_retention_size,
18231865 message_expiry_timestamp, buffer_expiry_timestamp);
1866+ return tracker->MemUsage <TProtocolTraits>();
18241867}
18251868
18261869void SocketTraceConnector::TransferConnStats (ConnectorContext* ctx, DataTable* data_table) {
0 commit comments