Skip to content

Commit d545a7b

Browse files
Merge branch 'apache:main' into main
2 parents aaa900d + 22c4214 commit d545a7b

34 files changed

Lines changed: 1096 additions & 262 deletions

File tree

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@2cdf2d81f4edfc3b41d5ddf56083c70b48ac2475 # v2.62.35
45+
uses: taiki-e/install-action@ebb229c6baa68383264f2822689b07b4916d9177 # v2.62.36
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ jobs:
425425
sudo apt-get update -qq
426426
sudo apt-get install -y -qq clang
427427
- name: Setup wasm-pack
428-
uses: taiki-e/install-action@2cdf2d81f4edfc3b41d5ddf56083c70b48ac2475 # v2.62.35
428+
uses: taiki-e/install-action@ebb229c6baa68383264f2822689b07b4916d9177 # v2.62.36
429429
with:
430430
tool: wasm-pack
431431
- name: Run tests with headless mode
@@ -752,7 +752,7 @@ jobs:
752752
- name: Setup Rust toolchain
753753
uses: ./.github/actions/setup-builder
754754
- name: Install cargo-msrv
755-
uses: taiki-e/install-action@2cdf2d81f4edfc3b41d5ddf56083c70b48ac2475 # v2.62.35
755+
uses: taiki-e/install-action@ebb229c6baa68383264f2822689b07b4916d9177 # v2.62.36
756756
with:
757757
tool: cargo-msrv
758758

benchmarks/src/nlj.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,45 @@ const NLJ_QUERIES: &[&str] = &[
146146
FULL JOIN range(30000) AS t2
147147
ON (t1.value > t2.value);
148148
"#,
149+
// Q13: LEFT SEMI 30K x 30K | HIGH 99.9%
150+
r#"
151+
SELECT t1.*
152+
FROM range(30000) AS t1
153+
LEFT SEMI JOIN range(30000) AS t2
154+
ON t1.value < t2.value;
155+
"#,
156+
// Q14: LEFT ANTI 30K x 30K | LOW 0.003%
157+
r#"
158+
SELECT t1.*
159+
FROM range(30000) AS t1
160+
LEFT ANTI JOIN range(30000) AS t2
161+
ON t1.value < t2.value;
162+
"#,
163+
// Q15: RIGHT SEMI 30K x 30K | HIGH 99.9%
164+
r#"
165+
SELECT t1.*
166+
FROM range(30000) AS t2
167+
RIGHT SEMI JOIN range(30000) AS t1
168+
ON t2.value < t1.value;
169+
"#,
170+
// Q16: RIGHT ANTI 30K x 30K | LOW 0.003%
171+
r#"
172+
SELECT t1.*
173+
FROM range(30000) AS t2
174+
RIGHT ANTI JOIN range(30000) AS t1
175+
ON t2.value < t1.value;
176+
"#,
177+
// Q17: LEFT MARK | HIGH 99.9%
178+
r#"
179+
SELECT *
180+
FROM range(30000) AS t2(k2)
181+
WHERE k2 > 0
182+
OR EXISTS (
183+
SELECT 1
184+
FROM range(30000) AS t1(k1)
185+
WHERE t2.k2 > t1.k1
186+
);
187+
"#,
149188
];
150189

151190
impl RunOpt {

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 125 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub enum InstrumentedObjectStoreMode {
5858

5959
impl fmt::Display for InstrumentedObjectStoreMode {
6060
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61-
write!(f, "{:?}", self)
61+
write!(f, "{self:?}")
6262
}
6363
}
6464

@@ -255,6 +255,48 @@ impl InstrumentedObjectStore {
255255
Ok(ret)
256256
}
257257

258+
async fn instrumented_copy(&self, from: &Path, to: &Path) -> Result<()> {
259+
let timestamp = Utc::now();
260+
let start = Instant::now();
261+
self.inner.copy(from, to).await?;
262+
let elapsed = start.elapsed();
263+
264+
self.requests.lock().push(RequestDetails {
265+
op: Operation::Copy,
266+
path: from.clone(),
267+
timestamp,
268+
duration: Some(elapsed),
269+
size: None,
270+
range: None,
271+
extra_display: Some(format!("copy_to: {to}")),
272+
});
273+
274+
Ok(())
275+
}
276+
277+
async fn instrumented_copy_if_not_exists(
278+
&self,
279+
from: &Path,
280+
to: &Path,
281+
) -> Result<()> {
282+
let timestamp = Utc::now();
283+
let start = Instant::now();
284+
self.inner.copy_if_not_exists(from, to).await?;
285+
let elapsed = start.elapsed();
286+
287+
self.requests.lock().push(RequestDetails {
288+
op: Operation::Copy,
289+
path: from.clone(),
290+
timestamp,
291+
duration: Some(elapsed),
292+
size: None,
293+
range: None,
294+
extra_display: Some(format!("copy_to: {to}")),
295+
});
296+
297+
Ok(())
298+
}
299+
258300
async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
259301
let timestamp = Utc::now();
260302
let start = Instant::now();
@@ -347,10 +389,18 @@ impl ObjectStore for InstrumentedObjectStore {
347389
}
348390

349391
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
392+
if self.enabled() {
393+
return self.instrumented_copy(from, to).await;
394+
}
395+
350396
self.inner.copy(from, to).await
351397
}
352398

353399
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
400+
if self.enabled() {
401+
return self.instrumented_copy_if_not_exists(from, to).await;
402+
}
403+
354404
self.inner.copy_if_not_exists(from, to).await
355405
}
356406

@@ -366,7 +416,7 @@ impl ObjectStore for InstrumentedObjectStore {
366416
/// Object store operation types tracked by [`InstrumentedObjectStore`]
367417
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
368418
pub enum Operation {
369-
_Copy,
419+
Copy,
370420
Delete,
371421
Get,
372422
Head,
@@ -376,7 +426,7 @@ pub enum Operation {
376426

377427
impl fmt::Display for Operation {
378428
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379-
write!(f, "{:?}", self)
429+
write!(f, "{self:?}")
380430
}
381431
}
382432

@@ -506,11 +556,11 @@ impl RequestSummaries {
506556
let size_stats = s.size_stats.as_ref();
507557
let dur_avg = duration_stats.map(|d| {
508558
let avg = d.sum.as_secs_f32() / count;
509-
format!("{:.6}s", avg)
559+
format!("{avg:.6}s")
510560
});
511561
let size_avg = size_stats.map(|s| {
512562
let avg = s.sum as f32 / count;
513-
format!("{} B", avg)
563+
format!("{avg} B")
514564
});
515565
[dur_avg, size_avg]
516566
})
@@ -947,6 +997,76 @@ mod tests {
947997
assert!(request.extra_display.is_none());
948998
}
949999

1000+
#[tokio::test]
1001+
async fn instrumented_store_copy() {
1002+
let (instrumented, path) = setup_test_store().await;
1003+
let copy_to = Path::from("test/copied");
1004+
1005+
// By default no requests should be instrumented/stored
1006+
assert!(instrumented.requests.lock().is_empty());
1007+
instrumented.copy(&path, &copy_to).await.unwrap();
1008+
assert!(instrumented.requests.lock().is_empty());
1009+
1010+
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1011+
assert!(instrumented.requests.lock().is_empty());
1012+
instrumented.copy(&path, &copy_to).await.unwrap();
1013+
assert_eq!(instrumented.requests.lock().len(), 1);
1014+
1015+
let mut requests = instrumented.take_requests();
1016+
assert_eq!(requests.len(), 1);
1017+
assert!(instrumented.requests.lock().is_empty());
1018+
1019+
let request = requests.pop().unwrap();
1020+
assert_eq!(request.op, Operation::Copy);
1021+
assert_eq!(request.path, path);
1022+
assert!(request.duration.is_some());
1023+
assert!(request.size.is_none());
1024+
assert!(request.range.is_none());
1025+
assert_eq!(
1026+
request.extra_display.unwrap(),
1027+
format!("copy_to: {copy_to}")
1028+
);
1029+
}
1030+
1031+
#[tokio::test]
1032+
async fn instrumented_store_copy_if_not_exists() {
1033+
let (instrumented, path) = setup_test_store().await;
1034+
let mut copy_to = Path::from("test/copied");
1035+
1036+
// By default no requests should be instrumented/stored
1037+
assert!(instrumented.requests.lock().is_empty());
1038+
instrumented
1039+
.copy_if_not_exists(&path, &copy_to)
1040+
.await
1041+
.unwrap();
1042+
assert!(instrumented.requests.lock().is_empty());
1043+
1044+
// Use a new destination since the previous one already exists
1045+
copy_to = Path::from("test/copied_again");
1046+
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
1047+
assert!(instrumented.requests.lock().is_empty());
1048+
instrumented
1049+
.copy_if_not_exists(&path, &copy_to)
1050+
.await
1051+
.unwrap();
1052+
assert_eq!(instrumented.requests.lock().len(), 1);
1053+
1054+
let mut requests = instrumented.take_requests();
1055+
assert_eq!(requests.len(), 1);
1056+
assert!(instrumented.requests.lock().is_empty());
1057+
1058+
let request = requests.pop().unwrap();
1059+
assert_eq!(request.op, Operation::Copy);
1060+
assert_eq!(request.path, path);
1061+
assert!(request.duration.is_some());
1062+
assert!(request.size.is_none());
1063+
assert!(request.range.is_none());
1064+
assert_eq!(
1065+
request.extra_display.unwrap(),
1066+
format!("copy_to: {copy_to}")
1067+
);
1068+
}
1069+
9501070
#[tokio::test]
9511071
async fn instrumented_store_head() {
9521072
let (instrumented, path) = setup_test_store().await;

datafusion-cli/src/print_options.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl PrintOptions {
206206

207207
writeln!(writer, "Summaries:")?;
208208
let summaries = RequestSummaries::new(&requests);
209-
writeln!(writer, "{}", summaries)?;
209+
writeln!(writer, "{summaries}")?;
210210
}
211211
}
212212
}

datafusion/common/src/table_reference.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -269,24 +269,41 @@ impl TableReference {
269269
}
270270

271271
/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
272-
/// identifier. See docs on [`TableReference`] for more details.
272+
/// identifier, normalizing `s` to lowercase.
273+
/// See docs on [`TableReference`] for more details.
273274
pub fn parse_str(s: &str) -> Self {
274-
let mut parts = parse_identifiers_normalized(s, false);
275+
Self::parse_str_normalized(s, false)
276+
}
277+
278+
/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
279+
/// identifier, normalizing `s` to lowercase if `ignore_case` is `false`.
280+
/// See docs on [`TableReference`] for more details.
281+
pub fn parse_str_normalized(s: &str, ignore_case: bool) -> Self {
282+
let table_parts = parse_identifiers_normalized(s, ignore_case);
275283

284+
Self::from_vec(table_parts).unwrap_or_else(|| Self::Bare { table: s.into() })
285+
}
286+
287+
/// Consume a vector of identifier parts to compose a [`TableReference`]. The input vector
288+
/// should contain 1 <= N <= 3 elements in the following sequence:
289+
/// ```no_rust
290+
/// [<catalog>, <schema>, table]
291+
/// ```
292+
fn from_vec(mut parts: Vec<String>) -> Option<Self> {
276293
match parts.len() {
277-
1 => Self::Bare {
278-
table: parts.remove(0).into(),
279-
},
280-
2 => Self::Partial {
281-
schema: parts.remove(0).into(),
282-
table: parts.remove(0).into(),
283-
},
284-
3 => Self::Full {
285-
catalog: parts.remove(0).into(),
286-
schema: parts.remove(0).into(),
287-
table: parts.remove(0).into(),
288-
},
289-
_ => Self::Bare { table: s.into() },
294+
1 => Some(Self::Bare {
295+
table: parts.pop()?.into(),
296+
}),
297+
2 => Some(Self::Partial {
298+
table: parts.pop()?.into(),
299+
schema: parts.pop()?.into(),
300+
}),
301+
3 => Some(Self::Full {
302+
table: parts.pop()?.into(),
303+
schema: parts.pop()?.into(),
304+
catalog: parts.pop()?.into(),
305+
}),
306+
_ => None,
290307
}
291308
}
292309

datafusion/common/src/utils/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,9 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
285285
Ok(idents)
286286
}
287287

288+
/// Parse a string into a vector of identifiers.
289+
///
290+
/// Note: If ignore_case is false, the string will be normalized to lowercase.
288291
#[cfg(feature = "sql")]
289292
pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
290293
parse_identifiers(s)

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ impl DefaultPhysicalPlanner {
989989
struct_type_columns.clone(),
990990
schema,
991991
options.clone(),
992-
))
992+
)?)
993993
}
994994

995995
// 2 Children

datafusion/core/tests/core_integration.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ mod sql;
2121
/// Run all tests that are found in the `dataframe` directory
2222
mod dataframe;
2323

24+
/// Run all tests that are found in the `datasource` directory
25+
mod datasource;
26+
2427
/// Run all tests that are found in the `macro_hygiene` directory
2528
mod macro_hygiene;
2629

datafusion/core/tests/dataframe/dataframe_functions.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,33 @@ async fn test_nvl2() -> Result<()> {
274274

275275
Ok(())
276276
}
277+
278+
#[tokio::test]
279+
async fn test_nvl2_short_circuit() -> Result<()> {
280+
let expr = nvl2(
281+
col("a"),
282+
arrow_cast(lit("1"), lit("Int32")),
283+
arrow_cast(col("a"), lit("Int32")),
284+
);
285+
286+
let batches = get_batches(expr).await?;
287+
288+
assert_snapshot!(
289+
batches_to_string(&batches),
290+
@r#"
291+
+-----------------------------------------------------------------------------------+
292+
| nvl2(test.a,arrow_cast(Utf8("1"),Utf8("Int32")),arrow_cast(test.a,Utf8("Int32"))) |
293+
+-----------------------------------------------------------------------------------+
294+
| 1 |
295+
| 1 |
296+
| 1 |
297+
| 1 |
298+
+-----------------------------------------------------------------------------------+
299+
"#
300+
);
301+
302+
Ok(())
303+
}
277304
#[tokio::test]
278305
async fn test_fn_arrow_typeof() -> Result<()> {
279306
let expr = arrow_typeof(col("l"));

0 commit comments

Comments
 (0)