diff --git a/Cargo.toml b/Cargo.toml index 9c429ac..be221a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,6 @@ categories = ["database", "parser-implementations"] readme = "README.md" [dependencies] -regex = "1.0" serde = { version = "1.0", features = ["derive"] } bincode = "1.0" rsp-rs = "0.3.5" diff --git a/src/parsing/janusql_parser.rs b/src/parsing/janusql_parser.rs index dd765b7..b44cc57 100644 --- a/src/parsing/janusql_parser.rs +++ b/src/parsing/janusql_parser.rs @@ -1,5 +1,4 @@ -use regex::Regex; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; #[derive(Debug, Clone, PartialEq)] /// Different types of windows supported in JanusQL. @@ -51,9 +50,60 @@ pub struct R2SOperator { pub name: String, } +#[derive(Debug, Clone, PartialEq)] +/// Prefix declaration captured from JanusQL. +pub struct PrefixDeclaration { + pub prefix: String, + pub namespace: String, +} + +#[derive(Debug, Clone, PartialEq)] +/// REGISTER clause captured from JanusQL. +pub struct RegisterClause { + pub operator: String, + pub name: String, +} + +#[derive(Debug, Clone, PartialEq)] +/// Structured window specification used by the AST. +pub enum WindowSpec { + LiveSliding { range: u64, step: u64 }, + HistoricalSliding { offset: u64, range: u64, step: u64 }, + HistoricalFixed { start: u64, end: u64 }, +} + +#[derive(Debug, Clone, PartialEq)] +/// Structured `FROM NAMED WINDOW` clause in the AST. +pub struct WindowClause { + pub window_name: String, + pub source_kind: SourceKind, + pub source_name: String, + pub spec: WindowSpec, +} + +#[derive(Debug, Clone, PartialEq)] +/// One parsed `WINDOW foo { ... }` block from the WHERE clause. +pub struct WhereWindowClause { + pub identifier: String, + pub body: String, +} + +#[derive(Debug, Clone, PartialEq)] +/// Abstract syntax tree for a JanusQL query. +pub struct JanusQueryAst { + pub prefixes: Vec, + pub register: Option, + pub select_clause: String, + pub windows: Vec, + pub where_clause: String, + pub where_windows: Vec, +} + /// Parsed JanusQL query structure containing all components extracted from the query. #[derive(Debug, Clone)] pub struct ParsedJanusQuery { + /// Structured AST representation of the parsed JanusQL query. + pub ast: JanusQueryAst, /// R2S operator if present pub r2s: Option, /// Live windows defined in the query @@ -72,111 +122,29 @@ pub struct ParsedJanusQuery { pub select_clause: String, } -/// Parser for JanusQL queries -pub struct JanusQLParser { - historical_sliding_window: Regex, - historical_fixed_window: Regex, - live_sliding_window: Regex, - register: Regex, - prefix: Regex, -} +/// Parser for JanusQL queries. +pub struct JanusQLParser; -/// Implement methods for JanusQLParser struct. impl JanusQLParser { /// Creates a new JanusQLParser instance. pub fn new() -> Result> { - Ok(JanusQLParser { - historical_sliding_window: Regex::new( - r"FROM\s+NAMED\s+WINDOW\s+([^\s]+)\s+ON\s+(STREAM|LOG)\s+([^\s]+)\s+\[OFFSET\s+(\d+)\s+RANGE\s+(\d+)\s+STEP\s+(\d+)\]", - )?, - historical_fixed_window: Regex::new( - r"FROM\s+NAMED\s+WINDOW\s+([^\s]+)\s+ON\s+(STREAM|LOG)\s+([^\s]+)\s+\[START\s+(\d+)\s+END\s+(\d+)\]", - )?, - live_sliding_window: Regex::new( - r"FROM\s+NAMED\s+WINDOW\s+([^\s]+)\s+ON\s+STREAM\s+([^\s]+)\s+\[RANGE\s+(\d+)\s+STEP\s+(\d+)\]", - )?, - register: Regex::new(r"REGISTER\s+(\w+)\s+([^\s]+)\s+AS")?, - prefix: Regex::new(r"PREFIX\s+([^\s]+):\s*<([^>]+)>")?, - }) - } - - fn parse_window( - &self, - line: &str, - prefix_mapper: &HashMap, - ) -> Result, Box> { - if let Some(captures) = self.historical_sliding_window.captures(line) { - return Ok(Some(WindowDefinition { - window_name: self.unwrap_iri(&captures[1], prefix_mapper), - source_kind: self.parse_source_kind(&captures[2]), - stream_name: self.unwrap_iri(&captures[3], prefix_mapper), - offset: Some(captures[4].parse()?), - width: captures[5].parse()?, - slide: captures[6].parse()?, - start: None, - end: None, - window_type: WindowType::HistoricalSliding, - })); - } - - if let Some(captures) = self.historical_fixed_window.captures(line) { - return Ok(Some(WindowDefinition { - window_name: self.unwrap_iri(&captures[1], prefix_mapper), - source_kind: self.parse_source_kind(&captures[2]), - stream_name: self.unwrap_iri(&captures[3], prefix_mapper), - start: Some(captures[4].parse()?), - end: Some(captures[5].parse()?), - width: 0, - slide: 0, - offset: None, - window_type: WindowType::HistoricalFixed, - })); - } - - if let Some(captures) = self.live_sliding_window.captures(line) { - return Ok(Some(WindowDefinition { - window_name: self.unwrap_iri(&captures[1], prefix_mapper), - source_kind: SourceKind::Stream, - stream_name: self.unwrap_iri(&captures[2], prefix_mapper), - width: captures[3].parse()?, - slide: captures[4].parse()?, - offset: None, - start: None, - end: None, - window_type: WindowType::Live, - })); - } - - Ok(None) + Ok(Self) } - fn parse_source_kind(&self, raw: &str) -> SourceKind { - if raw.eq_ignore_ascii_case("LOG") { - SourceKind::Log - } else { - SourceKind::Stream - } - } - - /// Parses a JanusQL query string. - pub fn parse(&self, query: &str) -> Result> { - let mut parsed = ParsedJanusQuery { - r2s: None, - live_windows: Vec::new(), - historical_windows: Vec::new(), - rspql_query: String::new(), - sparql_queries: Vec::new(), - prefixes: HashMap::new(), - where_clause: String::new(), - select_clause: String::new(), - }; - - let lines: Vec<&str> = query.lines().collect(); - let mut prefix_lines: Vec = Vec::new(); + /// Parse JanusQL into an explicit AST without regex-based clause matching. + pub fn parse_ast(&self, query: &str) -> Result> { + let mut prefixes = Vec::new(); + let mut prefix_mapper = HashMap::new(); + let mut register = None; + let mut select_clause = String::new(); + let mut windows = Vec::new(); let mut in_where_clause = false; let mut where_lines: Vec<&str> = Vec::new(); + let lines = query.lines().collect::>(); + let mut index = 0; - for line in lines { + while index < lines.len() { + let line = lines[index]; let trimmed_line = line.trim(); if trimmed_line.is_empty() @@ -187,43 +155,92 @@ impl JanusQLParser { if in_where_clause && !trimmed_line.is_empty() { where_lines.push(trimmed_line); } + index += 1; continue; } if trimmed_line.starts_with("REGISTER") { - if let Some(captures) = self.register.captures(trimmed_line) { - let operator = captures.get(1).unwrap().as_str().to_string(); - let name_raw = captures.get(2).unwrap().as_str(); - let name = self.unwrap_iri(name_raw, &parsed.prefixes); - parsed.r2s = Some(R2SOperator { operator, name }); - } + register = Some(self.parse_register_clause(trimmed_line, &prefix_mapper)?); } else if trimmed_line.starts_with("PREFIX") { - if let Some(captures) = self.prefix.captures(trimmed_line) { - let prefix = captures.get(1).unwrap().as_str().to_string(); - let namespace = captures.get(2).unwrap().as_str().to_string(); - parsed.prefixes.insert(prefix, namespace); - prefix_lines.push(trimmed_line.to_string()); - } + let prefix = self.parse_prefix_declaration(trimmed_line)?; + prefix_mapper.insert(prefix.prefix.clone(), prefix.namespace.clone()); + prefixes.push(prefix); } else if trimmed_line.starts_with("SELECT") { - parsed.select_clause = trimmed_line.to_string(); + select_clause = trimmed_line.to_string(); } else if trimmed_line.starts_with("FROM NAMED WINDOW") { - if let Some(window) = self.parse_window(trimmed_line, &parsed.prefixes)? { - match window.window_type { - WindowType::Live => parsed.live_windows.push(window), - WindowType::HistoricalSliding | WindowType::HistoricalFixed => { - parsed.historical_windows.push(window); - } - } + let mut clause = trimmed_line.to_string(); + while !clause.contains(']') && index + 1 < lines.len() { + index += 1; + clause.push(' '); + clause.push_str(lines[index].trim()); } + windows.push(self.parse_window_clause(&clause, &prefix_mapper)?); } else if trimmed_line.starts_with("WHERE") { in_where_clause = true; where_lines.push(line); } else if in_where_clause { where_lines.push(line); } + + index += 1; } - parsed.where_clause = where_lines.join("\n"); + let where_clause = where_lines.join("\n"); + let where_windows = self.extract_where_windows(&where_clause); + + Ok(JanusQueryAst { + prefixes, + register, + select_clause, + windows, + where_clause, + where_windows, + }) + } + + /// Parses a JanusQL query string. + pub fn parse(&self, query: &str) -> Result> { + let ast = self.parse_ast(query)?; + let prefixes = ast + .prefixes + .iter() + .map(|prefix| (prefix.prefix.clone(), prefix.namespace.clone())) + .collect::>(); + let prefix_lines = ast + .prefixes + .iter() + .map(|prefix| format!("PREFIX {}: <{}>", prefix.prefix, prefix.namespace)) + .collect::>(); + + let mut live_windows = Vec::new(); + let mut historical_windows = Vec::new(); + + for window in &ast.windows { + let definition = self.lower_window_clause(window); + match definition.window_type { + WindowType::Live => live_windows.push(definition), + WindowType::HistoricalSliding | WindowType::HistoricalFixed => { + historical_windows.push(definition); + } + } + } + + let r2s = ast + .register + .clone() + .map(|register| R2SOperator { operator: register.operator, name: register.name }); + + let mut parsed = ParsedJanusQuery { + ast: ast.clone(), + r2s, + live_windows, + historical_windows, + rspql_query: String::new(), + sparql_queries: Vec::new(), + prefixes, + where_clause: ast.where_clause.clone(), + select_clause: ast.select_clause.clone(), + }; if !parsed.live_windows.is_empty() { parsed.rspql_query = self.generate_rspql_query(&parsed, &prefix_lines); @@ -233,17 +250,159 @@ impl JanusQLParser { Ok(parsed) } + fn parse_register_clause( + &self, + line: &str, + prefix_mapper: &HashMap, + ) -> Result> { + let rest = line + .strip_prefix("REGISTER") + .ok_or_else(|| self.parse_error("REGISTER clause must start with REGISTER"))? + .trim(); + let parts = rest.split_whitespace().collect::>(); + + if parts.len() != 3 || parts[2] != "AS" { + return Err(self.parse_error(format!("Invalid REGISTER clause: {line}"))); + } + + Ok(RegisterClause { + operator: parts[0].to_string(), + name: self.unwrap_iri(parts[1], prefix_mapper), + }) + } + + fn parse_prefix_declaration( + &self, + line: &str, + ) -> Result> { + let rest = line + .strip_prefix("PREFIX") + .ok_or_else(|| self.parse_error("PREFIX clause must start with PREFIX"))? + .trim(); + let (prefix, namespace) = rest + .split_once(':') + .ok_or_else(|| self.parse_error(format!("Invalid PREFIX clause: {line}")))?; + let namespace = namespace.trim(); + + if !namespace.starts_with('<') || !namespace.ends_with('>') { + return Err(self.parse_error(format!( + "PREFIX namespace must be enclosed in angle brackets: {line}" + ))); + } + + Ok(PrefixDeclaration { + prefix: prefix.trim().to_string(), + namespace: namespace[1..namespace.len() - 1].to_string(), + }) + } + + fn parse_window_clause( + &self, + line: &str, + prefix_mapper: &HashMap, + ) -> Result> { + let (header, spec) = line + .split_once('[') + .ok_or_else(|| self.parse_error(format!("Missing window spec in clause: {line}")))?; + let spec = spec + .trim() + .strip_suffix(']') + .ok_or_else(|| self.parse_error(format!("Window spec must end with ']': {line}")))?; + let header_parts = header.split_whitespace().collect::>(); + + if header_parts.len() != 7 + || header_parts[0] != "FROM" + || header_parts[1] != "NAMED" + || header_parts[2] != "WINDOW" + || header_parts[4] != "ON" + { + return Err(self.parse_error(format!("Invalid window clause header: {line}"))); + } + + let source_kind = self.parse_source_kind(header_parts[5])?; + let window_name = self.unwrap_iri(header_parts[3], prefix_mapper); + let source_name = self.unwrap_iri(header_parts[6], prefix_mapper); + let spec_parts = spec.split_whitespace().collect::>(); + let spec = match spec_parts.as_slice() { + ["RANGE", range, "STEP", step] => { + if source_kind != SourceKind::Stream { + return Err(self.parse_error( + "Live RANGE/STEP windows are only supported on STREAM sources", + )); + } + WindowSpec::LiveSliding { range: range.parse()?, step: step.parse()? } + } + ["OFFSET", offset, "RANGE", range, "STEP", step] => WindowSpec::HistoricalSliding { + offset: offset.parse()?, + range: range.parse()?, + step: step.parse()?, + }, + ["START", start, "END", end] => { + WindowSpec::HistoricalFixed { start: start.parse()?, end: end.parse()? } + } + _ => { + return Err(self.parse_error(format!("Unsupported window specification: [{spec}]"))); + } + }; + + Ok(WindowClause { window_name, source_kind, source_name, spec }) + } + + fn parse_source_kind(&self, raw: &str) -> Result> { + match raw { + "STREAM" => Ok(SourceKind::Stream), + "LOG" => Ok(SourceKind::Log), + _ => Err(self.parse_error(format!("Unsupported source kind: {raw}"))), + } + } + + fn lower_window_clause(&self, window: &WindowClause) -> WindowDefinition { + match window.spec { + WindowSpec::LiveSliding { range, step } => WindowDefinition { + window_name: window.window_name.clone(), + source_kind: window.source_kind.clone(), + stream_name: window.source_name.clone(), + width: range, + slide: step, + offset: None, + start: None, + end: None, + window_type: WindowType::Live, + }, + WindowSpec::HistoricalSliding { offset, range, step } => WindowDefinition { + window_name: window.window_name.clone(), + source_kind: window.source_kind.clone(), + stream_name: window.source_name.clone(), + width: range, + slide: step, + offset: Some(offset), + start: None, + end: None, + window_type: WindowType::HistoricalSliding, + }, + WindowSpec::HistoricalFixed { start, end } => WindowDefinition { + window_name: window.window_name.clone(), + source_kind: window.source_kind.clone(), + stream_name: window.source_name.clone(), + width: 0, + slide: 0, + offset: None, + start: Some(start), + end: Some(end), + window_type: WindowType::HistoricalFixed, + }, + } + } + fn generate_rspql_query(&self, parsed: &ParsedJanusQuery, prefix_lines: &[String]) -> String { let mut lines: Vec = Vec::new(); - // Add prefixes for prefix in prefix_lines { lines.push(prefix.clone()); } lines.push(String::new()); - // Adding the R2S Operator if let Some(ref r2s) = parsed.r2s { let wrapped_name = self.wrap_iri(&r2s.name, &parsed.prefixes); lines.push(format!("REGISTER {} {} AS", r2s.operator, wrapped_name)); @@ -255,7 +414,6 @@ impl JanusQLParser { lines.push(String::new()); - // Adding live windows for window in &parsed.live_windows { let wrapped_window_name = self.wrap_iri(&window.window_name, &parsed.prefixes); let wrapped_stream_name = self.wrap_iri(&window.stream_name, &parsed.prefixes); @@ -266,15 +424,16 @@ impl JanusQLParser { )); } - // Adding WHERE clause with only live window patterns if !parsed.where_clause.is_empty() { let adapted_where = self.adapt_where_clause_for_live( + &parsed.ast.where_windows, &parsed.where_clause, &parsed.live_windows, &parsed.prefixes, ); lines.push(adapted_where); } + lines.join("\n") } @@ -288,21 +447,19 @@ impl JanusQLParser { for window in &parsed.historical_windows { let mut lines: Vec = Vec::new(); - // Adding the prefixes. for prefix in prefix_lines { lines.push(prefix.clone()); } lines.push(String::new()); - // Generate WHERE clause and extract bound variables let (where_clause, bound_vars) = self.generate_where_and_extract_vars( + &parsed.ast.where_windows, &parsed.where_clause, window, &parsed.prefixes, ); - // Clean SELECT clause based on bound variables if !parsed.select_clause.is_empty() { let clean_select = self.filter_select_clause(&parsed.select_clause, &bound_vars); lines.push(clean_select); @@ -312,53 +469,24 @@ impl JanusQLParser { lines.push(where_clause); queries.push(lines.join("\n")); } + queries } fn generate_where_and_extract_vars( &self, + where_windows: &[WhereWindowClause], where_clause: &str, window: &WindowDefinition, prefixes: &HashMap, - ) -> (String, std::collections::HashSet) { - let window_uri = &window.window_name; - let mut prefixed_name = None; - for (prefix, uri_base) in prefixes { - if window_uri.starts_with(uri_base) { - let local_name = &window_uri[uri_base.len()..]; - prefixed_name = Some(format!("{}:{}", prefix, local_name)); - break; - } - } - - let window_identifier = prefixed_name.as_ref().unwrap_or(window_uri); - let window_pattern = format!("WINDOW {} {{", window_identifier); - let mut bound_vars = std::collections::HashSet::new(); - - let where_string = if let Some(start_pos) = where_clause.find(&window_pattern) { - let after_opening = start_pos + window_pattern.len(); - let mut brace_count = 1; - let mut end_pos = after_opening; - let chars: Vec = where_clause[after_opening..].chars().collect(); - - for (i, ch) in chars.iter().enumerate() { - if *ch == '{' { - brace_count += 1; - } else if *ch == '}' { - brace_count -= 1; - if brace_count == 0 { - end_pos = after_opening + i; - break; - } - } - } - - let inner_pattern = where_clause[after_opening..end_pos].trim(); - - // Extract variables from inner pattern - let var_regex = Regex::new(r"\?[\w]+").unwrap(); - for cap in var_regex.captures_iter(inner_pattern) { - bound_vars.insert(cap[0].to_string()); + ) -> (String, HashSet) { + let mut bound_vars = HashSet::new(); + + let where_string = if let Some(inner_pattern) = + self.find_window_body(where_windows, window, prefixes) + { + for variable in self.extract_variables(inner_pattern) { + bound_vars.insert(variable); } match window.source_kind { @@ -380,11 +508,7 @@ impl JanusQLParser { (where_string, bound_vars) } - fn filter_select_clause( - &self, - select_clause: &str, - allowed_vars: &std::collections::HashSet, - ) -> String { + fn filter_select_clause(&self, select_clause: &str, allowed_vars: &HashSet) -> String { if allowed_vars.is_empty() { return select_clause.to_string(); } @@ -395,56 +519,17 @@ impl JanusQLParser { } let content = trimmed[6..].trim(); - - // Regex to capture projection items: - // 1. Aliased expressions: (expression AS ?var) - handle nested parens by matching until AS ?var) - // 2. Simple variables: ?var - let item_regex = Regex::new(r"(\(.*?\s+AS\s+\?[\w]+\))|(\?[\w]+)").unwrap(); - let var_regex = Regex::new(r"\?[\w]+").unwrap(); - + let projection_items = self.extract_projection_items(content); let mut kept_items = Vec::new(); - for cap in item_regex.captures_iter(content) { - let item = cap[0].to_string(); - - // Check if item uses allowed variables - let mut is_valid = false; - - // If it's an expression, check if input vars are allowed - // Note: We check if ANY of the variables inside are bound. - // For AVG(?a), if ?a is bound, we keep it. - // If it's a simple var ?a, check if bound. - - let mut vars_in_item = Vec::new(); - for var_cap in var_regex.captures_iter(&item) { - vars_in_item.push(var_cap[0].to_string()); - } - - // Special case: AS ?alias - the alias is a new variable, not a bound one. - // But usually expressions are like (AVG(?a) AS ?b). ?a must be bound. - // We only care about input variables. - // A simple heuristic: check if at least one variable in the item (excluding the alias if possible) is bound. - // Since parsing "AS ?alias" is hard with regex, we just check if ANY variable in the item is bound. - // If the item is just "?alias" (output of previous), it might be tricky if this is a subquery. - // But here we are filtering the top-level SELECT. - - for var in vars_in_item { - if allowed_vars.contains(&var) { - is_valid = true; - break; - } - } - - if is_valid { + for item in projection_items { + let vars_in_item = self.extract_variables(&item); + if vars_in_item.iter().any(|var| allowed_vars.contains(var)) { kept_items.push(item); } } if kept_items.is_empty() { - // Fallback: if nothing matches, return original (might fail) or SELECT * - // Given the issue, returning "SELECT *" might be safer if pattern is not empty, - // but "SELECT *" is invalid if we have GROUP BY (implied by AVG). - // Let's return original and hope for best if filtering failed. return select_clause.to_string(); } @@ -453,61 +538,195 @@ impl JanusQLParser { fn adapt_where_clause_for_live( &self, + where_windows: &[WhereWindowClause], where_clause: &str, live_windows: &[WindowDefinition], prefixes: &HashMap, ) -> String { - // Extract patterns for all live windows and combine them let mut where_patterns = Vec::new(); for window in live_windows { - // Find the window name in prefixed form - let window_uri = &window.window_name; - let mut prefixed_name = None; - for (prefix, uri_base) in prefixes { - if window_uri.starts_with(uri_base) { - let local_name = &window_uri[uri_base.len()..]; - prefixed_name = Some(format!("{}:{}", prefix, local_name)); + if let Some(inner_pattern) = self.find_window_body(where_windows, window, prefixes) { + let window_identifier = self.wrap_iri(&window.window_name, prefixes); + where_patterns + .push(format!("WINDOW {} {{\n {}\n }}", window_identifier, inner_pattern)); + } + } + + if where_patterns.is_empty() { + where_clause.to_string() + } else { + format!("WHERE {{\n {}\n}}", where_patterns.join("\n ")) + } + } + + fn find_window_body<'a>( + &self, + where_windows: &'a [WhereWindowClause], + window: &WindowDefinition, + prefixes: &HashMap, + ) -> Option<&'a str> { + let mut candidates = Vec::new(); + let wrapped = self.wrap_iri(&window.window_name, prefixes); + candidates.push(wrapped.clone()); + candidates.push(window.window_name.clone()); + + if let Some(local) = self.local_name(&window.window_name) { + candidates.push(format!(":{}", local)); + } + + where_windows + .iter() + .find(|clause| candidates.iter().any(|candidate| candidate == &clause.identifier)) + .map(|clause| clause.body.as_str()) + } + + fn extract_where_windows(&self, where_clause: &str) -> Vec { + let mut clauses = Vec::new(); + let mut offset = 0; + + while let Some(found) = where_clause[offset..].find("WINDOW") { + let start = offset + found; + let after_keyword = start + "WINDOW".len(); + let mut cursor = after_keyword; + + while let Some(ch) = where_clause[cursor..].chars().next() { + if ch.is_whitespace() { + cursor += ch.len_utf8(); + } else { break; } } - let window_identifier = prefixed_name.as_ref().unwrap_or(window_uri); - let window_pattern = format!("WINDOW {} {{", window_identifier); - - if let Some(start_pos) = where_clause.find(&window_pattern) { - let after_opening = start_pos + window_pattern.len(); - - // Find matching closing brace - let mut brace_count = 1; - let mut end_pos = after_opening; - let chars: Vec = where_clause[after_opening..].chars().collect(); - - for (i, ch) in chars.iter().enumerate() { - if *ch == '{' { - brace_count += 1; - } else if *ch == '}' { - brace_count -= 1; - if brace_count == 0 { - end_pos = after_opening + i; - break; - } + let identifier_start = cursor; + while let Some(ch) = where_clause[cursor..].chars().next() { + if ch.is_whitespace() || ch == '{' { + break; + } + cursor += ch.len_utf8(); + } + + let identifier = where_clause[identifier_start..cursor].trim().to_string(); + while let Some(ch) = where_clause[cursor..].chars().next() { + if ch.is_whitespace() { + cursor += ch.len_utf8(); + } else { + break; + } + } + + if !where_clause[cursor..].starts_with('{') { + offset = cursor; + continue; + } + + let body_start = cursor + 1; + let Some(body_end) = self.find_matching_brace(where_clause, cursor) else { + break; + }; + + clauses.push(WhereWindowClause { + identifier, + body: where_clause[body_start..body_end].trim().to_string(), + }); + offset = body_end + 1; + } + + clauses + } + + fn find_matching_brace(&self, input: &str, open_brace_index: usize) -> Option { + let mut depth = 0usize; + for (relative_index, ch) in input[open_brace_index..].char_indices() { + match ch { + '{' => depth += 1, + '}' => { + depth -= 1; + if depth == 0 { + return Some(open_brace_index + relative_index); } } + _ => {} + } + } + None + } - let inner_pattern = where_clause[after_opening..end_pos].trim(); - where_patterns - .push(format!("WINDOW {} {{\n {}\n }}", window_identifier, inner_pattern)); + fn extract_variables(&self, input: &str) -> Vec { + let mut variables = Vec::new(); + let chars = input.chars().collect::>(); + let mut index = 0; + + while index < chars.len() { + if chars[index] == '?' { + let start = index; + index += 1; + while index < chars.len() + && (chars[index].is_ascii_alphanumeric() || chars[index] == '_') + { + index += 1; + } + if index > start + 1 { + variables.push(chars[start..index].iter().collect()); + continue; + } } + index += 1; } - if where_patterns.is_empty() { - // Fallback to original - where_clause.to_string() - } else { - // Combine all live window patterns - format!("WHERE {{\n {}\n}}", where_patterns.join("\n ")) + variables + } + + fn extract_projection_items(&self, input: &str) -> Vec { + let chars = input.chars().collect::>(); + let mut items = Vec::new(); + let mut index = 0; + + while index < chars.len() { + while index < chars.len() && chars[index].is_whitespace() { + index += 1; + } + + if index >= chars.len() { + break; + } + + if chars[index] == '(' { + let start = index; + let mut depth = 0usize; + while index < chars.len() { + match chars[index] { + '(' => depth += 1, + ')' => { + depth -= 1; + if depth == 0 { + index += 1; + break; + } + } + _ => {} + } + index += 1; + } + items.push(chars[start..index].iter().collect::()); + } else { + let start = index; + while index < chars.len() && !chars[index].is_whitespace() { + index += 1; + } + items.push(chars[start..index].iter().collect::()); + } } + + items + } + + fn local_name<'a>(&self, iri: &'a str) -> Option<&'a str> { + iri.rsplit(['#', '/']).next().filter(|local| !local.is_empty()) + } + + fn parse_error(&self, message: impl Into) -> Box { + Box::new(std::io::Error::new(std::io::ErrorKind::InvalidInput, message.into())) } fn unwrap_iri(&self, prefixed_iri: &str, prefix_mapper: &HashMap) -> String { diff --git a/tests/janusql_parser_test.rs b/tests/janusql_parser_test.rs index 7bff4fc..12bbfab 100644 --- a/tests/janusql_parser_test.rs +++ b/tests/janusql_parser_test.rs @@ -3,7 +3,7 @@ //! Tests for the JanusQL query parser, verifying parsing of window definitions, //! R2S operators, and query generation. -use janus::parsing::janusql_parser::{JanusQLParser, SourceKind}; +use janus::parsing::janusql_parser::{JanusQLParser, SourceKind, WindowSpec}; #[test] fn test_basic_live_window() { @@ -102,3 +102,114 @@ fn test_on_log_historical_windows_are_parsed_as_logs() { "ON LOG queries should target historical named graphs" ); } + +#[test] +fn test_parse_ast_exposes_structured_window_specs() { + let parser = JanusQLParser::new().unwrap(); + let query = r" + PREFIX ex: + REGISTER RStream ex:out AS + SELECT ?sensor + FROM NAMED WINDOW ex:live ON STREAM ex:stream [RANGE 500 STEP 100] + FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1000 END 2000] + WHERE { + WINDOW ex:live { ?sensor ex:value ?value } + WINDOW ex:hist { ?sensor ex:value ?value } + } + "; + + let ast = parser.parse_ast(query).unwrap(); + assert_eq!(ast.windows.len(), 2); + assert_eq!(ast.where_windows.len(), 2); + assert_eq!(ast.prefixes.len(), 1); + + assert!(matches!(ast.windows[0].spec, WindowSpec::LiveSliding { range: 500, step: 100 })); + assert!(matches!( + ast.windows[1].spec, + WindowSpec::HistoricalFixed { start: 1000, end: 2000 } + )); +} + +#[test] +fn test_parse_ast_register_clause_is_structured() { + let parser = JanusQLParser::new().unwrap(); + let query = r" + PREFIX ex: + REGISTER RStream ex:out AS + SELECT ?sensor + FROM NAMED WINDOW ex:live ON STREAM ex:stream [RANGE 500 STEP 100] + WHERE { + WINDOW ex:live { ?sensor ex:value ?value } + } + "; + + let ast = parser.parse_ast(query).unwrap(); + let register = ast.register.expect("expected register clause"); + assert_eq!(register.operator, "RStream"); + assert_eq!(register.name, "http://example.org/out"); +} + +#[test] +fn test_parse_ast_multiline_window_clause_is_supported() { + let parser = JanusQLParser::new().unwrap(); + let query = r" + PREFIX ex: + SELECT ?sensor + FROM NAMED WINDOW ex:hist ON LOG ex:store + [START 1000 END 2000] + WHERE { + WINDOW ex:hist { ?sensor ex:value ?value } + } + "; + + let ast = parser.parse_ast(query).unwrap(); + assert_eq!(ast.windows.len(), 1); + assert!(matches!( + ast.windows[0].spec, + WindowSpec::HistoricalFixed { start: 1000, end: 2000 } + )); +} + +#[test] +fn test_parse_ast_on_log_historical_sliding_window() { + let parser = JanusQLParser::new().unwrap(); + let query = r" + PREFIX ex: + SELECT ?sensor + FROM NAMED WINDOW ex:hist ON LOG ex:store [OFFSET 3000 RANGE 1000 STEP 250] + WHERE { + WINDOW ex:hist { ?sensor ex:value ?value } + } + "; + + let ast = parser.parse_ast(query).unwrap(); + assert_eq!(ast.windows.len(), 1); + assert_eq!(ast.windows[0].source_kind, SourceKind::Log); + assert!(matches!( + ast.windows[0].spec, + WindowSpec::HistoricalSliding { offset: 3000, range: 1000, step: 250 } + )); +} + +#[test] +fn test_parse_ast_extracts_window_body_with_nested_braces() { + let parser = JanusQLParser::new().unwrap(); + let query = r#" + PREFIX ex: + SELECT ?sensor + FROM NAMED WINDOW ex:live ON STREAM ex:stream [RANGE 500 STEP 100] + WHERE { + WINDOW ex:live { + ?sensor ex:value ?value . + FILTER(EXISTS { + ?sensor ex:meta ?meta . + }) + } + } + "#; + + let ast = parser.parse_ast(query).unwrap(); + assert_eq!(ast.where_windows.len(), 1); + assert!(ast.where_windows[0].body.contains("FILTER(EXISTS")); + assert!(ast.where_windows[0].body.contains("?sensor ex:meta ?meta")); +}