chore(vector): add vector pipeline to ship clickhouse events from the edge#2526
Conversation
Deploying rivet-studio with
|
| Latest commit: |
526f37e
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://124bf5a3.rivet-studio.pages.dev |
| Branch Preview URL: | https://06-02-chore-vector-add-vecto.rivet-studio.pages.dev |
b9cc460 to
8d1f2c3
Compare
Deploying rivet with
|
| Latest commit: |
526f37e
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://4f91f438.rivet.pages.dev |
| Branch Preview URL: | https://06-02-chore-vector-add-vecto.rivet.pages.dev |
Deploying rivet-hub with
|
| Latest commit: |
526f37e
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://04f65c11.rivet-hub-7jb.pages.dev |
| Branch Preview URL: | https://06-02-chore-vector-add-vecto.rivet-hub-7jb.pages.dev |
71023be to
58a5eca
Compare
| let clickhouse = crate::db::clickhouse::setup(config.clone())?; | ||
| let sqlite = SqlitePoolManager::new(fdb.clone()).await?; | ||
|
|
||
| // Create the ClickHouse inserter if ClickHouse is enabled |
There was a problem hiding this comment.
| // Create the ClickHouse inserter if ClickHouse is enabled | |
| // Create the ClickHouse inserter if vector is enabled |
| mut target: RouteTarget, | ||
| request_context: &mut RequestContext, | ||
| ) -> GlobalResult<Response<Full<Bytes>>> { | ||
| let _request_start = Instant::now(); |
| let request_id = Uuid::new_v4(); | ||
| let mut request_context = | ||
| RequestContext::new_with_request_id(request_id, self.state.clickhouse_inserter.clone()); |
There was a problem hiding this comment.
Uses new_with_request_id with a new uuid, same as new
| let request_id = Uuid::new_v4(); | |
| let mut request_context = | |
| RequestContext::new_with_request_id(request_id, self.state.clickhouse_inserter.clone()); | |
| let mut request_context = | |
| RequestContext::new(self.state.clickhouse_inserter.clone()); |
| network_ports_ingress: HashMap<String, ActorClickHouseRowPortIngress>, | ||
| network_ports_host: HashMap<String, ActorClickHouseRowPortHost>, | ||
| network_ports_proxied: HashMap<String, ActorClickHouseRowPortProxied>, | ||
| client_id: Uuid, |
There was a problem hiding this comment.
Should we add runner_id now or later?
| if let Some(actor_input) = &input.actor_input { | ||
| // Get metadata | ||
| let meta = ctx | ||
| .activity(GetMetaInput { |
There was a problem hiding this comment.
Shouldn't this also be .v(2)?
|
|
||
| ctx.v(2).activity(InsertClickHouseInput { | ||
| input: input.clone(), | ||
| meta, |
There was a problem hiding this comment.
This metadata includes build data which changes if the actor's build is changed. the metadata is re-fetched on reschedule in reschedule_actor as actor_setup.meta
43b5230 to
64194de
Compare
64194de to
e120ace
Compare
There was a problem hiding this comment.
PR Summary
This PR adds a Vector pipeline to ship ClickHouse events from edge services, with significant changes across the codebase. Here are the key points:
- Introduces new
clickhouse-inserterpackage for batched event insertion with configurable batch sizes and intervals - Adds
http_requeststable in ClickHouse for guard analytics with 30-day TTL and comprehensive request metadata - Creates new
actor_logs2table with 14-day TTL and materialized viewactor_logs2_with_metadatafor enhanced actor logging - Implements request context tracking in guard service with detailed metrics collection and WebSocket support
- Adds non-interactive route management capabilities to CLI with auto-create and auto-sync options
58 file(s) reviewed, 14 comment(s)
Edit PR Review Bot Settings | Greptile
| dynamic_events_http: | ||
| type: http_server | ||
| address: 0.0.0.0:5022 | ||
| encoding: ndjson |
There was a problem hiding this comment.
logic: No transform is defined for dynamic_events_http before sending to sink. Consider adding a transform to ensure consistent metadata tagging like other sources.
| if events.len() >= BATCH_SIZE { | ||
| if let Err(e) = self.send_events(&events).await { | ||
| tracing::error!(?e, "failed to send events to Vector"); | ||
| } | ||
| events.clear(); | ||
| } |
There was a problem hiding this comment.
logic: Events are cleared even if send_events fails. This could result in data loss. Consider only clearing after successful send or implementing a retry mechanism.
| if events.len() >= BATCH_SIZE { | |
| if let Err(e) = self.send_events(&events).await { | |
| tracing::error!(?e, "failed to send events to Vector"); | |
| } | |
| events.clear(); | |
| } | |
| if events.len() >= BATCH_SIZE { | |
| if let Err(e) = self.send_events(&events).await { | |
| tracing::error!(?e, "failed to send events to Vector"); | |
| } else { | |
| events.clear(); | |
| } | |
| } |
| use global_error::GlobalResult; | ||
| use lazy_static::lazy_static; | ||
| use std::{net::IpAddr, time::SystemTime}; | ||
| use tracing::warn; |
There was a problem hiding this comment.
style: tracing::warn is imported but never used in the code
| use tracing::warn; | |
| use global_error::GlobalResult; | |
| use lazy_static::lazy_static; | |
| use std::{net::IpAddr, time::SystemTime}; | |
| use uuid::Uuid; |
| let request_send_start = Instant::now(); | ||
| match timeout(timeout_duration, self.client.request(proxied_req)).await { |
There was a problem hiding this comment.
logic: request_send_start is captured but response_receive_time is calculated and never used. Consider either removing the unused timing or using it for metrics/logging.
| ts DateTime64 (9), | ||
| stream_type UInt8, -- pegboard::types::LogsStreamType | ||
| message String | ||
| ) ENGINE = ReplicatedMergeTree () |
There was a problem hiding this comment.
logic: ReplicatedMergeTree() missing required parameters for zookeeper path and replica name
| ALTER TABLE state ADD project_id BLOB DEFAULT X'00000000000000000000000000000000'; -- UUID | ||
| ALTER TABLE state ADD root_user_enabled INT DEFAULT false; | ||
| ALTER TABLE state ADD build_kind INT DEFAULT -1; | ||
| ALTER TABLE state ADD build_compression INT DEFAULT -1; |
There was a problem hiding this comment.
logic: The default values for build_kind and build_compression are set to -1, which may be invalid enum values. Consider using 0 or NULL if these are meant to represent unset states.
| let state_row = sql_fetch_one!( | ||
| [ctx, StateRow, &pool] | ||
| " | ||
| SELECT | ||
| project_id, | ||
| env_id, | ||
| json(tags) AS tags, | ||
| resources_cpu_millicores, | ||
| resources_memory_mib, | ||
| selected_resources_cpu_millicores, | ||
| selected_resources_memory_mib, | ||
| client_id, | ||
| client_workflow_id, | ||
| client_wan_hostname, | ||
| lifecycle_kill_timeout_ms, | ||
| lifecycle_durable, | ||
| create_ts, | ||
| start_ts, | ||
| connectable_ts, | ||
| finish_ts, | ||
| destroy_ts, | ||
| image_id, | ||
| build_kind, | ||
| build_compression, | ||
| root_user_enabled, | ||
| json(args) AS args, | ||
| network_mode, | ||
| json(environment) AS environment | ||
| FROM state | ||
| ", | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
logic: SQL query lacks a WHERE clause to filter by actor_id, which could return incorrect data if multiple actors exist.
| let state_row = sql_fetch_one!( | |
| [ctx, StateRow, &pool] | |
| " | |
| SELECT | |
| project_id, | |
| env_id, | |
| json(tags) AS tags, | |
| resources_cpu_millicores, | |
| resources_memory_mib, | |
| selected_resources_cpu_millicores, | |
| selected_resources_memory_mib, | |
| client_id, | |
| client_workflow_id, | |
| client_wan_hostname, | |
| lifecycle_kill_timeout_ms, | |
| lifecycle_durable, | |
| create_ts, | |
| start_ts, | |
| connectable_ts, | |
| finish_ts, | |
| destroy_ts, | |
| image_id, | |
| build_kind, | |
| build_compression, | |
| root_user_enabled, | |
| json(args) AS args, | |
| network_mode, | |
| json(environment) AS environment | |
| FROM state | |
| ", | |
| ) | |
| .await?; | |
| let state_row = sql_fetch_one!( | |
| [ctx, StateRow, &pool] | |
| " | |
| SELECT | |
| project_id, | |
| env_id, | |
| json(tags) AS tags, | |
| resources_cpu_millicores, | |
| resources_memory_mib, | |
| selected_resources_cpu_millicores, | |
| selected_resources_memory_mib, | |
| client_id, | |
| client_workflow_id, | |
| client_wan_hostname, | |
| lifecycle_kill_timeout_ms, | |
| lifecycle_durable, | |
| create_ts, | |
| start_ts, | |
| connectable_ts, | |
| finish_ts, | |
| destroy_ts, | |
| image_id, | |
| build_kind, | |
| build_compression, | |
| root_user_enabled, | |
| json(args) AS args, | |
| network_mode, | |
| json(environment) AS environment | |
| FROM state | |
| WHERE actor_id = ? | |
| ", | |
| input.actor_id | |
| ) | |
| .await?; |
| sql_execute!( | ||
| [ctx, pool] | ||
| " | ||
| UPDATE state | ||
| SET | ||
| project_id = ?, | ||
| build_kind = ?, | ||
| build_compression = ?, | ||
| root_user_enabled = ? | ||
| ", | ||
| input.meta.project_id, | ||
| input.meta.build_kind as i64, | ||
| input.meta.build_compression as i64, | ||
| input.root_user_enabled, | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
logic: SQL update lacks a WHERE clause - could update all state records unintentionally
| let result = apis::routes_api::routes_update( | ||
| &ctx.openapi_config_cloud, | ||
| &self.name, | ||
| update_route_body.clone(), |
There was a problem hiding this comment.
style: Unnecessary clone() of update_route_body since it's not used after the API call
| update_route_body.clone(), | |
| update_route_body, |
| let matching_route = routes_response | ||
| .routes | ||
| .iter() | ||
| .find(|route| route.id == *route_id) |
There was a problem hiding this comment.
logic: Route lookup uses ID equality but name field may not be an ID. Consider documenting the expected format or validating input.
b43c653 to
a32e202
Compare
9bc9e25 to
21b6b12
Compare
21b6b12 to
ec7ede8
Compare
a32e202 to
526f37e
Compare
Merge activity
|
… edge (#2526) <!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. -->

Changes