Skip to content

Commit 40b4550

Browse files
committed
Extract cache invalidation logic
1 parent 4c8bd64 commit 40b4550

1 file changed

Lines changed: 28 additions & 14 deletions

File tree

  • datafusion/core/src/execution/context

datafusion/core/src/execution/context/mod.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,22 +1429,29 @@ impl SessionContext {
14291429
&& table_provider.table_type() == table_type
14301430
{
14311431
schema.deregister_table(&table)?;
1432-
if table_type == TableType::Base {
1433-
if let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache()
1434-
{
1435-
lfc.drop_table_entries(&Some(table_ref.clone()))?;
1436-
}
1437-
if let Some(fsc) =
1438-
self.runtime_env().cache_manager.get_file_statistic_cache()
1439-
{
1440-
fsc.drop_table_entries(&Some(table_ref.clone()))?;
1441-
}
1442-
}
1432+
self.invalidate_caches(&Some(table_ref.clone()), table_type)?;
14431433
return Ok(true);
14441434
}
14451435
Ok(false)
14461436
}
14471437

1438+
fn invalidate_caches(
1439+
&self,
1440+
table_ref: &Option<TableReference>,
1441+
table_type: TableType,
1442+
) -> Result<()> {
1443+
if table_type == TableType::Base {
1444+
if let Some(lfc) = self.runtime_env().cache_manager.get_list_files_cache() {
1445+
lfc.drop_table_entries(table_ref)?;
1446+
}
1447+
if let Some(fsc) = self.runtime_env().cache_manager.get_file_statistic_cache()
1448+
{
1449+
fsc.drop_table_entries(table_ref)?;
1450+
}
1451+
}
1452+
Ok(())
1453+
}
1454+
14481455
async fn create_function(&self, stmt: CreateFunction) -> Result<DataFrame> {
14491456
let function = {
14501457
let state = self.state.read().clone();
@@ -1879,10 +1886,17 @@ impl SessionContext {
18791886
) -> Result<Option<Arc<dyn TableProvider>>> {
18801887
let table_ref = table_ref.into();
18811888
let table = table_ref.table().to_owned();
1882-
self.state
1889+
let result = self
1890+
.state
18831891
.read()
1884-
.schema_for_ref(table_ref)?
1885-
.deregister_table(&table)
1892+
.schema_for_ref(table_ref.clone())?
1893+
.deregister_table(&table);
1894+
1895+
if let Ok(Some(ref table_provider)) = result {
1896+
self.invalidate_caches(&Some(table_ref), table_provider.table_type())?;
1897+
}
1898+
1899+
result
18861900
}
18871901

18881902
/// Return `true` if the specified table exists in the schema provider.

0 commit comments

Comments
 (0)