Skip to content

Commit 4a67099

Browse files
committed
Allow push source updates
1 parent a47b153 commit 4a67099

6 files changed

Lines changed: 457 additions & 27 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order:
1111
- Fixed
1212
-->
1313

14+
## Unreleased
15+
### Added
16+
- Support for re-defining the `AddPushSource`
17+
1418
## [0.254.1] - 2025-12-08
1519
### Fixed
1620
- Fixed bug in applying projected offsets for flow states in a situation when delayed transaction

src/domain/core/src/entities/writer_metadata_state.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ impl From<odf::dataset::AcceptVisitorError<ScanMetadataError>> for ScanMetadataE
198198

199199
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
200200

201+
// TODO: Convert into an enum to differentiate between not finding a source and
202+
// ambiguity between several sources.
201203
#[derive(Debug, thiserror::Error)]
202204
#[error("{message}")]
203205
pub struct SourceNotFoundError {
@@ -216,7 +218,7 @@ impl SourceNotFoundError {
216218

217219
impl From<SourceNotFoundError> for PushSourceNotFoundError {
218220
fn from(val: SourceNotFoundError) -> Self {
219-
PushSourceNotFoundError::new(val.source_name)
221+
PushSourceNotFoundError::new_with_messaage(val.source_name, val.message)
220222
}
221223
}
222224

src/domain/core/src/entities/writer_source_visitor.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,29 @@ impl<'a> WriterSourceEventVisitor<'a> {
8181
&mut self,
8282
e: &odf::metadata::AddPushSource,
8383
) -> Result<(), ScanMetadataError> {
84-
if self.maybe_source_event.is_none() {
85-
if self.maybe_source_name.is_none()
86-
|| self.maybe_source_name == Some(e.source_name.as_str())
87-
{
88-
self.maybe_source_event = Some(e.clone().into());
84+
match &self.maybe_source_event {
85+
None => {
86+
if self.maybe_source_name.is_none()
87+
|| self.maybe_source_name == Some(e.source_name.as_str())
88+
{
89+
self.maybe_source_event = Some(e.clone().into());
90+
}
91+
}
92+
Some(odf::MetadataEvent::AddPushSource(s)) if s.source_name == e.source_name => {
93+
// Encountered previous definition of the same source - the
94+
// one found first takes precedence
8995
}
90-
} else {
91-
// Encountered another source - if `source_name` was not specified we
92-
// return ambiguity error
93-
if self.maybe_source_name.is_none() {
94-
return Err(SourceNotFoundError::new(
95-
None::<&str>,
96-
"Explicit source name is required to pick between several active push sources",
97-
)
98-
.into());
96+
Some(_) => {
97+
// Encountered another source - if `source_name` was not specified we
98+
// return ambiguity error
99+
if self.maybe_source_name.is_none() {
100+
return Err(SourceNotFoundError::new(
101+
None::<&str>,
102+
"Explicit source name is required to pick between several active push \
103+
sources",
104+
)
105+
.into());
106+
}
99107
}
100108
}
101109

src/domain/core/src/services/ingest/push_ingest_planner.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -107,27 +107,35 @@ pub enum PushIngestPlanningError {
107107
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
108108

109109
#[derive(Debug, Error, Default)]
110+
#[error("{message}")]
110111
pub struct PushSourceNotFoundError {
111112
source_name: Option<String>,
113+
message: String,
112114
}
113115

114116
impl PushSourceNotFoundError {
115117
pub fn new(source_name: Option<impl Into<String>>) -> Self {
116-
Self {
117-
source_name: source_name.map(Into::into),
118+
match source_name {
119+
None => Self::new_with_messaage(
120+
source_name,
121+
"Dataset does not define a default push source, consider specifying the source \
122+
name",
123+
),
124+
Some(source_name) => {
125+
let source_name = source_name.into();
126+
let message = format!("Dataset does not define a push source '{source_name}'");
127+
Self::new_with_messaage(Some(source_name), message)
128+
}
118129
}
119130
}
120-
}
121131

122-
impl std::fmt::Display for PushSourceNotFoundError {
123-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124-
match &self.source_name {
125-
None => write!(
126-
f,
127-
"Dataset does not define a default push source, consider specifying the source \
128-
name"
129-
),
130-
Some(s) => write!(f, "Dataset does not define a push source '{s}'"),
132+
pub fn new_with_messaage(
133+
source_name: Option<impl Into<String>>,
134+
message: impl Into<String>,
135+
) -> Self {
136+
Self {
137+
source_name: source_name.map(Into::into),
138+
message: message.into(),
131139
}
132140
}
133141
}

src/infra/core/tests/tests/ingest/test_push_ingest.rs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,170 @@ async fn test_ingest_push_with_predefined_data_schema() {
10451045

10461046
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
10471047

1048+
#[test_group::group(engine, ingest, datafusion)]
1049+
#[test_log::test(tokio::test)]
1050+
async fn test_ingest_push_schema_and_source_evolution() {
1051+
use odf::metadata::*;
1052+
1053+
let harness = IngestTestHarness::new();
1054+
1055+
let dataset_snapshot = MetadataFactory::dataset_snapshot()
1056+
.name("foo.bar")
1057+
.kind(odf::DatasetKind::Root)
1058+
.push_event(SetDataSchema::new(DataSchema {
1059+
fields: vec![
1060+
DataField::i64("offset"),
1061+
DataField::i32("op"),
1062+
DataField::timestamp_millis_utc("system_time"),
1063+
DataField::timestamp_millis_utc("event_time"),
1064+
DataField::string("city"),
1065+
DataField::i64("population"),
1066+
],
1067+
extra: None,
1068+
}))
1069+
.push_event(
1070+
MetadataFactory::add_push_source()
1071+
.read(ReadStepNdJson {
1072+
schema: Some(vec![
1073+
"event_time TIMESTAMP".to_string(),
1074+
"city STRING".to_string(),
1075+
"population BIGINT".to_string(),
1076+
]),
1077+
..Default::default()
1078+
})
1079+
.merge(MergeStrategyLedger {
1080+
primary_key: vec!["event_time".to_string(), "city".to_string()],
1081+
})
1082+
.build(),
1083+
)
1084+
.build();
1085+
1086+
let dataset_alias = dataset_snapshot.name.clone();
1087+
let stored = harness.create_dataset(dataset_snapshot).await;
1088+
let target = ResolvedDataset::from_stored(&stored, &dataset_alias);
1089+
1090+
let data_helper = harness.dataset_data_helper(&dataset_alias).await;
1091+
1092+
// 1: Initial data
1093+
let data = std::io::Cursor::new(indoc!(
1094+
r#"
1095+
{ "event_time": "2020-01-01", "city": "A", "population": 1000 }
1096+
{ "event_time": "2020-01-02", "city": "B", "population": 2000 }
1097+
{ "event_time": "2020-01-03", "city": "C", "population": 3000 }
1098+
"#
1099+
));
1100+
1101+
harness
1102+
.ingest_from(
1103+
target.clone(),
1104+
None,
1105+
DataSource::Stream(Box::new(data)),
1106+
PushIngestOpts::default(),
1107+
)
1108+
.await
1109+
.unwrap();
1110+
1111+
data_helper
1112+
.assert_last_data_records_eq(indoc!(
1113+
r#"
1114+
+--------+----+----------------------+----------------------+------+------------+
1115+
| offset | op | system_time | event_time | city | population |
1116+
+--------+----+----------------------+----------------------+------+------------+
1117+
| 0 | 0 | 2050-01-01T12:00:00Z | 2020-01-01T00:00:00Z | A | 1000 |
1118+
| 1 | 0 | 2050-01-01T12:00:00Z | 2020-01-02T00:00:00Z | B | 2000 |
1119+
| 2 | 0 | 2050-01-01T12:00:00Z | 2020-01-03T00:00:00Z | C | 3000 |
1120+
+--------+----+----------------------+----------------------+------+------------+
1121+
"#
1122+
))
1123+
.await;
1124+
1125+
// 2: Migrate schema by adding optional `census_url` column
1126+
target
1127+
.commit_event(
1128+
SetDataSchema::new(DataSchema::new(vec![
1129+
DataField::i64("offset"),
1130+
DataField::i32("op"),
1131+
DataField::timestamp_millis_utc("system_time"),
1132+
DataField::timestamp_millis_utc("event_time"),
1133+
DataField::string("city"),
1134+
DataField::i64("population"),
1135+
DataField::string("census_url").optional(),
1136+
]))
1137+
.into(),
1138+
odf::dataset::CommitOpts {
1139+
system_time: Some(harness.time_source.now()),
1140+
..Default::default()
1141+
},
1142+
)
1143+
.await
1144+
.unwrap();
1145+
1146+
// 3: Update push source to match
1147+
target
1148+
.commit_event(
1149+
AddPushSource {
1150+
source_name: SourceState::DEFAULT_SOURCE_NAME.to_string(),
1151+
read: ReadStepNdJson {
1152+
schema: Some(vec![
1153+
"event_time TIMESTAMP".to_string(),
1154+
"city STRING".to_string(),
1155+
"population BIGINT".to_string(),
1156+
"census_url STRING".to_string(),
1157+
]),
1158+
..Default::default()
1159+
}
1160+
.into(),
1161+
preprocess: None,
1162+
merge: MergeStrategyLedger {
1163+
primary_key: vec!["event_time".to_string(), "city".to_string()],
1164+
}
1165+
.into(),
1166+
}
1167+
.into(),
1168+
odf::dataset::CommitOpts {
1169+
system_time: Some(harness.time_source.now()),
1170+
..Default::default()
1171+
},
1172+
)
1173+
.await
1174+
.unwrap();
1175+
1176+
// 4: Ingest new data
1177+
let data = std::io::Cursor::new(indoc!(
1178+
r#"
1179+
{ "event_time": "2020-01-04", "city": "A", "population": 2000 }
1180+
{ "event_time": "2020-01-05", "city": "B", "population": 3000, "census_url": null }
1181+
{ "event_time": "2020-01-06", "city": "C", "population": 4000, "census_url": "https://c.ca/census" }
1182+
"#
1183+
));
1184+
1185+
harness
1186+
.ingest_from(
1187+
target.clone(),
1188+
None,
1189+
DataSource::Stream(Box::new(data)),
1190+
PushIngestOpts::default(),
1191+
)
1192+
.await
1193+
.unwrap();
1194+
1195+
data_helper
1196+
.assert_last_data_records_eq(indoc!(
1197+
r#"
1198+
+--------+----+----------------------+----------------------+------+------------+---------------------+
1199+
| offset | op | system_time | event_time | city | population | census_url |
1200+
+--------+----+----------------------+----------------------+------+------------+---------------------+
1201+
| 3 | 0 | 2050-01-01T12:00:00Z | 2020-01-04T00:00:00Z | A | 2000 | |
1202+
| 4 | 0 | 2050-01-01T12:00:00Z | 2020-01-05T00:00:00Z | B | 3000 | |
1203+
| 5 | 0 | 2050-01-01T12:00:00Z | 2020-01-06T00:00:00Z | C | 4000 | https://c.ca/census |
1204+
+--------+----+----------------------+----------------------+------+------------+---------------------+
1205+
"#
1206+
))
1207+
.await;
1208+
}
1209+
1210+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1211+
10481212
struct IngestTestHarness {
10491213
temp_dir: TempDir,
10501214
dataset_registry: Arc<dyn DatasetRegistry>,

0 commit comments

Comments
 (0)