Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions bazel/external/opentelemetry.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,42 @@ cc_grpc_library(
grpc_only = True,
deps = [":metrics_service_proto_cc"],
)

proto_library(
name = "logs_proto",
srcs = [
"opentelemetry/proto/logs/v1/logs.proto",
],
deps = [
":common_proto",
":resource_proto",
],
)

cc_proto_library(
name = "logs_proto_cc",
deps = [":logs_proto"],
)

proto_library(
name = "logs_service_proto",
srcs = [
"opentelemetry/proto/collector/logs/v1/logs_service.proto",
],
deps = [
":logs_proto",
],
)

cc_proto_library(
name = "logs_service_proto_cc",
deps = [":logs_service_proto"],
)

cc_grpc_library(
name = "logs_service_grpc_cc",
srcs = [":logs_service_proto"],
generate_mocks = True,
grpc_only = True,
deps = [":logs_service_proto_cc"],
)
9 changes: 9 additions & 0 deletions src/carnot/engine_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class EngineState : public NotCopyable {
[this](const std::string& remote_addr, bool insecure) {
return TraceStubGenerator(remote_addr, insecure);
},
[this](const std::string& remote_addr, bool insecure) {
return LogsStubGenerator(remote_addr, insecure);
},
query_id, model_pool_.get(), grpc_router_, add_auth_to_grpc_context_func_, metrics_.get());
}
std::shared_ptr<grpc::Channel> CreateChannel(const std::string& remote_addr, bool insecure) {
Expand Down Expand Up @@ -115,6 +118,12 @@ class EngineState : public NotCopyable {
CreateChannel(remote_addr, insecure));
}

std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>
LogsStubGenerator(const std::string& remote_addr, bool insecure) {
return opentelemetry::proto::collector::logs::v1::LogsService::NewStub(
CreateChannel(remote_addr, insecure));
}

std::unique_ptr<plan::PlanState> CreatePlanState() {
return std::make_unique<plan::PlanState>(func_registry_.get());
}
Expand Down
1 change: 1 addition & 0 deletions src/carnot/exec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pl_cc_library(
"//src/table_store/table:cc_library",
"@com_github_apache_arrow//:arrow",
"@com_github_grpc_grpc//:grpc++",
"@com_github_opentelemetry_proto//:logs_service_grpc_cc",
"@com_github_opentelemetry_proto//:metrics_service_grpc_cc",
"@com_github_opentelemetry_proto//:trace_service_grpc_cc",
"@com_github_rlyeh_sole//:sole",
Expand Down
2 changes: 1 addition & 1 deletion src/carnot/exec/agg_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ std::unique_ptr<ExecState> MakeTestExecState(udf::Registry* registry) {
auto table_store = std::make_shared<table_store::TableStore>();
return std::make_unique<ExecState>(registry, table_store, MockResultSinkStubGenerator,
MockMetricsStubGenerator, MockTraceStubGenerator,
sole::uuid4(), nullptr);
MockLogStubGenerator, sole::uuid4(), nullptr);
}

std::unique_ptr<plan::Operator> PlanNodeFromPbtxt(const std::string& pbtxt) {
Expand Down
6 changes: 3 additions & 3 deletions src/carnot/exec/empty_source_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class EmptySourceNodeTest : public ::testing::Test {
void SetUp() override {
func_registry_ = std::make_unique<udf::Registry>("test_registry");
auto table_store = std::make_shared<table_store::TableStore>();
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
}

std::unique_ptr<ExecState> exec_state_;
Expand Down
6 changes: 3 additions & 3 deletions src/carnot/exec/equijoin_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class JoinNodeTest : public ::testing::Test {
JoinNodeTest() {
func_registry_ = std::make_unique<udf::Registry>("test_registry");
auto table_store = std::make_shared<table_store::TableStore>();
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
}

protected:
Expand Down
20 changes: 10 additions & 10 deletions src/carnot/exec/exec_graph_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ class BaseExecGraphTest : public ::testing::Test {
func_registry_->RegisterOrDie<MultiplyUDF>("multiply");

auto table_store = std::make_shared<table_store::TableStore>();
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
}

std::unique_ptr<udf::Registry> func_registry_;
Expand Down Expand Up @@ -174,7 +174,7 @@ TEST_P(ExecGraphExecuteTest, execute) {
table_store->AddTable("numbers", table);
auto exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

EXPECT_OK(exec_state_->AddScalarUDF(
0, "add", std::vector<types::DataType>({types::DataType::INT64, types::DataType::FLOAT64})));
Expand Down Expand Up @@ -255,7 +255,7 @@ TEST_F(ExecGraphTest, execute_time) {

auto exec_state_ = std::make_unique<ExecState>(
func_registry.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

EXPECT_OK(exec_state_->AddScalarUDF(
0, "add", std::vector<types::DataType>({types::DataType::INT64, types::DataType::FLOAT64})));
Expand Down Expand Up @@ -322,7 +322,7 @@ TEST_F(ExecGraphTest, two_limits_dont_interfere) {
table_store->AddTable("numbers", table);
auto exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

ExecutionGraph e;
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
Expand Down Expand Up @@ -390,7 +390,7 @@ TEST_F(ExecGraphTest, limit_w_multiple_srcs) {
table_store->AddTable("numbers", table);
auto exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

ExecutionGraph e;
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
Expand Down Expand Up @@ -452,7 +452,7 @@ TEST_F(ExecGraphTest, two_sequential_limits) {
table_store->AddTable("numbers", table);
auto exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

ExecutionGraph e;
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
Expand Down Expand Up @@ -515,7 +515,7 @@ TEST_F(ExecGraphTest, execute_with_two_limits) {
table_store->AddTable("numbers", table);
auto exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

ExecutionGraph e;
auto s = e.Init(schema.get(), plan_state.get(), exec_state_.get(), plan_fragment_.get(),
Expand Down Expand Up @@ -702,7 +702,7 @@ class GRPCExecGraphTest : public ::testing::Test {
auto table_store = std::make_shared<table_store::TableStore>();
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr, grpc_router_.get());
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr, grpc_router_.get());
}

void SetUpPlanFragment() {
Expand Down
32 changes: 30 additions & 2 deletions src/carnot/exec/exec_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "src/shared/metadata/metadata_state.h"
#include "src/table_store/table/table_store.h"

#include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
#include "src/carnot/carnotpb/carnot.grpc.pb.h"
Expand All @@ -54,6 +55,9 @@ using MetricsStubGenerator = std::function<
using TraceStubGenerator = std::function<
std::unique_ptr<opentelemetry::proto::collector::trace::v1::TraceService::StubInterface>(
const std::string& address, bool insecure)>;
using LogsStubGenerator = std::function<
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>(
const std::string& address, bool insecure)>;

/**
* ExecState manages the execution state for a single query. A new one will
Expand All @@ -69,15 +73,16 @@ class ExecState {
udf::Registry* func_registry, std::shared_ptr<table_store::TableStore> table_store,
const ResultSinkStubGenerator& stub_generator,
const MetricsStubGenerator& metrics_stub_generator,
const TraceStubGenerator& trace_stub_generator, const sole::uuid& query_id,
udf::ModelPool* model_pool, GRPCRouter* grpc_router = nullptr,
const TraceStubGenerator& trace_stub_generator, const LogsStubGenerator& logs_stub_generator,
const sole::uuid& query_id, udf::ModelPool* model_pool, GRPCRouter* grpc_router = nullptr,
std::function<void(grpc::ClientContext*)> add_auth_func = [](grpc::ClientContext*) {},
ExecMetrics* exec_metrics = nullptr)
: func_registry_(func_registry),
table_store_(std::move(table_store)),
stub_generator_(stub_generator),
metrics_stub_generator_(metrics_stub_generator),
trace_stub_generator_(trace_stub_generator),
logs_stub_generator_(logs_stub_generator),
query_id_(query_id),
model_pool_(model_pool),
grpc_router_(grpc_router),
Expand Down Expand Up @@ -157,6 +162,19 @@ class ExecState {
trace_service_stubs_pool_.push_back(std::move(stub_));
return raw;
}
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface* LogsServiceStub(
const std::string& remote_address, bool insecure) {
if (logs_service_stub_map_.contains(remote_address)) {
return logs_service_stub_map_[remote_address];
}
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface> stub_ =
logs_stub_generator_(remote_address, insecure);
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface* raw = stub_.get();
logs_service_stub_map_[remote_address] = raw;
// Push to the pool.
logs_service_stubs_pool_.push_back(std::move(stub_));
return raw;
}

udf::ScalarUDFDefinition* GetScalarUDFDefinition(int64_t id) { return id_to_scalar_udf_map_[id]; }

Expand Down Expand Up @@ -209,6 +227,7 @@ class ExecState {
const ResultSinkStubGenerator stub_generator_;
const MetricsStubGenerator metrics_stub_generator_;
const TraceStubGenerator trace_stub_generator_;
const LogsStubGenerator logs_stub_generator_;
std::map<int64_t, udf::ScalarUDFDefinition*> id_to_scalar_udf_map_;
std::map<int64_t, udf::UDADefinition*> id_to_uda_map_;
const sole::uuid query_id_;
Expand Down Expand Up @@ -239,6 +258,15 @@ class ExecState {
absl::flat_hash_map<std::string,
opentelemetry::proto::collector::trace::v1::TraceService::StubInterface*>
trace_service_stub_map_;

std::vector<
std::unique_ptr<opentelemetry::proto::collector::logs::v1::LogsService::StubInterface>>
logs_service_stubs_pool_;
absl::flat_hash_map<std::string,
opentelemetry::proto::collector::logs::v1::LogsService::StubInterface*>
logs_service_stub_map_;

types::Time64NSValue time_now_;
};

} // namespace exec
Expand Down
3 changes: 2 additions & 1 deletion src/carnot/exec/expression_evaluator_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
using ScalarExpression = px::carnot::plan::ScalarExpression;
using ScalarExpressionVector = std::vector<std::shared_ptr<ScalarExpression>>;
using px::carnot::exec::ExecState;
using px::carnot::exec::MockLogStubGenerator;
using px::carnot::exec::MockMetricsStubGenerator;
using px::carnot::exec::MockResultSinkStubGenerator;
using px::carnot::exec::MockTraceStubGenerator;
Expand Down Expand Up @@ -85,7 +86,7 @@ void BM_ScalarExpressionTwoCols(benchmark::State& state,
PX_CHECK_OK(func_registry->Register<AddUDF>("add"));
auto exec_state = std::make_unique<ExecState>(
func_registry.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
EXPECT_OK(exec_state->AddScalarUDF(
0, "add",
std::vector<px::types::DataType>({px::types::DataType::INT64, px::types::DataType::INT64})));
Expand Down
6 changes: 3 additions & 3 deletions src/carnot/exec/expression_evaluator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ class ScalarExpressionTest : public ::testing::TestWithParam<ScalarExpressionEva

EXPECT_TRUE(func_registry_->Register<AddUDF>("add").ok());
EXPECT_TRUE(func_registry_->Register<InitArgUDF>("init_arg").ok());
exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
EXPECT_OK(exec_state_->AddScalarUDF(
0, "add", std::vector<types::DataType>({types::DataType::INT64, types::DataType::INT64})));
EXPECT_OK(
Expand Down
6 changes: 3 additions & 3 deletions src/carnot/exec/filter_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class FilterNodeTest : public ::testing::Test {
EXPECT_OK(func_registry_->Register<StrEqUDF>("eq"));
auto table_store = std::make_shared<table_store::TableStore>();

exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
EXPECT_OK(exec_state_->AddScalarUDF(
0, "eq", std::vector<types::DataType>({types::DataType::INT64, types::DataType::INT64})));
EXPECT_OK(exec_state_->AddScalarUDF(
Expand Down
4 changes: 2 additions & 2 deletions src/carnot/exec/grpc_router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ TEST_F(GRPCRouterTest, threaded_router_test) {
auto table_store = std::make_shared<table_store::TableStore>();
auto exec_state = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

MockExecNode mock_child;

Expand Down Expand Up @@ -520,7 +520,7 @@ TEST_F(GRPCRouterTest, threaded_router_test_multi_writer) {
auto table_store = std::make_shared<table_store::TableStore>();
auto exec_state = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

MockExecNode mock_child0;
MockExecNode mock_child1;
Expand Down
5 changes: 3 additions & 2 deletions src/carnot/exec/grpc_sink_node_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "src/shared/types/types.h"
#include "src/shared/types/typespb/types.pb.h"

using px::carnot::exec::MockLogStubGenerator;
using px::carnot::exec::MockMetricsStubGenerator;
using px::carnot::exec::MockTraceStubGenerator;
using px::carnotpb::MockResultSinkServiceStub;
Expand All @@ -61,8 +62,8 @@ void BM_GRPCSinkNodeSplitting(benchmark::State& state) {
func_registry.get(), table_store,
[&](const std::string&, const std::string&)
-> std::unique_ptr<ResultSinkService::StubInterface> { return std::move(mock_unique); },
MockMetricsStubGenerator, MockTraceStubGenerator, sole::uuid4(), nullptr, nullptr,
[&](grpc::ClientContext*) {});
MockMetricsStubGenerator, MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(),
nullptr, nullptr, [&](grpc::ClientContext*) {});
TransferResultChunkResponse resp;
resp.set_success(true);
auto writer =
Expand Down
4 changes: 2 additions & 2 deletions src/carnot/exec/grpc_sink_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class GRPCSinkNodeTest : public ::testing::Test {
const std::string&) -> std::unique_ptr<ResultSinkService::StubInterface> {
return std::move(mock_unique_);
},
MockMetricsStubGenerator, MockTraceStubGenerator, sole::uuid4(), nullptr, nullptr,
[this](grpc::ClientContext*) { add_metadata_called_ = true; });
MockMetricsStubGenerator, MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(),
nullptr, nullptr, [this](grpc::ClientContext*) { add_metadata_called_ = true; });

table_store::schema::Relation rel({types::DataType::BOOLEAN, types::DataType::TIME64NS},
{"col1", "time_"});
Expand Down
6 changes: 3 additions & 3 deletions src/carnot/exec/grpc_source_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class GRPCSourceNodeTest : public ::testing::Test {
func_registry_ = std::make_unique<udf::Registry>("test_registry");
auto table_store = std::make_shared<table_store::TableStore>();

exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);

table_store::schema::Relation rel({types::DataType::BOOLEAN, types::DataType::TIME64NS},
{"col1", "time_"});
Expand Down
6 changes: 3 additions & 3 deletions src/carnot/exec/limit_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class LimitNodeTest : public ::testing::Test {

auto table_store = std::make_shared<table_store::TableStore>();

exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
}

protected:
Expand Down
6 changes: 3 additions & 3 deletions src/carnot/exec/map_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ class MapNodeTest : public ::testing::Test {
EXPECT_OK(func_registry_->Register<AddUDF>("add"));
auto table_store = std::make_shared<table_store::TableStore>();

exec_state_ = std::make_unique<ExecState>(func_registry_.get(), table_store,
MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, sole::uuid4(), nullptr);
exec_state_ = std::make_unique<ExecState>(
func_registry_.get(), table_store, MockResultSinkStubGenerator, MockMetricsStubGenerator,
MockTraceStubGenerator, MockLogStubGenerator, sole::uuid4(), nullptr);
EXPECT_OK(exec_state_->AddScalarUDF(
0, "add", std::vector<types::DataType>({types::DataType::INT64, types::DataType::INT64})));
}
Expand Down
Loading
Loading