@@ -229,13 +229,18 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
229229// happened to *the last stream in the request*. Ex: if three streams are part of the PushRequest
230230// and all three failed, the returned error only describes what happened to the last processed stream.
231231func (i * instance ) Push (ctx context.Context , req * logproto.PushRequest ) error {
232+ i .metrics .pushInflight .Inc ()
233+ defer i .metrics .pushInflight .Dec ()
234+
232235 record := recordPool .GetRecord ()
233236 record .UserID = i .instanceID
234237 defer recordPool .PutRecord (record )
235238 rateLimitWholeStream := i .limiter .limits .ShardStreams (i .instanceID ).Enabled
236239
237240 var appendErr error
241+ var chunkLockTotal time.Duration
238242 for _ , reqStream := range req .Streams {
243+ lockStart := time .Now ()
239244
240245 s , _ , err := i .streams .LoadOrStoreNew (reqStream .Labels ,
241246 func () (* stream , error ) {
@@ -259,9 +264,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
259264
260265 _ , appendErr = s .Push (ctx , reqStream .Entries , record , 0 , false , rateLimitWholeStream , i .customStreamsTracker , req .Format )
261266 s .chunkMtx .Unlock ()
267+ chunkLockTotal += time .Since (lockStart )
262268 }
269+ i .metrics .pushChunkMemoryDuration .Observe (chunkLockTotal .Seconds ())
263270
264271 if ! record .IsEmpty () {
272+ walStart := time .Now ()
265273 if err := i .wal .Log (record ); err != nil {
266274 if e , ok := err .(* os.PathError ); ok && e .Err == syscall .ENOSPC {
267275 i .metrics .walDiskFullFailures .Inc ()
@@ -272,9 +280,15 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
272280 )
273281 })
274282 } else {
283+ i .metrics .pushWALDuration .Observe (time .Since (walStart ).Seconds ())
275284 return err
276285 }
277286 }
287+ i .metrics .pushWALDuration .Observe (time .Since (walStart ).Seconds ())
288+ }
289+
290+ if ctx .Err () != nil {
291+ i .metrics .pushContextAlreadyDone .Inc ()
278292 }
279293
280294 return appendErr
0 commit comments