Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl Catalog for MutableCatalog {
// Create database.
let res = self.ctx.meta.create_database(req.clone()).await?;
info!(
"db name: {}, engine: {}",
"[CATALOG] Creating database: name={}, engine={}",
req.name_ident.database_name(),
&req.meta.engine
);
Expand Down Expand Up @@ -698,13 +698,13 @@ impl Catalog for MutableCatalog {
}

info!(
"updating multi table meta. number of tables: {}",
"[CATALOG] Updating multiple table metadata: table_count={}",
req.update_table_metas.len()
);
let begin = Instant::now();
let res = self.ctx.meta.update_multi_table_meta(req).await;
info!(
"update multi table meta done. time used {:?}",
"[CATALOG] Multiple table metadata update completed: elapsed_time={:?}",
begin.elapsed()
);
Ok(res?)
Expand Down
13 changes: 8 additions & 5 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ pub async fn hook_compact(
) {
let op_name = trace_ctx.operation_name.clone();
if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await {
info!("compact hook ({}) with error (ignored): {}", op_name, e);
info!(
"[COMPACT-HOOK] Operation {} failed with error (ignored): {}",
op_name, e
);
}
}

Expand All @@ -86,13 +89,13 @@ async fn do_hook_compact(
if info.res.is_ok() {
let op_name = &trace_ctx.operation_name;
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
info!("execute {op_name} finished successfully. running table optimization job.");
info!("[COMPACT-HOOK] Operation {op_name} completed successfully, starting table optimization job.");

let compact_start_at = Instant::now();
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
info!("[COMPACT-HOOK] Table {} requires compaction of {} blocks", compact_target.table, compaction_num_block_hint);
if compaction_num_block_hint == 0 {
return Ok(());
}
Expand Down Expand Up @@ -120,9 +123,9 @@ async fn do_hook_compact(
compact_table(ctx, compact_target, compaction_limits, lock_opt)
}) {
Ok(_) => {
info!("execute {op_name} finished successfully. table optimization job finished.");
info!("[COMPACT-HOOK] Operation {op_name} and table optimization job completed successfully.");
}
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e); }
Err(e) => { info!("[COMPACT-HOOK] Operation {op_name} completed but table optimization job failed: {:?}", e); }
}

// reset the progress value
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/hook/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ impl HookOperator {
pub async fn execute_compact(&self, pipeline: &mut Pipeline) {
match self.ctx.get_settings().get_enable_compact_after_write() {
Ok(false) => {
info!("auto compaction disabled");
info!("[TABLE-HOOK] Auto compaction is disabled");
return;
}
Err(e) => {
// swallow the exception, compaction hook should not prevent the main operation.
warn!("failed to get compaction settings, ignored. {}", e);
warn!("[TABLE-HOOK] Failed to retrieve compaction settings, continuing without compaction: {}", e);
return;
}
Ok(true) => {
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ pub async fn hook_refresh(ctx: Arc<QueryContext>, pipeline: &mut Pipeline, desc:

pipeline.set_on_finished(move |info: &ExecutionInfo| {
if info.res.is_ok() {
info!("execute pipeline finished successfully, starting run refresh job.");
info!("[REFRESH-HOOK] Pipeline execution completed successfully, starting refresh job");
match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc)) {
Ok(_) => {
info!("execute refresh job successfully.");
info!("[REFRESH-HOOK] Refresh job completed successfully");
}
Err(e) => {
info!("execute refresh job failed. {:?}", e);
info!("[REFRESH-HOOK] Refresh job failed: {:?}", e);
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/query/service/src/interpreters/hook/vacuum_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
}

log::info!(
"Vacuum temporary files by hook, node files: {:?}",
"[VACUUM-HOOK] Cleaning temporary files from nodes: {:?}",
node_files
);

Expand All @@ -77,7 +77,7 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc<QueryContext>) -> Result<()> {
.await;

if let Err(cause) = &removed_files {
log::warn!("Vacuum temporary files has error: {:?}", cause);
log::warn!("[VACUUM-HOOK] Failed to clean temporary files: {:?}", cause);
}

Ok(())
Expand All @@ -96,7 +96,10 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc<QueryContext>) -> Result<()> {
.get_spilling_to_disk_vacuum_unknown_temp_dirs_limit()?;
let deleted = mgr.drop_disk_spill_dir_unknown(limit)?;
if !deleted.is_empty() {
warn!("Deleted residual temporary directories: {:?}", deleted)
warn!(
"[VACUUM-HOOK] Removed residual temporary directories: {:?}",
deleted
)
}
}

Expand Down
12 changes: 7 additions & 5 deletions src/query/service/src/servers/http/middleware/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
} else {
let msg =
format!("sticky_node_id '{sticky_node_id}' not found in cluster",);
warn!("{}", msg);
warn!("[HTTP-SESSION] {}", msg);
Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::BadArguments(msg),
)))
Expand Down Expand Up @@ -650,7 +650,7 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
}
Ok(None) => {
let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse <new_warehouse>`", warehouse);
warn!("{}", msg);
warn!("[HTTP-SESSION] {}", msg);
return Err(Error::from(HttpErrorCode::bad_request(
ErrorCode::UnknownWarehouse(msg),
)));
Expand All @@ -671,7 +671,9 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
}
}

log::warn!("Ignore header ({HEADER_WAREHOUSE}: {warehouse:?})");
log::warn!(
"[HTTP-SESSION] Ignoring warehouse header: {HEADER_WAREHOUSE}={warehouse:?}"
);
}
};

Expand Down Expand Up @@ -706,13 +708,13 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
let err = HttpErrorCode::error_code(err);
if err.status() == StatusCode::UNAUTHORIZED {
warn!(
"http auth failure: {method} {uri}, headers={:?}, error={}",
"[HTTP-SESSION] Authentication failure: {method} {uri}, headers={:?}, error={}",
sanitize_request_headers(&headers),
err
);
} else {
error!(
"http request err: {method} {uri}, headers={:?}, error={}",
"[HTTP-SESSION] Request error: {method} {uri}, headers={:?}, error={}",
sanitize_request_headers(&headers),
err
);
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ impl HttpQuery {
warnings: query_context.pop_warnings(),
};

info!(
error!(
"[HTTP-QUERY] Query state changed to Stopped, failed to start: {:?}",
e
);
Expand Down
10 changes: 8 additions & 2 deletions src/query/settings/src/settings_global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,21 @@ impl Settings {
.insert(name.clone(), match default_settings.settings.get(&name) {
None => {
// the settings may be deprecated
warn!("Ignore deprecated global setting {} = {}", name, val);
warn!(
"[SETTINGS] Ignoring deprecated global setting: {} = {}",
name, val
);
continue;
}
Some(default_setting_value) => {
if DefaultSettings::check_setting_scope(&name, SettingScope::Global)
.is_err()
{
// the settings is session only, ignore the global setting
warn!("Ignore session only global setting {} = {}", name, val);
warn!(
"[SETTINGS] Ignoring session-only setting at global scope: {} = {}",
name, val
);
continue;
}
match &default_setting_value.value {
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl Binder {
let columns = self.metadata.read().columns_by_table_index(table_index);
let scan_id = self.metadata.write().next_scan_id();
log::info!(
"[RUNTIME-FILTER]bind_base_table scan_id: {},table_entry: {:?}",
"[RUNTIME-FILTER] bind_base_table scan_id: {},table_entry: {:?}",
scan_id,
table
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ where F: SnapshotGenerator + Send + Sync + 'static
// Vacuum in a best-effort manner, errors are ignored
warn!("Vacuum table {} failed : {}", tbl.table_info.name, e);
} else {
info!("vacuum table {} done", tbl.table_info.name);
info!(
"[SINK-COMMIT] Vacuum completed for table {}",
tbl.table_info.name
);
}
}

Expand All @@ -307,7 +310,7 @@ where F: SnapshotGenerator + Send + Sync + 'static
if let Some(vacuum_handler) = &self.vacuum_handler {
self.exec_auto_vacuum2(tbl, vacuum_handler.as_ref()).await;
} else {
info!("no vacuum handler found for auto vacuuming, please re-check your license");
info!("[SINK-COMMIT] No vacuum handler available for auto vacuuming, please verify your license");
}

Ok(())
Expand Down Expand Up @@ -530,7 +533,7 @@ where F: SnapshotGenerator + Send + Sync + 'static
{
let elapsed_time = self.start_time.elapsed();
let status = format!(
"commit mutation success after {} retries, which took {:?}",
"[SINK-COMMIT] Mutation committed successfully after {} retries in {:?}",
self.retries, elapsed_time
);
metrics_inc_commit_milliseconds(elapsed_time.as_millis());
Expand All @@ -554,7 +557,10 @@ where F: SnapshotGenerator + Send + Sync + 'static
.collect::<Vec<_>>();
(tbl, stream_descriptions)
};
info!("commit mutation success, targets {:?}", target_descriptions);
info!(
"[SINK-COMMIT] Mutation committed successfully, targets: {:?}",
target_descriptions
);
self.state = State::Finish;
}
Err(e) if self.is_error_recoverable(&e) => {
Expand All @@ -563,7 +569,7 @@ where F: SnapshotGenerator + Send + Sync + 'static
Some(d) => {
let name = table_info.name.clone();
debug!(
"got error TableVersionMismatched, tx will be retried {} ms later. table name {}, identity {}",
"[SINK-COMMIT] TableVersionMismatched error detected, transaction will retry in {} ms. Table: {}, ID: {}",
d.as_millis(),
name.as_str(),
table_info.ident
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl MatchedAggregator {
// |----------------------------block----------------------------------------|
// |----partial-unmodified----|-----matched------|----partial-unmodified-----|
info!(
"duplicated block: segment_idx: {}, block_idx: {}",
"[MERGE-INTO] Duplicated block detected: segment_idx={}, block_idx={}",
meta_index.segment_idx, meta_index.block_idx
);
}
Expand Down Expand Up @@ -289,7 +289,7 @@ impl MatchedAggregator {
let segment_info = segment_infos.get(&item.0).unwrap();
let block_idx = segment_info.blocks.len() - block_idx - 1;
info!(
"target_build_optimization, merge into apply: segment_idx:{},blk_idx:{}",
"[MERGE-INTO] Target build optimization applying: segment_idx={}, block_idx={}",
segment_idx, block_idx
);
mutation_logs.push(MutationLogEntry::DeletedBlock {
Expand All @@ -316,7 +316,7 @@ impl MatchedAggregator {
let block_idx = segment_info.blocks.len() - block_idx as usize - 1;
assert!(block_idx < segment_info.blocks.len());
info!(
"merge into apply: segment_idx:{},blk_idx:{}",
"[MERGE-INTO] Applying mutation: segment_idx={}, block_idx={}",
segment_idx, block_idx
);
// the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta()
Expand Down Expand Up @@ -385,8 +385,8 @@ impl AggregationContext {
modified_offsets: HashSet<usize>,
) -> Result<Option<MutationLogEntry>> {
info!(
"apply update and delete to segment idx {}, block idx {}",
segment_idx, block_idx,
"[MERGE-INTO] Applying update and delete operations to segment_idx={}, block_idx={}",
segment_idx, block_idx
);
let mut origin_data_block = read_block(
self.write_settings.storage_format,
Expand Down Expand Up @@ -439,7 +439,7 @@ impl AggregationContext {
let cluster_stats =
generator.gen_with_origin_stats(&block, origin_stats.clone())?;
info!(
"serialize block after get cluster_stats:\n {:?}",
"[MERGE-INTO] Serializing block with cluster stats: {:?}",
cluster_stats
);
Ok((cluster_stats, block))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Transform for ReadParquetDataTransform<true> {
if unfinished_processors_count == 1 {
let blocks_total = self.stats.blocks_total.load(Ordering::Relaxed);
let blocks_pruned = self.stats.blocks_pruned.load(Ordering::Relaxed);
info!("[RUNTIME-FILTER]ReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned);
info!("[RUNTIME-FILTER] ReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned);
}
Ok(())
}
Expand Down Expand Up @@ -324,7 +324,7 @@ impl AsyncTransform for ReadParquetDataTransform<false> {
if unfinished_processors_count == 1 {
let blocks_total = self.stats.blocks_total.load(Ordering::Relaxed);
let blocks_pruned = self.stats.blocks_pruned.load(Ordering::Relaxed);
info!("[RUNTIME-FILTER]AsyncReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned);
info!("[RUNTIME-FILTER] AsyncReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl FuseTable {
let pruning_stats = pruner.pruning_stats();

info!(
"prune snapshot block end, final block numbers:{}, cost:{:?}",
"[FUSE-PARTITIONS] prune snapshot block end, final block numbers:{}, cost:{:?}",
block_metas.len(),
start.elapsed()
);
Expand Down
Loading