@@ -41,8 +41,6 @@ StraxInserter::StraxInserter(){
4141 fFragmentSize = 0 ;
4242 fForceQuit = false ;
4343 fFullChunkLength = fChunkLength +fChunkOverlap ;
44- fFragmentsProcessed = 0 ;
45- fEventsProcessed = 0 ;
4644 fProcTimeDP = fProcTimeEv = fProcTimeCh = fCompTime = 0 .;
4745}
4846
@@ -74,17 +72,21 @@ StraxInserter::~StraxInserter(){
7472 fLog ->Entry (MongoLog::Message, " Still waiting for thread %lx to stop" , fThreadId );
7573 std::this_thread::sleep_for (std::chrono::seconds (2 ));
7674 }
77- long total_dps = std::accumulate (fBufferCounter .begin (), fBufferCounter .end (), 0L ,
78- [&](long tot, auto & p){return std::move (tot) + p.second ;});
79- fLog ->Entry (MongoLog::Local, " Thread %lx got events %.1f%% of the time" ,
80- fThreadId , (total_dps-fBufferCounter [0 ]+0.0 )/total_dps*100 .);
81- std::map<std::string, long > counters {
82- {" bytes" , fBytesProcessed },
83- {" fragments" , fFragmentsProcessed },
84- {" events" , fEventsProcessed },
85- {" data_packets" , total_dps - fBufferCounter [0 ]}};
86- fOptions ->SaveBenchmarks (counters, fBufferCounter ,
87- fProcTimeDP , fProcTimeEv , fProcTimeCh , fCompTime );
75+ auto accum = [&](long tot, std::pair<int , long > it){return std::move (tot) + pair.second ;};
76+ std::stringstream ss;
77+ ss << std::hex << fThreadId ;
78+ std::map<std::string, double > times {
79+ {" data_packets_us" , fProcTimeDP },
80+ {" events_us" , fProcTimeEv },
81+ {" fragments_us" , fProcTimeCh },
82+ {" compression_us" , fCompTime }
83+ };
84+ std::map<std::string, std::map<int , long >> counters {
85+ {" fragments" , fFragsPerEvent },
86+ {" events" , fEvPerDP },
87+ {" data_packets" , fBufferCounter }
88+ };
89+ fOptions ->SaveBenchmarks (counters, fBytesProcessed , ss.str (), times);
8890}
8991
9092int StraxInserter::Initialize (Options *options, MongoLog *log, DAQController *dataSource,
@@ -166,9 +168,10 @@ void StraxInserter::ProcessDatapacket(data_packet* dp){
166168
167169 // Take a buffer and break it up into one document per channel
168170
169- u_int32_t *buff = dp->buff ;
170- u_int32_t idx = 0 ;
171- unsigned total_words = dp->size /sizeof (u_int32_t );
171+ uint32_t *buff = dp->buff ;
172+ uint32_t idx = 0 ;
173+ unsigned total_words = dp->size /sizeof (uint32_t );
174+ int evs_this_dp (0 );
172175 clock_gettime (CLOCK_THREAD_CPUTIME_ID, &dp_start);
173176 while (idx < total_words && fForceQuit == false ){
174177
@@ -177,12 +180,14 @@ void StraxInserter::ProcessDatapacket(data_packet* dp){
177180 idx += ProcessEvent (buff+idx, total_words-idx, dp->clock_counter , dp->header_time , dp->bid );
178181 clock_gettime (CLOCK_THREAD_CPUTIME_ID, &ev_end);
179182 fProcTimeEv += timespec_subtract (ev_end, ev_start);
183+ evs_this_dp++;
180184 } else
181185 idx++;
182186 }
183187 clock_gettime (CLOCK_THREAD_CPUTIME_ID, &dp_end);
184188 fProcTimeDP += timespec_subtract (dp_end, dp_start);
185189 fBytesProcessed += dp->size ;
190+ fEvPerDP [evs_this_dp]++;
186191 delete dp;
187192}
188193
@@ -193,17 +198,16 @@ uint32_t StraxInserter::ProcessEvent(uint32_t* buff, unsigned total_words, long
193198 struct timespec ch_start, ch_end;
194199 std::map<std::string, int > fmt = fFmt [bid];
195200
196- u_int32_t words_in_event = std::min (buff[0 ]&0xFFFFFFF , total_words);
201+ uint32_t words_in_event = std::min (buff[0 ]&0xFFFFFFF , total_words);
197202 if (words_in_event < (buff[0 ]&0xFFFFFFF )) {
198203 fLog ->Entry (MongoLog::Local, " Board %i garbled event header: %x/%x" ,
199204 bid, buff[0 ]&0xFFFFFFF , total_words);
200205 }
201206
202- u_int32_t channel_mask = (buff[1 ]&0xFF );
207+ uint32_t channel_mask = (buff[1 ]&0xFF );
203208 if (fmt[" channel_mask_msb_idx" ] != -1 ) channel_mask |= ( ((buff[2 ]>>24 )&0xFF )<<8 );
204209
205- u_int32_t event_time = buff[3 ]&0x7FFFFFFF ;
206- fEventsProcessed ++;
210+ uint32_t event_time = buff[3 ]&0x7FFFFFFF ;
207211
208212 if (buff[1 ]&0x4000000 ){ // board fail
209213 const std::lock_guard<std::mutex> lg (fFC_mutex );
@@ -216,32 +220,34 @@ uint32_t StraxInserter::ProcessEvent(uint32_t* buff, unsigned total_words, long
216220
217221 unsigned idx = event_header_words;
218222 int ret;
223+ int frags (0 );
219224
220225 for (unsigned ch=0 ; ch<max_channels; ch++){
221226 if (channel_mask & (1 <<ch)) {
222227 clock_gettime (CLOCK_THREAD_CPUTIME_ID, &ch_start);
223228 ret = ProcessChannel (buff+idx, words_in_event, bid, ch, header_time, event_time,
224- clock_counter, channel_mask);
229+ clock_counter, channel_mask, frags );
225230 clock_gettime (CLOCK_THREAD_CPUTIME_ID, &ch_end);
226231 fProcTimeCh += timespec_subtract (ch_end, ch_start);
227232 if (ret == -1 )
228233 break ;
229234 idx += ret;
230235 }
231236 }
237+ fFragsPerEvent [frags]++;
232238 return idx;
233239}
234240
235241int StraxInserter::ProcessChannel (uint32_t * buff, unsigned words_in_event, int bid, int channel,
236- uint32_t header_time, uint32_t event_time, long clock_counter, int channel_mask) {
242+ uint32_t header_time, uint32_t event_time, long clock_counter, int channel_mask, int & frags ) {
237243 // buff points to the first word of the channel's data
238244
239245 // These defaults are valid for 'default' firmware where all channels are the same size
240246 int channels_in_event = std::bitset<max_channels>(channel_mask).count ();
241- u_int32_t channel_words = (words_in_event-event_header_words) / channels_in_event;
247+ uint32_t channel_words = (words_in_event-event_header_words) / channels_in_event;
242248 long channel_time = event_time;
243249 long channel_timeMSB = clock_counter<<31 ;
244- u_int16_t baseline_ch = 0 ;
250+ uint16_t baseline_ch = 0 ;
245251 std::map<std::string, int > fmt = fFmt [bid];
246252
247253 // Presence of a channel header indicates non-default firmware (DPP-DAW) so override
@@ -294,27 +300,27 @@ int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int b
294300 }
295301 }
296302
297- u_int16_t *payload = reinterpret_cast <u_int16_t *>(buff+fmt[" channel_header_words" ]);
298- u_int32_t samples_in_pulse = (channel_words-fmt[" channel_header_words" ])<<1 ;
299- u_int16_t sw = fmt[" ns_per_sample" ];
303+ uint16_t *payload = reinterpret_cast <uint16_t *>(buff+fmt[" channel_header_words" ]);
304+ uint32_t samples_in_pulse = (channel_words-fmt[" channel_header_words" ])<<1 ;
305+ uint16_t sw = fmt[" ns_per_sample" ];
300306 int samples_per_fragment = fFragmentBytes >>1 ;
301307 int16_t cl = fOptions ->GetChannel (bid, channel);
302308 // Failing to discern which channel we're getting data from seems serious enough to throw
303309 if (cl==-1 )
304310 throw std::runtime_error (" Failed to parse channel map. I'm gonna just kms now." );
305311
306312 int num_frags = std::ceil (1 .*samples_in_pulse/samples_per_fragment);
313+ frags += num_frags;
307314 for (uint16_t frag_i = 0 ; frag_i < num_frags; frag_i++) {
308315 std::string fragment;
309316 fragment.reserve (fFragmentBytes + fStraxHeaderSize );
310317
311318 // How long is this fragment?
312- u_int32_t samples_this_fragment = samples_per_fragment;
319+ uint32_t samples_this_fragment = samples_per_fragment;
313320 if (frag_i == num_frags-1 )
314321 samples_this_fragment = samples_in_pulse - frag_i*samples_per_fragment;
315- fFragmentsProcessed ++;
316322
317- u_int64_t time_this_fragment = Time64 + samples_per_fragment*sw*frag_i;
323+ int64_t time_this_fragment = Time64 + samples_per_fragment*sw*frag_i;
318324 fragment.append ((char *)&time_this_fragment, sizeof (time_this_fragment));
319325 fragment.append ((char *)&samples_this_fragment, sizeof (samples_this_fragment));
320326 fragment.append ((char *)&sw, sizeof (sw));
@@ -327,7 +333,7 @@ int StraxInserter::ProcessChannel(uint32_t* buff, unsigned words_in_event, int b
327333 fragment.append ((char *)(payload + frag_i*samples_per_fragment), samples_this_fragment*2 );
328334 uint16_t zero_filler = 0 ;
329335 while ((int )fragment.size ()<fFragmentBytes +fStraxHeaderSize )
330- fragment.append ((char *)&zero_filler, 2 );
336+ fragment.append ((char *)&zero_filler, sizeof (zero_filler) );
331337
332338 AddFragmentToBuffer (fragment, time_this_fragment, event_time, clock_counter);
333339 } // loop over frag_i
0 commit comments