Skip to content

Commit b25bce2

Browse files
authored
fix: more permissive deserialization, arrow v58 (#985)
### Description We have to patch to a lot of branches, so we might not even want to merge it. But I _think_ we'll need v58 to get native geometry reading (see stac-utils/rustac-py#251), which GDAL is writing. Includes several relaxes of our **stac-geoparquet** parsing to handle common mistakes. We should probably make a validator.
1 parent 2dfc36e commit b25bce2

10 files changed

Lines changed: 113 additions & 78 deletions

File tree

Cargo.toml

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ rust-version = "1.88"
3434

3535
[workspace.dependencies]
3636
anyhow = "1.0"
37-
arrow-array = "57.0.0"
38-
arrow-cast = "57.0.0"
39-
arrow-json = "57.0.0"
40-
arrow-schema = "57.0.0"
37+
arrow-array = "58.0.0"
38+
arrow-cast = "58.0.0"
39+
arrow-json = "58.0.0"
40+
arrow-schema = "58.0.0"
4141
assert-json-diff = "2.0"
4242
assert_cmd = "2.1"
4343
async-recursion = "1.1.1"
@@ -51,7 +51,7 @@ chrono = "0.4.39"
5151
clap = "4.5"
5252
clap_complete = "4.5"
5353
cql2 = "0.5.0"
54-
duckdb = { git = "https://github.com/duckdb/duckdb-rs", rev = "a2639608d946690e04b1d5f6448302d57bb99d7e" }
54+
duckdb = { git = "https://github.com/gadomski/duckdb-rs", branch = "arrow-v58" }
5555
fluent-uri = "0.4.1"
5656
futures = "0.3.31"
5757
futures-core = "0.3.31"
@@ -73,8 +73,8 @@ libduckdb-sys = "1.3.0"
7373
log = "0.4.25"
7474
mime = "0.3.17"
7575
mockito = "1.5"
76-
object_store = "0.12.0"
77-
parquet = { version = "57.0.0" }
76+
object_store = "0.13.0"
77+
parquet = { version = "58.0.0" }
7878
quote = "1.0"
7979
reqwest = { version = "0.13.1", features = ["query"] }
8080
referencing = { version = "0.45.0", features = ["retrieve-async"] }
@@ -102,3 +102,8 @@ tracing-indicatif = "0.3.9"
102102
url = "2.3"
103103
webpki-roots = "1.0.0"
104104
wkb = "0.9.0"
105+
106+
[patch.crates-io]
107+
geoarrow-array = { git = "https://github.com/geoarrow/geoarrow-rs", branch = "arrow-58" }
108+
geoarrow-schema = { git = "https://github.com/geoarrow/geoarrow-rs", branch = "arrow-58" }
109+
geoparquet = { git = "https://github.com/geoarrow/geoarrow-rs", branch = "arrow-58" }

crates/cli/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ pub struct Rustac {
106106
/// When records are ordered spatially or temporally, lower values result in smaller row groups (better for selective queries),
107107
/// while higher values result in larger row groups (better compression).
108108
#[arg(
109-
long = "parquet-max-row-group-size",
109+
long = "parquet-max-row-group-row-count",
110110
global = true,
111111
verbatim_doc_comment
112112
)]
113-
parquet_max_row_group_size: Option<usize>,
113+
parquet_max_row_group_row_count: Option<usize>,
114114

115115
#[arg(
116116
long,
@@ -398,6 +398,7 @@ impl Rustac {
398398
);
399399
}
400400
let input_format = self.input_format(infile.as_deref());
401+
tracing::debug!("Reading as {input_format}");
401402
let can_stream = matches!(input_format, Format::NdJson | Format::Geoparquet(_));
402403
if can_stream {
403404
let items = self.get_item_stream(infile.as_deref()).await?;
@@ -826,8 +827,9 @@ impl Rustac {
826827
let mut writer_options = WriterOptions::new()
827828
.with_compression(self.parquet_compression.or(Some(default_compression())));
828829

829-
if let Some(max_row_group_size) = self.parquet_max_row_group_size {
830-
writer_options = writer_options.with_max_row_group_size(max_row_group_size);
830+
if let Some(max_row_group_row_count) = self.parquet_max_row_group_row_count {
831+
writer_options =
832+
writer_options.with_max_row_group_row_count(max_row_group_row_count);
831833
}
832834

833835
Format::Geoparquet(writer_options)

crates/cli/tests/test_cli.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,13 @@ fn output_format() {
137137
"rustac",
138138
"--output-format",
139139
"parquet",
140-
"--parquet-max-row-group-size",
140+
"--parquet-max-row-group-row-count",
141141
"50000",
142142
"translate",
143143
]);
144144
assert_eq!(
145145
rustac.output_format(None),
146-
Format::Geoparquet(WriterOptions::new().with_max_row_group_size(50000))
146+
Format::Geoparquet(WriterOptions::new().with_max_row_group_row_count(50000))
147147
);
148148

149149
let rustac = Rustac::parse_from([
@@ -152,7 +152,7 @@ fn output_format() {
152152
"parquet",
153153
"--parquet-compression",
154154
"snappy",
155-
"--parquet-max-row-group-size",
155+
"--parquet-max-row-group-row-count",
156156
"100000",
157157
"translate",
158158
]);
@@ -161,7 +161,7 @@ fn output_format() {
161161
Format::Geoparquet(
162162
WriterOptions::new()
163163
.with_compression(Some(Compression::SNAPPY))
164-
.with_max_row_group_size(100000)
164+
.with_max_row_group_row_count(100000)
165165
)
166166
);
167167
}

crates/core/src/datetime.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
//! Datetime utilities.
22
33
use crate::{Error, Result};
4-
use chrono::{DateTime, FixedOffset};
4+
use chrono::{DateTime, NaiveDateTime, Utc};
55

66
/// A start and end datetime.
7-
pub type Interval = (Option<DateTime<FixedOffset>>, Option<DateTime<FixedOffset>>);
7+
pub type Interval = (Option<DateTime<Utc>>, Option<DateTime<Utc>>);
88

9-
/// Parse a datetime or datetime interval into a start and end datetime.
9+
/// Parses a datetime or datetime interval into a start and end datetime.
1010
///
1111
/// Returns `None` to indicate an open interval.
1212
///
@@ -35,21 +35,38 @@ pub fn parse(datetime: &str) -> Result<Interval> {
3535
} else if datetime == ".." {
3636
Err(Error::InvalidDatetime(datetime.to_string()))
3737
} else {
38-
let datetime = DateTime::parse_from_rfc3339(datetime).map(Some)?;
38+
let datetime = parse_datetime_permissively(datetime).map(Some)?;
3939
Ok((datetime, datetime))
4040
}
4141
}
4242

43-
fn parse_one(s: &str) -> Result<Option<DateTime<FixedOffset>>> {
43+
/// Parses a single datetime permissively.
44+
pub fn parse_datetime_permissively(s: &str) -> Result<DateTime<Utc>> {
45+
match DateTime::parse_from_rfc3339(&s) {
46+
Ok(datetime) => Ok(datetime.to_utc()),
47+
Err(err) => {
48+
log::warn!(
49+
"error when parsing item datetime as rfc3339 ({err}), trying to parse as naive datetime"
50+
);
51+
let (mut datetime, remainder) =
52+
NaiveDateTime::parse_and_remainder(&s, "%Y-%m-%dT%H:%M:%S")?;
53+
// This isn't super efficient but we're in a read-invalid-data path, so I think it's fine.
54+
if !remainder.is_empty() && remainder.starts_with(".") {
55+
datetime = NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f")?;
56+
}
57+
Ok(datetime.and_utc())
58+
}
59+
}
60+
}
61+
62+
fn parse_one(s: &str) -> Result<Option<DateTime<Utc>>> {
4463
if s == ".." {
4564
Ok(None)
4665
} else if s.is_empty() {
4766
log::warn!("an empty string in a datetime interval are invalid, converting to \"..\"");
4867
Ok(None)
4968
} else {
50-
DateTime::parse_from_rfc3339(s)
51-
.map(Some)
52-
.map_err(Error::from)
69+
parse_datetime_permissively(s).map(Some)
5370
}
5471
}
5572

crates/core/src/geoarrow/json.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,11 @@ const TOP_LEVEL_KEYS: [&str; 10] = [
3838
"collection",
3939
];
4040

41-
use crate::Error;
41+
use crate::{Error, datetime::parse_datetime_permissively};
4242
use arrow_array::{RecordBatchReader, cast::*, types::*, *};
4343
use arrow_cast::display::{ArrayFormatter, FormatOptions};
4444
use arrow_json::JsonSerializable;
4545
use arrow_schema::*;
46-
use chrono::DateTime;
4746
use geo_traits::to_geo::{
4847
ToGeoGeometry, ToGeoGeometryCollection, ToGeoLineString, ToGeoMultiLineString, ToGeoMultiPoint,
4948
ToGeoMultiPolygon, ToGeoPoint, ToGeoPolygon, ToGeoRect,
@@ -535,7 +534,33 @@ pub(crate) fn record_batch_to_json_rows(
535534
}
536535
rows.into_iter()
537536
.map(|row| {
538-
let row = row.unwrap();
537+
let mut row = row.unwrap();
538+
// Force it, in case
539+
if row.contains_key("type") {
540+
row.insert("type".to_string(), "Feature".to_string().into());
541+
}
542+
if let Some(id) = row.remove("id") {
543+
if id.is_string() {
544+
row.insert("id".to_string(), id);
545+
} else {
546+
tracing::warn!("id field is not a string, coercing");
547+
row.insert("id".to_string(), id.to_string().into());
548+
}
549+
}
550+
if let Some(stac_extensions) = row.remove("stac_extensions") {
551+
if stac_extensions.is_string() {
552+
tracing::warn!(
553+
"stac_extensions field is a string, coercing to list of strings"
554+
);
555+
let _ = row.insert(
556+
"stac_extensions".to_string(),
557+
serde_json::from_str::<Vec<String>>(stac_extensions.as_str().unwrap())?
558+
.into(),
559+
);
560+
} else {
561+
row.insert("stac_extensions".to_string(), stac_extensions);
562+
}
563+
}
539564
unflatten(row)
540565
})
541566
.collect::<Result<_, _>>()
@@ -562,13 +587,8 @@ fn unflatten(
562587
if let Some(value) = item.remove(&key) {
563588
if DATETIME_COLUMNS.contains(&key.as_str()) {
564589
if let Some(value) = value.as_str() {
565-
let _ = properties.insert(
566-
key,
567-
DateTime::parse_from_rfc3339(value)?
568-
.to_utc()
569-
.to_rfc3339()
570-
.into(),
571-
);
590+
let _ = properties
591+
.insert(key, parse_datetime_permissively(value)?.to_rfc3339().into());
572592
}
573593
} else {
574594
let _ = properties.insert(key, value);

crates/core/src/geoparquet.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ pub fn default_compression() -> Compression {
2323
Compression::ZSTD(ZstdLevel::try_new(15).unwrap())
2424
}
2525

26-
/// Default stac-geoparquet max row group size
27-
pub const DEFAULT_STAC_MAX_ROW_GROUP_SIZE: usize = 150_000;
26+
/// Default stac-geoparquet max row group row count
27+
pub const DEFAULT_STAC_MAX_ROW_GROUP_ROW_COUNT: usize = 150_000;
2828

2929
/// The stac-geoparquet metadata key.
3030
pub const METADATA_KEY: &str = "stac-geoparquet";
@@ -39,7 +39,7 @@ pub struct WriterOptions {
3939
pub compression: Option<Compression>,
4040

4141
/// Maximum number of rows in a row group
42-
pub max_row_group_size: usize,
42+
pub max_row_group_row_count: usize,
4343
}
4444

4545
/// An encoder for writing stac-geoparquet
@@ -84,10 +84,10 @@ impl WriterOptions {
8484
/// ```
8585
/// use stac::geoparquet::WriterOptions;
8686
///
87-
/// let options = WriterOptions::new().with_max_row_group_size(50000);
87+
/// let options = WriterOptions::new().with_max_row_group_row_count(50000);
8888
/// ```
89-
pub fn with_max_row_group_size(mut self, size: usize) -> Self {
90-
self.max_row_group_size = size;
89+
pub fn with_max_row_group_row_count(mut self, size: usize) -> Self {
90+
self.max_row_group_row_count = size;
9191
self
9292
}
9393
}
@@ -96,7 +96,7 @@ impl Default for WriterOptions {
9696
fn default() -> Self {
9797
Self {
9898
compression: Some(default_compression()),
99-
max_row_group_size: DEFAULT_STAC_MAX_ROW_GROUP_SIZE,
99+
max_row_group_row_count: DEFAULT_STAC_MAX_ROW_GROUP_ROW_COUNT,
100100
}
101101
}
102102
}
@@ -277,7 +277,7 @@ impl<W: Write + Send> WriterBuilder<W> {
277277
/// let cursor = Cursor::new(Vec::new());
278278
/// let options = WriterOptions::new()
279279
/// .with_compression(Compression::SNAPPY)
280-
/// .with_max_row_group_size(50000);
280+
/// .with_max_row_group_row_count(50000);
281281
/// let writer = WriterBuilder::new(cursor)
282282
/// .writer_options(options)
283283
/// .build(vec![item])
@@ -733,7 +733,7 @@ impl From<WriterOptions> for WriterProperties {
733733
if let Some(compression) = value.compression {
734734
builder = builder.set_compression(compression);
735735
}
736-
builder = builder.set_max_row_group_size(value.max_row_group_size);
736+
builder = builder.set_max_row_group_row_count(Some(value.max_row_group_row_count));
737737
builder.build()
738738
}
739739
}
@@ -877,7 +877,7 @@ mod tests {
877877
let items: Vec<Item> = (0..100).map(|_| item.clone()).collect();
878878

879879
let mut cursor = Cursor::new(Vec::new());
880-
let options = super::WriterOptions::new().with_max_row_group_size(25);
880+
let options = super::WriterOptions::new().with_max_row_group_row_count(25);
881881
WriterBuilder::new(&mut cursor)
882882
.writer_options(options)
883883
.build(items)

crates/core/src/item.rs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
//! STAC Items.
22
3-
use crate::{Asset, Assets, Bbox, Error, Fields, Link, Result, STAC_VERSION, Version};
4-
use chrono::{DateTime, FixedOffset, NaiveDateTime, Utc};
3+
use crate::{
4+
Asset, Assets, Bbox, Error, Fields, Link, Result, STAC_VERSION, Version,
5+
datetime::parse_datetime_permissively,
6+
};
7+
use chrono::{DateTime, Utc};
58
use cql2::Expr;
69
use geojson::{Feature, Geometry, feature::Id};
710
use indexmap::IndexMap;
@@ -523,8 +526,8 @@ impl Item {
523526
/// ```
524527
pub fn intersects_datetimes(
525528
&self,
526-
start: Option<DateTime<FixedOffset>>,
527-
end: Option<DateTime<FixedOffset>>,
529+
start: Option<DateTime<Utc>>,
530+
end: Option<DateTime<Utc>>,
528531
) -> Result<bool> {
529532
let (item_start, item_end) = self.datetimes();
530533
let mut intersects = true;
@@ -722,23 +725,9 @@ where
722725
use serde::de::Error;
723726

724727
if let Some(s) = Option::<String>::deserialize(deserializer)? {
725-
match DateTime::parse_from_rfc3339(&s) {
726-
Ok(datetime) => Ok(Some(datetime.to_utc())),
727-
Err(err) => {
728-
log::warn!(
729-
"error when parsing item datetime as rfc3339 ({err}), trying to parse as naive datetime"
730-
);
731-
let (mut datetime, remainder) =
732-
NaiveDateTime::parse_and_remainder(&s, "%Y-%m-%dT%H:%M:%S")
733-
.map_err(D::Error::custom)?;
734-
// This isn't super efficient but we're in a read-invalid-data path, so I think it's fine.
735-
if !remainder.is_empty() && remainder.starts_with(".") {
736-
datetime = NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%.f")
737-
.map_err(D::Error::custom)?;
738-
}
739-
Ok(Some(datetime.and_utc()))
740-
}
741-
}
728+
parse_datetime_permissively(&s)
729+
.map(Some)
730+
.map_err(D::Error::custom)
742731
} else {
743732
Ok(None)
744733
}

crates/io/src/store.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{Format, Readable, Result, Writeable};
2-
use object_store::{ObjectStore, ObjectStoreScheme, PutResult, path::Path};
2+
use object_store::{ObjectStore, ObjectStoreExt, ObjectStoreScheme, PutResult, path::Path};
33
use std::{fmt::Debug, sync::Arc};
44
use tracing::instrument;
55
use url::Url;
@@ -206,7 +206,7 @@ impl StacStore {
206206
}
207207
#[cfg(feature = "geoparquet")]
208208
Format::Geoparquet(writer_options) => {
209-
let batch_size = writer_options.max_row_group_size;
209+
let batch_size = writer_options.max_row_group_row_count;
210210
let mut batch = Vec::with_capacity(batch_size);
211211
let mut writer: Option<geoparquet::StacGeoparquetObjectWriter> = None;
212212
for item in items {
@@ -368,7 +368,7 @@ mod tests {
368368
#[tokio::test]
369369
#[cfg(feature = "geoparquet")]
370370
async fn write_parquet() {
371-
use object_store::ObjectStore;
371+
use object_store::ObjectStoreExt;
372372

373373
use super::geoparquet::StacGeoparquetObjectWriter;
374374

@@ -400,7 +400,7 @@ mod tests {
400400
#[tokio::test]
401401
#[cfg(feature = "geoparquet")]
402402
async fn write_parquet_with_collection() {
403-
use object_store::ObjectStore;
403+
use object_store::ObjectStoreExt;
404404
use parquet::file::reader::{FileReader, SerializedFileReader};
405405

406406
use super::geoparquet::StacGeoparquetObjectWriter;

0 commit comments

Comments
 (0)