@@ -36,6 +36,18 @@ import (
3636
3737 mcv1alpha3 "github.com/opea-project/GenAIInfra/microservices-connector/api/v1alpha3"
3838 flag "github.com/spf13/pflag"
39+
40+ // Prometheus and opentelemetry imports
41+ "github.com/prometheus/client_golang/prometheus/promhttp"
42+
43+ "go.opentelemetry.io/otel"
44+ "go.opentelemetry.io/otel/exporters/prometheus"
45+ api "go.opentelemetry.io/otel/metric"
46+ sdkmetric "go.opentelemetry.io/otel/sdk/metric"
47+
48+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
49+
50+ "go.opentelemetry.io/otel/metric"
3951)
4052
4153const (
6072 TLSHandshakeTimeout : time .Minute ,
6173 ExpectContinueTimeout : 30 * time .Second ,
6274 }
63- callClient = & http.Client {
64- Transport : transport ,
75+ callClient = http.Client {
76+ Transport : otelhttp . NewTransport ( transport ) ,
6577 Timeout : 30 * time .Second ,
6678 }
6779)
@@ -80,6 +92,69 @@ type ReadCloser struct {
8092 * bytes.Reader
8193}
8294
95+ var (
96+ firstTokenLatencyMeasure metric.Float64Histogram
97+ nextTokenLatencyMeasure metric.Float64Histogram
98+ allTokenLatencyMeasure metric.Float64Histogram
99+ pipelineLatencyMeasure metric.Float64Histogram
100+ )
101+
102+ func init () {
103+
104+ // The exporter embeds a default OpenTelemetry Reader and
105+ // implements prometheus.Collector, allowing it to be used as
106+ // both a Reader and Collector.
107+ exporter , err := prometheus .New ()
108+ if err != nil {
109+ log .Error (err , "metrics: cannot init prometheus collector" )
110+ }
111+ provider := sdkmetric .NewMeterProvider (sdkmetric .WithReader (exporter ))
112+ otel .SetMeterProvider (provider )
113+
114+ // ppalucki: Own metrics defintion bellow
115+ const meterName = "entrag-telemetry"
116+ meter := provider .Meter (meterName )
117+
118+ firstTokenLatencyMeasure , err = meter .Float64Histogram (
119+ "llm.first.token.latency" ,
120+ metric .WithUnit ("ms" ),
121+ metric .WithDescription ("Measures the duration of first token generation." ),
122+ api .WithExplicitBucketBoundaries (1 , 64 , 128 , 256 , 512 , 1024 , 2048 , 4096 , 8192 , 16364 ),
123+ )
124+ if err != nil {
125+ log .Error (err , "metrics: cannot register first token histogram measure" )
126+ }
127+ nextTokenLatencyMeasure , err = meter .Float64Histogram (
128+ "llm.next.token.latency" ,
129+ metric .WithUnit ("ms" ),
130+ metric .WithDescription ("Measures the duration of generating all but first tokens." ),
131+ api .WithExplicitBucketBoundaries (1 , 64 , 128 , 256 , 512 , 1024 , 2048 , 4096 , 8192 , 16364 ),
132+ )
133+ if err != nil {
134+ log .Error (err , "metrics: cannot register next token histogram measure" )
135+ }
136+
137+ allTokenLatencyMeasure , err = meter .Float64Histogram (
138+ "llm.all.token.latency" ,
139+ metric .WithUnit ("ms" ),
140+ metric .WithDescription ("Measures the duration to generate response with all tokens." ),
141+ api .WithExplicitBucketBoundaries (1 , 64 , 128 , 256 , 512 , 1024 , 2048 , 4096 , 8192 , 16364 ),
142+ )
143+ if err != nil {
144+ log .Error (err , "metrics: cannot register all token histogram measure" )
145+ }
146+
147+ pipelineLatencyMeasure , err = meter .Float64Histogram (
148+ "llm.pipeline.latency" ,
149+ metric .WithUnit ("ms" ),
150+ metric .WithDescription ("Measures the duration to going through pipeline steps until first token is being generated (including read data time from client)." ),
151+ api .WithExplicitBucketBoundaries (1 , 64 , 128 , 256 , 512 , 1024 , 2048 , 4096 , 8192 , 16364 ),
152+ )
153+ if err != nil {
154+ log .Error (err , "metrics: cannot register pipeline histogram measure" )
155+ }
156+ }
157+
83158func (ReadCloser ) Close () error {
84159 // Typically, you would release resources here, but for bytes.Reader, there's nothing to do.
85160 return nil
@@ -536,6 +611,7 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) {
536611 go func () {
537612 defer close (done )
538613
614+ allTokensStartTime := time .Now ()
539615 inputBytes , err := io .ReadAll (req .Body )
540616 if err != nil {
541617 log .Error (err , "failed to read request body" )
@@ -544,6 +620,9 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) {
544620 }
545621
546622 responseBody , statusCode , err := routeStep (defaultNodeName , * mcGraph , inputBytes , inputBytes , req .Header )
623+
624+ pipelineLatencyMeasure .Record (ctx , float64 (time .Since (allTokensStartTime ))/ float64 (time .Millisecond ))
625+
547626 if err != nil {
548627 log .Error (err , "failed to process request" )
549628 w .Header ().Set ("Content-Type" , "application/json" )
@@ -561,9 +640,22 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) {
561640 }()
562641
563642 w .Header ().Set ("Content-Type" , "application/json" )
643+ firstTokenCollected := false
564644 buffer := make ([]byte , BufferSize )
565645 for {
646+
647+ // measure time of reading another portion of response
648+ tokenStartTime := time .Now ()
566649 n , err := responseBody .Read (buffer )
650+ elapsedTimeMilisecond := float64 (time .Since (tokenStartTime )) / float64 (time .Millisecond )
651+
652+ if ! firstTokenCollected {
653+ firstTokenCollected = true
654+ firstTokenLatencyMeasure .Record (ctx , elapsedTimeMilisecond )
655+ } else {
656+ nextTokenLatencyMeasure .Record (ctx , elapsedTimeMilisecond )
657+ }
658+
567659 if err != nil && err != io .EOF {
568660 log .Error (err , "failed to read from response body" )
569661 http .Error (w , "failed to read from response body" , http .StatusInternalServerError )
@@ -586,6 +678,10 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) {
586678 return
587679 }
588680 }
681+
682+ allTokensElapsedTimeMilisecond := float64 (time .Since (allTokensStartTime )) / float64 (time .Millisecond )
683+ allTokenLatencyMeasure .Record (ctx , allTokensElapsedTimeMilisecond )
684+
589685 }()
590686
591687 select {
@@ -729,8 +825,23 @@ func handleMultipartError(writer *multipart.Writer, err error) {
729825
730826func initializeRoutes () * http.ServeMux {
731827 mux := http .NewServeMux ()
732- mux .HandleFunc ("/" , mcGraphHandler )
733- mux .HandleFunc ("/dataprep" , mcDataHandler )
828+
829+ // Wrap connector handlers with otelhttp wrappers
830+ // "http.server.request.size" - Int64Counter - "Measures the size of HTTP request messages" (Incoming request bytes total)
831+ // "http.server.response.size" - Int64Counter - "Measures the size of HTTP response messages" (Incoming response bytes total)
832+ // "http.server.duration" - Float64histogram "Measures the duration of inbound HTTP requests." (Incoming end to end duration, milliseconds)
833+ handleFunc := func (pattern string , handlerFunc func (http.ResponseWriter , * http.Request ), operation string ) {
834+ handler := otelhttp .NewHandler (otelhttp .WithRouteTag (pattern , http .HandlerFunc (handlerFunc )), operation )
835+ mux .Handle (pattern , handler )
836+ }
837+
838+ handleFunc ("/" , mcGraphHandler , "mcGraphHandler" )
839+ handleFunc ("/dataprep" , mcDataHandler , "mcDataHandler" )
840+
841+ promHandler := promhttp .Handler ()
842+ handleFunc ("/metrics" , promHandler .ServeHTTP , "metrics" )
843+ log .Info ("Metrics exposed on /metrics." )
844+
734845 return mux
735846}
736847
0 commit comments