Skip to content

Commit fc7aa0d

Browse files
zxqfd555Manul from Pathway
authored andcommitted
retry on pragma.data_version error in sqlite instead of failing (#9825)
GitOrigin-RevId: ec31b9d781ea4bd365368b93b68771416be59842
1 parent 39efa8f commit fc7aa0d

3 files changed

Lines changed: 236 additions & 212 deletions

File tree

src/connectors/data_storage.rs

Lines changed: 4 additions & 212 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,15 @@ use std::any::type_name;
88
use std::borrow::Borrow;
99
use std::borrow::Cow;
1010
use std::collections::HashMap;
11-
use std::collections::HashSet;
12-
use std::collections::VecDeque;
1311
use std::fmt;
1412
use std::fmt::{Debug, Display};
1513
use std::io;
1614
use std::io::BufRead;
1715
use std::io::BufWriter;
1816
use std::io::Write;
1917
use std::mem::take;
20-
use std::str::{from_utf8, Utf8Error};
18+
use std::str::Utf8Error;
2119
use std::sync::Arc;
22-
use std::thread::sleep;
2320
use std::time::Duration;
2421

2522
use arcstr::ArcStr;
@@ -60,18 +57,15 @@ use crate::connectors::data_format::{
6057
FormatterError, COMMIT_LITERAL,
6158
};
6259
use crate::connectors::data_lake::buffering::IncorrectSnapshotError;
63-
use crate::connectors::metadata::{KafkaMetadata, SQLiteMetadata, SourceMetadata};
64-
use crate::connectors::offset::EMPTY_OFFSET;
60+
use crate::connectors::metadata::{KafkaMetadata, SourceMetadata};
6561
use crate::connectors::posix_like::PosixLikeReader;
6662
use crate::connectors::scanner::s3::S3CommandName;
6763
use crate::connectors::{Offset, OffsetKey, OffsetValue, SPECIAL_FIELD_DIFF, SPECIAL_FIELD_TIME};
6864
use crate::engine::error::limit_length;
6965
use crate::engine::error::DynResult;
7066
use crate::engine::error::STANDARD_OBJECT_LENGTH_LIMIT;
7167
use crate::engine::time::DateTime;
72-
use crate::engine::DateTimeNaive;
73-
use crate::engine::Type;
74-
use crate::engine::{Duration as EngineDuration, Key, Value};
68+
use crate::engine::{DateTimeNaive, Duration as EngineDuration, Key, Type, Value};
7569
use crate::persistence::backends::Error as PersistenceBackendError;
7670
use crate::persistence::frontier::OffsetAntichain;
7771
use crate::persistence::tracker::WorkerPersistentStorage;
@@ -107,8 +101,6 @@ use rdkafka::producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedPr
107101
use rdkafka::topic_partition_list::Offset as KafkaOffset;
108102
use rdkafka::Message;
109103
use rdkafka::TopicPartitionList;
110-
use rusqlite::types::ValueRef as SqliteValue;
111-
use rusqlite::Connection as SqliteConnection;
112104
use rusqlite::Error as SqliteError;
113105
use serde::{Deserialize, Serialize};
114106

@@ -119,6 +111,7 @@ pub use super::data_lake::iceberg::IcebergReader;
119111
pub use super::data_lake::LakeWriter;
120112
pub use super::nats::NatsReader;
121113
pub use super::nats::NatsWriter;
114+
pub use super::sqlite::SqliteReader;
122115

123116
#[derive(Clone, Debug, Eq, PartialEq, Copy)]
124117
pub enum DataEventType {
@@ -1932,207 +1925,6 @@ impl Writer for NullWriter {
19321925
}
19331926
}
19341927

1935-
const SQLITE_DATA_VERSION_PRAGMA: &str = "data_version";
1936-
1937-
pub struct SqliteReader {
1938-
connection: SqliteConnection,
1939-
table_name: String,
1940-
schema: Vec<(String, Type)>,
1941-
1942-
last_saved_data_version: Option<i64>,
1943-
stored_state: HashMap<i64, ValuesMap>,
1944-
queued_updates: VecDeque<ReadResult>,
1945-
}
1946-
1947-
impl SqliteReader {
1948-
pub fn new(
1949-
connection: SqliteConnection,
1950-
table_name: String,
1951-
schema: Vec<(String, Type)>,
1952-
) -> Self {
1953-
Self {
1954-
connection,
1955-
table_name,
1956-
schema,
1957-
1958-
last_saved_data_version: None,
1959-
queued_updates: VecDeque::new(),
1960-
stored_state: HashMap::new(),
1961-
}
1962-
}
1963-
1964-
/// Data version is required to check if there was an update in the database.
1965-
/// There are also hooks, but they only work for changes happened in the same
1966-
/// connection.
1967-
/// More details why hooks don't help here: <https://sqlite.org/forum/forumpost/3174b39eeb79b6a4>
1968-
pub fn data_version(&self) -> i64 {
1969-
let version: ::rusqlite::Result<i64> = self.connection.pragma_query_value(
1970-
Some(::rusqlite::DatabaseName::Main),
1971-
SQLITE_DATA_VERSION_PRAGMA,
1972-
|row| row.get(0),
1973-
);
1974-
version.expect("pragma.data_version request should not fail")
1975-
}
1976-
1977-
/// Convert raw `SQLite` field into one of internal value types
1978-
/// There are only five supported types: null, integer, real, text, blob
1979-
/// See also: <https://www.sqlite.org/datatype3.html>
1980-
fn convert_to_value(
1981-
orig_value: SqliteValue<'_>,
1982-
field_name: &str,
1983-
dtype: &Type,
1984-
) -> Result<Value, Box<ConversionError>> {
1985-
let value = match (dtype, orig_value) {
1986-
(Type::Optional(_) | Type::Any, SqliteValue::Null) => Some(Value::None),
1987-
(Type::Optional(arg), value) => Self::convert_to_value(value, field_name, arg).ok(),
1988-
(Type::Int | Type::Any, SqliteValue::Integer(val)) => Some(Value::Int(val)),
1989-
(Type::Float | Type::Any, SqliteValue::Real(val)) => Some(Value::Float(val.into())),
1990-
(Type::String | Type::Any, SqliteValue::Text(val)) => from_utf8(val)
1991-
.ok()
1992-
.map(|parsed_string| Value::String(parsed_string.into())),
1993-
(Type::Json, SqliteValue::Text(val)) => from_utf8(val)
1994-
.ok()
1995-
.and_then(|parsed_string| {
1996-
serde_json::from_str::<serde_json::Value>(parsed_string).ok()
1997-
})
1998-
.map(Value::from),
1999-
(Type::Bytes | Type::Any, SqliteValue::Blob(val)) => Some(Value::Bytes(val.into())),
2000-
_ => None,
2001-
};
2002-
if let Some(value) = value {
2003-
Ok(value)
2004-
} else {
2005-
let value_repr = limit_length(format!("{orig_value:?}"), STANDARD_OBJECT_LENGTH_LIMIT);
2006-
Err(Box::new(ConversionError::new(
2007-
value_repr,
2008-
field_name.to_owned(),
2009-
dtype.clone(),
2010-
None,
2011-
)))
2012-
}
2013-
}
2014-
2015-
fn load_table(&mut self) -> Result<(), ReadError> {
2016-
let column_names: Vec<&str> = self
2017-
.schema
2018-
.iter()
2019-
.map(|(name, _dtype)| name.as_str())
2020-
.collect();
2021-
let query = format!(
2022-
"SELECT {},_rowid_ FROM {}",
2023-
column_names.join(","),
2024-
self.table_name
2025-
);
2026-
2027-
let mut statement = self.connection.prepare(&query)?;
2028-
let mut rows = statement.query([])?;
2029-
2030-
let mut present_rowids = HashSet::new();
2031-
while let Some(row) = rows.next()? {
2032-
let rowid: i64 = row.get(self.schema.len())?;
2033-
let mut values = HashMap::with_capacity(self.schema.len());
2034-
for (column_idx, (column_name, column_dtype)) in self.schema.iter().enumerate() {
2035-
let value =
2036-
Self::convert_to_value(row.get_ref(column_idx)?, column_name, column_dtype);
2037-
values.insert(column_name.clone(), value);
2038-
}
2039-
let values: ValuesMap = values.into();
2040-
self.stored_state
2041-
.entry(rowid)
2042-
.and_modify(|current_values| {
2043-
if current_values != &values {
2044-
let key = vec![Value::Int(rowid)];
2045-
self.queued_updates.push_back(ReadResult::Data(
2046-
ReaderContext::from_diff(
2047-
DataEventType::Delete,
2048-
Some(key.clone()),
2049-
take(current_values),
2050-
),
2051-
EMPTY_OFFSET,
2052-
));
2053-
self.queued_updates.push_back(ReadResult::Data(
2054-
ReaderContext::from_diff(
2055-
DataEventType::Insert,
2056-
Some(key),
2057-
values.clone(),
2058-
),
2059-
EMPTY_OFFSET,
2060-
));
2061-
current_values.clone_from(&values);
2062-
}
2063-
})
2064-
.or_insert_with(|| {
2065-
let key = vec![Value::Int(rowid)];
2066-
self.queued_updates.push_back(ReadResult::Data(
2067-
ReaderContext::from_diff(DataEventType::Insert, Some(key), values.clone()),
2068-
EMPTY_OFFSET,
2069-
));
2070-
values
2071-
});
2072-
present_rowids.insert(rowid);
2073-
}
2074-
2075-
self.stored_state.retain(|rowid, values| {
2076-
if present_rowids.contains(rowid) {
2077-
true
2078-
} else {
2079-
let key = vec![Value::Int(*rowid)];
2080-
self.queued_updates.push_back(ReadResult::Data(
2081-
ReaderContext::from_diff(DataEventType::Delete, Some(key), take(values)),
2082-
EMPTY_OFFSET,
2083-
));
2084-
false
2085-
}
2086-
});
2087-
2088-
if !self.queued_updates.is_empty() {
2089-
self.queued_updates.push_back(ReadResult::FinishedSource {
2090-
commit_possibility: CommitPossibility::Possible,
2091-
});
2092-
}
2093-
2094-
Ok(())
2095-
}
2096-
2097-
fn wait_period() -> Duration {
2098-
Duration::from_millis(500)
2099-
}
2100-
}
2101-
2102-
impl Reader for SqliteReader {
2103-
fn seek(&mut self, _frontier: &OffsetAntichain) -> Result<(), ReadError> {
2104-
todo!("seek is not supported for Sqlite source: persistent history of changes unavailable")
2105-
}
2106-
2107-
fn read(&mut self) -> Result<ReadResult, ReadError> {
2108-
loop {
2109-
if let Some(queued_update) = self.queued_updates.pop_front() {
2110-
return Ok(queued_update);
2111-
}
2112-
2113-
let current_data_version = self.data_version();
2114-
if self.last_saved_data_version != Some(current_data_version) {
2115-
self.load_table()?;
2116-
self.last_saved_data_version = Some(current_data_version);
2117-
return Ok(ReadResult::NewSource(
2118-
SQLiteMetadata::new(current_data_version).into(),
2119-
));
2120-
}
2121-
// Sleep to avoid non-stop pragma requests of a table
2122-
// that did not change
2123-
sleep(Self::wait_period());
2124-
}
2125-
}
2126-
2127-
fn short_description(&self) -> Cow<'static, str> {
2128-
format!("SQLite({})", self.table_name).into()
2129-
}
2130-
2131-
fn storage_type(&self) -> StorageType {
2132-
StorageType::Sqlite
2133-
}
2134-
}
2135-
21361928
#[derive(Debug)]
21371929
enum BufferedMongoEvent {
21381930
Insert {

src/connectors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub mod nats;
2929
pub mod offset;
3030
pub mod posix_like;
3131
pub mod scanner;
32+
pub mod sqlite;
3233
pub mod synchronization;
3334

3435
use crate::connectors::monitoring::ConnectorMonitor;

0 commit comments

Comments
 (0)