1919import com .google .api .core .ApiFuture ;
2020import com .google .api .core .ApiFutureCallback ;
2121import com .google .api .core .ApiFutures ;
22- import com .google .api .gax .core .CredentialsProvider ;
2322import com .google .api .gax .core .FixedCredentialsProvider ;
24- import com .google .api .gax .core .NoCredentialsProvider ;
2523import com .google .api .gax .grpc .InstantiatingGrpcChannelProvider ;
2624import com .google .api .gax .rpc .PermissionDeniedException ;
2725import com .google .auth .Credentials ;
28- import com .google .cloud .NoCredentials ;
2926import com .google .cloud .monitoring .v3 .MetricServiceClient ;
3027import com .google .cloud .monitoring .v3 .MetricServiceSettings ;
3128import com .google .common .annotations .VisibleForTesting ;
4542import java .util .ArrayList ;
4643import java .util .Collection ;
4744import java .util .List ;
45+ import java .util .Map ;
46+ import java .util .concurrent .ConcurrentHashMap ;
4847import java .util .concurrent .atomic .AtomicBoolean ;
48+ import java .util .concurrent .atomic .AtomicInteger ;
4949import java .util .logging .Level ;
5050import java .util .logging .Logger ;
5151import javax .annotation .Nonnull ;
@@ -71,7 +71,29 @@ class DatastoreCloudMonitoringExporter implements MetricExporter {
7171 private static final Logger logger =
7272 Logger .getLogger (DatastoreCloudMonitoringExporter .class .getName ());
7373
74+ /**
75+ * Wrapper class to hold a {@link MetricServiceClient} and its reference count. This is used to
76+ * share the client across multiple exporter instances.
77+ */
78+ static class CachedMetricsClient {
79+ final MetricServiceClient client ;
80+ final AtomicInteger refCount = new AtomicInteger (0 );
81+
82+ CachedMetricsClient (MetricServiceClient client ) {
83+ this .client = client ;
84+ }
85+ }
86+
87+ /**
88+ * Shared cache for {@link MetricServiceClient} instances, keyed by "projectId:databaseId". This
89+ * prevents creating a new gRPC client for every exporter instance, reducing resource usage.
90+ * Reference counting is used to safely shut down the client when no longer needed.
91+ */
92+ static final ConcurrentHashMap <String , CachedMetricsClient > METRICS_CLIENT_CACHE =
93+ new ConcurrentHashMap <>();
94+
7495 private final MetricServiceClient client ;
96+ private final Map <String , String > clientAttributes ;
7597
7698 // This is the quota limit from Cloud Monitoring. More details in
7799 // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
@@ -84,7 +106,8 @@ class DatastoreCloudMonitoringExporter implements MetricExporter {
84106 // Flag to prevent log spam of any export failures
85107 private final AtomicBoolean datastoreExportFailureLogged = new AtomicBoolean (false );
86108
87- private final String datastoreProjectId ;
109+ private final String projectId ;
110+ private final String databaseId ;
88111
89112 /**
90113 * Creates a new instance of the exporter.
@@ -102,10 +125,47 @@ class DatastoreCloudMonitoringExporter implements MetricExporter {
102125 * @param projectId the GCP project ID where metrics will be exported.
103126 * @param credentials the credentials used to authenticate with Cloud Monitoring.
104127 * @return a new {@link DatastoreCloudMonitoringExporter} instance.
105- * @throws IOException if the {@link MetricServiceClient} fails to initialize.
106128 */
129+ @ Nullable
107130 static DatastoreCloudMonitoringExporter create (
108- String projectId , @ Nullable Credentials credentials ) throws IOException {
131+ String projectId ,
132+ String databaseId ,
133+ Credentials credentials ,
134+ Map <String , String > clientAttributes ) {
135+ String key = projectId + ":" + databaseId ;
136+
137+ // Use compute to acquire or create the client atomically with reference counting.
138+ // If creation fails, we log the error and return null so it's not added to the map.
139+ CachedMetricsClient cachedMetricsClient =
140+ METRICS_CLIENT_CACHE .compute (
141+ key ,
142+ (k , v ) -> {
143+ if (v == null ) {
144+ try {
145+ v = new CachedMetricsClient (createMetricServiceClient (credentials ));
146+ } catch (IOException e ) {
147+ logger .log (
148+ Level .WARNING ,
149+ "Failed to create MetricServiceClient for metrics export. Monitoring will be disabled." ,
150+ e );
151+ return null ; // Do not add to map
152+ }
153+ }
154+ v .refCount .incrementAndGet ();
155+ return v ;
156+ });
157+
158+ // If there is no client in the cache (creation failed), return null.
159+ if (cachedMetricsClient == null ) {
160+ return null ;
161+ }
162+
163+ return new DatastoreCloudMonitoringExporter (
164+ projectId , databaseId , cachedMetricsClient .client , clientAttributes );
165+ }
166+
167+ private static MetricServiceClient createMetricServiceClient (Credentials credentials )
168+ throws IOException {
109169 MetricServiceSettings .Builder settingsBuilder = MetricServiceSettings .newBuilder ();
110170
111171 InstantiatingGrpcChannelProvider transportChannelProvider =
@@ -114,26 +174,25 @@ static DatastoreCloudMonitoringExporter create(
114174 .build ();
115175 settingsBuilder .setTransportChannelProvider (transportChannelProvider );
116176
117- CredentialsProvider credentialsProvider ;
118- if (credentials == null || credentials instanceof NoCredentials ) {
119- credentialsProvider = NoCredentialsProvider .create ();
120- } else {
121- credentialsProvider = FixedCredentialsProvider .create (credentials );
122- }
123- settingsBuilder .setCredentialsProvider (credentialsProvider );
177+ settingsBuilder .setCredentialsProvider (FixedCredentialsProvider .create (credentials ));
124178
125179 settingsBuilder
126180 .createTimeSeriesSettings ()
127181 .setSimpleTimeoutNoRetriesDuration (Duration .ofMinutes (1 ));
128182
129- return new DatastoreCloudMonitoringExporter (
130- projectId , MetricServiceClient .create (settingsBuilder .build ()));
183+ return MetricServiceClient .create (settingsBuilder .build ());
131184 }
132185
133186 @ VisibleForTesting
134- DatastoreCloudMonitoringExporter (String projectId , MetricServiceClient client ) {
187+ DatastoreCloudMonitoringExporter (
188+ String projectId ,
189+ String databaseId ,
190+ MetricServiceClient client ,
191+ Map <String , String > clientAttributes ) {
135192 this .client = client ;
136- this .datastoreProjectId = projectId ;
193+ this .projectId = projectId ;
194+ this .databaseId = databaseId ;
195+ this .clientAttributes = clientAttributes ;
137196 }
138197
139198 /**
@@ -159,8 +218,7 @@ public CompletableResultCode export(@Nonnull Collection<MetricData> collection)
159218 // Convert OTel MetricData to Cloud Monitoring TimeSeries.
160219 datastoreTimeSeries =
161220 DatastoreCloudMonitoringExporterUtils .convertToDatastoreTimeSeries (
162- new ArrayList <>(collection ),
163- BuiltInDatastoreMetricsProvider .INSTANCE .getClientAttributes ());
221+ new ArrayList <>(collection ), clientAttributes );
164222 } catch (Throwable e ) {
165223 logger .log (
166224 Level .WARNING ,
@@ -169,7 +227,7 @@ public CompletableResultCode export(@Nonnull Collection<MetricData> collection)
169227 return CompletableResultCode .ofFailure ();
170228 }
171229
172- ProjectName projectName = ProjectName .of (datastoreProjectId );
230+ ProjectName projectName = ProjectName .of (projectId );
173231
174232 // Perform the actual network call to Cloud Monitoring.
175233 ApiFuture <List <Empty >> futureList = exportTimeSeriesInBatch (projectName , datastoreTimeSeries );
@@ -249,7 +307,18 @@ public CompletableResultCode shutdown() {
249307 }
250308 CompletableResultCode shutdownResult = new CompletableResultCode ();
251309 try {
252- client .shutdown ();
310+ String key = projectId + ":" + databaseId ;
311+ // Atomically decrement reference count and cleanup if zero.
312+ METRICS_CLIENT_CACHE .compute (
313+ key ,
314+ (k , v ) -> {
315+ if (v != null && v .refCount .decrementAndGet () == 0 ) {
316+ v .client .shutdown ();
317+ return null ; // Remove from map to prevent leaks
318+ }
319+
320+ return v ;
321+ });
253322 shutdownResult .succeed ();
254323 } catch (Throwable e ) {
255324 logger .log (Level .WARNING , "failed to shutdown the monitoring client" , e );
0 commit comments