-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy patherr.rs
More file actions
209 lines (197 loc) · 8.22 KB
/
err.rs
File metadata and controls
209 lines (197 loc) · 8.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
use super::{BlockNumber, DeploymentSchemaVersion};
use crate::prelude::QueryExecutionError;
use crate::{data::store::EntityValidationError, prelude::DeploymentHash};
use anyhow::{anyhow, Error};
use diesel::result::Error as DieselError;
use thiserror::Error;
use tokio::task::JoinError;
#[derive(Error, Debug)]
pub enum StoreError {
#[error("store error: {0:#}")]
Unknown(Error),
#[error("Entity validation failed: {0}")]
EntityValidationError(EntityValidationError),
#[error(
"tried to set entity of type `{0}` with ID \"{1}\" but an entity of type `{2}`, \
which has an interface in common with `{0}`, exists with the same ID"
)]
ConflictingId(String, String, String), // (entity, id, conflicting_entity)
#[error("table '{0}' does not have a field '{1}'")]
UnknownField(String, String),
#[error("unknown table '{0}'")]
UnknownTable(String),
#[error("entity type '{0}' does not have an attribute '{0}'")]
UnknownAttribute(String, String),
#[error("malformed directive '{0}'")]
MalformedDirective(String),
#[error("query execution failed: {0}")]
QueryExecutionError(String),
#[error("Child filter nesting not supported by value `{0}`: `{1}`")]
ChildFilterNestingNotSupportedError(String, String),
#[error("invalid identifier: {0}")]
InvalidIdentifier(String),
#[error(
"subgraph `{0}` has already processed block `{1}`; \
there are most likely two (or more) nodes indexing this subgraph"
)]
DuplicateBlockProcessing(DeploymentHash, BlockNumber),
/// An internal error where we expected the application logic to enforce
/// some constraint, e.g., that subgraph names are unique, but found that
/// constraint to not hold
#[error("internal constraint violated: {0}")]
ConstraintViolation(String),
#[error("deployment not found: {0}")]
DeploymentNotFound(String),
#[error("shard not found: {0} (this usually indicates a misconfiguration)")]
UnknownShard(String),
#[error("Fulltext search not yet deterministic")]
FulltextSearchNonDeterministic,
#[error("Fulltext search column missing configuration")]
FulltextColumnMissingConfig,
#[error("operation was canceled")]
Canceled,
#[error("database unavailable")]
DatabaseUnavailable,
#[error("database disabled")]
DatabaseDisabled,
#[error("subgraph forking failed: {0}")]
ForkFailure(String),
#[error("subgraph writer poisoned by previous error")]
Poisoned,
#[error("panic in subgraph writer: {0}")]
WriterPanic(JoinError),
#[error(
"found schema version {0} but this graph node only supports versions up to {}. \
Did you downgrade Graph Node?",
DeploymentSchemaVersion::LATEST
)]
UnsupportedDeploymentSchemaVersion(i32),
#[error("pruning failed: {0}")]
PruneFailure(String),
#[error("unsupported filter `{0}` for value `{1}`")]
UnsupportedFilter(String, String),
#[error("writing {0} entities at block {1} failed: {2} Query: {3}")]
WriteFailure(String, BlockNumber, String, String),
#[error("database query timed out")]
StatementTimeout,
}
// Convenience to report a constraint violation
#[macro_export]
macro_rules! constraint_violation {
($msg:expr) => {{
$crate::prelude::StoreError::ConstraintViolation(format!("{}", $msg))
}};
($fmt:expr, $($arg:tt)*) => {{
$crate::prelude::StoreError::ConstraintViolation(format!($fmt, $($arg)*))
}}
}
/// We can't derive `Clone` because some variants use non-cloneable data.
/// For those cases, produce an `Unknown` error with some details about the
/// original error
impl Clone for StoreError {
fn clone(&self) -> Self {
match self {
Self::Unknown(arg0) => Self::Unknown(anyhow!("{}", arg0)),
Self::EntityValidationError(arg0) => Self::EntityValidationError(arg0.clone()),
Self::ConflictingId(arg0, arg1, arg2) => {
Self::ConflictingId(arg0.clone(), arg1.clone(), arg2.clone())
}
Self::UnknownField(arg0, arg1) => Self::UnknownField(arg0.clone(), arg1.clone()),
Self::UnknownTable(arg0) => Self::UnknownTable(arg0.clone()),
Self::UnknownAttribute(arg0, arg1) => {
Self::UnknownAttribute(arg0.clone(), arg1.clone())
}
Self::MalformedDirective(arg0) => Self::MalformedDirective(arg0.clone()),
Self::QueryExecutionError(arg0) => Self::QueryExecutionError(arg0.clone()),
Self::ChildFilterNestingNotSupportedError(arg0, arg1) => {
Self::ChildFilterNestingNotSupportedError(arg0.clone(), arg1.clone())
}
Self::InvalidIdentifier(arg0) => Self::InvalidIdentifier(arg0.clone()),
Self::DuplicateBlockProcessing(arg0, arg1) => {
Self::DuplicateBlockProcessing(arg0.clone(), arg1.clone())
}
Self::ConstraintViolation(arg0) => Self::ConstraintViolation(arg0.clone()),
Self::DeploymentNotFound(arg0) => Self::DeploymentNotFound(arg0.clone()),
Self::UnknownShard(arg0) => Self::UnknownShard(arg0.clone()),
Self::FulltextSearchNonDeterministic => Self::FulltextSearchNonDeterministic,
Self::FulltextColumnMissingConfig => Self::FulltextColumnMissingConfig,
Self::Canceled => Self::Canceled,
Self::DatabaseUnavailable => Self::DatabaseUnavailable,
Self::DatabaseDisabled => Self::DatabaseDisabled,
Self::ForkFailure(arg0) => Self::ForkFailure(arg0.clone()),
Self::Poisoned => Self::Poisoned,
Self::WriterPanic(arg0) => Self::Unknown(anyhow!("writer panic: {}", arg0)),
Self::UnsupportedDeploymentSchemaVersion(arg0) => {
Self::UnsupportedDeploymentSchemaVersion(arg0.clone())
}
Self::PruneFailure(arg0) => Self::PruneFailure(arg0.clone()),
Self::UnsupportedFilter(arg0, arg1) => {
Self::UnsupportedFilter(arg0.clone(), arg1.clone())
}
Self::WriteFailure(arg0, arg1, arg2, arg3) => {
Self::WriteFailure(arg0.clone(), arg1.clone(), arg2.clone(), arg3.clone())
}
Self::StatementTimeout => Self::StatementTimeout,
}
}
}
impl StoreError {
pub fn from_diesel_error(e: &DieselError) -> Option<Self> {
const CONN_CLOSE: &str = "server closed the connection unexpectedly";
const STMT_TIMEOUT: &str = "canceling statement due to statement timeout";
let DieselError::DatabaseError(_, info) = e else {
return None;
};
if info.message().contains(CONN_CLOSE) {
// When the error is caused by a closed connection, treat the error
// as 'database unavailable'. When this happens during indexing, the
// indexing machinery will retry in that case rather than fail the
// subgraph
Some(StoreError::DatabaseUnavailable)
} else if info.message().contains(STMT_TIMEOUT) {
Some(StoreError::StatementTimeout)
} else {
None
}
}
pub fn write_failure(
error: DieselError,
entity: &str,
block: BlockNumber,
query: String,
) -> Self {
Self::from_diesel_error(&error).unwrap_or_else(|| {
StoreError::WriteFailure(entity.to_string(), block, error.to_string(), query)
})
}
}
impl From<DieselError> for StoreError {
fn from(e: DieselError) -> Self {
Self::from_diesel_error(&e).unwrap_or_else(|| StoreError::Unknown(e.into()))
}
}
impl From<::diesel::r2d2::PoolError> for StoreError {
fn from(e: ::diesel::r2d2::PoolError) -> Self {
StoreError::Unknown(e.into())
}
}
impl From<Error> for StoreError {
fn from(e: Error) -> Self {
StoreError::Unknown(e)
}
}
impl From<serde_json::Error> for StoreError {
fn from(e: serde_json::Error) -> Self {
StoreError::Unknown(e.into())
}
}
impl From<QueryExecutionError> for StoreError {
fn from(e: QueryExecutionError) -> Self {
StoreError::QueryExecutionError(e.to_string())
}
}
impl From<std::fmt::Error> for StoreError {
fn from(e: std::fmt::Error) -> Self {
StoreError::Unknown(anyhow!("{}", e.to_string()))
}
}