|
| 1 | +"""Interceptor for collecting Cloud Spanner metrics.""" |
| 2 | + |
| 3 | +from grpc_interceptor import ClientInterceptor |
| 4 | +from .constants import ( |
| 5 | + GOOGLE_CLOUD_RESOURCE_KEY, |
| 6 | + SPANNER_METHOD_PREFIX, |
| 7 | +) |
| 8 | +from .metrics_tracer import MetricsTracer |
| 9 | +from typing import Dict |
| 10 | +from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory |
| 11 | +import re |
| 12 | + |
| 13 | + |
| 14 | +class MetricsInterceptor(ClientInterceptor): |
| 15 | + """Interceptor that collects metrics for Cloud Spanner operations.""" |
| 16 | + |
| 17 | + @staticmethod |
| 18 | + def _parse_resource_path(path: str) -> dict: |
| 19 | + """Parse the resource path to extract project, instance and database. |
| 20 | +
|
| 21 | + Args: |
| 22 | + path (str): The resource path from the request |
| 23 | +
|
| 24 | + Returns: |
| 25 | + dict: Extracted resource components |
| 26 | + """ |
| 27 | + # Match paths like: |
| 28 | + # projects/{project}/instances/{instance}/databases/{database}/sessions/{session} |
| 29 | + # projects/{project}/instances/{instance}/databases/{database} |
| 30 | + # projects/{project}/instances/{instance} |
| 31 | + pattern = r"^projects/(?P<project>[^/]+)(/instances/(?P<instance>[^/]+))?(/databases/(?P<database>[^/]+))?(/sessions/(?P<session>[^/]+))?.*$" |
| 32 | + match = re.match(pattern, path) |
| 33 | + if match: |
| 34 | + return {k: v for k, v in match.groupdict().items() if v is not None} |
| 35 | + return {} |
| 36 | + |
| 37 | + @staticmethod |
| 38 | + def _extract_resource_from_path(metadata: Dict[str, str]) -> Dict[str, str]: |
| 39 | + """ |
| 40 | + Extracts resource information from the metadata based on the path. |
| 41 | +
|
| 42 | + This method iterates through the metadata dictionary to find the first tuple containing the key 'google-cloud-resource-prefix'. It then extracts the path from this tuple and parses it to extract project, instance, and database information using the _parse_resource_path method. |
| 43 | +
|
| 44 | + Args: |
| 45 | + metadata (Dict[str, str]): A dictionary containing metadata information. |
| 46 | +
|
| 47 | + Returns: |
| 48 | + Dict[str, str]: A dictionary containing extracted project, instance, and database information. |
| 49 | + """ |
| 50 | + # Extract resource info from the first metadata tuple containing :path |
| 51 | + path = next( |
| 52 | + (value for key, value in metadata if key == GOOGLE_CLOUD_RESOURCE_KEY), "" |
| 53 | + ) |
| 54 | + |
| 55 | + resources = MetricsInterceptor._parse_resource_path(path) |
| 56 | + return resources |
| 57 | + |
| 58 | + @staticmethod |
| 59 | + def _remove_prefix(s: str, prefix: str) -> str: |
| 60 | + """ |
| 61 | + This function removes the prefix from the given string. |
| 62 | +
|
| 63 | + Args: |
| 64 | + s (str): The string from which the prefix is to be removed. |
| 65 | + prefix (str): The prefix to be removed from the string. |
| 66 | +
|
| 67 | + Returns: |
| 68 | + str: The string with the prefix removed. |
| 69 | +
|
| 70 | + Note: |
| 71 | + This function is used because the `removeprefix` method does not exist in Python 3.8. |
| 72 | + """ |
| 73 | + if s.startswith(prefix): |
| 74 | + return s[len(prefix) :] |
| 75 | + return s |
| 76 | + |
| 77 | + def _set_metrics_tracer_attributes(self, resources: Dict[str, str]) -> None: |
| 78 | + """ |
| 79 | + Sets the metric tracer attributes based on the provided resources. |
| 80 | +
|
| 81 | + This method updates the current metric tracer's attributes with the project, instance, and database information extracted from the resources dictionary. If the current metric tracer is not set, the method does nothing. |
| 82 | +
|
| 83 | + Args: |
| 84 | + resources (Dict[str, str]): A dictionary containing project, instance, and database information. |
| 85 | + """ |
| 86 | + if SpannerMetricsTracerFactory.current_metrics_tracer is None: |
| 87 | + return |
| 88 | + |
| 89 | + if resources: |
| 90 | + if "project" in resources: |
| 91 | + SpannerMetricsTracerFactory.current_metrics_tracer.set_project(resources["project"]) |
| 92 | + if "instance" in resources: |
| 93 | + SpannerMetricsTracerFactory.current_metrics_tracer.set_instance(resources["instance"]) |
| 94 | + if "database" in resources: |
| 95 | + SpannerMetricsTracerFactory.current_metrics_tracer.set_database(resources["database"]) |
| 96 | + |
| 97 | + def intercept(self, invoked_method, request_or_iterator, call_details): |
| 98 | + """Intercept gRPC calls to collect metrics. |
| 99 | +
|
| 100 | + Args: |
| 101 | + invoked_method: The RPC method |
| 102 | + request_or_iterator: The RPC request |
| 103 | + call_details: Details about the RPC call |
| 104 | +
|
| 105 | + Returns: |
| 106 | + The RPC response |
| 107 | + """ |
| 108 | + if SpannerMetricsTracerFactory.current_metrics_tracer is None: |
| 109 | + return invoked_method(request_or_iterator, call_details) |
| 110 | + |
| 111 | + # Setup Metric Tracer attributes from call details |
| 112 | + ## Extract Project / Instance / Databse from header information |
| 113 | + resources = self._extract_resource_from_path(call_details.metadata) |
| 114 | + self._set_metrics_tracer_attributes(resources) |
| 115 | + ## Format method to be be spanner.<method name> |
| 116 | + method_name = self._remove_prefix( |
| 117 | + call_details.method, SPANNER_METHOD_PREFIX |
| 118 | + ).replace("/", ".") |
| 119 | + SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name) |
| 120 | + |
| 121 | + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start() |
| 122 | + response = invoked_method(request_or_iterator, call_details) |
| 123 | + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion() |
| 124 | + |
| 125 | + return response |
0 commit comments