@@ -36,6 +36,7 @@ use tokio_util::sync::CancellationToken;
3636
3737use crate :: config:: Config ;
3838use crate :: helpers:: watch_subgraph_updates;
39+ use crate :: log_config_provider:: { LogStoreConfigProvider , LogStoreConfigSources } ;
3940use crate :: network_setup:: Networks ;
4041use crate :: opt:: Opt ;
4142use crate :: store_builder:: StoreBuilder ;
@@ -364,6 +365,7 @@ fn build_graphql_server(
364365 metrics_registry : Arc < MetricsRegistry > ,
365366 network_store : & Arc < Store > ,
366367 logger_factory : & LoggerFactory ,
368+ log_store : Arc < dyn graph:: components:: log_store:: LogStore > ,
367369) -> GraphQLQueryServer < GraphQlRunner < Store > > {
368370 let shards: Vec < _ > = config. stores . keys ( ) . cloned ( ) . collect ( ) ;
369371 let load_manager = Arc :: new ( LoadManager :: new (
@@ -377,6 +379,7 @@ fn build_graphql_server(
377379 network_store. clone ( ) ,
378380 load_manager,
379381 metrics_registry,
382+ log_store,
380383 ) ) ;
381384
382385 GraphQLQueryServer :: new ( logger_factory, graphql_runner. clone ( ) )
@@ -440,20 +443,121 @@ pub async fn run(
440443
441444 info ! ( logger, "Starting up" ; "node_id" => & node_id) ;
442445
443- // Optionally, identify the Elasticsearch logging configuration
444- let elastic_config = opt
445- . elasticsearch_url
446- . clone ( )
447- . map ( |endpoint| ElasticLoggingConfig {
448- endpoint,
449- username : opt. elasticsearch_user . clone ( ) ,
450- password : opt. elasticsearch_password . clone ( ) ,
451- client : reqwest:: Client :: new ( ) ,
452- } ) ;
446+ // Create log store configuration provider
447+ // Build LogStoreConfig from CLI args with backward compatibility
448+ let cli_config = if let Some ( backend) = opt. log_store_backend . as_ref ( ) {
449+ // New generic CLI args used
450+ match backend. to_lowercase ( ) . as_str ( ) {
451+ "elasticsearch" | "elastic" | "es" => {
452+ let url = opt
453+ . log_store_elasticsearch_url
454+ . clone ( )
455+ . or_else ( || {
456+ if opt. elasticsearch_url . is_some ( ) {
457+ warn ! (
458+ logger,
459+ "Using deprecated --elasticsearch-url, use --log-store-elasticsearch-url instead"
460+ ) ;
461+ }
462+ opt. elasticsearch_url . clone ( )
463+ } ) ;
464+
465+ url. map ( |endpoint| {
466+ let index = opt
467+ . log_store_elasticsearch_index
468+ . clone ( )
469+ . or_else ( || std:: env:: var ( "GRAPH_LOG_STORE_ELASTICSEARCH_INDEX" ) . ok ( ) )
470+ . or_else ( || std:: env:: var ( "GRAPH_ELASTIC_SEARCH_INDEX" ) . ok ( ) )
471+ . unwrap_or_else ( || "subgraph" . to_string ( ) ) ;
472+
473+ let timeout_secs = std:: env:: var ( "GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT" )
474+ . or_else ( |_| std:: env:: var ( "GRAPH_ELASTICSEARCH_TIMEOUT" ) )
475+ . ok ( )
476+ . and_then ( |s| s. parse :: < u64 > ( ) . ok ( ) )
477+ . unwrap_or ( 10 ) ;
478+
479+ graph:: components:: log_store:: LogStoreConfig :: Elasticsearch {
480+ endpoint,
481+ username : opt
482+ . log_store_elasticsearch_user
483+ . clone ( )
484+ . or_else ( || opt. elasticsearch_user . clone ( ) ) ,
485+ password : opt
486+ . log_store_elasticsearch_password
487+ . clone ( )
488+ . or_else ( || opt. elasticsearch_password . clone ( ) ) ,
489+ index,
490+ timeout_secs,
491+ }
492+ } )
493+ }
494+
495+ "loki" => opt. log_store_loki_url . clone ( ) . map ( |endpoint| {
496+ graph:: components:: log_store:: LogStoreConfig :: Loki {
497+ endpoint,
498+ tenant_id : opt. log_store_loki_tenant_id . clone ( ) ,
499+ }
500+ } ) ,
501+
502+ "file" | "files" => opt. log_store_file_dir . clone ( ) . map ( |directory| {
503+ graph:: components:: log_store:: LogStoreConfig :: File {
504+ directory : std:: path:: PathBuf :: from ( directory) ,
505+ max_file_size : opt. log_store_file_max_size . unwrap_or ( 100 * 1024 * 1024 ) ,
506+ retention_days : opt. log_store_file_retention_days . unwrap_or ( 30 ) ,
507+ }
508+ } ) ,
509+
510+ "disabled" | "none" => None ,
511+
512+ other => {
513+ warn ! ( logger, "Invalid log store backend: {}" , other) ;
514+ None
515+ }
516+ }
517+ } else if opt. elasticsearch_url . is_some ( ) {
518+ // Old Elasticsearch-specific CLI args used (backward compatibility)
519+ warn ! (
520+ logger,
521+ "Using deprecated --elasticsearch-url CLI argument, \
522+ please use --log-store-backend elasticsearch --log-store-elasticsearch-url instead"
523+ ) ;
524+
525+ let index = opt
526+ . log_store_elasticsearch_index
527+ . clone ( )
528+ . or_else ( || std:: env:: var ( "GRAPH_LOG_STORE_ELASTICSEARCH_INDEX" ) . ok ( ) )
529+ . or_else ( || std:: env:: var ( "GRAPH_ELASTIC_SEARCH_INDEX" ) . ok ( ) )
530+ . unwrap_or_else ( || "subgraph" . to_string ( ) ) ;
531+
532+ let timeout_secs = std:: env:: var ( "GRAPH_LOG_STORE_ELASTICSEARCH_TIMEOUT" )
533+ . or_else ( |_| std:: env:: var ( "GRAPH_ELASTICSEARCH_TIMEOUT" ) )
534+ . ok ( )
535+ . and_then ( |s| s. parse :: < u64 > ( ) . ok ( ) )
536+ . unwrap_or ( 10 ) ;
537+
538+ Some (
539+ graph:: components:: log_store:: LogStoreConfig :: Elasticsearch {
540+ endpoint : opt. elasticsearch_url . clone ( ) . unwrap ( ) ,
541+ username : opt. elasticsearch_user . clone ( ) ,
542+ password : opt. elasticsearch_password . clone ( ) ,
543+ index,
544+ timeout_secs,
545+ } ,
546+ )
547+ } else {
548+ // No CLI config provided
549+ None
550+ } ;
551+
552+ let log_config_provider = LogStoreConfigProvider :: new ( LogStoreConfigSources { cli_config } ) ;
553+
554+ // Resolve log store (for querying) and config (for drains)
555+ // Priority: GRAPH_LOG_STORE env var → CLI config → NoOp/None
556+ let ( log_store, log_store_config) = log_config_provider. resolve ( & logger) ;
453557
454558 // Create a component and subgraph logger factory
455559 let logger_factory =
456- LoggerFactory :: new ( logger. clone ( ) , elastic_config , metrics_registry. clone ( ) ) ;
560+ LoggerFactory :: new ( logger. clone ( ) , log_store_config , metrics_registry. clone ( ) ) ;
457561
458562 let arweave_resolver = Arc :: new ( ArweaveClient :: new (
459563 logger. cheap_clone ( ) ,
@@ -560,6 +664,7 @@ pub async fn run(
560664 metrics_registry. clone ( ) ,
561665 & network_store,
562666 & logger_factory,
667+ log_store. clone ( ) ,
563668 ) ;
564669
565670 let index_node_server = IndexNodeServer :: new (
0 commit comments