Skip to content

Commit 6c56156

Browse files
committed
graph: Add LogStore backend implementations
Implements three log storage backends for querying logs: - FileLogStore: Streams JSON Lines files with bounded memory usage - ElasticsearchLogStore: Queries Elasticsearch indices with full-text search - LokiLogStore: Queries Grafana Loki using LogQL All backends implement the LogStore trait and support: - Filtering by log level, timestamp range, and text search - Pagination via first/skip parameters - Returning structured LogEntry objects Dependencies added: reqwest, serde_json for HTTP clients.
1 parent a2e6f5f commit 6c56156

5 files changed

Lines changed: 865 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

graph/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ clap.workspace = true
108108
maplit = "1.0.2"
109109
hex-literal = "1.1"
110110
wiremock = "0.6.5"
111+
tempfile = "3.8"
111112

112113
[build-dependencies]
113114
tonic-build = { workspace = true }
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
use async_trait::async_trait;
2+
use reqwest::Client;
3+
use serde::Deserialize;
4+
use serde_json::json;
5+
use std::collections::HashMap;
6+
use std::time::Duration;
7+
8+
use crate::log::elastic::ElasticLoggingConfig;
9+
use crate::prelude::DeploymentHash;
10+
11+
use super::{LogEntry, LogMeta, LogQuery, LogStore, LogStoreError};
12+
13+
pub struct ElasticsearchLogStore {
14+
endpoint: String,
15+
username: Option<String>,
16+
password: Option<String>,
17+
client: Client,
18+
index: String,
19+
timeout: Duration,
20+
}
21+
22+
impl ElasticsearchLogStore {
23+
pub fn new(config: ElasticLoggingConfig, index: String, timeout: Duration) -> Self {
24+
Self {
25+
endpoint: config.endpoint,
26+
username: config.username,
27+
password: config.password,
28+
client: config.client,
29+
index,
30+
timeout,
31+
}
32+
}
33+
34+
fn build_query(&self, query: &LogQuery) -> serde_json::Value {
35+
let mut must_clauses = Vec::new();
36+
37+
// Filter by subgraph ID
38+
must_clauses.push(json!({
39+
"term": {
40+
"subgraphId": query.subgraph_id.to_string()
41+
}
42+
}));
43+
44+
// Filter by log level
45+
if let Some(level) = &query.level {
46+
must_clauses.push(json!({
47+
"term": {
48+
"level": level.as_str()
49+
}
50+
}));
51+
}
52+
53+
// Filter by time range
54+
if query.from.is_some() || query.to.is_some() {
55+
let mut range = serde_json::Map::new();
56+
if let Some(from) = &query.from {
57+
range.insert("gte".to_string(), json!(from));
58+
}
59+
if let Some(to) = &query.to {
60+
range.insert("lte".to_string(), json!(to));
61+
}
62+
must_clauses.push(json!({
63+
"range": {
64+
"timestamp": range
65+
}
66+
}));
67+
}
68+
69+
// Filter by text search
70+
if let Some(search) = &query.search {
71+
must_clauses.push(json!({
72+
"match": {
73+
"text": search
74+
}
75+
}));
76+
}
77+
78+
json!({
79+
"query": {
80+
"bool": {
81+
"must": must_clauses
82+
}
83+
},
84+
"from": query.skip,
85+
"size": query.first,
86+
"sort": [
87+
{ "timestamp": { "order": "desc" } }
88+
]
89+
})
90+
}
91+
92+
async fn execute_search(
93+
&self,
94+
query_body: serde_json::Value,
95+
) -> Result<Vec<LogEntry>, LogStoreError> {
96+
let url = format!("{}/{}/_search", self.endpoint, self.index);
97+
98+
let mut request = self
99+
.client
100+
.post(&url)
101+
.json(&query_body)
102+
.timeout(self.timeout);
103+
104+
// Add basic auth if credentials provided
105+
if let (Some(username), Some(password)) = (&self.username, &self.password) {
106+
request = request.basic_auth(username, Some(password));
107+
}
108+
109+
let response = request.send().await.map_err(|e| {
110+
LogStoreError::QueryFailed(
111+
anyhow::Error::from(e).context("Elasticsearch request failed"),
112+
)
113+
})?;
114+
115+
if !response.status().is_success() {
116+
let status = response.status();
117+
// Include response body in error context for debugging
118+
// The body is part of the error chain but not the main error message to avoid
119+
// leaking sensitive Elasticsearch internals in logs
120+
let body_text = response
121+
.text()
122+
.await
123+
.unwrap_or_else(|_| "<failed to read response body>".to_string());
124+
return Err(LogStoreError::QueryFailed(
125+
anyhow::anyhow!("Elasticsearch query failed with status {}", status)
126+
.context(format!("Response body: {}", body_text)),
127+
));
128+
}
129+
130+
let response_body: ElasticsearchResponse = response.json().await.map_err(|e| {
131+
LogStoreError::QueryFailed(
132+
anyhow::Error::from(e).context(
133+
"failed to parse Elasticsearch search response: response format may have changed or be invalid",
134+
),
135+
)
136+
})?;
137+
138+
let entries = response_body
139+
.hits
140+
.hits
141+
.into_iter()
142+
.filter_map(|hit| self.parse_log_entry(hit.source))
143+
.collect();
144+
145+
Ok(entries)
146+
}
147+
148+
fn parse_log_entry(&self, source: ElasticsearchLogDocument) -> Option<LogEntry> {
149+
let level = source.level.parse().ok()?;
150+
let subgraph_id = DeploymentHash::new(&source.subgraph_id).ok()?;
151+
152+
// Convert arguments HashMap to Vec<(String, String)>
153+
let arguments: Vec<(String, String)> = source.arguments.into_iter().collect();
154+
155+
Some(LogEntry {
156+
id: source.id,
157+
subgraph_id,
158+
timestamp: source.timestamp,
159+
level,
160+
text: source.text,
161+
arguments,
162+
meta: LogMeta {
163+
module: source.meta.module,
164+
line: source.meta.line,
165+
column: source.meta.column,
166+
},
167+
})
168+
}
169+
}
170+
171+
#[async_trait]
172+
impl LogStore for ElasticsearchLogStore {
173+
async fn query_logs(&self, query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError> {
174+
let query_body = self.build_query(&query);
175+
self.execute_search(query_body).await
176+
}
177+
178+
fn is_available(&self) -> bool {
179+
true
180+
}
181+
}
182+
183+
// Elasticsearch response types
184+
#[derive(Debug, Deserialize)]
185+
struct ElasticsearchResponse {
186+
hits: ElasticsearchHits,
187+
}
188+
189+
#[derive(Debug, Deserialize)]
190+
struct ElasticsearchHits {
191+
hits: Vec<ElasticsearchHit>,
192+
}
193+
194+
#[derive(Debug, Deserialize)]
195+
struct ElasticsearchHit {
196+
#[serde(rename = "_source")]
197+
source: ElasticsearchLogDocument,
198+
}
199+
200+
#[derive(Debug, Deserialize)]
201+
struct ElasticsearchLogDocument {
202+
id: String,
203+
#[serde(rename = "subgraphId")]
204+
subgraph_id: String,
205+
timestamp: String,
206+
level: String,
207+
text: String,
208+
arguments: HashMap<String, String>,
209+
meta: ElasticsearchLogMeta,
210+
}
211+
212+
#[derive(Debug, Deserialize)]
213+
struct ElasticsearchLogMeta {
214+
module: String,
215+
line: i64,
216+
column: i64,
217+
}

0 commit comments

Comments
 (0)