Skip to content

Commit eaacbe8

Browse files
author
Innocent
committed
feat: metrics implementation
1 parent 0a85180 commit eaacbe8

16 files changed

+446
-82
lines changed

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <algorithm>
2323
#include <iterator>
2424

25+
#include "iceberg/metrics_reporters.h"
2526
#include "iceberg/table.h"
2627
#include "iceberg/table_identifier.h"
2728
#include "iceberg/table_metadata.h"
@@ -351,7 +352,12 @@ InMemoryCatalog::InMemoryCatalog(
351352
properties_(std::move(properties)),
352353
file_io_(std::move(file_io)),
353354
warehouse_location_(std::move(warehouse_location)),
354-
root_namespace_(std::make_unique<InMemoryNamespace>()) {}
355+
root_namespace_(std::make_unique<InMemoryNamespace>()) {
356+
auto reporter_result = MetricsReporters::Load(properties_);
357+
if (reporter_result.has_value()) {
358+
reporter_ = std::move(reporter_result.value());
359+
}
360+
}
355361

356362
InMemoryCatalog::~InMemoryCatalog() = default;
357363

@@ -427,7 +433,8 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
427433
ICEBERG_RETURN_UNEXPECTED(
428434
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
429435
return Table::Make(identifier, std::move(table_metadata),
430-
std::move(metadata_file_location), file_io_, shared_from_this());
436+
std::move(metadata_file_location), file_io_, shared_from_this(),
437+
reporter_);
431438
}
432439

433440
Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
@@ -478,7 +485,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
478485
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);
479486

480487
return Table::Make(identifier, std::move(updated), std::move(new_metadata_location),
481-
file_io_, shared_from_this());
488+
file_io_, shared_from_this(), reporter_);
482489
}
483490

484491
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
@@ -499,7 +506,7 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
499506
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
500507
ICEBERG_ASSIGN_OR_RAISE(
501508
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
502-
shared_from_this()));
509+
shared_from_this(), reporter_));
503510
return Transaction::Make(std::move(table), Transaction::Kind::kCreate,
504511
/* auto_commit */ false);
505512
}
@@ -537,7 +544,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
537544
ICEBERG_ASSIGN_OR_RAISE(auto metadata,
538545
TableMetadataUtil::Read(*file_io_, metadata_location));
539546
return Table::Make(identifier, std::move(metadata), std::move(metadata_location),
540-
file_io_, shared_from_this());
547+
file_io_, shared_from_this(), reporter_);
541548
}
542549

543550
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
@@ -557,7 +564,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
557564
return UnknownError("The registry failed.");
558565
}
559566
return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_,
560-
shared_from_this());
567+
shared_from_this(), reporter_);
561568
}
562569

563570
} // namespace iceberg

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <shared_mutex>
2323

2424
#include "iceberg/catalog.h"
25+
#include "iceberg/metrics_reporter.h"
2526

2627
namespace iceberg {
2728

@@ -105,6 +106,7 @@ class ICEBERG_EXPORT InMemoryCatalog
105106
std::shared_ptr<FileIO> file_io_;
106107
std::string warehouse_location_;
107108
std::unique_ptr<class InMemoryNamespace> root_namespace_;
109+
std::shared_ptr<MetricsReporter> reporter_;
108110
mutable std::shared_mutex mutex_;
109111
};
110112

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "iceberg/catalog/rest/rest_util.h"
3838
#include "iceberg/catalog/rest/types.h"
3939
#include "iceberg/json_serde_internal.h"
40+
#include "iceberg/metrics_reporters.h"
4041
#include "iceberg/partition_spec.h"
4142
#include "iceberg/result.h"
4243
#include "iceberg/schema.h"
@@ -154,25 +155,35 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
154155
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
155156
auth_manager->CatalogSession(*client, final_config.configs()));
156157

157-
return std::shared_ptr<RestCatalog>(new RestCatalog(
158-
std::move(final_config), std::move(file_io), std::move(client), std::move(paths),
159-
std::move(endpoints), std::move(auth_manager), std::move(catalog_session)));
158+
// Load metrics reporter from catalog properties
159+
std::shared_ptr<MetricsReporter> reporter;
160+
auto reporter_result = MetricsReporters::Load(final_config.configs());
161+
if (reporter_result.has_value()) {
162+
reporter = std::move(reporter_result.value());
163+
}
164+
165+
return std::shared_ptr<RestCatalog>(
166+
new RestCatalog(std::move(final_config), std::move(file_io), std::move(client),
167+
std::move(paths), std::move(endpoints), std::move(auth_manager),
168+
std::move(catalog_session), std::move(reporter)));
160169
}
161170

162171
RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
163172
std::unique_ptr<HttpClient> client,
164173
std::unique_ptr<ResourcePaths> paths,
165174
std::unordered_set<Endpoint> endpoints,
166175
std::unique_ptr<auth::AuthManager> auth_manager,
167-
std::shared_ptr<auth::AuthSession> catalog_session)
176+
std::shared_ptr<auth::AuthSession> catalog_session,
177+
std::shared_ptr<MetricsReporter> reporter)
168178
: config_(std::move(config)),
169179
file_io_(std::move(file_io)),
170180
client_(std::move(client)),
171181
paths_(std::move(paths)),
172182
name_(config_.Get(RestCatalogProperties::kName)),
173183
supported_endpoints_(std::move(endpoints)),
174184
auth_manager_(std::move(auth_manager)),
175-
catalog_session_(std::move(catalog_session)) {
185+
catalog_session_(std::move(catalog_session)),
186+
reporter_(std::move(reporter)) {
176187
ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null");
177188
}
178189

@@ -338,7 +349,8 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
338349
CreateTableInternal(identifier, schema, spec, order, location,
339350
properties, /*stage_create=*/false));
340351
return Table::Make(identifier, std::move(result.metadata),
341-
std::move(result.metadata_location), file_io_, shared_from_this());
352+
std::move(result.metadata_location), file_io_, shared_from_this(),
353+
reporter_);
342354
}
343355

344356
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -369,7 +381,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
369381

370382
return Table::Make(identifier, std::move(commit_response.metadata),
371383
std::move(commit_response.metadata_location), file_io_,
372-
shared_from_this());
384+
shared_from_this(), reporter_);
373385
}
374386

375387
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
@@ -383,7 +395,7 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
383395
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
384396
StagedTable::Make(identifier, std::move(result.metadata),
385397
std::move(result.metadata_location), file_io_,
386-
shared_from_this()));
398+
shared_from_this(), reporter_));
387399
return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
388400
/*auto_commit=*/false);
389401
}
@@ -446,7 +458,7 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
446458

447459
return Table::Make(identifier, std::move(load_result.metadata),
448460
std::move(load_result.metadata_location), file_io_,
449-
shared_from_this());
461+
shared_from_this(), reporter_);
450462
}
451463

452464
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
@@ -469,7 +481,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
469481
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
470482
return Table::Make(identifier, std::move(load_result.metadata),
471483
std::move(load_result.metadata_location), file_io_,
472-
shared_from_this());
484+
shared_from_this(), reporter_);
473485
}
474486

475487
} // namespace iceberg::rest

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "iceberg/catalog/rest/endpoint.h"
2929
#include "iceberg/catalog/rest/iceberg_rest_export.h"
3030
#include "iceberg/catalog/rest/type_fwd.h"
31+
#include "iceberg/metrics_reporter.h"
3132
#include "iceberg/result.h"
3233

3334
/// \file iceberg/catalog/rest/rest_catalog.h
@@ -109,7 +110,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
109110
std::unique_ptr<HttpClient> client, std::unique_ptr<ResourcePaths> paths,
110111
std::unordered_set<Endpoint> endpoints,
111112
std::unique_ptr<auth::AuthManager> auth_manager,
112-
std::shared_ptr<auth::AuthSession> catalog_session);
113+
std::shared_ptr<auth::AuthSession> catalog_session,
114+
std::shared_ptr<MetricsReporter> reporter);
113115

114116
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
115117

@@ -127,6 +129,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
127129
std::unordered_set<Endpoint> supported_endpoints_;
128130
std::unique_ptr<auth::AuthManager> auth_manager_;
129131
std::shared_ptr<auth::AuthSession> catalog_session_;
132+
std::shared_ptr<MetricsReporter> reporter_;
130133
};
131134

132135
} // namespace iceberg::rest

src/iceberg/manifest/manifest_group.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,30 +321,36 @@ ManifestGroup::ReadEntries() {
321321
ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest));
322322
if (!should_match) {
323323
// Skip this manifest because it doesn't match partition filter
324+
scan_counters_.skipped_data_manifests++;
324325
continue;
325326
}
326327

327328
if (ignore_deleted_) {
328329
// only scan manifests that have entries other than deletes
329330
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
331+
scan_counters_.skipped_data_manifests++;
330332
continue;
331333
}
332334
}
333335

334336
if (ignore_existing_) {
335337
// only scan manifests that have entries other than existing
336338
if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
339+
scan_counters_.skipped_data_manifests++;
337340
continue;
338341
}
339342
}
340343

344+
scan_counters_.scanned_data_manifests++;
345+
341346
// Read manifest entries
342347
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
343348
ICEBERG_ASSIGN_OR_RAISE(auto entries,
344349
ignore_deleted_ ? reader->LiveEntries() : reader->Entries());
345350

346351
for (auto& entry : entries) {
347352
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
353+
scan_counters_.skipped_data_files++;
348354
continue;
349355
}
350356

@@ -354,6 +360,7 @@ ManifestGroup::ReadEntries() {
354360
}
355361

356362
if (!manifest_entry_predicate_(entry)) {
363+
scan_counters_.skipped_data_files++;
357364
continue;
358365
}
359366

src/iceberg/manifest/manifest_group.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@
3939

4040
namespace iceberg {
4141

42+
/// \brief Counters for tracking scan metrics during manifest processing.
43+
struct ICEBERG_EXPORT ScanMetricsCounters {
44+
int64_t scanned_data_manifests = 0;
45+
int64_t skipped_data_manifests = 0;
46+
int64_t scanned_delete_manifests = 0;
47+
int64_t skipped_delete_manifests = 0;
48+
int64_t skipped_data_files = 0;
49+
int64_t skipped_delete_files = 0;
50+
};
51+
4252
/// \brief Context passed to task creation functions.
4353
struct ICEBERG_EXPORT TaskContext {
4454
public:
@@ -120,6 +130,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
120130
/// \param column_ids Field IDs of columns whose statistics should be preserved.
121131
ManifestGroup& ColumnsToKeepStats(std::unordered_set<int32_t> column_ids);
122132

133+
/// \brief Returns the scan metrics counters accumulated during plan operations.
134+
const ScanMetricsCounters& scan_counters() const { return scan_counters_; }
135+
123136
/// \brief Plan scan tasks for all matching data files.
124137
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles();
125138

@@ -162,6 +175,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
162175
bool ignore_deleted_ = false;
163176
bool ignore_existing_ = false;
164177
bool ignore_residuals_ = false;
178+
ScanMetricsCounters scan_counters_;
165179
};
166180

167181
} // namespace iceberg

src/iceberg/metrics_reporter.h

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
#include <memory>
2424
#include <string>
2525
#include <string_view>
26+
#include <unordered_map>
2627
#include <utility>
2728
#include <variant>
29+
#include <vector>
2830

2931
#include "iceberg/iceberg_export.h"
3032

@@ -85,6 +87,25 @@ struct ICEBERG_EXPORT ScanReport {
8587

8688
/// \brief Number of delete manifests that were skipped.
8789
int64_t skipped_delete_manifests = 0;
90+
91+
/// \brief Projected field IDs from the scan schema.
92+
std::vector<int32_t> projected_field_ids;
93+
/// \brief Projected field names from the scan schema.
94+
std::vector<std::string> projected_field_names;
95+
/// \brief Total size in bytes of all result data files.
96+
int64_t total_file_size_in_bytes = 0;
97+
/// \brief Total size in bytes of all result delete files.
98+
int64_t total_delete_file_size_in_bytes = 0;
99+
/// \brief Number of indexed delete files.
100+
int64_t indexed_delete_files = 0;
101+
/// \brief Number of equality delete files in the scan result.
102+
int64_t equality_delete_files = 0;
103+
/// \brief Number of positional delete files in the scan result.
104+
int64_t positional_delete_files = 0;
105+
/// \brief Number of deletion vectors in the scan result.
106+
int64_t dvs = 0;
107+
/// \brief Additional key-value metadata.
108+
std::unordered_map<std::string, std::string> metadata;
88109
};
89110

90111
/// \brief Report generated after a commit operation.
@@ -136,6 +157,47 @@ struct ICEBERG_EXPORT CommitReport {
136157

137158
/// \brief Size in bytes of files removed.
138159
int64_t removed_files_size = 0;
160+
161+
/// \brief Total duration of the commit operation.
162+
DurationMs total_duration{0};
163+
/// \brief Total records after this commit.
164+
int64_t total_records = 0;
165+
/// \brief Total file size in bytes after this commit.
166+
int64_t total_files_size = 0;
167+
/// \brief Equality delete files added.
168+
int64_t added_equality_delete_files = 0;
169+
/// \brief Equality delete files removed.
170+
int64_t removed_equality_delete_files = 0;
171+
/// \brief Positional delete files added.
172+
int64_t added_positional_delete_files = 0;
173+
/// \brief Positional delete files removed.
174+
int64_t removed_positional_delete_files = 0;
175+
/// \brief Position delete records added.
176+
int64_t added_positional_deletes = 0;
177+
/// \brief Position delete records removed.
178+
int64_t removed_positional_deletes = 0;
179+
/// \brief Total position delete records.
180+
int64_t total_positional_deletes = 0;
181+
/// \brief Equality delete records added.
182+
int64_t added_equality_deletes = 0;
183+
/// \brief Equality delete records removed.
184+
int64_t removed_equality_deletes = 0;
185+
/// \brief Total equality delete records.
186+
int64_t total_equality_deletes = 0;
187+
/// \brief Deletion vectors added.
188+
int64_t added_dvs = 0;
189+
/// \brief Deletion vectors removed.
190+
int64_t removed_dvs = 0;
191+
/// \brief Manifests created in this commit.
192+
int64_t manifests_created = 0;
193+
/// \brief Manifests replaced in this commit.
194+
int64_t manifests_replaced = 0;
195+
/// \brief Manifests kept in this commit.
196+
int64_t manifests_kept = 0;
197+
/// \brief Manifest entries processed.
198+
int64_t manifest_entries_processed = 0;
199+
/// \brief Additional key-value metadata.
200+
std::unordered_map<std::string, std::string> metadata;
139201
};
140202

141203
/// \brief The type of a metrics report.

0 commit comments

Comments
 (0)