diff --git a/Cargo.lock b/Cargo.lock index 2322771..e6893fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,9 +63,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" [[package]] name = "anyhow" @@ -462,9 +462,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.56" +version = "1.2.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aebf35691d1bfb0ac386a69bac2fde4dd276fb618cf8bf4f5318fe285e821bb2" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" dependencies = [ "find-msvc-tools", "jobserver", @@ -501,18 +501,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ "anstyle", "clap_lex", @@ -521,9 +521,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "codespan-reporting" @@ -738,9 +738,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "503f1f4a9060ae6e650d3dff5dc7a21266fea1302d890768d45b4b28586e830f" +checksum = "ea28305c211e3541c9cfcf06a23d0d8c7c824b4502ed1fdf0a6ff4ad24ee531c" dependencies = [ "arrow", "arrow-schema", @@ -793,9 +793,9 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14417a3ee4ae3d092b56cd6c1d32e8ff3e2c9ec130ecb2276ec91c89fd599399" +checksum = "78ab99b6df5f60a6ddbc515e4c05caee1192d395cf3cb67ce5d1c17e3c9b9b74" dependencies = [ "arrow", "async-trait", @@ -818,9 +818,9 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0eba824adb45a4b3ac6f0251d40df3f6a9382371cad136f4f14ac9ebc6bc10" +checksum = "77ae3d14912c0d779ada98d30dc60f3244f3c26c2446b87394629ea5c076a31c" dependencies = [ "arrow", "async-trait", @@ -841,9 +841,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0039deefbd00c56adf5168b7ca58568fb058e4ba4c5a03b09f8be371b4e434b6" +checksum = "ea2df29b9592a5d55b8238eaf67d2f21963d5a08cd1a8b7670134405206caabd" dependencies = [ "ahash", "arrow", @@ -865,9 +865,9 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec7e3e60b813048331f8fb9673583173e5d2dd8fef862834ee871fc98b57ca7" +checksum = "42639baa0049d5fffd7e283504b9b5e7b9b2e7a2dea476eed60ab0d40d999b85" dependencies = [ "futures", "log", @@ -876,9 +876,9 @@ dependencies = [ [[package]] name = "datafusion-datasource" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "802068957f620302ecf05f84ff4019601aeafd36f5f3f1334984af2e34265129" +checksum = "25951b617bb22a9619e1520450590cb2004bfcad10bcb396b961f4a1a10dcec5" dependencies = [ "arrow", "async-compression", @@ -911,9 +911,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90fc387d5067c62d494a6647d29c5ad4fcdd5a6e50ab4ea1d2568caa2d66f2cc" +checksum = "dc0b28226960ba99c50d78ac6f736ebe09eb5cb3bb9bb58194266278000ca41f" dependencies = [ "arrow", "arrow-ipc", @@ -935,9 +935,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd5e20579bb6c8bd4e6c620253972fb723822030c280dd6aa047f660d09eeba" +checksum = "f538b57b052a678b1ce860181c65d3ace5a8486312dc50b41c01dd585a773a51" dependencies = [ "arrow", "async-trait", @@ -958,9 +958,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0788b0d48fcef31880a02013ea3cc18e5a4e0eacc3b0abdd2cd0597b99dc96e" +checksum = "89fbc1d32b1b03c9734e27c0c5f041232b68621c8455f22769838634750a196c" dependencies = [ "arrow", "async-trait", @@ -980,9 +980,9 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66639b70f1f363f5f0950733170100e588f1acfacac90c1894e231194aa35957" +checksum = "203271d31fe5613a5943181db70ec98162121d1de94a9a300d5e5f19f9500a32" dependencies = [ "arrow", "async-trait", @@ -1010,15 +1010,15 @@ dependencies = [ [[package]] name = "datafusion-doc" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44b41f3e8267c6cf3eec982d63f34db9f1dd5f30abfd2e1f124f0871708952e" +checksum = "5b6450dc702b3d39e8ced54c3356abb453bd2f3cea86d90d555a4b92f7a38462" [[package]] name = "datafusion-execution" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e456f60e5d38db45335e84617006d90af14a8c8c5b8e959add708b2daaa0e2c" +checksum = "e66a02fa601de49da5181dbdcf904a18b16a184db2b31f5e5534552ea2d5e660" dependencies = [ "arrow", "async-trait", @@ -1037,9 +1037,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6507c719804265a58043134580c1c20767e7c23ba450724393f03ec982769ad9" +checksum = "cdf59a9b308a1a07dc2eb2f85e6366bc0226dc390b40f3aa0a72d79f1cfe2465" dependencies = [ "arrow", "async-trait", @@ -1060,9 +1060,9 @@ dependencies = [ [[package]] name = "datafusion-expr-common" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a413caa9c5885072b539337aed68488f0291653e8edd7d676c92df2480f6cab0" +checksum = "bd99eac4c6538c708638db43e7a3bd88e0e57955ddb722d420fb9a6d38dfc28f" dependencies = [ "arrow", "datafusion-common", @@ -1073,9 +1073,9 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "189256495dc9cbbb8e20dbcf161f60422e628d201a78df8207e44bd4baefadb6" +checksum = "11aa2c492ac046397b36d57c62a72982aad306495bbcbcdbcabd424d4a2fe245" dependencies = [ "arrow", "arrow-buffer", @@ -1104,9 +1104,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12e73dfee4cd67c4a507ffff4c5a711d39983adf544adbc09c09bf06f789f413" +checksum = "325a00081898945d48d6194d9ca26120e523c993be3bb7c084061a5a2a72e787" dependencies = [ "ahash", "arrow", @@ -1125,9 +1125,9 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87727bd9e65f4f9ac6d608c9810b7da9eaa3b18b26a4a4b76520592d49020acf" +checksum = "809bbcb1e0dbec5d0ce30d493d135aea7564f1ba4550395f7f94321223df2dae" dependencies = [ "ahash", "arrow", @@ -1138,9 +1138,9 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5ef761359224b7c2b5a1bfad6296ac63225f8583d08ad18af9ba1a89ac3887" +checksum = "29ebaa5d7024ef45973e0a7db1e9aeaa647936496f4d4061c0448f23d77d6320" dependencies = [ "arrow", "arrow-ord", @@ -1161,9 +1161,9 @@ dependencies = [ [[package]] name = "datafusion-functions-table" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b17dac25dfda2d2a90ff0ad1c054a11fb1523766226bec6e9bd8c410daee2ae" +checksum = "60eab6f39df9ee49a2c7fa38eddc01fa0086ee31b29c7d19f38e72f479609752" dependencies = [ "arrow", "async-trait", @@ -1177,9 +1177,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c594a29ddb22cbdbce500e4d99b5b2392c5cecb4c1086298b41d1ffec14dbb77" +checksum = "e00b2c15e342a90e65a846199c9e49293dd09fe1bcd63d8be2544604892f7eb8" dependencies = [ "arrow", "datafusion-common", @@ -1195,9 +1195,9 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aa1b15ed81c7543f62264a30dd49dec4b1b0b698053b968f53be32dfba4f729" +checksum = "493e2e1d1f4753dfc139a5213f1b5d0b97eea46a82d9bda3c7908aa96981b74b" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -1205,9 +1205,9 @@ dependencies = [ [[package]] name = "datafusion-macros" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c00c31c4795597aa25b74cab5174ac07a53051f27ce1e011ecaffa9eaeecef81" +checksum = "ba01c55ade8278a791b429f7bf5cb1de64de587a342d084b18245edfae7096e2" dependencies = [ "datafusion-doc", "quote", @@ -1216,9 +1216,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80ccf60767c09302b2e0fc3afebb3761a6d508d07316fab8c5e93312728a21bb" +checksum = "a80c6dfbba6a2163a9507f6353ac78c69d8deb26232c9e419160e58ff7c3e047" dependencies = [ "arrow", "chrono", @@ -1236,9 +1236,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64b7f277556944e4edd3558da01d9e9ff9f5416f1c0aa7fee088e57bd141a7e" +checksum = "5d3a86264bb9163e7360b6622e789bc7fcbb43672e78a8493f0bc369a41a57c6" dependencies = [ "ahash", "arrow", @@ -1260,9 +1260,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-adapter" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7abaee372ea2d19c016ee9ef8629c4415257d291cdd152bc7f0b75f28af1b63" +checksum = "3f5e00e524ac33500be6c5eeac940bd3f6b984ba9b7df0cd5f6c34a8a2cc4d6b" dependencies = [ "arrow", "datafusion-common", @@ -1275,9 +1275,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42237efe621f92adc22d111b531fdbc2cc38ca9b5e02327535628fb103ae2157" +checksum = "2ae769ea5d688b4e74e9be5cad6f9d9f295b540825355868a3ab942380dd97ce" dependencies = [ "ahash", "arrow", @@ -1292,9 +1292,9 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd093498bd1319c6e5c76e9dfa905e78486f01b34579ce97f2e3a49f84c37fac" +checksum = "f3588753ab2b47b0e43cd823fe5e7944df6734dabd6dafb72e2cc1c2a22f1944" dependencies = [ "arrow", "datafusion-common", @@ -1311,9 +1311,9 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cbe61b12daf81a9f20ba03bd3541165d51f86e004ef37426b11881330eed261" +checksum = "79949cbb109c2a45c527bfe0d956b9f2916807c05d4d2e66f3fd0af827ac2b61" dependencies = [ "ahash", "arrow", @@ -1342,9 +1342,9 @@ dependencies = [ [[package]] name = "datafusion-pruning" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0124331116db7f79df92ebfd2c3b11a8f90240f253555c9bb084f10b6fecf1dd" +checksum = "6434e2ee8a39d04b95fed688ff34dc251af6e4a0c2e1714716b6e3846690d589" dependencies = [ "arrow", "datafusion-common", @@ -1359,9 +1359,9 @@ dependencies = [ [[package]] name = "datafusion-session" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1673e3c58ba618a6ea0568672f00664087b8982c581e9afd5aa6c3c79c9b431f" +checksum = "c91efb8302b4877d499c37e9a71886b90236ab27d9cc42fd51112febf341abd6" dependencies = [ "async-trait", "datafusion-common", @@ -1373,9 +1373,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "52.2.0" +version = "52.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5272d256dab5347bb39d2040589f45d8c6b715b27edcb5fffe88cc8b9c3909cb" +checksum = "3f01eef7bcf4d00e87305b55f1b75792384e130fe0258bac02cd48378ae5ff87" dependencies = [ "arrow", "bigdecimal", @@ -1396,8 +1396,14 @@ dependencies = [ "arrow-array", "arrow-schema", "async-trait", + "bytes", "datafusion", "futures", + "object_store", + "parquet", + "rusqlite", + "serde_json", + "tempfile", "tokio", "tracing", "usearch", @@ -1447,6 +1453,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.3.0" @@ -1624,19 +1642,19 @@ checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 5.3.0", "wasip2", ] [[package]] name = "getrandom" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 6.0.0", "wasip2", "wasip3", ] @@ -1664,6 +1682,9 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] [[package]] name = "hashbrown" @@ -1685,6 +1706,15 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -1969,9 +1999,9 @@ checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" [[package]] name = "libc" -version = "0.2.182" +version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" [[package]] name = "liblzma" @@ -1999,6 +2029,17 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "link-cplusplus" version = "1.0.12" @@ -2037,9 +2078,9 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "lz4_flex" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e" +checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746" dependencies = [ "twox-hash", ] @@ -2143,9 +2184,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "ordered-float" @@ -2319,9 +2360,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.44" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] @@ -2332,6 +2373,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "rand" version = "0.9.2" @@ -2419,6 +2466,20 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc_version" version = "0.4.1" @@ -2659,12 +2720,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.26.0" +version = "3.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.4.1", + "getrandom 0.4.2", "once_cell", "rustix", "windows-sys 0.61.2", @@ -2731,9 +2792,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.49.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "pin-project-lite", @@ -2861,15 +2922,21 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.21.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b672338555252d43fd2240c714dc444b8c6fb0a5c5335e65a07bba7742735ddb" +checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ - "getrandom 0.4.1", + "getrandom 0.4.2", "js-sys", "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -3292,18 +3359,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.40" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a789c6e490b576db9f7e6b6d661bcc9799f7c0ac8352f56ea20193b2681532e5" +checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.40" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f65c489a7071a749c849713807783f70672b28094011623e200cb86dcb835953" +checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index bdd195f..c6f38fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,15 +5,30 @@ edition = "2024" description = "DataFusion extension for USearch HNSW vector similarity search with adaptive WHERE clause filtering" license = "MIT OR Apache-2.0" +[features] +parquet-provider = ["dep:parquet", "dep:object_store", "dep:bytes"] +sqlite-provider = ["dep:rusqlite", "dep:serde_json", "dep:parquet"] + [dependencies] -tracing = "0.1" -datafusion = "52.2.0" -usearch = "2.24.0" -arrow-array = "57.2.0" +tracing = "0.1" +datafusion = "52.2.0" +usearch = "2.24.0" +arrow-array = "57.2.0" arrow-schema = "57.2.0" -async-trait = "0.1" -futures = "0.3" -tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +async-trait = "0.1" +futures = "0.3" +# "sync" adds tokio::sync::Semaphore, used by SqliteLookupProvider's connection pool +tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] } + +# parquet-provider +parquet = { version = "57.2.0", optional = true, features = ["async", "object_store"] } +object_store = { version = "0.12", optional = true } +bytes = { version = "1", optional = true } + +# sqlite-provider +rusqlite = { version = "0.32", optional = true, features = ["bundled"] } +serde_json = { version = "1", optional = true } [dev-dependencies] -tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tempfile = "3" diff --git a/src/keys.rs b/src/keys.rs new file mode 100644 index 0000000..87f33ef --- /dev/null +++ b/src/keys.rs @@ -0,0 +1,164 @@ +// keys.rs — Key encoding utilities for packed row addresses. +// +// Bit layout: +// bits 63-48 (16 bits) file_idx — which parquet file (up to 65,536) +// bits 47-32 (16 bits) rg_idx — which row group (up to 65,536) +// bits 31-0 (32 bits) local_offset — row offset within row group (up to 4 B) + +/// Pack a physical row address into a single `u64` key for use with USearch +/// and the lookup providers. +/// +/// # Panics +/// Panics if any component exceeds its allocated bit range: +/// `file_idx` < 65 536, `rg_idx` < 65 536, `local_offset` ≤ 4 294 967 295. +#[inline] +pub fn pack_key(file_idx: usize, rg_idx: usize, local_offset: usize) -> u64 { + assert!( + file_idx < (1 << 16), + "file_idx {file_idx} overflows 16 bits" + ); + assert!(rg_idx < (1 << 16), "rg_idx {rg_idx} overflows 16 bits"); + assert!( + local_offset <= u32::MAX as usize, + "local_offset {local_offset} overflows 32 bits" + ); + ((file_idx as u64) << 48) | ((rg_idx as u64) << 32) | (local_offset as u64) +} + +/// Unpack a `u64` key back to `(file_idx, rg_idx, local_offset)`. +#[inline] +pub fn unpack_key(key: u64) -> (usize, usize, usize) { + let file_idx = (key >> 48) as usize; + let rg_idx = ((key >> 32) & 0xFFFF) as usize; + let local_offset = (key & 0xFFFF_FFFF) as usize; + (file_idx, rg_idx, local_offset) +} + +/// Physical layout of a sharded parquet dataset. +/// +/// Computed once at startup from parquet file footers (reads only the last few +/// KB of each file). Not persisted to disk. +pub struct DatasetLayout { + /// Object-store path (or S3 key) for each file, indexed by `file_idx`. + pub file_keys: Vec, + /// Cumulative row counts at the start of each file: `file_cum_rows[i]` is + /// the total number of rows in files 0..i. `file_cum_rows[n_files]` is the + /// total row count of the dataset. + pub file_cum_rows: Vec, + /// For each file, cumulative row count at the start of each row group: + /// `rg_cum_rows[file][rg]` = rows in row groups 0..rg within that file. + pub rg_cum_rows: Vec>, +} + +impl DatasetLayout { + /// Convert a packed usearch key back to a global (dataset-wide) row index. + #[inline] + pub fn packed_key_to_global(&self, key: u64) -> u64 { + let (file_idx, rg_idx, local_offset) = unpack_key(key); + self.file_cum_rows[file_idx] + self.rg_cum_rows[file_idx][rg_idx] + local_offset as u64 + } + + /// Scan parquet footers to build the layout. No vector data is read. + /// + /// `key_prefix` is prepended to each bare filename when storing the + /// object-store path in `file_keys`. It must match the prefix used when + /// constructing the `ObjectStore` passed to `ParquetLookupProvider`, so + /// that `store.get("{key_prefix}{filename}")` resolves to the correct + /// object at query time. + /// + /// # Examples + /// ```text + /// // Files at /data/shard_00.parquet, store rooted at /data + /// DatasetLayout::from_files(&["/data/shard_00.parquet"], "") + /// + /// // Files under a parquet/ subdirectory, store rooted at /data + /// DatasetLayout::from_files(&["/data/parquet/shard_00.parquet"], "parquet/") + /// + /// // Local footer reads, but keys point at S3 prefix + /// DatasetLayout::from_files(&["/local/cache/shard_00.parquet"], "year=2024/") + /// ``` + /// + /// Only compiled when the `parquet-provider` or `sqlite-provider` feature is enabled. + #[cfg(any(feature = "parquet-provider", feature = "sqlite-provider"))] + pub fn from_files(local_paths: &[&str], key_prefix: &str) -> datafusion::common::Result { + use datafusion::error::DataFusionError; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use std::fs; + use std::path::Path; + + let mut file_keys = Vec::with_capacity(local_paths.len()); + let mut file_cum_rows = vec![0u64]; + let mut rg_cum_rows: Vec> = Vec::with_capacity(local_paths.len()); + + let mut running_total = 0u64; + for &path in local_paths { + let file_name = Path::new(path) + .file_name() + .and_then(|n| n.to_str()) + .ok_or_else(|| DataFusionError::Execution(format!("invalid path: {path}")))?; + file_keys.push(format!("{key_prefix}{file_name}")); + + let f = fs::File::open(path) + .map_err(|e| DataFusionError::Execution(format!("open {path}: {e}")))?; + let builder = ParquetRecordBatchReaderBuilder::try_new(f) + .map_err(|e| DataFusionError::Execution(format!("read footer {path}: {e}")))?; + let meta = builder.metadata(); + + let mut rg_cum = vec![0u64]; + let mut file_rows = 0u64; + for rg in 0..meta.num_row_groups() { + let n = meta.row_group(rg).num_rows() as u64; + file_rows += n; + rg_cum.push(file_rows); + } + rg_cum_rows.push(rg_cum); + running_total += file_rows; + file_cum_rows.push(running_total); + } + + Ok(Self { + file_keys, + file_cum_rows, + rg_cum_rows, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pack_unpack_roundtrip() { + let cases = [ + (0, 0, 0), + (1, 2, 3), + (65535, 65535, u32::MAX as usize), + (0, 0, u32::MAX as usize), + ]; + for (fi, rg, lo) in cases { + let key = pack_key(fi, rg, lo); + assert_eq!(unpack_key(key), (fi, rg, lo)); + } + } + + #[test] + fn test_pack_key_boundary_values() { + // u32::MAX as local_offset is within range and must round-trip cleanly. + let key = pack_key(0, 0, u32::MAX as usize); + let (_, _, lo) = unpack_key(key); + assert_eq!(lo, u32::MAX as usize); + } + + #[test] + #[should_panic(expected = "overflows 32 bits")] + fn test_pack_key_local_offset_overflow_panics() { + pack_key(0, 0, u32::MAX as usize + 1); + } + + #[test] + #[should_panic(expected = "overflows 16 bits")] + fn test_pack_key_file_idx_overflow_panics() { + pack_key(1 << 16, 0, 0); + } +} diff --git a/src/lib.rs b/src/lib.rs index 129c04c..538752d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,6 +56,7 @@ //! LIMIT 10 //! ``` +pub mod keys; pub mod lookup; pub mod node; pub mod planner; @@ -64,6 +65,12 @@ pub mod rule; pub mod udf; pub mod udtf; +#[cfg(feature = "parquet-provider")] +pub mod parquet_provider; +#[cfg(feature = "sqlite-provider")] +pub mod sqlite_provider; + +pub use keys::{DatasetLayout, pack_key, unpack_key}; pub use lookup::{HashKeyProvider, PointLookupProvider}; pub use node::{DistanceType, USearchNode}; pub use planner::{USearchExec, USearchExecPlanner, USearchQueryPlanner}; @@ -72,6 +79,11 @@ pub use rule::USearchRule; pub use udf::{cosine_distance_udf, l2_distance_udf, negative_dot_product_udf}; pub use udtf::USearchUDTF; +#[cfg(feature = "parquet-provider")] +pub use parquet_provider::ParquetLookupProvider; +#[cfg(feature = "sqlite-provider")] +pub use sqlite_provider::SqliteLookupProvider; + use std::sync::Arc; use datafusion::common::Result; diff --git a/src/parquet_provider.rs b/src/parquet_provider.rs new file mode 100644 index 0000000..be4c770 --- /dev/null +++ b/src/parquet_provider.rs @@ -0,0 +1,391 @@ +// parquet_provider.rs — ParquetLookupProvider backed by any ObjectStore. +// +// No rg_map needed. Each usearch key directly encodes (file_idx, rg_idx, local_offset) +// via pack_key / unpack_key. Decoding is O(1) bitwise — no binary search. + +use std::any::Any; +use std::collections::BTreeMap; +use std::fmt; +use std::sync::Arc; + +use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array}; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use bytes::Bytes; +use datafusion::catalog::Session; +use datafusion::common::Result; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::{Expr, TableType}; +use datafusion::physical_plan::ExecutionPlan; +use futures::StreamExt; +use futures::future::{BoxFuture, try_join_all}; +use object_store::ObjectStore; +use object_store::path::Path; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::ParquetMetaData; + +use crate::keys::unpack_key; +use crate::lookup::PointLookupProvider; + +pub struct ParquetLookupProvider { + /// Object-store path per file, indexed by file_idx encoded in the usearch key. + file_keys: Vec, + store: Arc, + pub schema: SchemaRef, + /// Parquet column indices for the provider schema positions 1, 2, … + parquet_col_indices: Vec, + /// When true, use `with_row_selection` to skip pages for non-target rows. + use_row_selection: bool, + /// Decoded parquet footer per file, cached at init time so fetch_by_keys + /// never reads the footer over the network. + metadata_cache: Vec>, +} + +impl fmt::Debug for ParquetLookupProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "ParquetLookupProvider(files={}, schema_cols={})", + self.file_keys.len(), + self.schema.fields().len() + ) + } +} + +impl ParquetLookupProvider { + /// Build the provider and eagerly cache parquet footers (one round-trip per file). + pub async fn new( + file_keys: Vec, + store: Arc, + schema: SchemaRef, + parquet_col_indices: Vec, + ) -> Result { + let metadata_cache = load_metadata_cache(&file_keys, &store).await?; + Ok(Self { + file_keys, + store, + schema, + parquet_col_indices, + use_row_selection: false, + metadata_cache, + }) + } + + /// Same as [`new`], but enables row-selection to skip non-target pages during fetch. + /// + /// [`new`]: ParquetLookupProvider::new + pub async fn new_with_row_selection( + file_keys: Vec, + store: Arc, + schema: SchemaRef, + parquet_col_indices: Vec, + ) -> Result { + let metadata_cache = load_metadata_cache(&file_keys, &store).await?; + Ok(Self { + file_keys, + store, + schema, + parquet_col_indices, + use_row_selection: true, + metadata_cache, + }) + } +} + +/// Wraps `ParquetObjectReader` and short-circuits `get_metadata` with a +/// pre-loaded footer, eliminating 2 HTTP round trips per query. +struct CachedMetaReader { + inner: ParquetObjectReader, + meta: Arc, +} + +impl AsyncFileReader for CachedMetaReader { + fn get_bytes( + &mut self, + range: std::ops::Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + _options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let meta = self.meta.clone(); + Box::pin(async move { Ok(meta) }) + } +} + +/// Read the parquet footer for every file in parallel and cache it. +/// Called once at provider init — eliminates all per-query footer fetches. +async fn load_metadata_cache( + file_keys: &[String], + store: &Arc, +) -> Result>> { + let futs = file_keys.iter().map(|key| { + let store = store.clone(); + let key = key.clone(); + async move { + let path = Path::parse(&key).map_err(|e| DataFusionError::Execution(format!("{e}")))?; + let reader = ParquetObjectReader::new(store, path); + let builder = ParquetRecordBatchStreamBuilder::new(reader) + .await + .map_err(|e| DataFusionError::Execution(format!("{e}")))?; + Ok::, DataFusionError>(builder.metadata().clone()) + } + }); + try_join_all(futs).await +} + +#[async_trait] +impl PointLookupProvider for ParquetLookupProvider { + async fn fetch_by_keys( + &self, + keys: &[u64], + _key_col: &str, + projection: Option<&[usize]>, + ) -> Result> { + if keys.is_empty() { + return Ok(vec![]); + } + + // Decode every key in O(1) and group by (file_idx, rg_idx). + let mut groups: BTreeMap<(usize, usize), Vec<(u64, usize)>> = BTreeMap::new(); + for &key in keys { + let (file_idx, rg_idx, local_offset) = unpack_key(key); + groups + .entry((file_idx, rg_idx)) + .or_default() + .push((key, local_offset)); + } + + // Build parquet column projection (same for every row group read). + let selected_parquet_cols: Arc> = Arc::new(match projection { + None => self.parquet_col_indices.clone(), + Some(idxs) => idxs + .iter() + .filter(|&&i| i > 0 && (i - 1) < self.parquet_col_indices.len()) + .map(|&i| self.parquet_col_indices[i - 1]) + .collect(), + }); + let out_schema: SchemaRef = match projection { + None => self.schema.clone(), + Some(idxs) => Arc::new(arrow_schema::Schema::new( + idxs.iter() + .map(|&i| self.schema.field(i).clone()) + .collect::>(), + )), + }; + let projection_owned: Option>> = projection.map(|p| Arc::new(p.to_vec())); + let use_row_selection = self.use_row_selection; + + // Fan out all row-group reads concurrently. + let futures: Vec<_> = groups + .into_iter() + .map(|((file_idx, rg_idx), mut kv_pairs)| { + let store = self.store.clone(); + let file_key = self.file_keys.get(file_idx).cloned(); + let cached_meta = self.metadata_cache.get(file_idx).cloned(); + let n_files = self.file_keys.len(); + let selected_parquet_cols = selected_parquet_cols.clone(); + let out_schema = out_schema.clone(); + let projection_owned = projection_owned.clone(); + + async move { + let file_key = file_key.ok_or_else(|| { + DataFusionError::Execution(format!( + "packed key references file_idx={file_idx} but provider has {n_files} files" + )) + })?; + let cached_meta = cached_meta.ok_or_else(|| { + DataFusionError::Execution(format!( + "packed key references file_idx={file_idx} but metadata cache has {n_files} entries" + )) + })?; + + let path = Path::parse(&file_key) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let reader = CachedMetaReader { + inner: ParquetObjectReader::new(store, path), + meta: cached_meta, + }; + let builder = ParquetRecordBatchStreamBuilder::new(reader) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let mask = ProjectionMask::roots( + builder.parquet_schema(), + selected_parquet_cols.iter().copied(), + ); + + // Sort kv_pairs by local_offset so row_idx values are in order. + kv_pairs.sort_by_key(|&(_, l)| l); + + let (stream_builder, global_keys) = if use_row_selection { + let mut sel: Vec = Vec::new(); + let mut prev = 0usize; + for &(_, off) in &kv_pairs { + if off > prev { + sel.push(RowSelector::skip(off - prev)); + } + sel.push(RowSelector::select(1)); + prev = off + 1; + } + let row_selection = RowSelection::from(sel); + let gkeys: Vec = kv_pairs.iter().map(|&(k, _)| k).collect(); + ( + builder + .with_projection(mask) + .with_row_groups(vec![rg_idx]) + .with_row_selection(row_selection), + gkeys, + ) + } else { + ( + builder.with_projection(mask).with_row_groups(vec![rg_idx]), + Vec::new(), + ) + }; + + let mut stream = stream_builder + .build() + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let mut rg_batches: Vec = Vec::new(); + while let Some(r) = stream.next().await { + rg_batches.push(r.map_err(|e| DataFusionError::External(Box::new(e)))?); + } + if rg_batches.is_empty() { + return Ok::, DataFusionError>(None); + } + + let combined = if rg_batches.len() == 1 { + rg_batches.remove(0) + } else { + let schema = rg_batches[0].schema(); + datafusion::arrow::compute::concat_batches(&schema, &rg_batches) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))? + }; + + let (filtered, global_keys) = if use_row_selection { + let cols: Vec> = combined.columns().to_vec(); + (cols, global_keys) + } else { + let local_set: std::collections::HashSet = + kv_pairs.iter().map(|&(_, l)| l).collect(); + let n_rows = combined.num_rows(); + let mask_bools: Vec = + (0..n_rows).map(|i| local_set.contains(&i)).collect(); + if !mask_bools.iter().any(|&b| b) { + return Ok(None); + } + let bool_arr = BooleanArray::from(mask_bools); + let cols: Vec> = combined + .columns() + .iter() + .map(|col| { + datafusion::arrow::compute::filter(col.as_ref(), &bool_arr) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }) + .collect::>()?; + let gkeys: Vec = (0..n_rows) + .filter(|i| local_set.contains(i)) + .map(|local| { + kv_pairs + .iter() + .find(|&&(_, l)| l == local) + .map(|&(k, _)| k) + .unwrap_or(u64::MAX) + }) + .collect(); + (cols, gkeys) + }; + + // `filtered` is in parquet schema order (ascending column index), + // but `selected_parquet_cols` is in `idxs` order. Reorder so that + // filtered[i] corresponds to selected_parquet_cols[i]. + let filtered = { + let mut order: Vec<(usize, usize)> = selected_parquet_cols + .iter() + .enumerate() + .map(|(pos, &pc)| (pc, pos)) + .collect(); + order.sort_by_key(|&(pc, _)| pc); + let mut reordered = vec![None::>; filtered.len()]; + for (j, (_, out_pos)) in order.into_iter().enumerate() { + reordered[out_pos] = Some(filtered[j].clone()); + } + reordered.into_iter().map(|c| c.unwrap()).collect::>() + }; + + let row_idx_arr: Arc = Arc::new(UInt64Array::from(global_keys)); + + let out_cols: Vec> = match projection_owned.as_deref() { + None => { + let mut cols = vec![row_idx_arr]; + cols.extend(filtered); + cols + } + Some(idxs) => { + let mut content = filtered.into_iter(); + idxs.iter() + .map(|&i| { + if i == 0 { + Ok(row_idx_arr.clone()) + } else { + content.next().ok_or_else(|| { + DataFusionError::Execution( + "projection column mismatch".into(), + ) + }) + } + }) + .collect::>>()? + } + }; + + let batch = RecordBatch::try_new(out_schema, out_cols) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + Ok(Some(batch)) + } + }) + .collect(); + + let results: Vec> = try_join_all(futures).await?; + Ok(results.into_iter().flatten().collect()) + } +} + +#[async_trait] +impl datafusion::catalog::TableProvider for ParquetLookupProvider { + fn as_any(&self) -> &dyn Any { + self + } + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Err(DataFusionError::NotImplemented( + "ParquetLookupProvider does not support full table scans; use fetch_by_keys".into(), + )) + } +} diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs new file mode 100644 index 0000000..b636093 --- /dev/null +++ b/src/sqlite_provider.rs @@ -0,0 +1,682 @@ +// sqlite_provider.rs — SQLite-backed PointLookupProvider. +// +// Stores all non-embedding columns in a local SQLite database (bundled libsqlite3). +// Scalar columns map to INTEGER/TEXT/REAL; list columns are serialised as JSON TEXT. +// Lookups use `WHERE row_idx IN (?, ...)` against the INTEGER PRIMARY KEY B-tree. +// +// Schema: row_idx INTEGER PRIMARY KEY, TEXT/INTEGER/REAL, ... +// +// Persistence: the database is written once to the given path and reused on +// subsequent runs. The first build reads all parquet files and inserts rows +// inside a single transaction. + +use std::any::Any; +use std::fmt; +use std::sync::{Arc, Mutex}; + +use arrow_array::builder::{Int32Builder, Int64Builder, ListBuilder, StringBuilder}; +use arrow_array::{ + Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + UInt32Array, UInt64Array, +}; +use arrow_schema::{DataType, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::Result as DFResult; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::{Expr, TableType}; +use datafusion::physical_plan::ExecutionPlan; +use rusqlite::{Connection, types::Value as SqlValue}; +use tokio::sync::Semaphore; + +use crate::keys::{DatasetLayout, pack_key}; +use crate::lookup::PointLookupProvider; + +// ── Provider ────────────────────────────────────────────────────────────────── + +pub struct SqliteLookupProvider { + schema: SchemaRef, + table_name: String, + pool: Arc>>, + sem: Arc, +} + +impl fmt::Debug for SqliteLookupProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "SqliteLookupProvider(table={}, schema_cols={})", + self.table_name, + self.schema.fields().len() + ) + } +} + +/// RAII guard that returns the connection to the pool on drop, even on panic. +struct ConnGuard { + pool: Arc>>, + conn: Option, +} + +impl ConnGuard { + fn new(pool: Arc>>, conn: Connection) -> Self { + Self { + pool, + conn: Some(conn), + } + } +} + +impl Drop for ConnGuard { + fn drop(&mut self) { + if let Some(c) = self.conn.take() { + // best-effort: ignore poison so a panicking query doesn't + // permanently shrink the pool. + let _ = self.pool.lock().map(|mut p| p.push(c)); + } + } +} + +/// Double-quote a SQLite identifier, escaping embedded double-quotes by +/// doubling them. This prevents SQL injection when a caller-supplied name +/// is interpolated into a statement as an identifier. +fn quote_ident(name: &str) -> String { + format!("\"{}\"", name.replace('"', "\"\"")) +} + +fn open_conn(db_path: &str) -> DFResult { + let conn = Connection::open(db_path).map_err(|e| DataFusionError::Execution(e.to_string()))?; + conn.execute_batch( + "PRAGMA journal_mode = WAL; + PRAGMA synchronous = NORMAL; + PRAGMA cache_size = -65536;", + ) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + Ok(conn) +} + +impl SqliteLookupProvider { + /// Open the existing SQLite database at `db_path`, or build it from + /// parquet files on first run. Opens a pool of `pool_size` read + /// connections (WAL allows N concurrent readers). + /// + /// `local_parquet_files`, `layout`, `schema`, and `parquet_col_indices` + /// are only used if the table does not yet exist. + #[allow(clippy::too_many_arguments)] + pub fn open_or_build( + db_path: &str, + table_name: &str, + pool_size: usize, + local_parquet_files: &[String], + layout: &DatasetLayout, + schema: SchemaRef, + parquet_col_indices: &[usize], + ) -> DFResult { + if pool_size == 0 { + return Err(DataFusionError::Execution( + "pool_size must be at least 1".into(), + )); + } + let conn = open_conn(db_path)?; + + let table_exists: bool = conn + .query_row( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", + rusqlite::params![table_name], + |row| row.get::<_, i64>(0), + ) + .map_err(|e| DataFusionError::Execution(e.to_string()))? + > 0; + + if table_exists { + let n: i64 = conn + .query_row( + &format!("SELECT COUNT(*) FROM {}", quote_ident(table_name)), + [], + |row| row.get(0), + ) + .unwrap_or(0); + tracing::info!( + "SQLite table '{}' already exists ({} rows), skipping build.", + table_name, + n + ); + } else { + tracing::info!( + "First run: building SQLite table '{}' (one-time).", + table_name + ); + build_table( + &conn, + table_name, + local_parquet_files, + layout, + &schema, + parquet_col_indices, + )?; + } + + let mut conns = vec![conn]; + for _ in 1..pool_size { + conns.push(open_conn(db_path)?); + } + Ok(Self { + schema, + table_name: table_name.to_string(), + pool: Arc::new(Mutex::new(conns)), + sem: Arc::new(Semaphore::new(pool_size)), + }) + } +} + +// ── PointLookupProvider ─────────────────────────────────────────────────────── + +#[async_trait] +impl PointLookupProvider for SqliteLookupProvider { + async fn fetch_by_keys( + &self, + keys: &[u64], + _key_col: &str, + projection: Option<&[usize]>, + ) -> DFResult> { + if keys.is_empty() { + return Ok(vec![]); + } + + let out_schema = match projection { + None => self.schema.clone(), + Some(idxs) => Arc::new(arrow_schema::Schema::new( + idxs.iter() + .map(|&i| self.schema.field(i).clone()) + .collect::>(), + )), + }; + let keys_vec = keys.to_vec(); + let pool = self.pool.clone(); + let table_name = self.table_name.clone(); + + // Acquire a semaphore permit to bound concurrency to the pool size, + // then run the synchronous SQLite query on a blocking thread. + let _permit = self + .sem + .acquire() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let result = tokio::task::spawn_blocking(move || { + let conn = pool + .lock() + .map_err(|e| { + DataFusionError::Execution(format!("connection pool mutex poisoned: {e}")) + })? + .pop() + .ok_or_else(|| { + DataFusionError::Execution("connection pool unexpectedly empty".into()) + })?; + let guard = ConnGuard::new(pool, conn); + let res = execute_query_sync( + guard.conn.as_ref().unwrap(), + &keys_vec, + &out_schema, + &table_name, + ); + drop(guard); // explicit but not required — Drop handles it + res + }) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))??; + + Ok(result) + } +} + +fn execute_query_sync( + conn: &Connection, + keys: &[u64], + out_schema: &SchemaRef, + table_name: &str, +) -> DFResult> { + let placeholders = keys.iter().map(|_| "?").collect::>().join(", "); + // Select only the columns in out_schema (already projection-applied by the + // caller) so we don't fetch unused columns from SQLite. + let col_list = out_schema + .fields() + .iter() + .map(|f| quote_ident(f.name())) + .collect::>() + .join(", "); + let sql = format!( + "SELECT {col_list} FROM {tn} WHERE row_idx IN ({placeholders}) ORDER BY row_idx", + tn = quote_ident(table_name) + ); + + let n_out = out_schema.fields().len(); + let mut col_bufs: Vec> = vec![Vec::with_capacity(keys.len()); n_out]; + + let key_params: Vec = keys.iter().map(|&k| SqlValue::Integer(k as i64)).collect(); + + let mut stmt = conn + .prepare(&sql) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut rows = stmt + .query(rusqlite::params_from_iter(key_params.iter())) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + while let Some(row) = rows + .next() + .map_err(|e| DataFusionError::Execution(e.to_string()))? + { + for (out_idx, buf) in col_bufs.iter_mut().enumerate() { + let v: SqlValue = row + .get(out_idx) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + buf.push(v); + } + } + + if col_bufs.first().is_none_or(|v| v.is_empty()) { + return Ok(vec![]); + } + + let arrays: Vec = out_schema + .fields() + .iter() + .zip(col_bufs) + .map(|(field, values)| sql_values_to_arrow(field.data_type(), values)) + .collect::>()?; + + let batch = RecordBatch::try_new(out_schema.clone(), arrays) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + Ok(vec![batch]) +} + +// ── TableProvider ───────────────────────────────────────────────────────────── + +#[async_trait] +impl TableProvider for SqliteLookupProvider { + fn as_any(&self) -> &dyn Any { + self + } + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + Err(DataFusionError::NotImplemented( + "SqliteLookupProvider does not support full table scans; use fetch_by_keys".into(), + )) + } +} + +// ── Build helpers ───────────────────────────────────────────────────────────── + +fn build_table( + conn: &Connection, + table_name: &str, + parquet_files: &[String], + layout: &DatasetLayout, + schema: &SchemaRef, + parquet_col_indices: &[usize], +) -> DFResult<()> { + let col_defs = schema + .fields() + .iter() + .map(|f| { + let sql_type = arrow_type_to_sql(f.data_type()); + if f.name() == "row_idx" { + "row_idx INTEGER PRIMARY KEY".to_string() + } else { + format!("{} {}", quote_ident(f.name()), sql_type) + } + }) + .collect::>() + .join(", "); + + let placeholders = schema + .fields() + .iter() + .map(|_| "?") + .collect::>() + .join(", "); + let insert_sql = format!( + "INSERT INTO {} VALUES ({placeholders})", + quote_ident(table_name) + ); + + // CREATE TABLE and all INSERTs share one transaction so a mid-build crash + // leaves no half-built table. If the table exists with zero rows on the + // next startup, open_or_build would wrongly skip the build; atomicity + // ensures the table either doesn't exist or is fully populated. + let tx = conn + .unchecked_transaction() + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + { + tx.execute_batch(&format!( + "CREATE TABLE {} ({col_defs});", + quote_ident(table_name) + )) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut stmt = tx + .prepare(&insert_sql) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + for (file_idx, file_path) in parquet_files.iter().enumerate() { + let f = std::fs::File::open(file_path) + .map_err(|e| DataFusionError::Execution(format!("open {file_path}: {e}")))?; + let builder = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(f) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + let reader = builder + .with_batch_size(2048) + .build() + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + let mut file_row: u64 = 0; + + for batch_result in reader { + let batch = batch_result.map_err(|e| DataFusionError::Execution(e.to_string()))?; + let n = batch.num_rows(); + + for row_i in 0..n { + let r = file_row + row_i as u64; + let rg = layout.rg_cum_rows[file_idx].partition_point(|&s| s <= r) - 1; + let lo = (r - layout.rg_cum_rows[file_idx][rg]) as usize; + let packed_key = pack_key(file_idx, rg, lo); + + let mut params: Vec = Vec::with_capacity(schema.fields().len()); + params.push(SqlValue::Integer(packed_key as i64)); + + for &ci in parquet_col_indices { + params.push(arrow_cell_to_sql(batch.column(ci), row_i)); + } + + stmt.execute(rusqlite::params_from_iter(params.iter())) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + } + file_row += n as u64; + } + } + } + tx.commit() + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + tracing::info!("SQLite table '{}' built and committed.", table_name); + Ok(()) +} + +// ── Type conversion helpers ─────────────────────────────────────────────────── + +fn arrow_type_to_sql(dt: &DataType) -> &'static str { + match dt { + DataType::UInt64 | DataType::UInt32 | DataType::Int32 | DataType::Int64 => "INTEGER", + DataType::Float32 | DataType::Float64 => "REAL", + _ => "TEXT", // Utf8, LargeUtf8, List variants → TEXT (JSON for lists) + } +} + +fn arrow_cell_to_sql(col: &ArrayRef, row: usize) -> SqlValue { + if col.is_null(row) { + return SqlValue::Null; + } + match col.data_type() { + DataType::Utf8 => { + let v = col + .as_any() + .downcast_ref::() + .unwrap() + .value(row); + SqlValue::Text(v.to_string()) + } + DataType::LargeUtf8 => { + let v = col + .as_any() + .downcast_ref::() + .unwrap() + .value(row); + SqlValue::Text(v.to_string()) + } + DataType::Int32 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Int64 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row), + ), + DataType::UInt32 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + // UInt64 values > i64::MAX (2^63-1) will wrap to negative when cast to + // SQLite INTEGER. This is acceptable for packed usearch keys (which use + // only 63 bits) but callers storing arbitrary u64 data should be aware. + DataType::UInt64 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Float32 => SqlValue::Real( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as f64, + ), + DataType::Float64 => SqlValue::Real( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row), + ), + DataType::List(_) | DataType::LargeList(_) => SqlValue::Text(serialize_list(col, row)), + _ => SqlValue::Null, + } +} + +fn serialize_list(col: &ArrayRef, row: usize) -> String { + use serde_json::Value as JV; + + let list_val: ArrayRef = + if let Some(arr) = col.as_any().downcast_ref::() { + arr.value(row) + } else if let Some(arr) = col.as_any().downcast_ref::() { + arr.value(row) + } else { + return "[]".to_string(); + }; + + let items: Vec = (0..list_val.len()) + .map(|i| { + if list_val.is_null(i) { + return JV::Null; + } + match list_val.data_type() { + DataType::Utf8 => { + let s = list_val + .as_any() + .downcast_ref::() + .unwrap() + .value(i); + JV::String(s.to_string()) + } + DataType::LargeUtf8 => { + let s = list_val + .as_any() + .downcast_ref::() + .unwrap() + .value(i); + JV::String(s.to_string()) + } + DataType::Int64 => { + let v = list_val + .as_any() + .downcast_ref::() + .unwrap() + .value(i); + JV::Number(v.into()) + } + DataType::Int32 => { + let v = list_val + .as_any() + .downcast_ref::() + .unwrap() + .value(i); + JV::Number(v.into()) + } + _ => JV::Null, + } + }) + .collect(); + + serde_json::to_string(&items).unwrap_or_else(|_| "[]".to_string()) +} + +fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { + Ok(match dt { + DataType::UInt64 => { + let arr: UInt64Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as u64), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::UInt32 => { + let arr: UInt32Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as u32), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Int32 => { + let mut b = Int32Builder::with_capacity(values.len()); + for v in &values { + match v { + SqlValue::Integer(i) => b.append_value(*i as i32), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } + DataType::Int64 => { + let mut b = Int64Builder::with_capacity(values.len()); + for v in &values { + match v { + SqlValue::Integer(i) => b.append_value(*i), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } + DataType::Utf8 => { + let mut b = StringBuilder::with_capacity(values.len(), values.len() * 32); + for v in &values { + match v { + SqlValue::Text(s) => b.append_value(s), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } + DataType::List(item_field) => match item_field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + let mut b = + ListBuilder::new(StringBuilder::new()).with_field(item_field.as_ref().clone()); + for v in &values { + match v { + SqlValue::Text(s) => { + let items: Vec> = + serde_json::from_str(s).unwrap_or_default(); + for item in items { + b.values().append_option(item); + } + b.append(true); + } + _ => b.append(false), + } + } + Arc::new(b.finish()) + } + DataType::Int64 => { + let mut b = + ListBuilder::new(Int64Builder::new()).with_field(item_field.as_ref().clone()); + for v in &values { + match v { + SqlValue::Text(s) => { + let items: Vec> = + serde_json::from_str(s).unwrap_or_default(); + for item in items { + b.values().append_option(item); + } + b.append(true); + } + _ => b.append(false), + } + } + Arc::new(b.finish()) + } + DataType::Int32 => { + let mut b = + ListBuilder::new(Int32Builder::new()).with_field(item_field.as_ref().clone()); + for v in &values { + match v { + SqlValue::Text(s) => { + let items: Vec> = + serde_json::from_str(s).unwrap_or_default(); + for item in items { + b.values().append_option(item); + } + b.append(true); + } + _ => b.append(false), + } + } + Arc::new(b.finish()) + } + inner => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: unsupported list item type {inner:?}" + )))?, + }, + DataType::Float64 => { + let arr: Float64Array = values + .iter() + .map(|v| match v { + SqlValue::Real(f) => Some(*f), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Float32 => { + let arr: Float32Array = values + .iter() + .map(|v| match v { + SqlValue::Real(f) => Some(*f as f32), + _ => None, + }) + .collect(); + Arc::new(arr) + } + other => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: unsupported Arrow type {other:?}" + )))?, + }) +} diff --git a/tests/parquet_provider_test.rs b/tests/parquet_provider_test.rs new file mode 100644 index 0000000..fad36ea --- /dev/null +++ b/tests/parquet_provider_test.rs @@ -0,0 +1,297 @@ +#![cfg(feature = "parquet-provider")] + +use std::sync::Arc; + +use arrow_array::{RecordBatch, StringArray, UInt32Array, UInt64Array}; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::catalog::TableProvider; +use datafusion::prelude::SessionContext; +use datafusion_vector_search_ext::{ParquetLookupProvider, PointLookupProvider, pack_key}; +use object_store::local::LocalFileSystem; +use parquet::arrow::ArrowWriter; +use tempfile::tempdir; + +/// Write a small 3-row, 1-column parquet file to `dir/test.parquet` and +/// return a `ParquetLookupProvider` wrapping it. +async fn make_provider(dir: &tempfile::TempDir) -> ParquetLookupProvider { + // Parquet schema: just one column (without synthesised row_idx). + let parquet_schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)])); + + // Provider schema = row_idx (synthesised) + name. + let provider_schema = Arc::new(Schema::new(vec![ + Field::new("row_idx", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + parquet_schema.clone(), + vec![Arc::new(StringArray::from(vec![ + Some("alice"), + Some("bob"), + Some("carol"), + ]))], + ) + .unwrap(); + + let file_path = dir.path().join("test.parquet"); + let file = std::fs::File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, parquet_schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap()); + // parquet_col_indices: provider col 1 (name) maps to parquet col 0 + ParquetLookupProvider::new( + vec!["test.parquet".to_string()], + store, + provider_schema, + vec![0], + ) + .await + .unwrap() +} + +#[tokio::test] +async fn test_fetch_existing_keys() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir).await; + + // Fetch rows 0 and 2 (alice and carol). + let key0 = pack_key(0, 0, 0); + let key2 = pack_key(0, 0, 2); + let batches = provider + .fetch_by_keys(&[key0, key2], "row_idx", None) + .await + .unwrap(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + + // Both batches combined should contain the correct names. + let names: Vec = batches + .iter() + .flat_map(|b| { + b.column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|s| s.to_string()) + }) + .collect(); + assert!(names.contains(&"alice".to_string())); + assert!(names.contains(&"carol".to_string())); + assert!(!names.contains(&"bob".to_string())); +} + +#[tokio::test] +async fn test_projection() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir).await; + + let key1 = pack_key(0, 0, 1); + // Project only row_idx (index 0). + let batches = provider + .fetch_by_keys(&[key1], "row_idx", Some(&[0])) + .await + .unwrap(); + + assert!(!batches.is_empty()); + // Returned schema should have only row_idx. + assert_eq!(batches[0].schema().fields().len(), 1); + assert_eq!(batches[0].schema().field(0).name(), "row_idx"); + + let row_idx_col = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(row_idx_col.value(0), key1); +} + +#[tokio::test] +async fn test_missing_keys_return_empty() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir).await; + + // Keys that reference file_idx=1 which doesn't exist — provider would + // panic on index, so use a local_offset beyond the 3-row file instead. + // Actually the easiest way is to request a row that passes filter but + // whose local_offset is beyond the 3 rows → bool mask is all false → None. + let missing = pack_key(0, 0, 99); // local_offset 99, file has only 3 rows + let batches = provider + .fetch_by_keys(&[missing], "row_idx", None) + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 0); +} + +#[tokio::test] +async fn test_empty_key_slice() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir).await; + + let batches = provider.fetch_by_keys(&[], "row_idx", None).await.unwrap(); + assert!(batches.is_empty()); +} + +#[tokio::test] +async fn test_row_selection_variant() { + let dir = tempdir().unwrap(); + let parquet_schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)])); + let provider_schema = Arc::new(Schema::new(vec![ + Field::new("row_idx", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + parquet_schema.clone(), + vec![Arc::new(StringArray::from(vec![ + Some("alice"), + Some("bob"), + Some("carol"), + ]))], + ) + .unwrap(); + + let file_path = dir.path().join("test_rs.parquet"); + let file = std::fs::File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, parquet_schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap()); + let provider = ParquetLookupProvider::new_with_row_selection( + vec!["test_rs.parquet".to_string()], + store, + provider_schema, + vec![0], + ) + .await + .unwrap(); + + let key0 = pack_key(0, 0, 0); + let key2 = pack_key(0, 0, 2); + let batches = provider + .fetch_by_keys(&[key0, key2], "row_idx", None) + .await + .unwrap(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); +} + +/// Regression test for the projection column ordering bug: +/// when `selected_parquet_cols` is not monotonically increasing, +/// `ProjectionMask::roots` still returns columns in parquet schema order. +/// The provider must reorder them back to match the requested `idxs` order. +#[tokio::test] +async fn test_projection_non_monotonic_column_order() { + let dir = tempdir().unwrap(); + + // Parquet schema: col_a (UInt32, parquet idx 0), col_b (Utf8, parquet idx 1). + let parquet_schema = Arc::new(Schema::new(vec![ + Field::new("col_a", DataType::UInt32, false), + Field::new("col_b", DataType::Utf8, true), + ])); + // Provider schema: row_idx (idx 0), col_a (idx 1, parquet 0), col_b (idx 2, parquet 1). + let provider_schema = Arc::new(Schema::new(vec![ + Field::new("row_idx", DataType::UInt64, false), + Field::new("col_a", DataType::UInt32, false), + Field::new("col_b", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + parquet_schema.clone(), + vec![ + Arc::new(UInt32Array::from(vec![10u32, 20, 30])), + Arc::new(StringArray::from(vec![ + Some("alice"), + Some("bob"), + Some("carol"), + ])), + ], + ) + .unwrap(); + + let file_path = dir.path().join("two_col.parquet"); + let file = std::fs::File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, parquet_schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap()); + // parquet_col_indices: provider col 1 → parquet col 0 (col_a), + // provider col 2 → parquet col 1 (col_b). + let provider = ParquetLookupProvider::new( + vec!["two_col.parquet".to_string()], + store, + provider_schema, + vec![0, 1], + ) + .await + .unwrap(); + + // Project [col_b (2), col_a (1)] — reverse order, non-monotonic parquet indices. + // selected_parquet_cols becomes [1, 0]; without the reorder fix the values + // would be swapped (col_a value 10 returned as col_b, etc.). + let key0 = pack_key(0, 0, 0); + let batches = provider + .fetch_by_keys(&[key0], "row_idx", Some(&[2, 1])) + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.schema().field(0).name(), "col_b"); + assert_eq!(batch.schema().field(1).name(), "col_a"); + + let col_b = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let col_a = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col_b.value(0), "alice"); + assert_eq!(col_a.value(0), 10); +} + +/// Regression test for the stale-key bounds check: +/// a packed key referencing a file_idx beyond the provider's file list must +/// return Err, not panic via an out-of-bounds index. +#[tokio::test] +async fn test_stale_file_idx_returns_error() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir).await; // single-file provider (file_idx 0 only) + + let stale_key = pack_key(1, 0, 0); // file_idx=1 doesn't exist + let result = provider.fetch_by_keys(&[stale_key], "row_idx", None).await; + assert!(result.is_err(), "expected Err for out-of-bounds file_idx"); + assert!(result.unwrap_err().to_string().contains("file_idx=1")); +} + +/// Regression test for the silent-empty-scan bug: +/// scan() used to return an empty MemTable, producing zero rows with no error. +/// It must now return NotImplemented so callers get a clear failure. +#[tokio::test] +async fn test_scan_returns_not_implemented() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir).await; + + let ctx = SessionContext::new(); + let state = ctx.state(); + let result = provider.scan(&state, None, &[], None).await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("not support full table scans"), + "expected NotImplemented error, got: {err}" + ); +} diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs new file mode 100644 index 0000000..a08121d --- /dev/null +++ b/tests/sqlite_provider_test.rs @@ -0,0 +1,211 @@ +#![cfg(feature = "sqlite-provider")] + +use std::sync::Arc; + +use arrow_array::{RecordBatch, StringArray, UInt64Array}; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::catalog::TableProvider; +use datafusion::prelude::SessionContext; +use datafusion_vector_search_ext::{ + DatasetLayout, PointLookupProvider, SqliteLookupProvider, pack_key, +}; +use parquet::arrow::ArrowWriter; +use tempfile::tempdir; + +/// Write a small 3-row parquet file and build a `SqliteLookupProvider` from it. +fn make_provider(dir: &tempfile::TempDir) -> SqliteLookupProvider { + let parquet_schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)])); + + // Provider schema = row_idx (synthesised) + name. + let provider_schema = Arc::new(Schema::new(vec![ + Field::new("row_idx", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + parquet_schema.clone(), + vec![Arc::new(StringArray::from(vec![ + Some("alice"), + Some("bob"), + Some("carol"), + ]))], + ) + .unwrap(); + + let parquet_path = dir.path().join("test.parquet"); + let file = std::fs::File::create(&parquet_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, parquet_schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Build a minimal DatasetLayout for 1 file with 1 row group of 3 rows. + let layout = DatasetLayout { + file_keys: vec!["parquet/test.parquet".to_string()], + file_cum_rows: vec![0, 3], + rg_cum_rows: vec![vec![0, 3]], + }; + + let db_path = dir.path().join("test.db"); + let parquet_files = vec![parquet_path.to_str().unwrap().to_string()]; + + SqliteLookupProvider::open_or_build( + db_path.to_str().unwrap(), + "models", + 4, + &parquet_files, + &layout, + provider_schema, + &[0], // parquet col 0 (name) → provider col 1 + ) + .unwrap() +} + +#[tokio::test] +async fn test_fetch_existing_keys() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir); + + let key0 = pack_key(0, 0, 0); + let key2 = pack_key(0, 0, 2); + let batches = provider + .fetch_by_keys(&[key0, key2], "row_idx", None) + .await + .unwrap(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2); + + let names: Vec = batches + .iter() + .flat_map(|b| { + b.column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + .map(|s| s.to_string()) + }) + .collect(); + assert!(names.contains(&"alice".to_string())); + assert!(names.contains(&"carol".to_string())); + assert!(!names.contains(&"bob".to_string())); +} + +#[tokio::test] +async fn test_projection() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir); + + let key1 = pack_key(0, 0, 1); + // Project only row_idx (index 0). + let batches = provider + .fetch_by_keys(&[key1], "row_idx", Some(&[0])) + .await + .unwrap(); + + assert!(!batches.is_empty()); + assert_eq!(batches[0].schema().fields().len(), 1); + assert_eq!(batches[0].schema().field(0).name(), "row_idx"); + + let row_idx_col = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(row_idx_col.value(0), key1); +} + +#[tokio::test] +async fn test_missing_keys_return_empty() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir); + + let missing = pack_key(0, 0, 99); // offset 99 doesn't exist + let batches = provider + .fetch_by_keys(&[missing], "row_idx", None) + .await + .unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 0); +} + +#[tokio::test] +async fn test_empty_key_slice() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir); + + let batches = provider.fetch_by_keys(&[], "row_idx", None).await.unwrap(); + assert!(batches.is_empty()); +} + +/// Regression test for the silent-empty-scan bug: +/// scan() used to return an empty MemTable, producing zero rows with no error. +/// It must now return NotImplemented so callers get a clear failure. +#[tokio::test] +async fn test_scan_returns_not_implemented() { + let dir = tempdir().unwrap(); + let provider = make_provider(&dir); + + let ctx = SessionContext::new(); + let state = ctx.state(); + let result = provider.scan(&state, None, &[], None).await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("not support full table scans"), + "expected NotImplemented error, got: {err}" + ); +} + +/// Regression test for the SQL injection fix via quote_ident: +/// a table name containing spaces (and thus requiring quoting) must work +/// correctly rather than producing a SQL syntax error. +#[tokio::test] +async fn test_table_name_with_spaces() { + let dir = tempdir().unwrap(); + let parquet_schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)])); + let provider_schema = Arc::new(Schema::new(vec![ + Field::new("row_idx", DataType::UInt64, false), + Field::new("name", DataType::Utf8, true), + ])); + + let batch = RecordBatch::try_new( + parquet_schema.clone(), + vec![Arc::new(StringArray::from(vec![Some("alice")]))], + ) + .unwrap(); + + let parquet_path = dir.path().join("test.parquet"); + let file = std::fs::File::create(&parquet_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, parquet_schema, None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let layout = DatasetLayout { + file_keys: vec!["parquet/test.parquet".to_string()], + file_cum_rows: vec![0, 1], + rg_cum_rows: vec![vec![0, 1]], + }; + + let db_path = dir.path().join("test.db"); + // Table name with spaces — previously this would have produced a SQL syntax error. + let provider = SqliteLookupProvider::open_or_build( + db_path.to_str().unwrap(), + "my models", + 2, + &[parquet_path.to_str().unwrap().to_string()], + &layout, + provider_schema, + &[0], + ) + .unwrap(); + + let key0 = pack_key(0, 0, 0); + let batches = provider + .fetch_by_keys(&[key0], "row_idx", None) + .await + .unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); +}