diff --git a/Cargo.lock b/Cargo.lock index 9c56b98a..9e06c379 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3314,6 +3314,9 @@ dependencies = [ "http 1.4.0", "http-body-util", "lambda_http", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "serde", "serde_json", "serde_urlencoded", @@ -3321,6 +3324,8 @@ dependencies = [ "tower 0.5.2", "tower-http", "tracing", + "tracing-allocations", + "tracing-opentelemetry", "tracing-subscriber", "uuid", ] diff --git a/config/.env.example b/config/.env.example new file mode 100644 index 00000000..e456ae9c --- /dev/null +++ b/config/.env.example @@ -0,0 +1,7 @@ +METASTORE_CONFIG=config/metastore.yaml +JWT_SECRET=secret +RUST_LOG=info +TRACING_LEVEL=info +OTEL_SERVICE_NAME=embucket-lambda-api +HONEYCOMB_API_KEY=full_ingest_key +HONEYCOMB_ENDPOINT_URL=api.honeycomb.io:443 \ No newline at end of file diff --git a/crates/api-snowflake-rest/src/tests/create_test_server.rs b/crates/api-snowflake-rest/src/tests/create_test_server.rs index ebd0f535..96b3a1be 100644 --- a/crates/api-snowflake-rest/src/tests/create_test_server.rs +++ b/crates/api-snowflake-rest/src/tests/create_test_server.rs @@ -30,6 +30,7 @@ pub fn executor_default_cfg() -> UtilsConfig { UtilsConfig::default().with_max_concurrency_level(2) } +#[must_use] pub fn metastore_default_settings_cfg() -> MetastoreSettingsConfig { MetastoreSettingsConfig::default() .with_object_store_connect_timeout(1) diff --git a/crates/embucket-lambda/Cargo.toml b/crates/embucket-lambda/Cargo.toml index e03e8192..b99aa8bb 100644 --- a/crates/embucket-lambda/Cargo.toml +++ b/crates/embucket-lambda/Cargo.toml @@ -12,8 +12,6 @@ executor = { path = "../executor" } build-info = { path = "../build-info" } lambda_http = "0.17" tokio = { workspace = true } -tracing = { workspace = true } -tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } base64 = "0.22" serde = { workspace = true } serde_json = { workspace = true } @@ -26,6 +24,13 @@ flate2 = { version = "1", default-features = false, features = ["rust_backend"] tower = { workspace = true } tower-http = { workspace = true } cfg-if = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "registry", "fmt", "json"] } +tracing-opentelemetry = { version = "0.31.0" } +tracing-allocations = { version = "0.1.0", optional = true } +opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic"] } +opentelemetry_sdk = { version = "0.30.0" } +opentelemetry = { version = "0.30.0" } [lints] workspace = true @@ -54,7 +59,3 @@ tracing = "Active" # Note: include path is relative to workspace root # Must run deploy from workspace root: cargo lambda deploy --binary-name bootstrap include = ["config"] - -[package.metadata.lambda.deploy.env] -LOG_FORMAT = "json" -METASTORE_CONFIG = "config/metastore.yaml" diff --git a/crates/embucket-lambda/Makefile b/crates/embucket-lambda/Makefile index d60d679b..b4fb3dc6 100644 --- a/crates/embucket-lambda/Makefile +++ b/crates/embucket-lambda/Makefile @@ -2,12 +2,21 @@ # Function name (override with: make deploy FUNCTION_NAME=your-function) FUNCTION_NAME ?= embucket-lambda -ENV_FILE ?= .env + +ENV_FILE ?= config/.env +IAM_ROLE ?= arn:aws:iam::767397688925:role/EmbucketLambdaExecRole +# --layer-arn required for flexibiity +# otel-collector-config goes second as it overrides original config from opentelemetry-collector +OTEL_COLLECTOR_LAYERS ?= \ + --layer-arn arn:aws:lambda:us-east-2:184161586896:layer:opentelemetry-collector-arm64-0_19_0:1\ + --layer-arn arn:aws:lambda:us-east-2:767397688925:layer:otel-collector-config:22\ + --layer-arn arn:aws:lambda:us-east-2:580247275435:layer:LambdaInsightsExtension-Arm64:33 + # supported features: "streaming" FEATURES_PARAM := $(if $(FEATURES),--features $(FEATURES)) build: - cd ../.. && cargo lambda build --release --arm64 --manifest-path crates/embucket-lambda/Cargo.toml $(FEATURES_PARAM) + cd ../.. && cargo lambda build --release -p embucket-lambda --arm64 -o zip --include config/metastore.yaml --manifest-path crates/embucket-lambda/Cargo.toml $(FEATURES_PARAM) # Deploy to AWS (must run from workspace root for include paths to work) deploy: build deploy-only @@ -15,7 +24,25 @@ deploy: build deploy-only # Quick deploy without rebuild deploy-only: - cd ../.. && cargo lambda deploy --binary-name bootstrap $(FUNCTION_NAME) --env-file $(ENV_FILE) + @test -n "$(FUNCTION_NAME)" || (echo "ERROR: Missing function name. Use: make deploy " >&2; exit 1) + cd ../.. && cargo lambda deploy $(OTEL_COLLECTOR_LAYERS) --iam-role $(IAM_ROLE) --env-file $(ENV_FILE) --binary-name bootstrap $(FUNCTION_NAME) + aws logs create-log-group --log-group-name "/aws/lambda/$(FUNCTION_NAME)" >/dev/null 2>&1 || true + @$(MAKE) url FUNCTION_NAME=$(FUNCTION_NAME) + +url: + @test -n "$(FUNCTION_NAME)" || (echo "ERROR: Missing function name. Use: make url " >&2; exit 1) + @set -e; \ + echo "Ensuring Function URL config exists for $(FUNCTION_NAME) (auth: NONE)..." ; \ + aws lambda create-function-url-config --function-name "$(FUNCTION_NAME)" --auth-type NONE >/dev/null 2>&1 || \ + aws lambda update-function-url-config --function-name "$(FUNCTION_NAME)" --auth-type NONE >/dev/null ; \ + echo "Ensuring public invoke permission exists..." ; \ + aws lambda add-permission --function-name "$(FUNCTION_NAME)" \ + --statement-id AllowPublicURLInvoke \ + --action lambda:InvokeFunctionUrl \ + --principal "*" \ + --function-url-auth-type NONE >/dev/null 2>&1 || true ; \ + URL="$$(aws lambda get-function-url-config --function-name "$(FUNCTION_NAME)" --query 'FunctionUrl' --output text)"; \ + echo "$$URL" # Watch locally for development watch: @@ -27,6 +54,7 @@ test: # Tail lambda logs logs: + @test -n "$(FUNCTION_NAME)" || (echo "ERROR: Missing function name. Use: make deploy " >&2; exit 1) aws logs tail /aws/lambda/$(FUNCTION_NAME) --since 5m --follow # Verify deployment with snow CLI diff --git a/crates/embucket-lambda/README.md b/crates/embucket-lambda/README.md index 9cb44dbb..c196e54f 100644 --- a/crates/embucket-lambda/README.md +++ b/crates/embucket-lambda/README.md @@ -32,6 +32,9 @@ make verify make logs ``` +`make deploy` will build, deploy with .env vars and with the iam-role in the makefile, add logs group, create a url, add allow uri access policy. And volia - it works. +P.S. Don't forget to change host url in the `snowcli` config. + The function name defaults to `embucket-lambda` but can be overridden: - Via Makefile variable: `make deploy FUNCTION_NAME=my-function` - Via environment variable: `export FUNCTION_NAME=my-function && make deploy` @@ -77,6 +80,21 @@ cargo lambda deploy --binary-name bootstrap ``` - It will deploy envs from `.env` if `ENV_FILE` not specified +### Observability + + +#### AWS traces +We send events, spans to stdout log in json format, and in case if AWS X-Ray is enabled it enhances traces. +- `RUST_LOG` - Controls verbosity log level. Default to "INFO", possible values: "OFF", "ERROR", "WARN", "INFO", "DEBUG", "TRACE". + +#### Exporting telemetry spans to [**honeycomb.io**](https://docs.honeycomb.io/send-data/opentelemetry/collector/) +- Required environment variables configuring remote Observability platform: + * `HONEYCOMB_API_KEY` + * `HONEYCOMB_ENDPOINT_URL` +- Optional: + * `OTEL_SERVICE_NAME` + - `TRACING_LEVEL` - verbosity level, default to "INFO", possible values: "OFF", "ERROR", "WARN", "INFO", "DEBUG", "TRACE". + ### Test locally ```bash @@ -107,5 +125,6 @@ aws logs tail /aws/lambda/embucket-lambda --since 5m --follow - `LOG_FORMAT`: json - `METASTORE_CONFIG`: config/metastore.yaml - `RUST_LOG`: (optional) Set logging level, defaults to "info" +- `TRACING_LEVEL`: (optional) Set tracing level, defaults to "info" diff --git a/crates/embucket-lambda/extensions/collector-config/README.md b/crates/embucket-lambda/extensions/collector-config/README.md new file mode 100644 index 00000000..372b8400 --- /dev/null +++ b/crates/embucket-lambda/extensions/collector-config/README.md @@ -0,0 +1,9 @@ +To update the layer config: +1. `cd crates/embucket-lambda/extensions` +2. `zip -r otel-collector-config-layer.zip collector-config` +3. `cd ../../..` +4. `aws lambda publish-layer-version +--layer-name otel-collector-config +--zip-file fileb://config/layer-root/otel-collector-config-layer.zip +--compatible-runtimes provided.al2 provided.al2023 +--compatible-architectures arm64` \ No newline at end of file diff --git a/crates/embucket-lambda/extensions/collector-config/config.yml b/crates/embucket-lambda/extensions/collector-config/config.yml new file mode 100644 index 00000000..958d0fb6 --- /dev/null +++ b/crates/embucket-lambda/extensions/collector-config/config.yml @@ -0,0 +1,23 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + +exporters: + otlp: + endpoint: "${env:HONEYCOMB_ENDPOINT_URL}" + headers: + x-honeycomb-team: "${env:HONEYCOMB_API_KEY}" + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp] \ No newline at end of file diff --git a/crates/embucket-lambda/src/config.rs b/crates/embucket-lambda/src/config.rs index 800eb54b..90d4194f 100644 --- a/crates/embucket-lambda/src/config.rs +++ b/crates/embucket-lambda/src/config.rs @@ -25,6 +25,10 @@ pub struct EnvConfig { pub iceberg_catalog_timeout_secs: u64, pub object_store_timeout_secs: u64, pub object_store_connect_timeout_secs: u64, + pub otel_grpc: bool, + pub tracing_level: String, + pub log_format: String, + pub log_filter: String, } impl EnvConfig { @@ -58,6 +62,10 @@ impl EnvConfig { object_store_timeout_secs: parse_env("OBJECT_STORE_TIMEOUT_SECS").unwrap_or(30), object_store_connect_timeout_secs: parse_env("OBJECT_STORE_CONNECT_TIMEOUT_SECS") .unwrap_or(3), + otel_grpc: parse_env("OTEL_GRPC").unwrap_or(true), + tracing_level: env_or_default("TRACING_LEVEL", "INFO"), + log_format: env_or_default("LOG_FORMAT", "json"), + log_filter: parse_env("RUST_LOG").unwrap_or_else(|| "INFO".to_string()), } } diff --git a/crates/embucket-lambda/src/main.rs b/crates/embucket-lambda/src/main.rs index 8fe894d2..43d06fe8 100644 --- a/crates/embucket-lambda/src/main.rs +++ b/crates/embucket-lambda/src/main.rs @@ -15,11 +15,20 @@ use catalog_metastore::metastore_settings_config::MetastoreSettingsConfig; use http::HeaderMap; use http_body_util::BodyExt; use lambda_http::{Body as LambdaBody, Error as LambdaError, Request, Response, service_fn}; +use opentelemetry::trace::TracerProvider; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::SdkTracerProvider; +use std::fs; use std::io::IsTerminal; use std::net::{IpAddr, SocketAddr}; +use std::path::Path; use std::sync::Arc; use tower::ServiceExt; use tracing::{error, info}; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::filter::{LevelFilter, Targets}; +use tracing_subscriber::{Layer, layer::SubscriberExt, util::SubscriberInitExt}; cfg_if::cfg_if! { if #[cfg(feature = "streaming")] { use lambda_http::run_with_streaming_response as run; @@ -30,20 +39,29 @@ cfg_if::cfg_if! { type InitResult = Result>; +const DISABLED_TARGETS: [&str; 2] = ["h2", "aws_smithy_runtime"]; + +fn list_dir(path: &str) -> std::io::Result> { + let mut files = Vec::new(); + for entry in fs::read_dir(Path::new(path))? { + files.push(entry?.path().display().to_string()); + } + Ok(files) +} + #[tokio::main] async fn main() -> Result<(), LambdaError> { - init_tracing(); + let env_config = EnvConfig::from_env(); + + let tracing_provider = init_tracing_and_logs(&env_config); - // Log version and build information on startup + // for unknown reason following printed in some strange way info!( - version = %BuildInfo::GIT_DESCRIBE, - git_sha = %BuildInfo::GIT_SHA_SHORT, - git_branch = %BuildInfo::GIT_BRANCH, - build_timestamp = %BuildInfo::BUILD_TIMESTAMP, - "embucket-lambda started" + lambda_configs = ?list_dir("/var/task/config").unwrap_or_default(), + lambda_extensions_configs = ?list_dir("/opt").unwrap_or_default(), + "Lambda files" ); - let env_config = EnvConfig::from_env(); info!( data_format = %env_config.data_format, max_concurrency = env_config.max_concurrency_level, @@ -65,11 +83,18 @@ async fn main() -> Result<(), LambdaError> { err })?); - run(service_fn(move |event: Request| { + let err = run(service_fn(move |event: Request| { let app = Arc::clone(&app); async move { app.handle_event(event).await } })) - .await + .await; + + tracing_provider.shutdown().map_err(|err| { + error!(error = %err, "Failed to shutdown TracerProvider"); + err + })?; + + err } struct LambdaApp { @@ -154,10 +179,6 @@ impl LambdaApp { ); } - // if let Err(err) = ensure_session_header(&mut parts.headers, &self.state).await { - // return Ok(snowflake_error_response(&err)); - // } - let mut axum_request = to_axum_request(parts, body_bytes); if let Some(addr) = extract_socket_addr(axum_request.headers()) { axum_request.extensions_mut().insert(ConnectInfo(addr)); @@ -230,31 +251,75 @@ fn extract_socket_addr(headers: &HeaderMap) -> Option { .map(|ip| SocketAddr::new(ip, 0)) } -fn init_tracing() { - let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()); - let emit_ansi = std::io::stdout().is_terminal(); - - // Use json format if requested via env var, otherwise use pretty format with span events - let format = std::env::var("LOG_FORMAT").unwrap_or_else(|_| "pretty".to_string()); - - if format == "json" { - let _ = tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(true) - .with_ansi(false) - .json() - .with_current_span(true) - .with_span_list(true) - .try_init(); +#[allow(clippy::expect_used, clippy::redundant_closure_for_method_calls)] +fn init_tracing_and_logs(config: &EnvConfig) -> SdkTracerProvider { + let exporter = if config.otel_grpc { + // Initialize OTLP exporter using gRPC (Tonic) + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build() + .expect("Failed to create OTLP gRPC exporter") + } else { + // Initialize OTLP exporter using HTTP + opentelemetry_otlp::SpanExporter::builder() + .with_http() + .build() + .expect("Failed to create OTLP HTTP exporter") + }; + + let resource = Resource::builder().build(); + + let tracing_provider = SdkTracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter).build()) + .with_resource(resource) + .build(); + + let targets_with_level = + |targets: &[&'static str], level: LevelFilter| -> Vec<(&str, LevelFilter)> { + // let default_log_targets: Vec<(String, LevelFilter)> = + targets.iter().map(|t| ((*t), level)).collect() + }; + + let registry = tracing_subscriber::registry() + // Telemetry filtering + .with( + tracing_opentelemetry::OpenTelemetryLayer::new(tracing_provider.tracer("embucket")) + .with_level(true) + .with_filter( + Targets::default() + .with_targets(targets_with_level(&DISABLED_TARGETS, LevelFilter::OFF)) + .with_default(config.tracing_level.parse().unwrap_or(tracing::Level::INFO)), + ), + ); + // Logs filtering + // fmt::layer has different types for json vs plain + if config.log_format == "json" { + registry + .with( + tracing_subscriber::fmt::layer() + .json() + .with_target(false) + .with_ansi(false) + .with_current_span(false) + .with_span_list(false) + .without_time(), + ) + .with(EnvFilter::new(config.log_filter.clone())) + .init(); } else { - let _ = tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(true) - .with_ansi(emit_ansi) - .with_span_events( - tracing_subscriber::fmt::format::FmtSpan::ENTER - | tracing_subscriber::fmt::format::FmtSpan::CLOSE, + registry + .with( + tracing_subscriber::fmt::layer() + .with_target(true) + .with_ansi(std::io::stdout().is_terminal()) + .with_span_events( + tracing_subscriber::fmt::format::FmtSpan::ENTER + | tracing_subscriber::fmt::format::FmtSpan::CLOSE, + ), ) - .try_init(); + .with(EnvFilter::new(config.log_filter.clone())) + .init(); } + + tracing_provider } diff --git a/crates/embucketd/src/cli.rs b/crates/embucketd/src/cli.rs index f6db5875..6c8b3b83 100644 --- a/crates/embucketd/src/cli.rs +++ b/crates/embucketd/src/cli.rs @@ -118,6 +118,14 @@ pub struct CliOpts { )] pub auth_demo_password: Option, + #[arg( + long, + env = "OTEL_GRPC", + default_value = "true", + help = "Enable OTLP gRPC exporter (requires 'otel-grpc' feature)" + )] + pub otel_grpc: bool, + #[arg( long, value_enum, diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index 7f315aea..20aea838 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -207,11 +207,19 @@ async fn async_main( #[allow(clippy::expect_used, clippy::redundant_closure_for_method_calls)] fn setup_tracing(opts: &cli::CliOpts) -> SdkTracerProvider { - // Initialize OTLP exporter using gRPC (Tonic) - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .build() - .expect("Failed to create OTLP exporter"); + let exporter = if opts.otel_grpc { + // Initialize OTLP exporter using gRPC (Tonic) + opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build() + .expect("Failed to create OTLP gRPC exporter") + } else { + // Initialize OTLP exporter using HTTP + opentelemetry_otlp::SpanExporter::builder() + .with_http() + .build() + .expect("Failed to create OTLP HTTP exporter") + }; let resource = Resource::builder().with_service_name("Em").build();