Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/error-handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
hive-router: patch
---

Cleanup Error Handling implementation
2 changes: 1 addition & 1 deletion bin/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn graphql_endpoint_dispatch(
// If the request handler returns an error, convert it to an HTTP response.
Err(err) => {
write_graphql_response_metric_status(request, GraphQLResponseStatus::Error);
handle_pipeline_error(err, &app_state, &response_mode)
handle_pipeline_error(&err, &app_state, &response_mode)
}
};

Expand Down
43 changes: 14 additions & 29 deletions bin/router/src/pipeline/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, vec};

use graphql_tools::validation::utils::ValidationError;
use graphql_tools::{parser::query::MinifyError, validation::utils::ValidationError};
use hive_router_plan_executor::{
execution::{error::PlanExecutionError, jwt_forward::JwtForwardingError},
headers::errors::HeaderRuleRuntimeError,
Expand Down Expand Up @@ -69,13 +69,13 @@ pub enum PipelineError {
FailedToParseExtensions(sonic_rs::Error),
#[error("Failed to parse GraphQL operation: {0}")]
#[strum(serialize = "GRAPHQL_PARSE_FAILED")]
FailedToParseOperation(#[from] Arc<graphql_tools::parser::query::ParseError>),
FailedToParseOperation(#[from] graphql_tools::parser::query::ParseError),
#[error("Failed to minify parsed GraphQL operation: {0}")]
#[strum(serialize = "GRAPHQL_PARSE_MINIFY_FAILED")]
FailedToMinifyParsedOperation(String),
FailedToMinifyParsedOperation(#[from] MinifyError),
#[error("Failed to normalize GraphQL operation")]
#[strum(serialize = "OPERATION_RESOLUTION_FAILURE")]
NormalizationError(#[from] Arc<NormalizationError>),
NormalizationError(#[from] NormalizationError),
#[error("Failed to collect GraphQL variables: {0}")]
#[strum(serialize = "BAD_USER_INPUT")]
VariablesCoercionError(String),
Expand All @@ -90,7 +90,7 @@ pub enum PipelineError {
PlanExecutionError(#[from] PlanExecutionError),
#[error("Failed to produce a plan: {0}")]
#[strum(serialize = "QUERY_PLAN_BUILD_FAILED")]
PlannerError(#[from] Arc<PlannerError>),
PlannerError(#[from] PlannerError),
#[error(transparent)]
#[strum(serialize = "OVERRIDE_LABEL_EVALUATION_FAILED")]
LabelEvaluationError(#[from] LabelEvaluationError),
Expand Down Expand Up @@ -143,30 +143,9 @@ pub enum PipelineError {
#[error("No supergraph available yet, unable to process request")]
#[strum(serialize = "NO_SUPERGRAPH_AVAILABLE")]
NoSupergraphAvailable,
}

#[derive(Clone, Debug, thiserror::Error)]
pub enum ParserCacheError {
#[error("Failed to parse GraphQL operation: {0}")]
ParseError(Arc<graphql_tools::parser::query::ParseError>),
#[error("Failed to minify parsed GraphQL operation: {0}")]
MinifyError(String),
#[error("Validation errors")]
ValidationErrors(Arc<Vec<ValidationError>>),
}

impl From<Arc<ParserCacheError>> for PipelineError {
fn from(value: Arc<ParserCacheError>) -> Self {
match value.as_ref() {
ParserCacheError::ParseError(err) => PipelineError::FailedToParseOperation(err.clone()),
ParserCacheError::MinifyError(err) => {
PipelineError::FailedToMinifyParsedOperation(err.clone())
}
ParserCacheError::ValidationErrors(errs) => {
PipelineError::ValidationErrors(errs.clone())
}
}
}
#[error(transparent)]
PipelineErrorArc(#[from] Arc<PipelineError>),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please explain why we need PipelineErrorArc? why not put the Arc on higher level?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What part in the code makes us needing this kind or error?
It's a bit weird to be that we have PipelineError and then inside we have a variant named PipelineErrorArc that holds Arc<PipelineError>, why not just use Arc<PipelineError> in the relevant places instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the pipeline, for example the parser part of the pipeline impl, we have or_try_insert_with that is called if cache miss. But the return value is always Arc even if Error. So when it fails, it returns Arc in this case. But the actual error type is PipelineError in the flow. So I added this variant to catch Arc ones too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if that's really needed in this case. When you have Arc<Error>, you can still do ? on it, so what the reason to have special handling for the Arc<Error>? 🤔

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arc is different than Error unfortunately :/ So it is considered as a different kind. That's why i put a variant for it with "from"

}

impl PipelineError {
Expand All @@ -175,13 +154,15 @@ impl PipelineError {
Self::JwtError(err) => err.error_code(),
Self::PlanExecutionError(err) => err.error_code(),
Self::ReadBodyStreamError(err) => err.error_code(),
Self::PipelineErrorArc(err) => err.graphql_error_code(),
_ => self.into(),
}
}

pub fn graphql_error_message(&self) -> String {
match self {
Self::PlannerError(_) => "Unexpected error".to_string(),
Self::PipelineErrorArc(err) => err.graphql_error_message(),
_ => self.to_string(),
}
}
Expand Down Expand Up @@ -226,6 +207,7 @@ impl PipelineError {
(Self::HeaderPropagation(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
(Self::QueryPlanSerializationFailed(_), _) => StatusCode::INTERNAL_SERVER_ERROR,
(Self::NoSupergraphAvailable, _) => StatusCode::SERVICE_UNAVAILABLE,
(Self::PipelineErrorArc(err), _) => err.default_status_code(prefer_ok),
}
}
}
Expand All @@ -237,10 +219,13 @@ struct FailedExecutionResult {

#[inline]
pub fn handle_pipeline_error(
err: PipelineError,
err: &PipelineError,
shared_state: &RouterSharedState,
response_mode: &ResponseMode,
) -> web::HttpResponse {
if let PipelineError::PipelineErrorArc(inner) = &err {
return handle_pipeline_error(inner, shared_state, response_mode);
}
let single_content_type = response_mode.single_content_type();

let prefer_ok = response_mode.prefer_status_ok_for_errors();
Expand Down
8 changes: 3 additions & 5 deletions bin/router/src/pipeline/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use hive_router_plan_executor::hooks::on_graphql_params::GraphQLParams;
use hive_router_plan_executor::hooks::on_supergraph_load::SupergraphData;
use hive_router_plan_executor::introspection::partition::partition_operation;
use hive_router_plan_executor::projection::plan::FieldProjectionPlan;
use hive_router_query_planner::ast::normalization::error::NormalizationError;
use hive_router_query_planner::ast::normalization::normalize_operation;
use hive_router_query_planner::ast::operation::OperationDefinition;
use xxhash_rust::xxh3::Xxh3;
Expand Down Expand Up @@ -68,10 +67,10 @@ pub async fn normalize_request_with_cache(
None => parser_payload.cache_key,
};

schema_state
Ok(schema_state
.normalize_cache
.entry(cache_key)
.or_try_insert_with::<_, NormalizationError>(async {
.or_try_insert_with(async {
let doc = normalize_operation(
&supergraph.planner.supergraph,
&parser_payload.parsed_operation,
Expand Down Expand Up @@ -106,7 +105,6 @@ pub async fn normalize_request_with_cache(
Ok(Arc::new(payload))
})
.await
.map_err(PipelineError::from)
.into_result_with_hit_miss(|hit_miss| match hit_miss {
CacheHitMiss::Hit => {
normalize_span.record_cache_hit(true);
Expand All @@ -116,7 +114,7 @@ pub async fn normalize_request_with_cache(
normalize_span.record_cache_hit(false);
normalize_cache_capture.finish_miss();
}
})
})?)
}
.instrument(normalize_span.clone())
.await
Expand Down
44 changes: 6 additions & 38 deletions bin/router/src/pipeline/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hive_router_query_planner::utils::parsing::{
use xxhash_rust::xxh3::Xxh3;

use crate::cache_state::{CacheHitMiss, EntryResultHitMissExt};
use crate::pipeline::error::{ParserCacheError, PipelineError};
use crate::pipeline::error::PipelineError;
use crate::pipeline::execution_request::GetQueryStr;
use crate::shared_state::RouterSharedState;
use tracing::{error, trace, Instrument};
Expand All @@ -33,35 +33,6 @@ pub struct ParseCacheEntry {
hive_operation_hash: Arc<String>,
}

impl ParseCacheEntry {
#[inline]
pub fn try_new(
parsed_arc: Arc<Document<'static, String>>,
query_str: &str,
) -> Result<Self, PipelineError> {
let minified_arc = {
Arc::new(minify_query(query_str).map_err(|err| {
error!("Failed to minify parsed GraphQL operation: {}", err);
PipelineError::FailedToMinifyParsedOperation(err.to_string())
})?)
};
let hive_normalized_operation = hive_sdk_normalize_operation(&parsed_arc);
let hive_minified =
minify_query(hive_normalized_operation.to_string().as_ref()).map_err(|err| {
error!(
"Failed to minify GraphQL operation normalized for Hive SDK: {}",
err
);
PipelineError::FailedToMinifyParsedOperation(err.to_string())
})?;
Ok(ParseCacheEntry {
document: parsed_arc,
document_minified_string: minified_arc,
hive_operation_hash: Arc::new(format!("{:x}", md5::compute(hive_minified))),
})
}
}

#[derive(Debug)]
pub struct GraphQLParserPayload {
pub parsed_operation: Arc<Document<'static, String>>,
Expand Down Expand Up @@ -136,7 +107,7 @@ pub async fn parse_operation_with_cache(
let parse_cache_item = app_state
.parse_cache
.entry(cache_key)
.or_try_insert_with::<_, ParserCacheError>(async {
.or_try_insert_with(async {
let parsed = match app_state.router_config.limits.max_tokens.as_ref() {
Some(cfg) => safe_parse_operation_with_token_limit(query_str, cfg.n),
_ => safe_parse_operation(query_str),
Expand All @@ -146,7 +117,7 @@ pub async fn parse_operation_with_cache(
err.0.errors.first()
{
if *msg == "Token limit exceeded" {
return ParserCacheError::ValidationErrors(
return PipelineError::ValidationErrors(
vec![ValidationError {
locations: vec![err.0.position],
message: "Token limit exceeded.".to_string(),
Expand All @@ -157,22 +128,20 @@ pub async fn parse_operation_with_cache(
}
}
error!("Failed to parse GraphQL operation: {}", err);
ParserCacheError::ParseError(Arc::new(err))
err.into()
})?;
trace!("successfully parsed GraphQL operation");
let parsed_arc = Arc::new(parsed);
let minified_arc = Arc::new(minify_query(query_str).map_err(|err| {
let minified_arc = Arc::new(minify_query(query_str).inspect_err(|err| {
error!("Failed to minify parsed GraphQL operation: {}", err);
ParserCacheError::MinifyError(err.to_string())
})?);
let hive_normalized_operation = hive_sdk_normalize_operation(&parsed_arc);
let hive_minified = minify_query(hive_normalized_operation.to_string().as_ref())
.map_err(|err| {
.inspect_err(|err| {
error!(
"Failed to minify GraphQL operation normalized for Hive SDK: {}",
err
);
ParserCacheError::MinifyError(err.to_string())
})?;

Ok(ParseCacheEntry {
Expand All @@ -182,7 +151,6 @@ pub async fn parse_operation_with_cache(
})
})
.await
.map_err(PipelineError::from)
.into_result_with_hit_miss(|hit_miss| match hit_miss {
CacheHitMiss::Hit => {
parse_span.record_cache_hit(true);
Expand Down
16 changes: 7 additions & 9 deletions bin/router/src/pipeline/query_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,15 @@ pub async fn plan_operation_with_cache(
return Ok(EMPTY_QUERY_PLAN.clone());
}

supergraph
.planner
.plan_from_normalized_operation(
filtered_operation_for_plan,
(&request_override_context.clone()).into(),
cancellation_token,
)
.map(Arc::new)
let query_plan = supergraph.planner.plan_from_normalized_operation(
filtered_operation_for_plan,
(&request_override_context.clone()).into(),
cancellation_token,
)?;

Ok(query_plan.into())
})
.await
.map_err(PipelineError::from)
.into_result_with_hit_miss(|hit_miss| match hit_miss {
CacheHitMiss::Hit => {
cache_hint = CacheHint::Hit;
Expand Down
2 changes: 1 addition & 1 deletion lib/graphql-tools/src/parser/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ mod minify;
pub use self::ast::*;
pub use self::error::ParseError;
pub use self::grammar::{consume_definition, parse_query, parse_query_with_token_limit};
pub use self::minify::{minify_query, minify_query_document};
pub use self::minify::{minify_query, minify_query_document, MinifyError};
Loading