diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 7ec0e04f42a2c..42257e0c5e185 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -307,7 +307,7 @@ impl Catalog for MutableCatalog { // Create database. let res = self.ctx.meta.create_database(req.clone()).await?; info!( - "db name: {}, engine: {}", + "[CATALOG] Creating database: name={}, engine={}", req.name_ident.database_name(), &req.meta.engine ); @@ -698,13 +698,13 @@ impl Catalog for MutableCatalog { } info!( - "updating multi table meta. number of tables: {}", + "[CATALOG] Updating multiple table metadata: table_count={}", req.update_table_metas.len() ); let begin = Instant::now(); let res = self.ctx.meta.update_multi_table_meta(req).await; info!( - "update multi table meta done. time used {:?}", + "[CATALOG] Multiple table metadata update completed: elapsed_time={:?}", begin.elapsed() ); Ok(res?) diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 0029a2b0ff0b1..36c49b255e251 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -66,7 +66,10 @@ pub async fn hook_compact( ) { let op_name = trace_ctx.operation_name.clone(); if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await { - info!("compact hook ({}) with error (ignored): {}", op_name, e); + info!( + "[COMPACT-HOOK] Operation {} failed with error (ignored): {}", + op_name, e + ); } } @@ -86,13 +89,13 @@ async fn do_hook_compact( if info.res.is_ok() { let op_name = &trace_ctx.operation_name; metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64); - info!("execute {op_name} finished successfully. running table optimization job."); + info!("[COMPACT-HOOK] Operation {op_name} completed successfully, starting table optimization job."); let compact_start_at = Instant::now(); let compaction_limits = match compact_target.mutation_kind { MutationKind::Insert => { let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table); - info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint); + info!("[COMPACT-HOOK] Table {} requires compaction of {} blocks", compact_target.table, compaction_num_block_hint); if compaction_num_block_hint == 0 { return Ok(()); } @@ -120,9 +123,9 @@ async fn do_hook_compact( compact_table(ctx, compact_target, compaction_limits, lock_opt) }) { Ok(_) => { - info!("execute {op_name} finished successfully. table optimization job finished."); + info!("[COMPACT-HOOK] Operation {op_name} and table optimization job completed successfully."); } - Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e); } + Err(e) => { info!("[COMPACT-HOOK] Operation {op_name} completed but table optimization job failed: {:?}", e); } } // reset the progress value diff --git a/src/query/service/src/interpreters/hook/hook.rs b/src/query/service/src/interpreters/hook/hook.rs index 24b81b2ace411..91566a7210ad9 100644 --- a/src/query/service/src/interpreters/hook/hook.rs +++ b/src/query/service/src/interpreters/hook/hook.rs @@ -76,12 +76,12 @@ impl HookOperator { pub async fn execute_compact(&self, pipeline: &mut Pipeline) { match self.ctx.get_settings().get_enable_compact_after_write() { Ok(false) => { - info!("auto compaction disabled"); + info!("[TABLE-HOOK] Auto compaction is disabled"); return; } Err(e) => { // swallow the exception, compaction hook should not prevent the main operation. - warn!("failed to get compaction settings, ignored. {}", e); + warn!("[TABLE-HOOK] Failed to retrieve compaction settings, continuing without compaction: {}", e); return; } Ok(true) => { diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index 3589c79459a59..d00872646f8ea 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -63,13 +63,13 @@ pub async fn hook_refresh(ctx: Arc, pipeline: &mut Pipeline, desc: pipeline.set_on_finished(move |info: &ExecutionInfo| { if info.res.is_ok() { - info!("execute pipeline finished successfully, starting run refresh job."); + info!("[REFRESH-HOOK] Pipeline execution completed successfully, starting refresh job"); match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc)) { Ok(_) => { - info!("execute refresh job successfully."); + info!("[REFRESH-HOOK] Refresh job completed successfully"); } Err(e) => { - info!("execute refresh job failed. {:?}", e); + info!("[REFRESH-HOOK] Refresh job failed: {:?}", e); } } } diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index b1f0aaa977ecb..c3eed71fc475e 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -61,7 +61,7 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { } log::info!( - "Vacuum temporary files by hook, node files: {:?}", + "[VACUUM-HOOK] Cleaning temporary files from nodes: {:?}", node_files ); @@ -77,7 +77,7 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { .await; if let Err(cause) = &removed_files { - log::warn!("Vacuum temporary files has error: {:?}", cause); + log::warn!("[VACUUM-HOOK] Failed to clean temporary files: {:?}", cause); } Ok(()) @@ -96,7 +96,10 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc) -> Result<()> { .get_spilling_to_disk_vacuum_unknown_temp_dirs_limit()?; let deleted = mgr.drop_disk_spill_dir_unknown(limit)?; if !deleted.is_empty() { - warn!("Deleted residual temporary directories: {:?}", deleted) + warn!( + "[VACUUM-HOOK] Removed residual temporary directories: {:?}", + deleted + ) } } diff --git a/src/query/service/src/servers/http/middleware/session.rs b/src/query/service/src/servers/http/middleware/session.rs index 99bea6200d4de..fbb81dcfa22fa 100644 --- a/src/query/service/src/servers/http/middleware/session.rs +++ b/src/query/service/src/servers/http/middleware/session.rs @@ -617,7 +617,7 @@ impl Endpoint for HTTPSessionEndpoint { } else { let msg = format!("sticky_node_id '{sticky_node_id}' not found in cluster",); - warn!("{}", msg); + warn!("[HTTP-SESSION] {}", msg); Err(Error::from(HttpErrorCode::bad_request( ErrorCode::BadArguments(msg), ))) @@ -650,7 +650,7 @@ impl Endpoint for HTTPSessionEndpoint { } Ok(None) => { let msg = format!("Not find the '{}' warehouse; it is possible that all nodes of the warehouse have gone offline. Please exit the client and reconnect, or use `use warehouse `", warehouse); - warn!("{}", msg); + warn!("[HTTP-SESSION] {}", msg); return Err(Error::from(HttpErrorCode::bad_request( ErrorCode::UnknownWarehouse(msg), ))); @@ -671,7 +671,9 @@ impl Endpoint for HTTPSessionEndpoint { } } - log::warn!("Ignore header ({HEADER_WAREHOUSE}: {warehouse:?})"); + log::warn!( + "[HTTP-SESSION] Ignoring warehouse header: {HEADER_WAREHOUSE}={warehouse:?}" + ); } }; @@ -706,13 +708,13 @@ impl Endpoint for HTTPSessionEndpoint { let err = HttpErrorCode::error_code(err); if err.status() == StatusCode::UNAUTHORIZED { warn!( - "http auth failure: {method} {uri}, headers={:?}, error={}", + "[HTTP-SESSION] Authentication failure: {method} {uri}, headers={:?}, error={}", sanitize_request_headers(&headers), err ); } else { error!( - "http request err: {method} {uri}, headers={:?}, error={}", + "[HTTP-SESSION] Request error: {method} {uri}, headers={:?}, error={}", sanitize_request_headers(&headers), err ); diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 147f38520d77a..4c07eddf12b62 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -807,7 +807,7 @@ impl HttpQuery { warnings: query_context.pop_warnings(), }; - info!( + error!( "[HTTP-QUERY] Query state changed to Stopped, failed to start: {:?}", e ); diff --git a/src/query/settings/src/settings_global.rs b/src/query/settings/src/settings_global.rs index b3566e19671b3..55a1706f82f38 100644 --- a/src/query/settings/src/settings_global.rs +++ b/src/query/settings/src/settings_global.rs @@ -108,7 +108,10 @@ impl Settings { .insert(name.clone(), match default_settings.settings.get(&name) { None => { // the settings may be deprecated - warn!("Ignore deprecated global setting {} = {}", name, val); + warn!( + "[SETTINGS] Ignoring deprecated global setting: {} = {}", + name, val + ); continue; } Some(default_setting_value) => { @@ -116,7 +119,10 @@ impl Settings { .is_err() { // the settings is session only, ignore the global setting - warn!("Ignore session only global setting {} = {}", name, val); + warn!( + "[SETTINGS] Ignoring session-only setting at global scope: {} = {}", + name, val + ); continue; } match &default_setting_value.value { diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 23ff11114564a..c659a4b12910a 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -366,7 +366,7 @@ impl Binder { let columns = self.metadata.read().columns_by_table_index(table_index); let scan_id = self.metadata.write().next_scan_id(); log::info!( - "[RUNTIME-FILTER]bind_base_table scan_id: {},table_entry: {:?}", + "[RUNTIME-FILTER] bind_base_table scan_id: {},table_entry: {:?}", scan_id, table ); diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 658ffc8ca67e6..97c87c95cc69b 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -288,7 +288,10 @@ where F: SnapshotGenerator + Send + Sync + 'static // Vacuum in a best-effort manner, errors are ignored warn!("Vacuum table {} failed : {}", tbl.table_info.name, e); } else { - info!("vacuum table {} done", tbl.table_info.name); + info!( + "[SINK-COMMIT] Vacuum completed for table {}", + tbl.table_info.name + ); } } @@ -307,7 +310,7 @@ where F: SnapshotGenerator + Send + Sync + 'static if let Some(vacuum_handler) = &self.vacuum_handler { self.exec_auto_vacuum2(tbl, vacuum_handler.as_ref()).await; } else { - info!("no vacuum handler found for auto vacuuming, please re-check your license"); + info!("[SINK-COMMIT] No vacuum handler available for auto vacuuming, please verify your license"); } Ok(()) @@ -530,7 +533,7 @@ where F: SnapshotGenerator + Send + Sync + 'static { let elapsed_time = self.start_time.elapsed(); let status = format!( - "commit mutation success after {} retries, which took {:?}", + "[SINK-COMMIT] Mutation committed successfully after {} retries in {:?}", self.retries, elapsed_time ); metrics_inc_commit_milliseconds(elapsed_time.as_millis()); @@ -554,7 +557,10 @@ where F: SnapshotGenerator + Send + Sync + 'static .collect::>(); (tbl, stream_descriptions) }; - info!("commit mutation success, targets {:?}", target_descriptions); + info!( + "[SINK-COMMIT] Mutation committed successfully, targets: {:?}", + target_descriptions + ); self.state = State::Finish; } Err(e) if self.is_error_recoverable(&e) => { @@ -563,7 +569,7 @@ where F: SnapshotGenerator + Send + Sync + 'static Some(d) => { let name = table_info.name.clone(); debug!( - "got error TableVersionMismatched, tx will be retried {} ms later. table name {}, identity {}", + "[SINK-COMMIT] TableVersionMismatched error detected, transaction will retry in {} ms. Table: {}, ID: {}", d.as_millis(), name.as_str(), table_info.ident diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index 7b52c32acac84..3bbc2e873264d 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -167,7 +167,7 @@ impl MatchedAggregator { // |----------------------------block----------------------------------------| // |----partial-unmodified----|-----matched------|----partial-unmodified-----| info!( - "duplicated block: segment_idx: {}, block_idx: {}", + "[MERGE-INTO] Duplicated block detected: segment_idx={}, block_idx={}", meta_index.segment_idx, meta_index.block_idx ); } @@ -289,7 +289,7 @@ impl MatchedAggregator { let segment_info = segment_infos.get(&item.0).unwrap(); let block_idx = segment_info.blocks.len() - block_idx - 1; info!( - "target_build_optimization, merge into apply: segment_idx:{},blk_idx:{}", + "[MERGE-INTO] Target build optimization applying: segment_idx={}, block_idx={}", segment_idx, block_idx ); mutation_logs.push(MutationLogEntry::DeletedBlock { @@ -316,7 +316,7 @@ impl MatchedAggregator { let block_idx = segment_info.blocks.len() - block_idx as usize - 1; assert!(block_idx < segment_info.blocks.len()); info!( - "merge into apply: segment_idx:{},blk_idx:{}", + "[MERGE-INTO] Applying mutation: segment_idx={}, block_idx={}", segment_idx, block_idx ); // the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta() @@ -385,8 +385,8 @@ impl AggregationContext { modified_offsets: HashSet, ) -> Result> { info!( - "apply update and delete to segment idx {}, block idx {}", - segment_idx, block_idx, + "[MERGE-INTO] Applying update and delete operations to segment_idx={}, block_idx={}", + segment_idx, block_idx ); let mut origin_data_block = read_block( self.write_settings.storage_format, @@ -439,7 +439,7 @@ impl AggregationContext { let cluster_stats = generator.gen_with_origin_stats(&block, origin_stats.clone())?; info!( - "serialize block after get cluster_stats:\n {:?}", + "[MERGE-INTO] Serializing block with cluster stats: {:?}", cluster_stats ); Ok((cluster_stats, block)) diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs index cdd92e0980cdd..cf8fb61eb077f 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs @@ -215,7 +215,7 @@ impl Transform for ReadParquetDataTransform { if unfinished_processors_count == 1 { let blocks_total = self.stats.blocks_total.load(Ordering::Relaxed); let blocks_pruned = self.stats.blocks_pruned.load(Ordering::Relaxed); - info!("[RUNTIME-FILTER]ReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned); + info!("[RUNTIME-FILTER] ReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned); } Ok(()) } @@ -324,7 +324,7 @@ impl AsyncTransform for ReadParquetDataTransform { if unfinished_processors_count == 1 { let blocks_total = self.stats.blocks_total.load(Ordering::Relaxed); let blocks_pruned = self.stats.blocks_pruned.load(Ordering::Relaxed); - info!("[RUNTIME-FILTER]AsyncReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned); + info!("[RUNTIME-FILTER] AsyncReadParquetDataTransform finished, scan_id: {}, blocks_total: {}, blocks_pruned: {}", self.scan_id, blocks_total, blocks_pruned); } Ok(()) } diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index f279915cebba7..7eeb531d43e5d 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -378,7 +378,7 @@ impl FuseTable { let pruning_stats = pruner.pruning_stats(); info!( - "prune snapshot block end, final block numbers:{}, cost:{:?}", + "[FUSE-PARTITIONS] prune snapshot block end, final block numbers:{}, cost:{:?}", block_metas.len(), start.elapsed() );