Skip to content

Commit 2db4c86

Browse files
committed
Add OTel log support to Carnot plan protos and implement
exec node support Signed-off-by: Dom Del Nano <ddelnano@gmail.com> (cherry picked from commit ac0d014)
1 parent eb2722d commit 2db4c86

9 files changed

Lines changed: 1116 additions & 225 deletions

File tree

bazel/external/opentelemetry.BUILD

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,42 @@ cc_grpc_library(
123123
grpc_only = True,
124124
deps = [":metrics_service_proto_cc"],
125125
)
126+
127+
proto_library(
128+
name = "logs_proto",
129+
srcs = [
130+
"opentelemetry/proto/logs/v1/logs.proto",
131+
],
132+
deps = [
133+
":common_proto",
134+
":resource_proto",
135+
],
136+
)
137+
138+
cc_proto_library(
139+
name = "logs_proto_cc",
140+
deps = [":logs_proto"],
141+
)
142+
143+
proto_library(
144+
name = "logs_service_proto",
145+
srcs = [
146+
"opentelemetry/proto/collector/logs/v1/logs_service.proto",
147+
],
148+
deps = [
149+
":logs_proto",
150+
],
151+
)
152+
153+
cc_proto_library(
154+
name = "logs_service_proto_cc",
155+
deps = [":logs_service_proto"],
156+
)
157+
158+
cc_grpc_library(
159+
name = "logs_service_grpc_cc",
160+
srcs = [":logs_service_proto"],
161+
generate_mocks = True,
162+
grpc_only = True,
163+
deps = [":logs_service_proto_cc"],
164+
)

src/carnot/exec/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pl_cc_library(
4747
"//src/table_store/table:cc_library",
4848
"@com_github_apache_arrow//:arrow",
4949
"@com_github_grpc_grpc//:grpc++",
50+
"@com_github_opentelemetry_proto//:logs_service_grpc_cc",
5051
"@com_github_opentelemetry_proto//:metrics_service_grpc_cc",
5152
"@com_github_opentelemetry_proto//:trace_service_grpc_cc",
5253
"@com_github_rlyeh_sole//:sole",

src/carnot/exec/exec_state.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "src/shared/metadata/metadata_state.h"
3838
#include "src/table_store/table/table_store.h"
3939

40+
#include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
4041
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
4142
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
4243
#include "src/carnot/carnotpb/carnot.grpc.pb.h"
@@ -54,6 +55,9 @@ using MetricsStubGenerator = std::function<
5455
using TraceStubGenerator = std::function<
5556
std::unique_ptr<opentelemetry::proto::collector::trace::v1::TraceService::StubInterface>(
5657
const std::string& address, bool insecure)>;
58+
using LogsStubGenerator = std::function<
59+
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>(
60+
const std::string& address, bool insecure)>;
5761

5862
/**
5963
* ExecState manages the execution state for a single query. A new one will
@@ -69,15 +73,16 @@ class ExecState {
6973
udf::Registry* func_registry, std::shared_ptr<table_store::TableStore> table_store,
7074
const ResultSinkStubGenerator& stub_generator,
7175
const MetricsStubGenerator& metrics_stub_generator,
72-
const TraceStubGenerator& trace_stub_generator, const sole::uuid& query_id,
73-
udf::ModelPool* model_pool, GRPCRouter* grpc_router = nullptr,
76+
const TraceStubGenerator& trace_stub_generator, const LogsStubGenerator& logs_stub_generator,
77+
const sole::uuid& query_id, udf::ModelPool* model_pool, GRPCRouter* grpc_router = nullptr,
7478
std::function<void(grpc::ClientContext*)> add_auth_func = [](grpc::ClientContext*) {},
7579
ExecMetrics* exec_metrics = nullptr)
7680
: func_registry_(func_registry),
7781
table_store_(std::move(table_store)),
7882
stub_generator_(stub_generator),
7983
metrics_stub_generator_(metrics_stub_generator),
8084
trace_stub_generator_(trace_stub_generator),
85+
logs_stub_generator_(logs_stub_generator),
8186
query_id_(query_id),
8287
model_pool_(model_pool),
8388
grpc_router_(grpc_router),
@@ -157,6 +162,19 @@ class ExecState {
157162
trace_service_stubs_pool_.push_back(std::move(stub_));
158163
return raw;
159164
}
165+
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface* LogsServiceStub(
166+
const std::string& remote_address, bool insecure) {
167+
if (logs_service_stub_map_.contains(remote_address)) {
168+
return logs_service_stub_map_[remote_address];
169+
}
170+
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface> stub_ =
171+
logs_stub_generator_(remote_address, insecure);
172+
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface* raw = stub_.get();
173+
logs_service_stub_map_[remote_address] = raw;
174+
// Push to the pool.
175+
logs_service_stubs_pool_.push_back(std::move(stub_));
176+
return raw;
177+
}
160178

161179
udf::ScalarUDFDefinition* GetScalarUDFDefinition(int64_t id) { return id_to_scalar_udf_map_[id]; }
162180

@@ -209,6 +227,7 @@ class ExecState {
209227
const ResultSinkStubGenerator stub_generator_;
210228
const MetricsStubGenerator metrics_stub_generator_;
211229
const TraceStubGenerator trace_stub_generator_;
230+
const LogsStubGenerator logs_stub_generator_;
212231
std::map<int64_t, udf::ScalarUDFDefinition*> id_to_scalar_udf_map_;
213232
std::map<int64_t, udf::UDADefinition*> id_to_uda_map_;
214233
const sole::uuid query_id_;
@@ -239,6 +258,15 @@ class ExecState {
239258
absl::flat_hash_map<std::string,
240259
opentelemetry::proto::collector::trace::v1::TraceService::StubInterface*>
241260
trace_service_stub_map_;
261+
262+
std::vector<
263+
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>>
264+
logs_service_stubs_pool_;
265+
absl::flat_hash_map<std::string,
266+
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface*>
267+
logs_service_stub_map_;
268+
269+
types::Time64NSValue time_now_;
242270
};
243271

244272
} // namespace exec

src/carnot/exec/otel_export_sink_node.cc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ Status OTelExportSinkNode::OpenImpl(ExecState* exec_state) {
7777
if (plan_node_->spans().size()) {
7878
trace_service_stub_ = exec_state->TraceServiceStub(plan_node_->url(), plan_node_->insecure());
7979
}
80+
if (plan_node_->logs().size()) {
81+
logs_service_stub_ = exec_state->LogsServiceStub(plan_node_->url(), plan_node_->insecure());
82+
}
8083
return Status::OK();
8184
}
8285

@@ -438,13 +441,79 @@ Status OTelExportSinkNode::ConsumeSpans(ExecState* exec_state, const RowBatch& r
438441
return Status::OK();
439442
}
440443

444+
using ::opentelemetry::proto::logs::v1::ResourceLogs;
445+
Status OTelExportSinkNode::ConsumeLogs(ExecState* exec_state, const RowBatch& rb) {
446+
grpc::ClientContext context;
447+
for (const auto& header : plan_node_->endpoint_headers()) {
448+
context.AddMetadata(header.first, header.second);
449+
}
450+
context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
451+
452+
logs_response_.Clear();
453+
opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request;
454+
455+
for (int64_t row_idx = 0; row_idx < rb.ColumnAt(0)->length(); ++row_idx) {
456+
// TODO(ddelnano) aggregate spans by resource.
457+
::opentelemetry::proto::logs::v1::ResourceLogs resource_logs;
458+
auto resource = resource_logs.mutable_resource();
459+
AddAttributes(resource->mutable_attributes(), plan_node_->resource_attributes_normal_encoding(),
460+
rb, row_idx);
461+
auto scope_logs = resource_logs.add_scope_logs();
462+
for (const auto& log_pb : plan_node_->logs()) {
463+
auto log = scope_logs->add_log_records();
464+
465+
AddAttributes(log->mutable_attributes(), log_pb.attributes(), rb, row_idx);
466+
467+
auto time_col = rb.ColumnAt(log_pb.time_column_index()).get();
468+
log->set_time_unix_nano(types::GetValueFromArrowArray<types::TIME64NS>(time_col, row_idx));
469+
if (log_pb.observed_time_column_index() >= 0) {
470+
auto observed_time_col = rb.ColumnAt(log_pb.observed_time_column_index()).get();
471+
log->set_observed_time_unix_nano(
472+
types::GetValueFromArrowArray<types::TIME64NS>(observed_time_col, row_idx));
473+
} else {
474+
log->set_observed_time_unix_nano(
475+
types::GetValueFromArrowArray<types::TIME64NS>(time_col, row_idx));
476+
}
477+
log->set_severity_number(
478+
static_cast<::opentelemetry::proto::logs::v1::SeverityNumber>(log_pb.severity_number()));
479+
log->set_severity_text(log_pb.severity_text());
480+
log->mutable_body()->set_string_value(types::GetValueFromArrowArray<types::STRING>(
481+
rb.ColumnAt(log_pb.body_column_index()).get(), row_idx));
482+
}
483+
484+
ReplicateData<ResourceLogs>(
485+
plan_node_->resource_attributes_optional_json_encoded(),
486+
[&request](ResourceLogs log) { *request.add_resource_logs() = std::move(log); },
487+
std::move(resource_logs), rb, row_idx);
488+
}
489+
// Set timeout, to avoid blocking on query.
490+
if (plan_node_->timeout() > 0) {
491+
std::chrono::system_clock::time_point deadline =
492+
std::chrono::system_clock::now() + std::chrono::seconds{plan_node_->timeout()};
493+
context.set_deadline(deadline);
494+
}
495+
496+
grpc::Status status = logs_service_stub_->Export(&context, request, &logs_response_);
497+
if (!status.ok()) {
498+
if (status.error_code() == grpc::StatusCode::DEADLINE_EXCEEDED) {
499+
exec_state->exec_metrics()->otlp_spans_timeout_counter.Increment();
500+
}
501+
502+
return FormatOTelStatus(plan_node_->id(), status);
503+
}
504+
return Status::OK();
505+
}
506+
441507
Status OTelExportSinkNode::ConsumeNextImpl(ExecState* exec_state, const RowBatch& rb, size_t) {
442508
if (plan_node_->metrics().size()) {
443509
PX_RETURN_IF_ERROR(ConsumeMetrics(exec_state, rb));
444510
}
445511
if (plan_node_->spans().size()) {
446512
PX_RETURN_IF_ERROR(ConsumeSpans(exec_state, rb));
447513
}
514+
if (plan_node_->logs().size()) {
515+
PX_RETURN_IF_ERROR(ConsumeLogs(exec_state, rb));
516+
}
448517
if (rb.eos()) {
449518
sent_eos_ = true;
450519
}

src/carnot/exec/otel_export_sink_node.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@
2121
#include <memory>
2222
#include <string>
2323

24-
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
25-
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
26-
2724
#include "src/carnot/exec/exec_node.h"
2825
#include "src/carnot/planpb/plan.pb.h"
2926
#include "src/common/base/base.h"
@@ -51,6 +48,7 @@ class OTelExportSinkNode : public SinkNode {
5148
size_t parent_index) override;
5249

5350
private:
51+
Status ConsumeLogs(ExecState* exec_state, const table_store::schema::RowBatch& rb);
5452
Status ConsumeMetrics(ExecState* exec_state, const table_store::schema::RowBatch& rb);
5553
Status ConsumeSpans(ExecState* exec_state, const table_store::schema::RowBatch& rb);
5654

@@ -60,6 +58,8 @@ class OTelExportSinkNode : public SinkNode {
6058
metrics_service_stub_;
6159
opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse trace_response_;
6260
opentelemetry::proto::collector::trace::v1::TraceService::StubInterface* trace_service_stub_;
61+
opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse logs_response_;
62+
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface* logs_service_stub_;
6363
std::unique_ptr<plan::OTelExportSinkOperator> plan_node_;
6464

6565
std::unique_ptr<SpanConfig> span_config_;

0 commit comments

Comments
 (0)