diff --git a/Cargo.lock b/Cargo.lock index 297b566f46..40d794f2dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,21 +97,6 @@ dependencies = [ "libc", ] -[[package]] -name = "anstream" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" -dependencies = [ - "anstyle", - "anstyle-parse 0.2.7", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "is_terminal_polyfill", - "utf8parse", -] - [[package]] name = "anstream" version = "1.0.0" @@ -119,7 +104,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" dependencies = [ "anstyle", - "anstyle-parse 1.0.0", + "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", @@ -133,15 +118,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" -[[package]] -name = "anstyle-parse" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" -dependencies = [ - "utf8parse", -] - [[package]] name = "anstyle-parse" version = "1.0.0" @@ -378,7 +354,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.13.0", + "indexmap 2.14.0", "itoa", "lexical-core", "memchr", @@ -604,9 +580,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.39.0" +version = "0.39.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fa7e52a4c5c547c741610a2c6f123f3881e409b714cd27e6798ef020c514f0a" +checksum = "83a25cf98105baa966497416dbd42565ce3a8cf8dbfd59803ec9ad46f3126399" dependencies = [ "cc", "cmake", @@ -641,9 +617,9 @@ dependencies = [ [[package]] name = "aws-sdk-glue" -version = "1.142.0" +version = "1.142.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3962675ec1f2012ae6439814e784557550fa239a4a291bd4f33d8f514d4fdb5b" +checksum = "cc77647907307c70ffd751db85653804552e4a3c27f054d3af7a0874ef4dfe22" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1012,9 +988,9 @@ checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" [[package]] name = "bitflags" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" dependencies = [ "serde_core", ] @@ -1030,16 +1006,16 @@ dependencies = [ [[package]] name = "blake3" -version = "1.8.3" +version = "1.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2468ef7d57b3fb7e16b576e8377cdbde2320c60e1491e961d11da40fc4f02a2d" +checksum = "4d2d5991425dfd0785aed03aedcf0b321d61975c9b5b3689c774a2610ae0b51e" dependencies = [ "arrayref", "arrayvec", "cc", "cfg-if", "constant_time_eq", - "cpufeatures 0.2.17", + "cpufeatures 0.3.0", ] [[package]] @@ -1173,9 +1149,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.57" +version = "1.2.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" +checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" dependencies = [ "find-msvc-tools", "jobserver", @@ -1203,7 +1179,7 @@ checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" dependencies = [ "cfg-if", "cpufeatures 0.3.0", - "rand_core 0.10.0", + "rand_core 0.10.1", ] [[package]] @@ -1242,9 +1218,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.6.0" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" dependencies = [ "clap_builder", "clap_derive", @@ -1256,7 +1232,7 @@ version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ - "anstream 1.0.0", + "anstream", "anstyle", "clap_lex", "strsim", @@ -1264,9 +1240,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.6.0" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" dependencies = [ "heck", "proc-macro2", @@ -1291,9 +1267,9 @@ dependencies = [ [[package]] name = "cmake" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" dependencies = [ "cc", ] @@ -1773,7 +1749,7 @@ dependencies = [ "half", "hashbrown 0.16.1", "hex", - "indexmap 2.13.0", + "indexmap 2.14.0", "itertools 0.14.0", "libc", "log", @@ -1998,7 +1974,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr-common", - "indexmap 2.13.0", + "indexmap 2.14.0", "itertools 0.14.0", "paste", "recursive", @@ -2014,7 +1990,7 @@ checksum = "ab05fdd00e05d5a6ee362882546d29d6d3df43a6c55355164a7fbee12d163bc9" dependencies = [ "arrow", "datafusion-common", - "indexmap 2.13.0", + "indexmap 2.14.0", "itertools 0.14.0", "paste", ] @@ -2178,7 +2154,7 @@ dependencies = [ "datafusion-expr", "datafusion-expr-common", "datafusion-physical-expr", - "indexmap 2.13.0", + "indexmap 2.14.0", "itertools 0.14.0", "log", "recursive", @@ -2201,7 +2177,7 @@ dependencies = [ "datafusion-physical-expr-common", "half", "hashbrown 0.16.1", - "indexmap 2.13.0", + "indexmap 2.14.0", "itertools 0.14.0", "parking_lot", "paste", @@ -2237,7 +2213,7 @@ dependencies = [ "datafusion-common", "datafusion-expr-common", "hashbrown 0.16.1", - "indexmap 2.13.0", + "indexmap 2.14.0", "itertools 0.14.0", "parking_lot", ] @@ -2284,7 +2260,7 @@ dependencies = [ "futures", "half", "hashbrown 0.16.1", - "indexmap 2.13.0", + "indexmap 2.14.0", "itertools 0.14.0", "log", "num-traits", @@ -2363,7 +2339,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-functions-nested", - "indexmap 2.13.0", + "indexmap 2.14.0", "log", "recursive", "regex", @@ -2612,9 +2588,9 @@ dependencies = [ [[package]] name = "env_filter" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" dependencies = [ "log", "regex", @@ -2622,11 +2598,11 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.9" +version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" +checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" dependencies = [ - "anstream 0.6.21", + "anstream", "anstyle", "env_filter", "jiff", @@ -2728,9 +2704,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" [[package]] name = "faststr" @@ -2828,9 +2804,12 @@ dependencies = [ [[package]] name = "fragile" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" +checksum = "8878864ba14bb86e818a412bfd6f18f9eabd4ec0f008a28e8f7eb61db532fcf9" +dependencies = [ + "futures-core", +] [[package]] name = "fs-err" @@ -2998,7 +2977,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", - "rand_core 0.10.0", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -3043,7 +3022,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.4.0", - "indexmap 2.13.0", + "indexmap 2.14.0", "slab", "tokio", "tokio-util", @@ -3096,6 +3075,12 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + [[package]] name = "hashlink" version = "0.10.0" @@ -3231,9 +3216,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" [[package]] name = "hyper" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" dependencies = [ "atomic-waker", "bytes", @@ -3246,7 +3231,6 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "pin-utils", "smallvec", "tokio", "want", @@ -3254,16 +3238,15 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.7" +version = "0.27.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ "http 1.4.0", "hyper", "hyper-util", "rustls", "rustls-native-certs", - "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", @@ -3611,12 +3594,13 @@ dependencies = [ [[package]] name = "icu_collections" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" dependencies = [ "displaydoc", "potential_utf", + "utf8_iter", "yoke", "zerofrom", "zerovec", @@ -3624,9 +3608,9 @@ dependencies = [ [[package]] name = "icu_locale_core" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" dependencies = [ "displaydoc", "litemap", @@ -3637,9 +3621,9 @@ dependencies = [ [[package]] name = "icu_normalizer" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" dependencies = [ "icu_collections", "icu_normalizer_data", @@ -3651,15 +3635,15 @@ dependencies = [ [[package]] name = "icu_normalizer_data" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" [[package]] name = "icu_properties" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" dependencies = [ "icu_collections", "icu_locale_core", @@ -3671,15 +3655,15 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" [[package]] name = "icu_provider" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" dependencies = [ "displaydoc", "icu_locale_core", @@ -3736,12 +3720,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "serde", "serde_core", ] @@ -3787,9 +3771,9 @@ dependencies = [ [[package]] name = "inventory" -version = "0.3.22" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "009ae045c87e7082cb72dab0ccd01ae075dd00141ddc108f43a0ea150a9e7227" +checksum = "a4f0c30c76f2f4ccee3fe55a2435f691ca00c0e4bd87abe4f4a851b1d4dac39b" dependencies = [ "rustversion", ] @@ -3802,9 +3786,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "iri-string" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8e7418f59cc01c88316161279a7f665217ae316b388e58a0d10e29f54f1e5eb" +checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" dependencies = [ "memchr", "serde", @@ -3893,10 +3877,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.91" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" +checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" dependencies = [ + "cfg-if", + "futures-util", "once_cell", "wasm-bindgen", ] @@ -3996,9 +3982,9 @@ checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" [[package]] name = "libc" -version = "0.2.183" +version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" [[package]] name = "liblzma" @@ -4011,9 +3997,9 @@ dependencies = [ [[package]] name = "liblzma-sys" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f2db66f3268487b5033077f266da6777d057949b8f93c8ad82e441df25e6186" +checksum = "1a60851d15cd8c5346eca4ab8babff585be2ae4bc8097c067291d3ffe2add3b6" dependencies = [ "cc", "libc", @@ -4038,14 +4024,14 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.14" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" +checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" dependencies = [ "bitflags", "libc", "plain", - "redox_syscall 0.7.3", + "redox_syscall 0.7.4", ] [[package]] @@ -4065,7 +4051,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14e6ba06f0ade6e504aff834d7c34298e5155c6baca353cc6a4aaff2f9fd7f33" dependencies = [ - "anstream 1.0.0", + "anstream", "anstyle", "clap", "escape8259", @@ -4096,9 +4082,9 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" [[package]] name = "lock_api" @@ -4427,9 +4413,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" [[package]] name = "num-integer" @@ -4784,7 +4770,7 @@ checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455" dependencies = [ "fixedbitset", "hashbrown 0.15.5", - "indexmap 2.13.0", + "indexmap 2.14.0", "serde", ] @@ -4900,9 +4886,9 @@ dependencies = [ [[package]] name = "pkg-config" -version = "0.3.32" +version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" [[package]] name = "plain" @@ -4939,9 +4925,9 @@ dependencies = [ [[package]] name = "potential_utf" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" dependencies = [ "zerovec", ] @@ -5013,7 +4999,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ - "toml_edit 0.25.5+spec-1.1.0", + "toml_edit 0.25.11+spec-1.1.0", ] [[package]] @@ -5267,7 +5253,7 @@ checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" dependencies = [ "chacha20", "getrandom 0.4.2", - "rand_core 0.10.0", + "rand_core 0.10.1", ] [[package]] @@ -5311,9 +5297,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" [[package]] name = "recursive" @@ -5346,9 +5332,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" +checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a" dependencies = [ "bitflags", ] @@ -5538,7 +5524,7 @@ checksum = "1a30e631b7f4a03dee9056b8ef6982e8ba371dd5bedb74d3ec86df4499132c70" dependencies = [ "bytes", "hashbrown 0.16.1", - "indexmap 2.13.0", + "indexmap 2.14.0", "munge", "ptr_meta", "rancor", @@ -5631,9 +5617,9 @@ dependencies = [ [[package]] name = "rustc-hash" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" dependencies = [ "rand 0.8.5", ] @@ -5662,9 +5648,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.37" +version = "0.23.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" +checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" dependencies = [ "aws-lc-rs", "once_cell", @@ -5860,9 +5846,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" dependencies = [ "serde", "serde_core", @@ -6001,7 +5987,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.13.0", + "indexmap 2.14.0", "schemars 0.9.0", "schemars 1.2.1", "serde_core", @@ -6028,7 +6014,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.14.0", "itoa", "ryu", "serde", @@ -6094,9 +6080,9 @@ dependencies = [ [[package]] name = "simd-adler32" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" +checksum = "703d5c7ef118737c72f1af64ad2f6f8c5e1921f818cdcb97b8fe6fc69bf66214" [[package]] name = "simdutf8" @@ -6171,9 +6157,9 @@ dependencies = [ [[package]] name = "sonic-number" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5661364b38abad49cf1ade6631fcc35d2ccf882a7d68616b4228b7717feb5fba" +checksum = "3775c3390edf958191f1ab1e8c5c188907feebd0f3ce1604cb621f72961dbf32" dependencies = [ "cfg-if", ] @@ -6200,9 +6186,9 @@ dependencies = [ [[package]] name = "sonic-simd" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9f944718c33623919878cf74b4c9361eb3024f635733922b26722b14cd3f8cc" +checksum = "f99e664ecd2d85a68c87e3c7a3cfe691f647ea9e835de984aba4d54a41f817d4" dependencies = [ "cfg-if", ] @@ -6304,7 +6290,7 @@ dependencies = [ "futures-util", "hashbrown 0.15.5", "hashlink", - "indexmap 2.13.0", + "indexmap 2.14.0", "log", "memchr", "once_cell", @@ -6716,9 +6702,9 @@ dependencies = [ [[package]] name = "tinystr" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" dependencies = [ "displaydoc", "zerovec", @@ -6741,9 +6727,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.51.1" +version = "1.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" +checksum = "a91135f59b1cbf38c91e73cf3386fca9bb77915c45ce2771460c9d92f0f3d776" dependencies = [ "bytes", "libc", @@ -6825,9 +6811,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "1.0.1+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b320e741db58cac564e26c607d3cc1fdc4a88fd36c879568c07856ed83ff3e9" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" dependencies = [ "serde_core", ] @@ -6838,7 +6824,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.14.0", "serde", "serde_spanned", "toml_datetime 0.6.11", @@ -6848,23 +6834,23 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.25.5+spec-1.1.0" +version = "0.25.11+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca1a40644a28bce036923f6a431df0b34236949d111cc07cb6dca830c9ef2e1" +checksum = "0b59c4d22ed448339746c59b905d24568fcbb3ab65a500494f7b8c3e97739f2b" dependencies = [ - "indexmap 2.13.0", - "toml_datetime 1.0.1+spec-1.1.0", + "indexmap 2.14.0", + "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", - "winnow 1.0.0", + "winnow 1.0.1", ] [[package]] name = "toml_parser" -version = "1.0.10+spec-1.1.0" +version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7df25b4befd31c4816df190124375d5a20c6b6921e2cad937316de3fccd63420" +checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ - "winnow 1.0.0", + "winnow 1.0.1", ] [[package]] @@ -7120,9 +7106,9 @@ checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" [[package]] name = "unicode-segmentation" -version = "1.12.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" [[package]] name = "unicode-width" @@ -7343,9 +7329,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" +checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" dependencies = [ "cfg-if", "once_cell", @@ -7356,23 +7342,19 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.64" +version = "0.4.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" +checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" dependencies = [ - "cfg-if", - "futures-util", "js-sys", - "once_cell", "wasm-bindgen", - "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" +checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -7380,9 +7362,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" +checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" dependencies = [ "bumpalo", "proc-macro2", @@ -7393,9 +7375,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.114" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" +checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" dependencies = [ "unicode-ident", ] @@ -7417,7 +7399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap 2.13.0", + "indexmap 2.14.0", "wasm-encoder", "wasmparser", ] @@ -7443,15 +7425,15 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags", "hashbrown 0.15.5", - "indexmap 2.13.0", + "indexmap 2.14.0", "semver", ] [[package]] name = "web-sys" -version = "0.3.91" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" +checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" dependencies = [ "js-sys", "wasm-bindgen", @@ -7805,9 +7787,9 @@ dependencies = [ [[package]] name = "winnow" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a90e88e4667264a994d34e6d1ab2d26d398dcdca8b7f52bec8668957517fc7d8" +checksum = "09dac053f1cd375980747450bfc7250c264eaae0583872e845c0c7cd578872b5" dependencies = [ "memchr", ] @@ -7840,7 +7822,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck", - "indexmap 2.13.0", + "indexmap 2.14.0", "prettyplease", "syn", "wasm-metadata", @@ -7871,7 +7853,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", "bitflags", - "indexmap 2.13.0", + "indexmap 2.14.0", "log", "serde", "serde_derive", @@ -7890,7 +7872,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap 2.13.0", + "indexmap 2.14.0", "log", "semver", "serde", @@ -7902,9 +7884,9 @@ dependencies = [ [[package]] name = "writeable" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "xmlparser" @@ -7920,9 +7902,9 @@ checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" [[package]] name = "yoke" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" dependencies = [ "stable_deref_trait", "yoke-derive", @@ -7931,9 +7913,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" dependencies = [ "proc-macro2", "quote", @@ -7943,18 +7925,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.47" +version = "0.8.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efbb2a062be311f2ba113ce66f697a4dc589f85e78a4aea276200804cea0ed87" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.47" +version = "0.8.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e8bc7269b54418e7aeeef514aa68f8690b8c0489a06b0136e5f57c4c5ccab89" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" dependencies = [ "proc-macro2", "quote", @@ -7963,18 +7945,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" dependencies = [ "proc-macro2", "quote", @@ -7990,9 +7972,9 @@ checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" [[package]] name = "zerotrie" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" dependencies = [ "displaydoc", "yoke", @@ -8001,9 +7983,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" dependencies = [ "yoke", "zerofrom", @@ -8012,9 +7994,9 @@ dependencies = [ [[package]] name = "zerovec-derive" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" dependencies = [ "proc-macro2", "quote", diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index aa28ffd5a2..64c4b550f1 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -33,6 +33,14 @@ use crate::spec::{ }; use crate::{Error, ErrorKind, Result}; +/// Filter applied to each [`ManifestFile`] before fetching it. +/// Returns `true` to include the manifest, `false` to skip it. +pub(crate) type ManifestFileFilter = Arc bool + Send + Sync>; + +/// Filter applied to each manifest entry after loading a manifest. +/// Returns `true` to include the entry, `false` to skip it. +pub(crate) type ManifestEntryFilter = Arc bool + Send + Sync>; + /// Wraps a [`ManifestFile`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestFileContext { @@ -47,6 +55,7 @@ pub(crate) struct ManifestFileContext { expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, case_sensitive: bool, + entry_filter: Option, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -76,12 +85,19 @@ impl ManifestFileContext { mut sender, expression_evaluator_cache, delete_file_index, - .. + case_sensitive, + entry_filter, } = self; let manifest = object_cache.get_manifest(&manifest_file).await?; for manifest_entry in manifest.entries() { + if let Some(ref filter) = entry_filter + && !filter(manifest_entry) + { + continue; + } + let manifest_entry_context = ManifestEntryContext { // TODO: refactor to avoid the expensive ManifestEntry clone manifest_entry: manifest_entry.clone(), @@ -91,7 +107,7 @@ impl ManifestFileContext { bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), delete_file_index: delete_file_index.clone(), - case_sensitive: self.case_sensitive, + case_sensitive, }; sender @@ -146,7 +162,6 @@ impl ManifestEntryContext { /// PlanContext wraps a [`SnapshotRef`] alongside all the other /// objects that are required to perform a scan file plan. -#[derive(Debug)] pub(crate) struct PlanContext { pub snapshot: SnapshotRef, @@ -161,6 +176,25 @@ pub(crate) struct PlanContext { pub partition_filter_cache: Arc, pub manifest_evaluator_cache: Arc, pub expression_evaluator_cache: Arc, + pub manifest_file_filter: Option, + pub manifest_entry_filter: Option, +} + +impl std::fmt::Debug for PlanContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PlanContext") + .field("snapshot", &self.snapshot) + .field("case_sensitive", &self.case_sensitive) + .field( + "manifest_file_filter", + &self.manifest_file_filter.as_ref().map(|_| "..."), + ) + .field( + "manifest_entry_filter", + &self.manifest_entry_filter.as_ref().map(|_| "..."), + ) + .finish_non_exhaustive() + } } impl PlanContext { @@ -214,6 +248,12 @@ impl PlanContext { // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; for manifest_file in manifest_files { + if let Some(ref filter) = self.manifest_file_filter + && !filter(manifest_file) + { + continue; + } + let tx = if manifest_file.content == ManifestContentType::Deletes { delete_file_tx.clone() } else { @@ -283,6 +323,7 @@ impl PlanContext { expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, case_sensitive: self.case_sensitive, + entry_filter: self.manifest_entry_filter.clone(), } } } diff --git a/crates/iceberg/src/scan/incremental.rs b/crates/iceberg/src/scan/incremental.rs new file mode 100644 index 0000000000..316a797c07 --- /dev/null +++ b/crates/iceberg/src/scan/incremental.rs @@ -0,0 +1,802 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Incremental append scan for reading only newly added data between snapshots. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::expr::Predicate; +use crate::scan::context::{ManifestEntryFilter, ManifestFileFilter}; +use crate::scan::{ScanConfig, TableScan, build_table_scan}; +use crate::spec::{ManifestContentType, ManifestStatus, Operation, TableMetadataRef}; +use crate::table::Table; +use crate::util::available_parallelism; +use crate::util::snapshot::ancestors_between; +use crate::{Error, ErrorKind, Result}; + +/// Represents a validated range of snapshots for incremental scanning. +/// +/// This struct is used to track which snapshot IDs are included in an incremental +/// scan range, allowing efficient filtering of manifest entries. +#[derive(Debug, Clone)] +pub(crate) struct AppendSnapshotSet { + /// Snapshot IDs in the range + snapshot_ids: HashSet, +} + +impl AppendSnapshotSet { + /// Build a snapshot range by walking the snapshot ancestry chain. + /// + /// Validates that `from_snapshot_id` is an ancestor of `to_snapshot_id` and + /// collects all snapshot IDs in between. Also validates that all snapshots + /// in the range have APPEND operations. + /// + /// # Arguments + /// * `table_metadata` - The table metadata containing snapshot information + /// * `from_snapshot_id` - The starting snapshot ID + /// * `to_snapshot_id` - The ending snapshot ID + /// * `from_inclusive` - Whether to include the from_snapshot in the range + pub(crate) fn build( + table_metadata: &TableMetadataRef, + from_snapshot_id: i64, + to_snapshot_id: i64, + from_inclusive: bool, + ) -> Result { + // Determine the exclusive stop point for the ancestry walk. + // For inclusive mode the from-snapshot must exist so we can look up + // its parent. For exclusive mode the snapshot may have been expired + // (the parent pointer on its child still references it), so we only + // need the ID — matching Java's BaseIncrementalScan semantics. + let oldest_exclusive = if from_inclusive { + let from_snapshot = + table_metadata + .snapshot_by_id(from_snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot {from_snapshot_id} not found"), + ) + })?; + from_snapshot.parent_snapshot_id() + } else { + Some(from_snapshot_id) + }; + + let snapshots: Vec<_> = + ancestors_between(table_metadata, to_snapshot_id, oldest_exclusive).collect(); + + // ancestors_between silently returns the full chain to root if + // oldest_exclusive isn't in the ancestry chain. Detect this: + // if we got snapshots but from_snapshot_id wasn't encountered as + // the stop point, the chain doesn't connect. + if from_snapshot_id == to_snapshot_id { + // Edge case: from == to. In exclusive mode, range is empty. + // In inclusive mode, we should have exactly one snapshot. + if !from_inclusive { + return Ok(Self { + snapshot_ids: HashSet::new(), + }); + } + } else if snapshots.is_empty() { + // to_snapshot_id doesn't exist + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "from_snapshot {from_snapshot_id} is not an ancestor of to_snapshot {to_snapshot_id}", + ), + )); + } else { + // Verify the oldest snapshot in our walk is actually connected + // to from_snapshot_id. The last snapshot's parent (for exclusive) + // or the last snapshot itself (for inclusive) should be from_snapshot_id. + let oldest_collected = snapshots.last().unwrap(); + let connects = if from_inclusive { + oldest_collected.snapshot_id() == from_snapshot_id + } else { + oldest_collected.parent_snapshot_id() == Some(from_snapshot_id) + }; + if !connects { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "from_snapshot {from_snapshot_id} is not an ancestor of to_snapshot {to_snapshot_id}", + ), + )); + } + } + + // Collect only APPEND snapshot IDs, silently skipping non-APPEND + // snapshots (e.g. replace/compaction, overwrite, delete). This matches + // the Java BaseIncrementalAppendScan behavior — only append operations + // contribute new data files to an incremental append scan. + let mut snapshot_ids = HashSet::with_capacity(snapshots.len()); + for snapshot in &snapshots { + if snapshot.summary().operation == Operation::Append { + snapshot_ids.insert(snapshot.snapshot_id()); + } + } + + Ok(Self { snapshot_ids }) + } + + /// Check if a snapshot_id is within this set + pub(crate) fn contains(&self, snapshot_id: i64) -> bool { + self.snapshot_ids.contains(&snapshot_id) + } + + /// Create a manifest file filter that skips delete manifests and data + /// manifests whose `added_snapshot_id` is outside this set. + pub(crate) fn manifest_file_filter(self: &Arc) -> ManifestFileFilter { + let set = self.clone(); + Arc::new(move |manifest_file| { + manifest_file.content != ManifestContentType::Deletes + && set.contains(manifest_file.added_snapshot_id) + }) + } + + /// Create a manifest entry filter that includes only entries with + /// status ADDED and a snapshot_id within this set. + pub(crate) fn manifest_entry_filter(self: &Arc) -> ManifestEntryFilter { + let set = self.clone(); + Arc::new(move |entry| { + entry.status() == ManifestStatus::Added + && entry.snapshot_id().is_some_and(|id| set.contains(id)) + }) + } +} + +/// Builder to create an incremental append scan between two snapshots. +/// +/// An incremental append scan returns only data files that were added in +/// snapshots between `from_snapshot_id` and the target snapshot. Only +/// snapshots with APPEND operations are supported. +/// +/// Use [`Table::incremental_append_scan`] or +/// [`Table::incremental_append_scan_inclusive`] to create an instance. +pub struct IncrementalAppendScanBuilder<'a> { + table: &'a Table, + from_snapshot_id: i64, + from_inclusive: bool, + to_snapshot_id: Option, + column_names: Option>, + batch_size: Option, + case_sensitive: bool, + filter: Option, + concurrency_limit_data_files: usize, + concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, + row_group_filtering_enabled: bool, + row_selection_enabled: bool, +} + +impl<'a> IncrementalAppendScanBuilder<'a> { + pub(crate) fn new( + table: &'a Table, + from_snapshot_id: i64, + to_snapshot_id: Option, + from_inclusive: bool, + ) -> Self { + let num_cpus = available_parallelism().get(); + + Self { + table, + from_snapshot_id, + from_inclusive, + to_snapshot_id, + column_names: None, + batch_size: None, + case_sensitive: true, + filter: None, + concurrency_limit_data_files: num_cpus, + concurrency_limit_manifest_entries: num_cpus, + concurrency_limit_manifest_files: num_cpus, + row_group_filtering_enabled: true, + row_selection_enabled: false, + } + } + + /// Sets the desired size of batches in the response + /// to something other than the default + pub fn with_batch_size(mut self, batch_size: Option) -> Self { + self.batch_size = batch_size; + self + } + + /// Sets the scan's case sensitivity + pub fn with_case_sensitive(mut self, case_sensitive: bool) -> Self { + self.case_sensitive = case_sensitive; + self + } + + /// Specifies a predicate to use as a filter + pub fn with_filter(mut self, predicate: Predicate) -> Self { + self.filter = Some(predicate.rewrite_not()); + self + } + + /// Select all columns. + pub fn select_all(mut self) -> Self { + self.column_names = None; + self + } + + /// Select empty columns. + pub fn select_empty(mut self) -> Self { + self.column_names = Some(vec![]); + self + } + + /// Select some columns of the table. + pub fn select(mut self, column_names: impl IntoIterator) -> Self { + self.column_names = Some( + column_names + .into_iter() + .map(|item| item.to_string()) + .collect(), + ); + self + } + + /// Sets the concurrency limit for both manifest files and manifest + /// entries for this scan + pub fn with_concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_files = limit; + self.concurrency_limit_manifest_entries = limit; + self.concurrency_limit_data_files = limit; + self + } + + /// Sets the data file concurrency limit for this scan + pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit_data_files = limit; + self + } + + /// Sets the manifest entry concurrency limit for this scan + pub fn with_manifest_entry_concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_entries = limit; + self + } + + /// Determines whether to enable row group filtering. + pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self { + self.row_group_filtering_enabled = row_group_filtering_enabled; + self + } + + /// Determines whether to enable row selection. + pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self { + self.row_selection_enabled = row_selection_enabled; + self + } + + /// Build the incremental append scan. + pub fn build(self) -> Result { + let to_snapshot = match self.to_snapshot_id { + Some(snapshot_id) => self + .table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("to_snapshot with id {snapshot_id} not found"), + ) + })? + .clone(), + None => { + let Some(current_snapshot) = self.table.metadata().current_snapshot() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot perform incremental scan: table has no snapshots", + )); + }; + current_snapshot.clone() + } + }; + + let append_set = Arc::new(AppendSnapshotSet::build( + &self.table.metadata_ref(), + self.from_snapshot_id, + to_snapshot.snapshot_id(), + self.from_inclusive, + )?); + + build_table_scan( + ScanConfig { + table: self.table, + column_names: self.column_names, + batch_size: self.batch_size, + case_sensitive: self.case_sensitive, + filter: self.filter, + concurrency_limit_data_files: self.concurrency_limit_data_files, + concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, + row_group_filtering_enabled: self.row_group_filtering_enabled, + row_selection_enabled: self.row_selection_enabled, + }, + to_snapshot, + Some(append_set.manifest_file_filter()), + Some(append_set.manifest_entry_filter()), + ) + } +} + +#[cfg(test)] +mod tests { + use futures::TryStreamExt; + + use super::AppendSnapshotSet; + use crate::scan::tests::TableTestFixture; + + #[test] + fn test_incremental_scan_invalid_from_snapshot_exclusive() { + let table = TableTestFixture::new().table; + + // Exclusive mode doesn't require from-snapshot to exist, but it must + // be an ancestor of the to-snapshot. 999999999 is not in the ancestry + // chain so this should fail. + let result = table.incremental_append_scan(999999999, None).build(); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("not an ancestor"), + "Expected ancestry error, got: {err}" + ); + } + + #[test] + fn test_incremental_scan_invalid_from_snapshot_inclusive() { + let table = TableTestFixture::new().table; + + // Inclusive mode requires from-snapshot to exist (we need its parent ID). + let result = table + .incremental_append_scan_inclusive(999999999, None) + .build(); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("not found"), + "Expected 'not found' error, got: {err}" + ); + } + + #[test] + fn test_incremental_scan_exclusive_from_expired_snapshot() { + // Fixture has S1 (append) -> S2 (append, current). + // Simulate S1 being expired: use S1's ID as from-snapshot in exclusive + // mode even though it wouldn't exist in metadata after expiration. + // Since exclusive mode only needs the ID (not the snapshot object), + // this should succeed — the child (S2) still has parent_snapshot_id = S1. + let table = TableTestFixture::new().table; + + let s1_id = 3051729675574597004_i64; + let s2_id = 3055729675574597004_i64; + + // Verify S2's parent is S1 (simulating the expired-parent scenario) + assert_eq!( + table + .metadata() + .snapshot_by_id(s2_id) + .unwrap() + .parent_snapshot_id(), + Some(s1_id) + ); + + let result = table.incremental_append_scan(s1_id, Some(s2_id)).build(); + + assert!( + result.is_ok(), + "Exclusive scan from an (effectively expired) parent should succeed" + ); + } + + #[test] + fn test_incremental_scan_invalid_to_snapshot() { + let table = TableTestFixture::new().table; + + let result = table + .incremental_append_scan(3051729675574597004, Some(999999999)) + .build(); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("not found")); + } + + #[test] + fn test_incremental_scan_appends_after() { + // Fixture has S1 (append) -> S2 (append, current) + let table = TableTestFixture::new().table; + + let result = table + .incremental_append_scan(3051729675574597004, None) + .build(); + assert!( + result.is_ok(), + "appends_after should succeed when all snapshots are appends" + ); + + let scan = result.unwrap(); + assert!( + scan.plan_context.is_some(), + "Incremental scan should have a plan context" + ); + } + + #[test] + fn test_incremental_scan_appends_between() { + // Fixture has S1 (append) -> S2 (append, current) + let table = TableTestFixture::new().table; + + let current_snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + let parent_id = table + .metadata() + .current_snapshot() + .unwrap() + .parent_snapshot_id() + .expect("Current snapshot should have a parent"); + + let result = table + .incremental_append_scan(parent_id, Some(current_snapshot_id)) + .build(); + + assert!( + result.is_ok(), + "appends_between should succeed for two append snapshots" + ); + } + + #[test] + fn test_incremental_scan_from_snapshot_inclusive() { + // Fixture has S1 (append) -> S2 (append, current) + let table = TableTestFixture::new().table; + let current_snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Verify the scan builds successfully + let result = table + .incremental_append_scan_inclusive(current_snapshot_id, Some(current_snapshot_id)) + .build(); + assert!( + result.is_ok(), + "Inclusive scan of a single append snapshot should succeed" + ); + + // Verify AppendSnapshotSet directly + let set = AppendSnapshotSet::build( + &table.metadata_ref(), + current_snapshot_id, + current_snapshot_id, + true, + ) + .unwrap(); + assert!( + set.contains(current_snapshot_id), + "Inclusive set should contain the from_snapshot" + ); + } + + #[test] + fn test_incremental_scan_from_snapshot_exclusive() { + // Fixture has S1 (append) -> S2 (append, current) + let table = TableTestFixture::new().table; + let current_snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Verify the scan builds successfully + let result = table + .incremental_append_scan(current_snapshot_id, Some(current_snapshot_id)) + .build(); + assert!( + result.is_ok(), + "Exclusive scan from=to should succeed with empty range" + ); + + // Verify AppendSnapshotSet directly + let set = AppendSnapshotSet::build( + &table.metadata_ref(), + current_snapshot_id, + current_snapshot_id, + false, + ) + .unwrap(); + assert!( + !set.contains(current_snapshot_id), + "Exclusive set should not contain the from_snapshot" + ); + } + + #[test] + fn test_incremental_scan_skips_non_append_operations() { + // Deep history fixture: S1 (append) -> S2 (append) -> S3 (append) + // -> S4 (overwrite) -> S5 (append, current) + let table = TableTestFixture::new_with_deep_history().table; + + // Scanning from S1 to S5 crosses S4 (overwrite) — should succeed + // but only include APPEND snapshots (S2, S3, S5), skipping S4 + let result = table + .incremental_append_scan(3051729675574597004, Some(3059729675574597004)) + .build(); + + assert!( + result.is_ok(), + "Should succeed, skipping non-APPEND snapshots" + ); + + let set = AppendSnapshotSet::build( + &table.metadata_ref(), + 3051729675574597004, + 3059729675574597004, + false, + ) + .unwrap(); + assert!( + !set.contains(3051729675574597004), + "S1 (from) should be excluded" + ); + assert!( + set.contains(3055729675574597004), + "S2 (append) should be in set" + ); + assert!( + set.contains(3056729675574597004), + "S3 (append) should be in set" + ); + assert!( + !set.contains(3057729675574597004), + "S4 (overwrite) should be skipped" + ); + assert!( + set.contains(3059729675574597004), + "S5 (append) should be in set" + ); + } + + #[test] + fn test_incremental_scan_append_only_range() { + // Deep history fixture: S1 (append) -> S2 (append) -> S3 (append) + // -> S4 (overwrite) -> S5 (append, current) + let table = TableTestFixture::new_with_deep_history().table; + + // Scanning from S1 to S3 (all appends) + let set = AppendSnapshotSet::build( + &table.metadata_ref(), + 3051729675574597004, + 3056729675574597004, + false, + ) + .unwrap(); + assert!( + !set.contains(3051729675574597004), + "from_snapshot should be excluded" + ); + assert!(set.contains(3055729675574597004), "S2 should be in range"); + assert!(set.contains(3056729675574597004), "S3 should be in range"); + } + + #[tokio::test] + async fn test_incremental_scan_returns_only_added_files_in_range() { + // Fixture has S1 (append) -> S2 (append, current) + // Manifest contains: + // 1.parquet: status=Added, snapshot=S2 + // 2.parquet: status=Deleted, snapshot=S1 + // 3.parquet: status=Existing, snapshot=S1 + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let current_snapshot = fixture.table.metadata().current_snapshot().unwrap(); + let parent_snapshot_id = current_snapshot.parent_snapshot_id().unwrap(); + + // Incremental scan from S1 (exclusive) to S2 should return only 1.parquet + let table_scan = fixture + .table + .incremental_append_scan(parent_snapshot_id, Some(current_snapshot.snapshot_id())) + .build() + .unwrap(); + + let tasks: Vec<_> = table_scan + .plan_files() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!( + tasks.len(), + 1, + "Incremental scan should return exactly 1 file" + ); + assert_eq!( + tasks[0].data_file_path, + format!("{}/1.parquet", &fixture.table_location), + "Should only return the file added in S2" + ); + } + + #[tokio::test] + async fn test_incremental_scan_exclusive_same_snapshot_returns_empty() { + // Fixture has S1 (append) -> S2 (append, current) + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let current_snapshot_id = fixture + .table + .metadata() + .current_snapshot() + .unwrap() + .snapshot_id(); + + // Incremental scan from S2 to S2 (exclusive) should return nothing + let table_scan = fixture + .table + .incremental_append_scan(current_snapshot_id, Some(current_snapshot_id)) + .build() + .unwrap(); + + let tasks: Vec<_> = table_scan + .plan_files() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!( + tasks.is_empty(), + "Exclusive scan from=to should return no files" + ); + } + + #[tokio::test] + async fn test_incremental_scan_deep_history_skips_overwrite_files() { + // Deep history fixture: + // S1 (append) -> S2 (append) -> S3 (append) -> S4 (overwrite) -> S5 (append, current) + // Each snapshot adds one file: s1.parquet .. s5.parquet + // + // Incremental scan from S1 (exclusive) to S5 should return only files + // from APPEND snapshots: s2.parquet, s3.parquet, s5.parquet + // s4.parquet (added in overwrite S4) must be skipped. + let mut fixture = TableTestFixture::new_with_deep_history(); + fixture.setup_manifest_files_deep_history().await; + + let s1_id = 3051729675574597004_i64; + let s5_id = 3059729675574597004_i64; + + let table_scan = fixture + .table + .incremental_append_scan(s1_id, Some(s5_id)) + .build() + .unwrap(); + + let mut tasks: Vec<_> = table_scan + .plan_files() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + // Sort by path for deterministic assertions + tasks.sort_by(|a, b| a.data_file_path.cmp(&b.data_file_path)); + + assert_eq!( + tasks.len(), + 3, + "Should return 3 files (s2, s3, s5), skipping s4 (overwrite)" + ); + + let file_names: Vec<&str> = tasks + .iter() + .map(|t| { + t.data_file_path + .rsplit('/') + .next() + .unwrap_or(&t.data_file_path) + }) + .collect(); + + assert_eq!( + file_names, + vec!["s2.parquet", "s3.parquet", "s5.parquet"], + "Only files from APPEND snapshots should be returned" + ); + } + + #[tokio::test] + async fn test_incremental_scan_deep_history_partial_range() { + // Scan from S2 (exclusive) to S3 — both appends, should return only s3.parquet + let mut fixture = TableTestFixture::new_with_deep_history(); + fixture.setup_manifest_files_deep_history().await; + + let s2_id = 3055729675574597004_i64; + let s3_id = 3056729675574597004_i64; + + let table_scan = fixture + .table + .incremental_append_scan(s2_id, Some(s3_id)) + .build() + .unwrap(); + + let tasks: Vec<_> = table_scan + .plan_files() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(tasks.len(), 1, "Should return exactly 1 file"); + assert!( + tasks[0].data_file_path.ends_with("s3.parquet"), + "Should return s3.parquet, got: {}", + tasks[0].data_file_path + ); + } + + #[tokio::test] + async fn test_incremental_scan_deep_history_inclusive_with_overwrite() { + // Inclusive scan from S3 to S5: + // S3 (append) -> S4 (overwrite) -> S5 (append) + // Should return s3.parquet and s5.parquet, skipping s4.parquet + let mut fixture = TableTestFixture::new_with_deep_history(); + fixture.setup_manifest_files_deep_history().await; + + let s3_id = 3056729675574597004_i64; + let s5_id = 3059729675574597004_i64; + + let table_scan = fixture + .table + .incremental_append_scan_inclusive(s3_id, Some(s5_id)) + .build() + .unwrap(); + + let mut tasks: Vec<_> = table_scan + .plan_files() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + tasks.sort_by(|a, b| a.data_file_path.cmp(&b.data_file_path)); + + assert_eq!( + tasks.len(), + 2, + "Should return 2 files (s3, s5), skipping s4 (overwrite)" + ); + + let file_names: Vec<&str> = tasks + .iter() + .map(|t| { + t.data_file_path + .rsplit('/') + .next() + .unwrap_or(&t.data_file_path) + }) + .collect(); + + assert_eq!( + file_names, + vec!["s3.parquet", "s5.parquet"], + "Only files from APPEND snapshots should be returned" + ); + } +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 4a1e27bdc1..8e6549b3d4 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,6 +21,7 @@ mod cache; use cache::*; mod context; use context::*; +mod incremental; mod task; use std::sync::Arc; @@ -29,6 +30,7 @@ use arrow_array::RecordBatch; use futures::channel::mpsc::{Sender, channel}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; +pub use incremental::IncrementalAppendScanBuilder; pub use task::*; use crate::arrow::ArrowReaderBuilder; @@ -46,6 +48,119 @@ use crate::{Error, ErrorKind, Result}; /// A stream of arrow [`RecordBatch`]es. pub type ArrowRecordBatchStream = BoxStream<'static, Result>; +/// Shared configuration extracted from scan builders, used by both +/// [`TableScanBuilder`] and [`IncrementalAppendScanBuilder`]. +pub(crate) struct ScanConfig<'a> { + table: &'a Table, + column_names: Option>, + batch_size: Option, + case_sensitive: bool, + filter: Option, + concurrency_limit_data_files: usize, + concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, + row_group_filtering_enabled: bool, + row_selection_enabled: bool, +} + +/// Shared build logic: validates columns, resolves field IDs, binds predicates, +/// and constructs [`PlanContext`] + [`TableScan`]. +pub(crate) fn build_table_scan( + config: ScanConfig<'_>, + snapshot: SnapshotRef, + manifest_file_filter: Option, + manifest_entry_filter: Option, +) -> Result { + let schema = snapshot.schema(config.table.metadata())?; + + // Check that all column names exist in the schema (skip reserved columns). + if let Some(column_names) = config.column_names.as_ref() { + for column_name in column_names { + if is_metadata_column_name(column_name) { + continue; + } + if schema.field_by_name(column_name).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Column {column_name} not found in table. Schema: {schema}"), + )); + } + } + } + + let mut field_ids = vec![]; + let column_names = config.column_names.clone().unwrap_or_else(|| { + schema + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + for column_name in column_names.iter() { + if is_metadata_column_name(column_name) { + field_ids.push(get_metadata_field_id(column_name)?); + continue; + } + + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Column {column_name} not found in table. Schema: {schema}"), + ) + })?; + + schema + .as_struct() + .field_by_id(field_id) + .ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" + ), + ) + })?; + + field_ids.push(field_id); + } + + let snapshot_bound_predicate = if let Some(ref predicates) = config.filter { + Some(predicates.bind(schema.clone(), true)?) + } else { + None + }; + + let plan_context = PlanContext { + snapshot, + table_metadata: config.table.metadata_ref(), + snapshot_schema: schema, + case_sensitive: config.case_sensitive, + predicate: config.filter.map(Arc::new), + snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), + object_cache: config.table.object_cache(), + field_ids: Arc::new(field_ids), + partition_filter_cache: Arc::new(PartitionFilterCache::new()), + manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), + expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + manifest_file_filter, + manifest_entry_filter, + }; + + Ok(TableScan { + batch_size: config.batch_size, + column_names: config.column_names, + file_io: config.table.file_io().clone(), + plan_context: Some(plan_context), + concurrency_limit_data_files: config.concurrency_limit_data_files, + concurrency_limit_manifest_entries: config.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: config.concurrency_limit_manifest_files, + row_group_filtering_enabled: config.row_group_filtering_enabled, + row_selection_enabled: config.row_selection_enabled, + }) +} + /// Builder to create table scan. pub struct TableScanBuilder<'a> { table: &'a Table, @@ -199,7 +314,7 @@ impl<'a> TableScanBuilder<'a> { })? .clone(), None => { - let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else { + let Some(current_snapshot) = self.table.metadata().current_snapshot() else { return Ok(TableScan { batch_size: self.batch_size, column_names: self.column_names, @@ -212,98 +327,27 @@ impl<'a> TableScanBuilder<'a> { row_selection_enabled: self.row_selection_enabled, }); }; - current_snapshot_id.clone() - } - }; - - let schema = snapshot.schema(self.table.metadata())?; - - // Check that all column names exist in the schema (skip reserved columns). - if let Some(column_names) = self.column_names.as_ref() { - for column_name in column_names { - // Skip reserved columns that don't exist in the schema - if is_metadata_column_name(column_name) { - continue; - } - if schema.field_by_name(column_name).is_none() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Column {column_name} not found in table. Schema: {schema}"), - )); - } + current_snapshot.clone() } - } - - let mut field_ids = vec![]; - let column_names = self.column_names.clone().unwrap_or_else(|| { - schema - .as_struct() - .fields() - .iter() - .map(|f| f.name.clone()) - .collect() - }); - - for column_name in column_names.iter() { - // Handle metadata columns (like "_file") - if is_metadata_column_name(column_name) { - field_ids.push(get_metadata_field_id(column_name)?); - continue; - } - - let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Column {column_name} not found in table. Schema: {schema}"), - ) - })?; - - schema - .as_struct() - .field_by_id(field_id) - .ok_or_else(|| { - Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" - ), - ) - })?; - - field_ids.push(field_id); - } - - let snapshot_bound_predicate = if let Some(ref predicates) = self.filter { - Some(predicates.bind(schema.clone(), true)?) - } else { - None }; - let plan_context = PlanContext { + build_table_scan( + ScanConfig { + table: self.table, + column_names: self.column_names, + batch_size: self.batch_size, + case_sensitive: self.case_sensitive, + filter: self.filter, + concurrency_limit_data_files: self.concurrency_limit_data_files, + concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, + row_group_filtering_enabled: self.row_group_filtering_enabled, + row_selection_enabled: self.row_selection_enabled, + }, snapshot, - table_metadata: self.table.metadata_ref(), - snapshot_schema: schema, - case_sensitive: self.case_sensitive, - predicate: self.filter.map(Arc::new), - snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), - object_cache: self.table.object_cache(), - field_ids: Arc::new(field_ids), - partition_filter_cache: Arc::new(PartitionFilterCache::new()), - manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), - expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), - }; - - Ok(TableScan { - batch_size: self.batch_size, - column_names: self.column_names, - file_io: self.table.file_io().clone(), - plan_context: Some(plan_context), - concurrency_limit_data_files: self.concurrency_limit_data_files, - concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, - concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, - row_group_filtering_enabled: self.row_group_filtering_enabled, - row_selection_enabled: self.row_selection_enabled, - }) + None, + None, + ) } } @@ -592,8 +636,8 @@ pub mod tests { use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec, - PrimitiveType, Schema, Struct, StructType, TableMetadata, Type, + ManifestFile, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, + PartitionSpec, PrimitiveType, Schema, Struct, StructType, TableMetadata, Type, }; use crate::table::Table; @@ -684,22 +728,37 @@ pub mod tests { } /// Creates a fixture with 5 snapshots chained as: - /// S1 (root) -> S2 -> S3 -> S4 -> S5 (current) - /// Useful for testing snapshot history traversal. + /// S1 (append) -> S2 (append) -> S3 (append) -> S4 (overwrite) -> S5 (append, current) + /// Useful for testing snapshot history traversal and incremental scans + /// with non-append operations in the chain. pub fn new_with_deep_history() -> Self { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().join("table1"); let table_metadata1_location = table_location.join("metadata/v1.json"); + let manifest_list_s1 = table_location.join("metadata/snap-3051729675574597004.avro"); + let manifest_list_s2 = table_location.join("metadata/snap-3055729675574597004.avro"); + let manifest_list_s3 = table_location.join("metadata/snap-3056729675574597004.avro"); + let manifest_list_s4 = table_location.join("metadata/snap-3057729675574597004.avro"); + let manifest_list_s5 = table_location.join("metadata/snap-3059729675574597004.avro"); + let file_io = FileIO::new_with_fs(); let table_metadata = { - let json_str = fs::read_to_string(format!( + let template_json_str = fs::read_to_string(format!( "{}/testdata/example_table_metadata_v2_deep_history.json", env!("CARGO_MANIFEST_DIR") )) .unwrap(); - serde_json::from_str::(&json_str).unwrap() + let metadata_json = render_template(&template_json_str, context! { + table_location => &table_location, + manifest_list_s1_location => &manifest_list_s1, + manifest_list_s2_location => &manifest_list_s2, + manifest_list_s3_location => &manifest_list_s3, + manifest_list_s4_location => &manifest_list_s4, + manifest_list_s5_location => &manifest_list_s5, + }); + serde_json::from_str::(&metadata_json).unwrap() }; let table = Table::builder() @@ -873,6 +932,147 @@ pub mod tests { manifest_list_write.close().await.unwrap(); } + /// Sets up manifest files for the deep history fixture. + /// + /// Creates one data file per snapshot (s1.parquet through s5.parquet), + /// each with a manifest and manifest list. Manifest lists are cumulative + /// (each snapshot's list includes all prior manifests), matching real + /// Iceberg behavior. The incremental scan should skip s4.parquet + /// (added in the overwrite snapshot S4). + pub async fn setup_manifest_files_deep_history(&mut self) { + let parquet_file_size = self.write_parquet_data_files_deep_history(); + let partition_spec = self.table.metadata().default_partition_spec(); + + // Snapshot chain: S1 -> S2 -> S3 -> S4 (overwrite) -> S5 + let snapshot_ids: Vec = vec![ + 3051729675574597004, + 3055729675574597004, + 3056729675574597004, + 3057729675574597004, + 3059729675574597004, + ]; + + // Accumulate manifests across snapshots (each manifest list is cumulative) + let mut all_manifests: Vec = Vec::new(); + + for (i, &snap_id) in snapshot_ids.iter().enumerate() { + let snapshot = self + .table + .metadata() + .snapshot_by_id(snap_id) + .unwrap() + .clone(); + let schema = snapshot.schema(self.table.metadata()).unwrap(); + + let file_name = format!("s{}.parquet", i + 1); + let partition_value = (i + 1) as i64 * 100; + + let mut writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snap_id), + None, + schema, + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(format!("{}/{}", &self.table_location, file_name)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(parquet_file_size) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long( + partition_value, + ))])) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + let mut data_file_manifest = writer.write_manifest_file().await.unwrap(); + // Assign sequence numbers so the manifest can be included in + // later snapshots' cumulative manifest lists without triggering + // the "unassigned sequence number" validation. + data_file_manifest.sequence_number = snapshot.sequence_number(); + data_file_manifest.min_sequence_number = snapshot.sequence_number(); + all_manifests.push(data_file_manifest); + + // Write cumulative manifest list for this snapshot + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(snapshot.manifest_list()) + .unwrap(), + snap_id, + snapshot.parent_snapshot_id(), + snapshot.sequence_number(), + ); + manifest_list_write + .add_manifests(all_manifests.clone().into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + } + } + + /// Writes parquet data files for the deep history fixture (3-column schema: x, y, z). + fn write_parquet_data_files_deep_history(&self) -> u64 { + std::fs::create_dir_all(&self.table_location).unwrap(); + + let schema = { + let fields = vec![ + arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 10])) as ArrayRef; + let col2 = Arc::new(Int64Array::from_iter_values(vec![2; 10])) as ArrayRef; + let col3 = Arc::new(Int64Array::from_iter_values(vec![3; 10])) as ArrayRef; + + let batch = RecordBatch::try_new(schema.clone(), vec![col1, col2, col3]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for i in 1..=5 { + let file = + File::create(format!("{}/s{}.parquet", &self.table_location, i)).unwrap(); + let mut writer = + ArrowWriter::try_new(file, batch.schema(), Some(props.clone())).unwrap(); + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + } + + std::fs::metadata(format!("{}/s1.parquet", &self.table_location)) + .unwrap() + .len() + } + /// Writes identical Parquet data files (1.parquet, 2.parquet, 3.parquet) /// and returns the file size in bytes. fn write_parquet_data_files(&self) -> u64 { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 56ddbd51ba..a0558a679f 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -23,7 +23,7 @@ use crate::arrow::ArrowReaderBuilder; use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; -use crate::scan::TableScanBuilder; +use crate::scan::{IncrementalAppendScanBuilder, TableScanBuilder}; use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -224,6 +224,30 @@ impl Table { TableScanBuilder::new(self) } + /// Creates an incremental append scan starting from the given snapshot (exclusive). + /// + /// Returns only data files added in APPEND snapshots after `from_snapshot_id`, + /// up to `to_snapshot_id` or the current snapshot if `None`. + pub fn incremental_append_scan( + &self, + from_snapshot_id: i64, + to_snapshot_id: Option, + ) -> IncrementalAppendScanBuilder<'_> { + IncrementalAppendScanBuilder::new(self, from_snapshot_id, to_snapshot_id, false) + } + + /// Creates an incremental append scan starting from the given snapshot (inclusive). + /// + /// Returns only data files added in APPEND snapshots from `from_snapshot_id` (inclusive), + /// up to `to_snapshot_id` or the current snapshot if `None`. + pub fn incremental_append_scan_inclusive( + &self, + from_snapshot_id: i64, + to_snapshot_id: Option, + ) -> IncrementalAppendScanBuilder<'_> { + IncrementalAppendScanBuilder::new(self, from_snapshot_id, to_snapshot_id, true) + } + /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. pub fn inspect(&self) -> MetadataTable<'_> { @@ -312,6 +336,26 @@ impl StaticTable { self.0.scan() } + /// Creates an incremental append scan starting from the given snapshot (exclusive). + pub fn incremental_append_scan( + &self, + from_snapshot_id: i64, + to_snapshot_id: Option, + ) -> IncrementalAppendScanBuilder<'_> { + self.0 + .incremental_append_scan(from_snapshot_id, to_snapshot_id) + } + + /// Creates an incremental append scan starting from the given snapshot (inclusive). + pub fn incremental_append_scan_inclusive( + &self, + from_snapshot_id: i64, + to_snapshot_id: Option, + ) -> IncrementalAppendScanBuilder<'_> { + self.0 + .incremental_append_scan_inclusive(from_snapshot_id, to_snapshot_id) + } + /// Get TableMetadataRef for the static table pub fn metadata(&self) -> TableMetadataRef { self.0.metadata_ref() diff --git a/crates/iceberg/testdata/example_table_metadata_v2_deep_history.json b/crates/iceberg/testdata/example_table_metadata_v2_deep_history.json index a354958697..bd192ca6e2 100644 --- a/crates/iceberg/testdata/example_table_metadata_v2_deep_history.json +++ b/crates/iceberg/testdata/example_table_metadata_v2_deep_history.json @@ -1,7 +1,7 @@ { "format-version": 2, "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", - "location": "s3://bucket/test/location", + "location": "{{ table_location }}", "last-sequence-number": 34, "last-updated-ms": 1602638573590, "last-column-id": 3, @@ -53,7 +53,7 @@ "timestamp-ms": 1515100955770, "sequence-number": 0, "summary": {"operation": "append"}, - "manifest-list": "s3://bucket/metadata/snap-3051729675574597004.avro" + "manifest-list": "{{ manifest_list_s1_location }}" }, { "snapshot-id": 3055729675574597004, @@ -61,7 +61,7 @@ "timestamp-ms": 1555100955770, "sequence-number": 1, "summary": {"operation": "append"}, - "manifest-list": "s3://bucket/metadata/snap-3055729675574597004.avro", + "manifest-list": "{{ manifest_list_s2_location }}", "schema-id": 1 }, { @@ -70,7 +70,7 @@ "timestamp-ms": 1575100955770, "sequence-number": 2, "summary": {"operation": "append"}, - "manifest-list": "s3://bucket/metadata/snap-3056729675574597004.avro", + "manifest-list": "{{ manifest_list_s3_location }}", "schema-id": 1 }, { @@ -79,7 +79,7 @@ "timestamp-ms": 1595100955770, "sequence-number": 3, "summary": {"operation": "overwrite"}, - "manifest-list": "s3://bucket/metadata/snap-3057729675574597004.avro", + "manifest-list": "{{ manifest_list_s4_location }}", "schema-id": 1 }, { @@ -88,7 +88,7 @@ "timestamp-ms": 1602638573590, "sequence-number": 4, "summary": {"operation": "append"}, - "manifest-list": "s3://bucket/metadata/snap-3059729675574597004.avro", + "manifest-list": "{{ manifest_list_s5_location }}", "schema-id": 1 } ], diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 234ab26470..1176e0867d 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -36,14 +36,29 @@ use iceberg::table::Table; use super::expr_to_predicate::convert_filters_to_predicate; use crate::to_datafusion_error; +/// Describes which snapshot(s) to scan. +#[derive(Debug, Clone)] +pub(crate) enum ScanRange { + /// Scan the current (latest) snapshot. + Latest, + /// Scan a specific point-in-time snapshot. + PointInTime(i64), + /// Incremental append scan between two snapshots. + Incremental { + from: i64, + to: Option, + from_inclusive: bool, + }, +} + /// Manages the scanning process of an Iceberg [`Table`], encapsulating the /// necessary details and computed properties required for execution planning. #[derive(Debug)] pub struct IcebergTableScan { /// A table in the catalog. table: Table, - /// Snapshot of the table to scan. - snapshot_id: Option, + /// Which snapshot(s) to scan. + scan_range: ScanRange, /// Stores certain, often expensive to compute, /// plan properties used in query optimization. plan_properties: Arc, @@ -59,7 +74,7 @@ impl IcebergTableScan { /// Creates a new [`IcebergTableScan`] object. pub(crate) fn new( table: Table, - snapshot_id: Option, + scan_range: ScanRange, schema: ArrowSchemaRef, projection: Option<&Vec>, filters: &[Expr], @@ -75,7 +90,7 @@ impl IcebergTableScan { Self { table, - snapshot_id, + scan_range, plan_properties, projection, predicates, @@ -87,8 +102,9 @@ impl IcebergTableScan { &self.table } - pub fn snapshot_id(&self) -> Option { - self.snapshot_id + #[cfg(test)] + pub(crate) fn scan_range(&self) -> &ScanRange { + &self.scan_range } pub fn projection(&self) -> Option<&[String]> { @@ -148,7 +164,7 @@ impl ExecutionPlan for IcebergTableScan { ) -> DFResult { let fut = get_batch_stream( self.table.clone(), - self.snapshot_id, + self.scan_range.clone(), self.projection.clone(), self.predicates.clone(), ); @@ -205,25 +221,47 @@ impl DisplayAs for IcebergTableScan { /// /// This function initializes a [`TableScan`], builds it, /// and then converts it into a stream of Arrow [`RecordBatch`]es. +/// +/// Supports both regular point-in-time scans and incremental scans. async fn get_batch_stream( table: Table, - snapshot_id: Option, + scan_range: ScanRange, column_names: Option>, predicates: Option, ) -> DFResult> + Send>>> { - let scan_builder = match snapshot_id { - Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), - None => table.scan(), - }; + // Apply column selection, predicates, and build a TableScan. + macro_rules! configure_and_build { + ($builder:expr) => {{ + let mut b = $builder; + b = match column_names { + Some(names) => b.select(names), + None => b.select_all(), + }; + if let Some(pred) = predicates { + b = b.with_filter(pred); + } + b.build().map_err(to_datafusion_error)? + }}; + } - let mut scan_builder = match column_names { - Some(column_names) => scan_builder.select(column_names), - None => scan_builder.select_all(), + let table_scan = match scan_range { + ScanRange::Incremental { + from, + to, + from_inclusive, + } => { + let scan_builder = if from_inclusive { + table.incremental_append_scan_inclusive(from, to) + } else { + table.incremental_append_scan(from, to) + }; + configure_and_build!(scan_builder) + } + ScanRange::Latest => configure_and_build!(table.scan()), + ScanRange::PointInTime(snapshot_id) => { + configure_and_build!(table.scan().snapshot_id(snapshot_id)) + } }; - if let Some(pred) = predicates { - scan_builder = scan_builder.with_filter(pred); - } - let table_scan = scan_builder.build().map_err(to_datafusion_error)?; let stream = table_scan .to_arrow() diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..d3e421cee8 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -53,7 +53,7 @@ use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; use crate::physical_plan::project::project_with_partition; use crate::physical_plan::repartition::repartition; -use crate::physical_plan::scan::IcebergTableScan; +use crate::physical_plan::scan::{IcebergTableScan, ScanRange}; use crate::physical_plan::sort::sort_by_partition; use crate::physical_plan::write::IcebergWriteExec; @@ -139,7 +139,7 @@ impl TableProvider for IcebergTableProvider { // Create scan with fresh metadata (always use current snapshot) Ok(Arc::new(IcebergTableScan::new( table, - None, // Always use current snapshot for catalog-backed provider + ScanRange::Latest, self.schema.clone(), projection, filters, @@ -239,7 +239,7 @@ impl TableProvider for IcebergTableProvider { /// /// This provider holds a cached table instance and does not refresh metadata or support /// write operations. Use this for consistent analytical queries, time-travel scenarios, -/// or when you want to avoid catalog overhead. +/// incremental reads, or when you want to avoid catalog overhead. /// /// For catalog-backed tables with write support and automatic refresh, use /// [`IcebergTableProvider`] instead. @@ -247,10 +247,10 @@ impl TableProvider for IcebergTableProvider { pub struct IcebergStaticTableProvider { /// The static table instance (never refreshed) table: Table, - /// Optional snapshot ID for this static view - snapshot_id: Option, /// A reference-counted arrow `Schema` schema: ArrowSchemaRef, + /// Which snapshot(s) to scan + scan_range: ScanRange, } impl IcebergStaticTableProvider { @@ -261,8 +261,8 @@ impl IcebergStaticTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergStaticTableProvider { table, - snapshot_id: None, schema, + scan_range: ScanRange::Latest, }) } @@ -287,8 +287,131 @@ impl IcebergStaticTableProvider { let schema = Arc::new(schema_to_arrow_schema(&table_schema)?); Ok(IcebergStaticTableProvider { table, - snapshot_id: Some(snapshot_id), schema, + scan_range: ScanRange::PointInTime(snapshot_id), + }) + } + + /// Creates a provider for incremental scanning between two snapshots. + /// + /// Returns only data files that were added in snapshots between `from_snapshot_id` + /// (exclusive) and `to_snapshot_id` (inclusive). Only APPEND operations are supported. + /// + /// # Arguments + /// * `table` - The table to scan + /// * `from_snapshot_id` - Starting snapshot (exclusive - changes after this are included) + /// * `to_snapshot_id` - Ending snapshot (inclusive) + /// + /// # Example + /// ```ignore + /// let provider = IcebergStaticTableProvider::try_new_incremental(table, 100, 200).await?; + /// ctx.register_table("changes", Arc::new(provider))?; + /// let df = ctx.sql("SELECT * FROM changes").await?; + /// ``` + pub async fn try_new_incremental( + table: Table, + from_snapshot_id: i64, + to_snapshot_id: i64, + ) -> Result { + let snapshot = table + .metadata() + .snapshot_by_id(to_snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "to_snapshot id {to_snapshot_id} not found in table {}", + table.identifier().name() + ), + ) + })?; + let table_schema = snapshot.schema(table.metadata())?; + let schema = Arc::new(schema_to_arrow_schema(&table_schema)?); + Ok(IcebergStaticTableProvider { + table, + schema, + scan_range: ScanRange::Incremental { + from: from_snapshot_id, + to: Some(to_snapshot_id), + from_inclusive: false, + }, + }) + } + + /// Creates a provider for incremental scanning between two snapshots (inclusive). + /// + /// Returns only data files that were added in snapshots between `from_snapshot_id` + /// (inclusive) and `to_snapshot_id` (inclusive). Only APPEND operations are supported. + /// + /// # Arguments + /// * `table` - The table to scan + /// * `from_snapshot_id` - Starting snapshot (inclusive - changes from this snapshot are included) + /// * `to_snapshot_id` - Ending snapshot (inclusive) + pub async fn try_new_incremental_inclusive( + table: Table, + from_snapshot_id: i64, + to_snapshot_id: i64, + ) -> Result { + let snapshot = table + .metadata() + .snapshot_by_id(to_snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "to_snapshot id {to_snapshot_id} not found in table {}", + table.identifier().name() + ), + ) + })?; + let table_schema = snapshot.schema(table.metadata())?; + let schema = Arc::new(schema_to_arrow_schema(&table_schema)?); + Ok(IcebergStaticTableProvider { + table, + schema, + scan_range: ScanRange::Incremental { + from: from_snapshot_id, + to: Some(to_snapshot_id), + from_inclusive: true, + }, + }) + } + + /// Creates a provider for scanning all appends after a snapshot up to the current snapshot. + /// + /// Returns only data files that were added in snapshots after `from_snapshot_id` + /// up to and including the current snapshot. Only APPEND operations are supported. + /// + /// # Arguments + /// * `table` - The table to scan + /// * `from_snapshot_id` - Starting snapshot (exclusive - changes after this are included) + /// + /// # Example + /// ```ignore + /// let provider = IcebergStaticTableProvider::try_new_appends_after(table, 100).await?; + /// ctx.register_table("new_data", Arc::new(provider))?; + /// let df = ctx.sql("SELECT * FROM new_data").await?; + /// ``` + pub async fn try_new_appends_after(table: Table, from_snapshot_id: i64) -> Result { + let current_snapshot = table.metadata().current_snapshot().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "table {} has no current snapshot", + table.identifier().name() + ), + ) + })?; + let table_schema = current_snapshot.schema(table.metadata())?; + let schema = Arc::new(schema_to_arrow_schema(&table_schema)?); + Ok(IcebergStaticTableProvider { + table, + schema, + scan_range: ScanRange::Incremental { + from: from_snapshot_id, + to: None, + from_inclusive: false, + }, }) } } @@ -317,7 +440,7 @@ impl TableProvider for IcebergStaticTableProvider { // Use cached table (no refresh) Ok(Arc::new(IcebergTableScan::new( self.table.clone(), - self.snapshot_id, + self.scan_range.clone(), self.schema.clone(), projection, filters, @@ -865,4 +988,125 @@ mod tests { "Limit should be None when not specified" ); } + + // Tests for incremental scan providers + + #[tokio::test] + async fn test_static_provider_incremental_creates_scan() { + use datafusion::datasource::TableProvider; + + let table = get_test_table_from_metadata_file().await; + let snapshots: Vec<_> = table.metadata().snapshots().collect(); + + assert!(snapshots.len() >= 2); + let from_id = snapshots[0].snapshot_id(); + let to_id = snapshots[snapshots.len() - 1].snapshot_id(); + + let provider = + IcebergStaticTableProvider::try_new_incremental(table.clone(), from_id, to_id) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + let scan_plan = provider.scan(&state, None, &[], None).await.unwrap(); + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + + assert!(matches!( + iceberg_scan.scan_range(), + ScanRange::Incremental { + from, + to: Some(to), + from_inclusive: false + } if *from == from_id && *to == to_id + )); + } + + #[tokio::test] + async fn test_static_provider_incremental_inclusive() { + use datafusion::datasource::TableProvider; + + let table = get_test_table_from_metadata_file().await; + let snapshots: Vec<_> = table.metadata().snapshots().collect(); + + assert!(snapshots.len() >= 2); + let from_id = snapshots[0].snapshot_id(); + let to_id = snapshots[snapshots.len() - 1].snapshot_id(); + + let provider = IcebergStaticTableProvider::try_new_incremental_inclusive( + table.clone(), + from_id, + to_id, + ) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + let scan_plan = provider.scan(&state, None, &[], None).await.unwrap(); + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + + assert!(matches!( + iceberg_scan.scan_range(), + ScanRange::Incremental { + from, + to: Some(to), + from_inclusive: true + } if *from == from_id && *to == to_id + )); + } + + #[tokio::test] + async fn test_static_provider_appends_after() { + use datafusion::datasource::TableProvider; + + let table = get_test_table_from_metadata_file().await; + let snapshots: Vec<_> = table.metadata().snapshots().collect(); + + assert!(!snapshots.is_empty()); + let from_id = snapshots[0].snapshot_id(); + + let provider = IcebergStaticTableProvider::try_new_appends_after(table.clone(), from_id) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let state = ctx.state(); + + let scan_plan = provider.scan(&state, None, &[], None).await.unwrap(); + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + + assert!(matches!( + iceberg_scan.scan_range(), + ScanRange::Incremental { + from, + to: None, + from_inclusive: false + } if *from == from_id + )); + } + + #[tokio::test] + async fn test_static_provider_incremental_invalid_snapshot() { + let table = get_test_table_from_metadata_file().await; + + // Test with invalid to_snapshot_id + let result = + IcebergStaticTableProvider::try_new_incremental(table.clone(), 1, 999999999).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("not found")); + } }