4747import java .util .ArrayList ;
4848import java .util .Collection ;
4949import java .util .List ;
50- import java .util .concurrent .atomic .AtomicBoolean ;
50+ import java .util .Map ;
51+ import java .util .Set ;
52+ import java .util .concurrent .ConcurrentHashMap ;
5153import java .util .function .Supplier ;
5254import java .util .logging .Level ;
5355import java .util .logging .Logger ;
56+ import java .util .stream .Collectors ;
5457import javax .annotation .Nonnull ;
5558import javax .annotation .Nullable ;
5659
@@ -68,21 +71,12 @@ class SpannerCloudMonitoringExporter implements MetricExporter {
6871 // This the quota limit from Cloud Monitoring. More details in
6972 // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
7073 private static final int EXPORT_BATCH_SIZE_LIMIT = 200 ;
71- private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean ( false );
74+ private final Set < String > spannerExportFailureLoggedProjects = ConcurrentHashMap . newKeySet ( );
7275 private final MetricServiceClient client ;
73- private final Supplier <String > spannerProjectIdSupplier ;
76+ private final Supplier <String > fallbackProjectIdSupplier ;
7477
7578 static SpannerCloudMonitoringExporter create (
76- String projectId ,
77- @ Nullable Credentials credentials ,
78- @ Nullable String monitoringHost ,
79- String universeDomain )
80- throws IOException {
81- return create (() -> projectId , credentials , monitoringHost , universeDomain );
82- }
83-
84- static SpannerCloudMonitoringExporter create (
85- Supplier <String > projectIdSupplier ,
79+ Supplier <String > fallbackProjectIdSupplier ,
8680 @ Nullable Credentials credentials ,
8781 @ Nullable String monitoringHost ,
8882 String universeDomain )
@@ -121,18 +115,19 @@ static SpannerCloudMonitoringExporter create(
121115 settingsBuilder .createServiceTimeSeriesSettings ().setSimpleTimeoutNoRetriesDuration (timeout );
122116
123117 return new SpannerCloudMonitoringExporter (
124- projectIdSupplier , MetricServiceClient .create (settingsBuilder .build ()));
118+ fallbackProjectIdSupplier , MetricServiceClient .create (settingsBuilder .build ()));
125119 }
126120
127121 @ VisibleForTesting
128- SpannerCloudMonitoringExporter (String projectId , MetricServiceClient client ) {
129- this (() -> projectId , client );
122+ SpannerCloudMonitoringExporter (MetricServiceClient client ) {
123+ this (() -> null , client );
130124 }
131125
132126 @ VisibleForTesting
133- SpannerCloudMonitoringExporter (Supplier <String > projectIdSupplier , MetricServiceClient client ) {
127+ SpannerCloudMonitoringExporter (
128+ Supplier <String > fallbackProjectIdSupplier , MetricServiceClient client ) {
134129 this .client = client ;
135- this .spannerProjectIdSupplier = projectIdSupplier ;
130+ this .fallbackProjectIdSupplier = fallbackProjectIdSupplier ;
136131 }
137132
138133 @ Override
@@ -152,10 +147,6 @@ MetricServiceClient getMetricServiceClient() {
152147
153148 /** Export client built in metrics */
154149 private CompletableResultCode exportSpannerClientMetrics (Collection <MetricData > collection ) {
155- String spannerProjectId = spannerProjectIdSupplier .get ();
156- if (Strings .isNullOrEmpty (spannerProjectId )) {
157- return CompletableResultCode .ofSuccess ();
158- }
159150 // Skips exporting if there's none
160151 if (collection .isEmpty ()) {
161152 return CompletableResultCode .ofSuccess ();
@@ -165,7 +156,7 @@ private CompletableResultCode exportSpannerClientMetrics(Collection<MetricData>
165156 try {
166157 spannerTimeSeries =
167158 SpannerCloudMonitoringExporterUtils .convertToSpannerTimeSeries (
168- collection , spannerProjectId );
159+ collection , fallbackProjectIdSupplier . get () );
169160 } catch (Throwable e ) {
170161 logger .log (
171162 Level .WARNING ,
@@ -174,37 +165,60 @@ private CompletableResultCode exportSpannerClientMetrics(Collection<MetricData>
174165 return CompletableResultCode .ofFailure ();
175166 }
176167
177- ProjectName projectName = ProjectName .of (spannerProjectId );
168+ if (spannerTimeSeries .isEmpty ()) {
169+ return CompletableResultCode .ofSuccess ();
170+ }
178171
179- ApiFuture <List <Empty >> futureList = exportTimeSeriesInBatch (projectName , spannerTimeSeries );
172+ Map <String , List <TimeSeries >> timeSeriesByProject =
173+ spannerTimeSeries .stream ()
174+ .collect (
175+ Collectors .groupingBy (
176+ timeSeries ->
177+ timeSeries
178+ .getResource ()
179+ .getLabelsMap ()
180+ .get (BuiltInMetricsConstant .PROJECT_ID_KEY .getKey ())));
181+
182+ List <ApiFuture <List <Empty >>> futures = new ArrayList <>();
183+ for (Map .Entry <String , List <TimeSeries >> entry : timeSeriesByProject .entrySet ()) {
184+ ProjectName projectName = ProjectName .of (entry .getKey ());
185+ ApiFuture <List <Empty >> future = exportTimeSeriesInBatch (projectName , entry .getValue ());
186+ ApiFutures .addCallback (
187+ future ,
188+ new ApiFutureCallback <List <Empty >>() {
189+ @ Override
190+ public void onFailure (Throwable throwable ) {
191+ logExportFailure (throwable , projectName );
192+ }
193+
194+ @ Override
195+ public void onSuccess (List <Empty > ignored ) {
196+ spannerExportFailureLoggedProjects .remove (projectName .getProject ());
197+ }
198+ },
199+ MoreExecutors .directExecutor ());
200+ futures .add (future );
201+ }
202+
203+ ApiFuture <List <List <Empty >>> groupedFuture = ApiFutures .allAsList (futures );
204+ ApiFuture <List <Empty >> futureList =
205+ ApiFutures .transform (
206+ groupedFuture ,
207+ groupedResults ->
208+ groupedResults .stream ().flatMap (List ::stream ).collect (Collectors .toList ()),
209+ MoreExecutors .directExecutor ());
180210
181211 CompletableResultCode spannerExportCode = new CompletableResultCode ();
182212 ApiFutures .addCallback (
183213 futureList ,
184214 new ApiFutureCallback <List <Empty >>() {
185215 @ Override
186216 public void onFailure (Throwable throwable ) {
187- if (spannerExportFailureLogged .compareAndSet (false , true )) {
188- String msg = "createServiceTimeSeries request failed for spanner metrics." ;
189- if (throwable instanceof PermissionDeniedException ) {
190- // TODO: Add the link of public documentation when available in the log message.
191- msg +=
192- String .format (
193- " Need monitoring metric writer permission on project=%s. Follow"
194- + " https://cloud.google.com/spanner/docs/view-manage-client-side-metrics#access-client-side-metrics"
195- + " to set up permissions" ,
196- projectName .getProject ());
197- }
198- logger .log (Level .WARNING , msg , throwable );
199- }
200217 spannerExportCode .fail ();
201218 }
202219
203220 @ Override
204221 public void onSuccess (List <Empty > empty ) {
205- // When an export succeeded reset the export failure flag to false so if there's a
206- // transient failure it'll be logged.
207- spannerExportFailureLogged .set (false );
208222 spannerExportCode .succeed ();
209223 }
210224 },
@@ -213,6 +227,25 @@ public void onSuccess(List<Empty> empty) {
213227 return spannerExportCode ;
214228 }
215229
230+ private void logExportFailure (Throwable throwable , ProjectName projectName ) {
231+ if (spannerExportFailureLoggedProjects .add (projectName .getProject ())) {
232+ String msg = "createServiceTimeSeries request failed for spanner metrics." ;
233+ if (throwable instanceof PermissionDeniedException ) {
234+ // TODO: Add the link of public documentation when available in the log message.
235+ msg +=
236+ String .format (
237+ " Need monitoring metric writer permission on project=%s. Follow"
238+ + " https://cloud.google.com/spanner/docs/view-manage-client-side-metrics"
239+ + "#access-client-side-metrics"
240+ + " to set up permissions" ,
241+ projectName .getProject ());
242+ } else {
243+ msg += String .format (" project=%s." , projectName .getProject ());
244+ }
245+ logger .log (Level .WARNING , msg , throwable );
246+ }
247+ }
248+
216249 private ApiFuture <List <Empty >> exportTimeSeriesInBatch (
217250 ProjectName projectName , List <TimeSeries > timeSeries ) {
218251 List <ApiFuture <Empty >> batchResults = new ArrayList <>();
0 commit comments