@@ -36,34 +36,36 @@ public ODSBaseWriter(MetricsCollector metricsCollector, InfluxCache influxCache)
3636 @ BeforeWrite
3737 public void beforeWrite () {
3838 LocalDateTime startWriteTime = LocalDateTime .now ();
39- this .writeStartTimes .put (Thread .currentThread ().getId (), startWriteTime );
39+ this .writeStartTimes .put (Thread .currentThread ().threadId (), startWriteTime );
4040 }
4141
4242 @ AfterWrite
4343 public void afterWrite (Chunk <? extends DataChunk > chunk ) {
4444 List <? extends DataChunk > items = chunk .getItems ();
4545 LocalDateTime writeEndTime = LocalDateTime .now ();
4646 long totalBytes = items .stream ().mapToLong (DataChunk ::getSize ).sum ();
47- LocalDateTime writeStartTime = this .writeStartTimes .get (Thread .currentThread ().getId ());
47+ long threadId = Thread .currentThread ().threadId ();
48+ LocalDateTime writeStartTime = this .writeStartTimes .remove (threadId );
4849 //this is a cache for the optimizer directly in. This i actually think should be deleted and all data querying maybe ideally is done through the monitoring interface
49- influxCache .addMetric (Thread . currentThread (). getId () , stepExecution , totalBytes , writeStartTime , writeEndTime , InfluxCache .ThroughputType .WRITER , items .get (0 ).getSize ());
50+ influxCache .addMetric (threadId , stepExecution , totalBytes , writeStartTime , writeEndTime , InfluxCache .ThroughputType .WRITER , items .get (0 ).getSize ());
5051
5152 }
5253
5354 @ BeforeRead
5455 public void beforeRead () {
5556 LocalDateTime startReadTime = LocalDateTime .now ();
56- this .readStartTimes .put (Thread .currentThread ().getId (), startReadTime );
57+ this .readStartTimes .put (Thread .currentThread ().threadId (), startReadTime );
5758 }
5859
5960 @ AfterRead
6061 public void afterRead (DataChunk item ) {
6162 LocalDateTime endTime = LocalDateTime .now ();
63+ long threadId = Thread .currentThread ().threadId ();
6264 if (item == null ) {
6365 return ;
6466 }
65- LocalDateTime readStartTime = this .readStartTimes .get ( Thread . currentThread (). getId () );
67+ LocalDateTime readStartTime = this .readStartTimes .remove ( threadId );
6668 if (readStartTime == null ) return ;
67- influxCache .addMetric (Thread . currentThread (). getId () , stepExecution , item .getSize (), readStartTime , endTime , InfluxCache .ThroughputType .READER , item .getSize ());
69+ influxCache .addMetric (threadId , stepExecution , item .getSize (), readStartTime , endTime , InfluxCache .ThroughputType .READER , item .getSize ());
6870 }
6971}
0 commit comments