From a0f1a91e026f68aa2101618a95b12c0aeef9f363 Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Fri, 24 Apr 2026 17:32:28 +0000 Subject: [PATCH 1/5] Clean up parameter grouping for various output handlers. --- .../core/src/clp_s/CommandLineArguments.cpp | 139 +++++++++--------- .../core/src/clp_s/CommandLineArguments.hpp | 91 ++++++------ components/core/src/clp_s/clp-s.cpp | 56 +++---- components/core/src/clp_s/kv_ir_search.cpp | 4 +- 4 files changed, 154 insertions(+), 136 deletions(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 6b1bee3ce0..3e7c1dd216 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -633,31 +633,18 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { // clang-format on search_options.add(match_options); - po::options_description aggregation_options("Aggregation Options"); - // clang-format off - aggregation_options.add_options()( - "count", - po::bool_switch(&m_do_count_results_aggregation), - "Count the number of results" - )( - "count-by-time", - po::value(&m_count_by_time_bucket_size)->value_name("SIZE"), - "Count the number of results in each time span of the given size (ms)" - ); - // clang-format on - search_options.add(aggregation_options); - po::options_description network_output_handler_options( "Network Output Handler Options" ); // clang-format off network_output_handler_options.add_options()( "host", - po::value(&m_network_dest_host)->value_name("HOST"), + po::value( + &m_network_output_handler_options.host)->value_name("HOST"), "Network destination host" )( "port", - po::value(&m_network_dest_port)->value_name("PORT"), + po::value(&m_network_output_handler_options.port)->value_name("PORT"), "Network destination port" ); // clang-format on @@ -668,16 +655,27 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { // clang-format off reducer_output_handler_options.add_options()( "host", - po::value(&m_reducer_host)->value_name("HOST"), + po::value( + &m_reducer_output_handler_options.host)->value_name("HOST"), "Host the reducer is running on" )( "port", - po::value(&m_reducer_port)->value_name("PORT"), + po::value(&m_reducer_output_handler_options.port)->value_name("PORT"), "Port the reducer is listening on" )( "job-id", - po::value(&m_job_id)->value_name("ID"), + po::value( + &m_reducer_output_handler_options.job_id)->value_name("ID"), "Job ID for the requested aggregation operation" + )( + "count", + "Count the number of results" + )( + "count-by-time", + po::value( + &m_reducer_output_handler_options.count_by_time_bucket_size + )->value_name("SIZE"), + "Count the number of results in each time span of the given size (ms)" ); // clang-format on @@ -686,32 +684,38 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); results_cache_output_handler_options.add_options()( "uri", - po::value(&m_mongodb_uri)->value_name("URI"), + po::value( + &m_results_cache_output_handler_options.uri)->value_name("URI"), "MongoDB URI for the results cache" )( "collection", - po::value(&m_mongodb_collection)->value_name("COLLECTION"), + po::value( + &m_results_cache_output_handler_options.collection)->value_name("COLLECTION"), "MongoDB collection to output to" )( "batch-size", - po::value(&m_batch_size)->value_name("SIZE")-> - default_value(m_batch_size), + po::value( + &m_results_cache_output_handler_options.batch_size)->value_name("SIZE")-> + default_value(m_results_cache_output_handler_options.batch_size), "The number of documents to insert into MongoDB per batch" )( "max-num-results", - po::value(&m_max_num_results)->value_name("MAX")-> - default_value(m_max_num_results), + po::value( + &m_results_cache_output_handler_options.max_num_results)->value_name("MAX")-> + default_value(m_results_cache_output_handler_options.max_num_results), "The maximum number of results to output" )( "dataset", - po::value(&m_dataset)->value_name("DATASET"), + po::value( + &m_results_cache_output_handler_options.dataset)->value_name("DATASET"), "The dataset name to include in each result document" ); po::options_description file_output_handler_options("File Output Handler Options"); file_output_handler_options.add_options()( "path", - po::value(&m_file_output_path)->value_name("PATH"), + po::value(&m_file_output_handler_options.output_path) + ->value_name("PATH"), "File output path" ); @@ -795,7 +799,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::options_description visible_options; visible_options.add(general_options); visible_options.add(match_options); - visible_options.add(aggregation_options); visible_options.add(file_output_handler_options); visible_options.add(network_output_handler_options); visible_options.add(results_cache_output_handler_options); @@ -828,15 +831,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); } - if (parsed_command_line_options.count("count-by-time") > 0) { - m_do_count_by_time_aggregation = true; - if (m_count_by_time_bucket_size <= 0) { - throw std::invalid_argument( - "Value for count-by-time must be greater than zero." - ); - } - } - if (parsed_command_line_options.count("output-handler") > 0) { if (static_cast(cNetworkOutputHandlerName) == output_handler_name) { m_output_handler_type = OutputHandlerType::Network; @@ -893,27 +887,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { + std::to_string(clp::enum_to_underlying_type(m_output_handler_type)) ); } - - bool aggregation_was_specified - = m_do_count_by_time_aggregation || m_do_count_results_aggregation; - if (aggregation_was_specified && OutputHandlerType::Reducer != m_output_handler_type) { - throw std::invalid_argument( - "Aggregations are only supported with the reducer output handler." - ); - } else if ((false == aggregation_was_specified - && OutputHandlerType::Reducer == m_output_handler_type)) - { - throw std::invalid_argument( - "The reducer output handler currently only supports count and" - " count-by-time aggregations." - ); - } - - if (m_do_count_by_time_aggregation && m_do_count_results_aggregation) { - throw std::invalid_argument( - "The --count-by-time and --count options are mutually exclusive." - ); - } } } catch (std::exception& e) { SPDLOG_ERROR("{}", e.what()); @@ -936,14 +909,14 @@ void CommandLineArguments::parse_network_dest_output_handler_options( if (parsed_options.count("host") == 0) { throw std::invalid_argument("host must be specified."); } - if (m_network_dest_host.empty()) { + if (m_network_output_handler_options.host.empty()) { throw std::invalid_argument("host cannot be an empty string."); } if (parsed_options.count("port") == 0) { throw std::invalid_argument("port must be specified."); } - if (m_network_dest_port <= 0) { + if (m_network_output_handler_options.port <= 0) { throw std::invalid_argument("port must be greater than zero."); } } @@ -958,23 +931,53 @@ void CommandLineArguments::parse_reducer_output_handler_options( if (parsed_options.count("host") == 0) { throw std::invalid_argument("host must be specified."); } - if (m_reducer_host.empty()) { + + if (m_reducer_output_handler_options.host.empty()) { throw std::invalid_argument("host cannot be an empty string."); } if (parsed_options.count("port") == 0) { throw std::invalid_argument("port must be specified."); } - if (m_reducer_port <= 0) { + if (m_reducer_output_handler_options.port <= 0) { throw std::invalid_argument("port must be greater than zero."); } if (parsed_options.count("job-id") == 0) { throw std::invalid_argument("job-id must be specified."); } - if (m_job_id < 0) { + if (m_reducer_output_handler_options.job_id < 0) { throw std::invalid_argument("job-id cannot be negative."); } + + bool has_aggregation{}; + if (parsed_options.count("count")) { + m_reducer_output_handler_options.aggregation_type = AggregationType::Count; + has_aggregation = true; + } + if (parsed_options.count("count-by-time")) { + if (has_aggregation) { + throw std::invalid_argument( + "The --count-by-time and --count options are mutually exclusive." + ); + } + + if (m_reducer_output_handler_options.count_by_time_bucket_size <= 0) { + throw std::invalid_argument( + "Value for count-by-time must be greater than zero." + ); + } + + m_reducer_output_handler_options.aggregation_type = AggregationType::CountByTime; + has_aggregation = true; + } + + if (false == has_aggregation) { + throw std::invalid_argument( + "The reducer output handler currently only supports count and" + " count-by-time aggregations." + ); + } } void CommandLineArguments::parse_results_cache_output_handler_options( @@ -987,22 +990,22 @@ void CommandLineArguments::parse_results_cache_output_handler_options( if (parsed_options.count("uri") == 0) { throw std::invalid_argument("uri must be specified."); } - if (m_mongodb_uri.empty()) { + if (m_results_cache_output_handler_options.uri.empty()) { throw std::invalid_argument("uri cannot be an empty string."); } if (parsed_options.count("collection") == 0) { throw std::invalid_argument("collection must be specified."); } - if (m_mongodb_collection.empty()) { + if (m_results_cache_output_handler_options.collection.empty()) { throw std::invalid_argument("collection cannot be an empty string."); } - if (0 == m_batch_size) { + if (0 == m_results_cache_output_handler_options.batch_size) { throw std::invalid_argument("batch-size cannot be 0."); } - if (0 == m_max_num_results) { + if (0 == m_results_cache_output_handler_options.max_num_results) { throw std::invalid_argument("max-num-results cannot be 0."); } } @@ -1016,7 +1019,7 @@ void CommandLineArguments::parse_file_output_handler_options( if (parsed_options.count("path") == 0) { throw std::invalid_argument("path must be specified."); } - if (m_file_output_path.empty()) { + if (m_file_output_handler_options.output_path.empty()) { throw std::invalid_argument("path cannot be an empty string."); } } diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index c82380d853..ae11795cc9 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -38,6 +39,36 @@ class CommandLineArguments { Stdout, }; + enum class AggregationType : uint8_t { + Count, + CountByTime, + }; + + struct ResultsCacheOutputHandlerOptions { + std::string uri; + std::string collection; + std::string dataset; + uint64_t batch_size{1000}; + uint64_t max_num_results{1000}; + }; + + struct FileOutputHandlerOptions { + std::string output_path; + }; + + struct NetworkOutputHandlerOptions { + std::string host; + int port{}; + }; + + struct ReducerOutputHandlerOptions { + std::string host; + int port{-1}; + reducer::job_id_t job_id{-1}; + AggregationType aggregation_type{AggregationType::Count}; + int64_t count_by_time_bucket_size{}; // Milliseconds + }; + // Constructors explicit CommandLineArguments(std::string const& program_name) : m_program_name(program_name) {} @@ -79,16 +110,6 @@ class CommandLineArguments { std::string const& get_mongodb_collection() const { return m_mongodb_collection; } - uint64_t get_batch_size() const { return m_batch_size; } - - uint64_t get_max_num_results() const { return m_max_num_results; } - - std::string const& get_network_dest_host() const { return m_network_dest_host; } - - int const& get_network_dest_port() const { return m_network_dest_port; } - - std::string const& get_file_output_path() const { return m_file_output_path; } - std::string const& get_query() const { return m_query; } std::optional get_search_begin_ts() const { return m_search_begin_ts; } @@ -97,17 +118,22 @@ class CommandLineArguments { bool get_ignore_case() const { return m_ignore_case; } - std::string const& get_reducer_host() const { return m_reducer_host; } - - int get_reducer_port() const { return m_reducer_port; } - - reducer::job_id_t get_job_id() const { return m_job_id; } + auto get_results_cache_output_handler_options() const + -> ResultsCacheOutputHandlerOptions const& { + return m_results_cache_output_handler_options; + } - bool do_count_results_aggregation() const { return m_do_count_results_aggregation; } + auto get_file_output_handler_options() const -> FileOutputHandlerOptions const& { + return m_file_output_handler_options; + } - bool do_count_by_time_aggregation() const { return m_do_count_by_time_aggregation; } + auto get_network_output_handler_options() const -> NetworkOutputHandlerOptions const& { + return m_network_output_handler_options; + } - int64_t get_count_by_time_bucket_size() const { return m_count_by_time_bucket_size; } + auto get_reducer_output_handler_options() const -> ReducerOutputHandlerOptions const& { + return m_reducer_output_handler_options; + } OutputHandlerType get_output_handler_type() const { return m_output_handler_type; } @@ -129,8 +155,6 @@ class CommandLineArguments { bool get_record_log_order() const { return false == m_disable_log_order; } - std::string const& get_dataset() const { return m_dataset; } - private: // Methods /** @@ -216,19 +240,15 @@ class CommandLineArguments { bool m_print_ordered_chunk_stats{false}; size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MiB bool m_disable_log_order{false}; - - // MongoDB configuration variables std::string m_mongodb_uri; std::string m_mongodb_collection; - uint64_t m_batch_size{1000}; - uint64_t m_max_num_results{1000}; - // Network configuration variables - std::string m_network_dest_host; - int m_network_dest_port; - - // File output configuration variables - std::string m_file_output_path; + // Search output handler options + OutputHandlerType m_output_handler_type{OutputHandlerType::Stdout}; + ResultsCacheOutputHandlerOptions m_results_cache_output_handler_options{}; + NetworkOutputHandlerOptions m_network_output_handler_options{}; + FileOutputHandlerOptions m_file_output_handler_options{}; + ReducerOutputHandlerOptions m_reducer_output_handler_options{}; // Search variables std::string m_query; @@ -236,17 +256,6 @@ class CommandLineArguments { std::optional m_search_end_ts; bool m_ignore_case{false}; std::vector m_projection_columns; - std::string m_dataset; - - // Search aggregation variables - std::string m_reducer_host; - int m_reducer_port{-1}; - reducer::job_id_t m_job_id{-1}; - bool m_do_count_results_aggregation{false}; - bool m_do_count_by_time_aggregation{false}; - int64_t m_count_by_time_bucket_size{0}; // Milliseconds - - OutputHandlerType m_output_handler_type{OutputHandlerType::Stdout}; }; } // namespace clp_s diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index f7bec78f07..0f71d5165a 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -226,40 +226,48 @@ bool search_archive( std::unique_ptr output_handler; try { switch (command_line_arguments.get_output_handler_type()) { - case CommandLineArguments::OutputHandlerType::File: - output_handler = std::make_unique( - command_line_arguments.get_file_output_path(), - true - ); + case CommandLineArguments::OutputHandlerType::File: { + auto const& options{command_line_arguments.get_file_output_handler_options()}; + output_handler + = std::make_unique(options.output_path, true); break; - case CommandLineArguments::OutputHandlerType::Network: - output_handler = std::make_unique( - command_line_arguments.get_network_dest_host(), - command_line_arguments.get_network_dest_port() - ); + } + case CommandLineArguments::OutputHandlerType::Network: { + auto const& options{command_line_arguments.get_network_output_handler_options()}; + output_handler + = std::make_unique(options.host, options.port); break; - case CommandLineArguments::OutputHandlerType::Reducer: - if (command_line_arguments.do_count_results_aggregation()) { + } + case CommandLineArguments::OutputHandlerType::Reducer: { + auto const& options{command_line_arguments.get_reducer_output_handler_options()}; + if (CommandLineArguments::AggregationType::Count == options.aggregation_type) { output_handler = std::make_unique(reducer_socket_fd); - } else if (command_line_arguments.do_count_by_time_aggregation()) { + } else if (CommandLineArguments::AggregationType::CountByTime + == options.aggregation_type) + { output_handler = std::make_unique( reducer_socket_fd, - command_line_arguments.get_count_by_time_bucket_size() + options.count_by_time_bucket_size ); } else { SPDLOG_ERROR("Unhandled aggregation type."); return false; } break; - case CommandLineArguments::OutputHandlerType::ResultsCache: + } + case CommandLineArguments::OutputHandlerType::ResultsCache: { + auto const& options{ + command_line_arguments.get_results_cache_output_handler_options() + }; output_handler = std::make_unique( - command_line_arguments.get_mongodb_uri(), - command_line_arguments.get_mongodb_collection(), - command_line_arguments.get_batch_size(), - command_line_arguments.get_max_num_results(), - command_line_arguments.get_dataset() + options.uri, + options.collection, + options.batch_size, + options.max_num_results, + options.dataset ); break; + } case CommandLineArguments::OutputHandlerType::Stdout: output_handler = std::make_unique(); break; @@ -359,11 +367,9 @@ int main(int argc, char const* argv[]) { if (command_line_arguments.get_output_handler_type() == CommandLineArguments::OutputHandlerType::Reducer) { - reducer_socket_fd = reducer::connect_to_reducer( - command_line_arguments.get_reducer_host(), - command_line_arguments.get_reducer_port(), - command_line_arguments.get_job_id() - ); + auto const& options{command_line_arguments.get_reducer_output_handler_options()}; + reducer_socket_fd + = reducer::connect_to_reducer(options.host, options.port, options.job_id); if (-1 == reducer_socket_fd) { SPDLOG_ERROR("Failed to connect to reducer"); return 1; diff --git a/components/core/src/clp_s/kv_ir_search.cpp b/components/core/src/clp_s/kv_ir_search.cpp index dc77efa1a2..da4964b11f 100644 --- a/components/core/src/clp_s/kv_ir_search.cpp +++ b/components/core/src/clp_s/kv_ir_search.cpp @@ -249,8 +249,8 @@ auto search_kv_ir_stream( return KvIrSearchError{KvIrSearchErrorEnum::ProjectionSupportNotImplemented}; } - if (command_line_arguments.do_count_by_time_aggregation() - || command_line_arguments.do_count_results_aggregation()) + if (CommandLineArguments::OutputHandlerType::Reducer + == command_line_arguments.get_output_handler_type()) { SPDLOG_ERROR("kv-ir search: Count support is not implemented."); return KvIrSearchError{KvIrSearchErrorEnum::CountSupportNotImplemented}; From 530dd7e935f7b21788cb912fc4e3f206064d596c Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Tue, 28 Apr 2026 17:09:40 +0000 Subject: [PATCH 2/5] Refactor output handler options parsing to make it easier to support multiple output handlers. --- components/core/src/clp_s/CMakeLists.txt | 2 - .../core/src/clp_s/CommandLineArguments.cpp | 248 ++++++++++++------ .../core/src/clp_s/CommandLineArguments.hpp | 17 +- 3 files changed, 171 insertions(+), 96 deletions(-) diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 6230d58529..af64b1450b 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -13,8 +13,6 @@ set( ../clp/BufferedReader.hpp ../clp/BufferReader.cpp ../clp/BufferReader.hpp - ../clp/cli_utils.cpp - ../clp/cli_utils.hpp ../clp/Defs.h ../clp/EncodedVariableInterpreter.cpp ../clp/EncodedVariableInterpreter.hpp diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 3e7c1dd216..2fdf2ab08d 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -1,5 +1,6 @@ #include "CommandLineArguments.hpp" +#include #include #include #include @@ -8,7 +9,6 @@ #include #include -#include "../clp/cli_utils.hpp" #include "../clp/type_utils.hpp" #include "../reducer/types.hpp" #include "FileReader.hpp" @@ -21,6 +21,37 @@ namespace { constexpr std::string_view cNoAuth{"none"}; constexpr std::string_view cS3Auth{"s3"}; +// Output handler constants +constexpr std::string_view cFileOutputHandlerName{"file"}; +constexpr std::string_view cNetworkOutputHandlerName{"network"}; +constexpr std::string_view cReducerOutputHandlerName{"reducer"}; +constexpr std::string_view cResultsCacheOutputHandlerName{"results-cache"}; +constexpr std::string_view cStdoutCacheOutputHandlerName{"stdout"}; + +/** + * Splits unrecognized options into lists of arguments for one or more known subcommands. + * @param subcommand_names + * @param options A vector of arguments that should correspond to a known set of subcommands. + * @return A map of subcommand names to a vector of corresponding arguments. + * @throws std::invalid_argument if a string of options begins with an unknown subcommand name. + */ +auto collect_subcommands( + std::set const& subcommand_names, + std::vector const& options +) -> std::map>; + +/** + * Parses options for a subcommand according to an options_description. + * @param options_description + * @param options + * @param parsed_options The parsed options. + */ +auto parse_subcommand_options( + po::options_description const& options_description, + std::vector const& options, + po::variables_map& parsed_options +) -> void; + /** * Read a list of newline-delimited paths from a file and put them into a vector passed by reference * TODO: deduplicate this code with the version in clp @@ -113,6 +144,54 @@ void validate_archive_paths( throw std::invalid_argument("No archive paths specified"); } } + +auto collect_subcommands( + std::set const& subcommand_names, + std::vector const& options +) -> std::map> { + std::optional current_subcommand; + std::optional> current_subcommand_arguments; + std::map> collected_subcommands; + + auto collect_subcommand = [&]() -> void { + if (current_subcommand.has_value() && current_subcommand_arguments.has_value()) { + collected_subcommands.emplace( + std::move(current_subcommand.value()), + std::move(current_subcommand_arguments.value()) + ); + current_subcommand.reset(); + current_subcommand_arguments.reset(); + } + }; + + for (auto const& option : options) { + if (subcommand_names.contains(option)) { + collect_subcommand(); + current_subcommand = option; + current_subcommand_arguments.emplace(); + continue; + } + + if (false == current_subcommand.has_value() + || false == current_subcommand_arguments.has_value()) + { + throw std::invalid_argument(fmt::format("unrecognized option \"{}\"", option)); + } + + current_subcommand_arguments.value().emplace_back(option); + } + collect_subcommand(); + return collected_subcommands; +} + +auto parse_subcommand_options( + po::options_description const& options_description, + std::vector const& options, + po::variables_map& parsed_options +) -> void { + po::store(po::command_line_parser(options).options(options_description).run(), parsed_options); + po::notify(parsed_options); +} } // namespace CommandLineArguments::ParsingResult @@ -568,7 +647,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { std::string query; po::options_description search_options; - std::string output_handler_name; std::string archive_path; // clang-format off search_options.add_options()( @@ -579,18 +657,15 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "query,q", po::value(&m_query), "Query to perform" - )( - "output-handler", - po::value(&output_handler_name) )( "output-handler-args", po::value>() ); // clang-format on + constexpr size_t cNumRequiredPositionalArguments{2}; po::positional_options_description positional_options; positional_options.add("archive-path", 1); positional_options.add("query", 1); - positional_options.add("output-handler", 1); positional_options.add("output-handler-args", -1); po::options_description match_options("Match Controls"); @@ -731,25 +806,18 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::notify(parsed_command_line_options); - constexpr char cFileOutputHandlerName[] = "file"; - constexpr char cNetworkOutputHandlerName[] = "network"; - constexpr char cReducerOutputHandlerName[] = "reducer"; - constexpr char cResultsCacheOutputHandlerName[] = "results-cache"; - constexpr char cStdoutCacheOutputHandlerName[] = "stdout"; - if (parsed_command_line_options.count("help")) { print_search_usage(); std::cerr << "OUTPUT_HANDLER is one of:" << std::endl; - std::cerr << " " << static_cast(cStdoutCacheOutputHandlerName) + std::cerr << " " << cStdoutCacheOutputHandlerName << " (default) - Output to stdout" << std::endl; - std::cerr << " " << static_cast(cFileOutputHandlerName) - << " - Output to a file" << std::endl; - std::cerr << " " << static_cast(cNetworkOutputHandlerName) + std::cerr << " " << cFileOutputHandlerName << " - Output to a file" << std::endl; + std::cerr << " " << cNetworkOutputHandlerName << " - Output to a network destination" << std::endl; - std::cerr << " " << static_cast(cResultsCacheOutputHandlerName) + std::cerr << " " << cResultsCacheOutputHandlerName << " - Output to the results cache" << std::endl; - std::cerr << " " << static_cast(cReducerOutputHandlerName) - << " - Output to the reducer" << std::endl; + std::cerr << " " << cReducerOutputHandlerName << " - Output to the reducer" + << std::endl; std::cerr << std::endl; std::cerr << "Examples:" << std::endl; @@ -807,6 +875,38 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { return ParsingResult::InfoCommand; } + if (false == parsed_command_line_options.count("archive-path")) { + throw std::invalid_argument( + "missing required positional argument \"ARCHIVES_DIR\"" + ); + } + + if (false == parsed_command_line_options.count("query")) { + throw std::invalid_argument("missing required positional argument \"KQL_QUERY\""); + } + + auto unrecognized_output_options{ + po::collect_unrecognized(search_parsed.options, po::include_positional) + }; + unrecognized_output_options.erase( + unrecognized_output_options.begin(), + unrecognized_output_options.begin() + + std::min( + cNumRequiredPositionalArguments, + unrecognized_output_options.size() + ) + ); + std::set const subcommand_names{ + std::string{cFileOutputHandlerName}, + std::string{cNetworkOutputHandlerName}, + std::string{cReducerOutputHandlerName}, + std::string{cResultsCacheOutputHandlerName}, + std::string{cStdoutCacheOutputHandlerName} + }; + auto const output_options_map{ + collect_subcommands(subcommand_names, unrecognized_output_options) + }; + validate_archive_paths(archive_path, archive_id, m_input_paths); validate_network_auth(auth, m_network_auth); @@ -831,62 +931,50 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); } - if (parsed_command_line_options.count("output-handler") > 0) { - if (static_cast(cNetworkOutputHandlerName) == output_handler_name) { + if (output_options_map.size() > 1) { + throw std::invalid_argument("clp-s only supports one output handler at a time"); + } + + for (auto const& [output_handler_name, output_handler_options] : output_options_map) { + if (cNetworkOutputHandlerName == output_handler_name) { m_output_handler_type = OutputHandlerType::Network; - } else if ((static_cast(cReducerOutputHandlerName) - == output_handler_name)) - { + parse_network_dest_output_handler_options( + network_output_handler_options, + output_handler_options + ); + } else if (cReducerOutputHandlerName == output_handler_name) { m_output_handler_type = OutputHandlerType::Reducer; - } else if ((static_cast(cResultsCacheOutputHandlerName) - == output_handler_name)) - { + parse_reducer_output_handler_options( + reducer_output_handler_options, + output_handler_options + ); + } else if (cResultsCacheOutputHandlerName == output_handler_name) { m_output_handler_type = OutputHandlerType::ResultsCache; - } else if ((static_cast(cStdoutCacheOutputHandlerName) - == output_handler_name)) - { + parse_results_cache_output_handler_options( + results_cache_output_handler_options, + output_handler_options + ); + } else if (cStdoutCacheOutputHandlerName == output_handler_name) { m_output_handler_type = OutputHandlerType::Stdout; - } else if ((static_cast(cFileOutputHandlerName) - == output_handler_name)) - { + if (false == output_handler_options.empty()) { + std::string error_msg{fmt::format( + "stdout output handler does not support \"{}\"", + output_handler_options.front() + )}; + throw std::invalid_argument(error_msg); + } + } else if (cFileOutputHandlerName == output_handler_name) { m_output_handler_type = OutputHandlerType::File; + parse_file_output_handler_options( + file_output_handler_options, + output_handler_options + ); } else if (output_handler_name.empty()) { throw std::invalid_argument("OUTPUT_HANDLER cannot be an empty string."); } else { throw std::invalid_argument("Unknown OUTPUT_HANDLER: " + output_handler_name); } } - - if (OutputHandlerType::Network == m_output_handler_type) { - parse_network_dest_output_handler_options( - network_output_handler_options, - search_parsed.options, - parsed_command_line_options - ); - } else if (OutputHandlerType::Reducer == m_output_handler_type) { - parse_reducer_output_handler_options( - reducer_output_handler_options, - search_parsed.options, - parsed_command_line_options - ); - } else if (OutputHandlerType::ResultsCache == m_output_handler_type) { - parse_results_cache_output_handler_options( - results_cache_output_handler_options, - search_parsed.options, - parsed_command_line_options - ); - } else if (OutputHandlerType::File == m_output_handler_type) { - parse_file_output_handler_options( - file_output_handler_options, - search_parsed.options, - parsed_command_line_options - ); - } else if (m_output_handler_type != OutputHandlerType::Stdout) { - throw std::invalid_argument( - "Unhandled OutputHandlerType=" - + std::to_string(clp::enum_to_underlying_type(m_output_handler_type)) - ); - } } } catch (std::exception& e) { SPDLOG_ERROR("{}", e.what()); @@ -901,10 +989,10 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { void CommandLineArguments::parse_network_dest_output_handler_options( po::options_description const& options_description, - std::vector const& options, - po::variables_map& parsed_options + std::vector const& options ) { - clp::parse_unrecognized_options(options_description, options, parsed_options); + po::variables_map parsed_options; + parse_subcommand_options(options_description, options, parsed_options); if (parsed_options.count("host") == 0) { throw std::invalid_argument("host must be specified."); @@ -923,10 +1011,10 @@ void CommandLineArguments::parse_network_dest_output_handler_options( void CommandLineArguments::parse_reducer_output_handler_options( po::options_description const& options_description, - std::vector const& options, - po::variables_map& parsed_options + std::vector const& options ) { - clp::parse_unrecognized_options(options_description, options, parsed_options); + po::variables_map parsed_options; + parse_subcommand_options(options_description, options, parsed_options); if (parsed_options.count("host") == 0) { throw std::invalid_argument("host must be specified."); @@ -963,9 +1051,7 @@ void CommandLineArguments::parse_reducer_output_handler_options( } if (m_reducer_output_handler_options.count_by_time_bucket_size <= 0) { - throw std::invalid_argument( - "Value for count-by-time must be greater than zero." - ); + throw std::invalid_argument("Value for count-by-time must be greater than zero."); } m_reducer_output_handler_options.aggregation_type = AggregationType::CountByTime; @@ -974,18 +1060,18 @@ void CommandLineArguments::parse_reducer_output_handler_options( if (false == has_aggregation) { throw std::invalid_argument( - "The reducer output handler currently only supports count and" - " count-by-time aggregations." + "The reducer output handler currently only supports count and" + " count-by-time aggregations." ); } } void CommandLineArguments::parse_results_cache_output_handler_options( po::options_description const& options_description, - std::vector const& options, - po::variables_map& parsed_options + std::vector const& options ) { - clp::parse_unrecognized_options(options_description, options, parsed_options); + po::variables_map parsed_options; + parse_subcommand_options(options_description, options, parsed_options); if (parsed_options.count("uri") == 0) { throw std::invalid_argument("uri must be specified."); @@ -1012,10 +1098,10 @@ void CommandLineArguments::parse_results_cache_output_handler_options( void CommandLineArguments::parse_file_output_handler_options( po::options_description const& options_description, - std::vector const& options, - po::variables_map& parsed_options + std::vector const& options ) { - clp::parse_unrecognized_options(options_description, options, parsed_options); + po::variables_map parsed_options; + parse_subcommand_options(options_description, options, parsed_options); if (parsed_options.count("path") == 0) { throw std::invalid_argument("path must be specified."); } diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index ae11795cc9..3a9f31448b 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -7,7 +7,6 @@ #include #include -#include #include #include @@ -162,12 +161,10 @@ class CommandLineArguments { * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may * contain options that have the unrecognized flag set - * @param parsed_options Returns any parsed options that were newly recognized */ void parse_network_dest_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options, - boost::program_options::variables_map& parsed_options + std::vector const& options ); /** @@ -175,12 +172,10 @@ class CommandLineArguments { * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may * contain options that have the unrecognized flag set - * @param parsed_options Returns any parsed options that were newly recognized */ void parse_reducer_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options, - boost::program_options::variables_map& parsed_options + std::vector const& options ); /** @@ -188,12 +183,10 @@ class CommandLineArguments { * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may * contain options that have the unrecognized flag set - * @param parsed_options Returns any parsed options that were newly recognized */ void parse_results_cache_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options, - boost::program_options::variables_map& parsed_options + std::vector const& options ); /** @@ -201,12 +194,10 @@ class CommandLineArguments { * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may * contain options that have the unrecognized flag set - * @param parsed_options Returns any parsed options that were newly recognized */ void parse_file_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options, - boost::program_options::variables_map& parsed_options + std::vector const& options ); void print_basic_usage() const; From 0984f724e62a6934757854db2f5e3748eb3b21a7 Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Tue, 28 Apr 2026 19:03:31 +0000 Subject: [PATCH 3/5] Group aggregation options with reducer output handler options. --- .../executor/query/fs_search_task.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index 5f9c4af5a0..7b7e6aadd1 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -142,12 +142,6 @@ def _make_command_and_env_vars( if search_config.aggregation_config is not None: aggregation_config = search_config.aggregation_config - if aggregation_config.do_count_aggregation is not None: - command.append("--count") - if aggregation_config.count_by_time_bucket_size is not None: - command.append("--count-by-time") - command.append(str(aggregation_config.count_by_time_bucket_size)) - # fmt: off command.extend(( "reducer", @@ -156,6 +150,11 @@ def _make_command_and_env_vars( "--job-id", str(aggregation_config.job_id) )) # fmt: on + if aggregation_config.do_count_aggregation is not None: + command.append("--count") + if aggregation_config.count_by_time_bucket_size is not None: + command.append("--count-by-time") + command.append(str(aggregation_config.count_by_time_bucket_size)) elif search_config.network_address is not None: # fmt: off command.extend(( From 359ab95fea5c0ca02dce8c5de9da9bb63991387c Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Tue, 28 Apr 2026 20:47:14 +0000 Subject: [PATCH 4/5] Use the options_description for each subcommand to more accurately separate options related to a given subcommand. --- .../core/src/clp_s/CommandLineArguments.cpp | 85 ++++++++++++++++--- 1 file changed, 72 insertions(+), 13 deletions(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 2fdf2ab08d..fcbc4dbc64 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -30,16 +30,29 @@ constexpr std::string_view cStdoutCacheOutputHandlerName{"stdout"}; /** * Splits unrecognized options into lists of arguments for one or more known subcommands. - * @param subcommand_names + * @param subcommands A map of subcommands characterized by the positional subcommand name and a + * description of the valid arguments for the subcommand. * @param options A vector of arguments that should correspond to a known set of subcommands. * @return A map of subcommand names to a vector of corresponding arguments. * @throws std::invalid_argument if a string of options begins with an unknown subcommand name. */ auto collect_subcommands( - std::set const& subcommand_names, + std::map const& subcommands, std::vector const& options ) -> std::map>; +/** + * Gets the maximum number of tokens for an option. + * @param options_description The options description potentially containing the option. + * @param option + * @return The maximum number of tokens for the option, or an std::nullopt if the maximum number of + * tokens could not be determined. + */ +auto get_max_tokens_for_option( + po::options_description const* options_description, + std::string const& option +) -> std::optional; + /** * Parses options for a subcommand according to an options_description. * @param options_description @@ -146,29 +159,40 @@ void validate_archive_paths( } auto collect_subcommands( - std::set const& subcommand_names, + std::map const& subcommands, std::vector const& options ) -> std::map> { std::optional current_subcommand; std::optional> current_subcommand_arguments; + po::options_description const* current_subcommand_options_description{}; + unsigned remaining_tokens_in_current_argument{}; std::map> collected_subcommands; auto collect_subcommand = [&]() -> void { - if (current_subcommand.has_value() && current_subcommand_arguments.has_value()) { + if (current_subcommand.has_value()) { collected_subcommands.emplace( std::move(current_subcommand.value()), std::move(current_subcommand_arguments.value()) ); current_subcommand.reset(); current_subcommand_arguments.reset(); + current_subcommand_options_description = nullptr; + remaining_tokens_in_current_argument = 0; } }; for (auto const& option : options) { - if (subcommand_names.contains(option)) { + if (remaining_tokens_in_current_argument > 0) { + current_subcommand_arguments.value().emplace_back(option); + --remaining_tokens_in_current_argument; + continue; + } + + if (subcommands.contains(option)) { collect_subcommand(); current_subcommand = option; current_subcommand_arguments.emplace(); + current_subcommand_options_description = subcommands.at(option); continue; } @@ -179,17 +203,51 @@ auto collect_subcommands( } current_subcommand_arguments.value().emplace_back(option); + remaining_tokens_in_current_argument + = get_max_tokens_for_option(current_subcommand_options_description, option) + .value_or(0); } collect_subcommand(); return collected_subcommands; } +auto get_max_tokens_for_option( + po::options_description const* options_description, + std::string const& option +) -> std::optional { + if (nullptr == options_description) { + return std::nullopt; + } + + std::string trimmed_option{ + std::find_if( + option.begin(), + option.end(), + [](unsigned char c) -> bool { return c != '-'; } + ), + option.end() + }; + if (auto const option_description{options_description->find_nothrow(trimmed_option, true)}; + nullptr != option_description) + { + auto const semantic{option_description->semantic()}; + if (nullptr == semantic) { + return 0; + } + return semantic->max_tokens(); + } + return std::nullopt; +} + auto parse_subcommand_options( po::options_description const& options_description, std::vector const& options, po::variables_map& parsed_options ) -> void { - po::store(po::command_line_parser(options).options(options_description).run(), parsed_options); + po::store( + po::command_line_parser(options).options(options_description).positional({}).run(), + parsed_options + ); po::notify(parsed_options); } } // namespace @@ -896,15 +954,16 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { unrecognized_output_options.size() ) ); - std::set const subcommand_names{ - std::string{cFileOutputHandlerName}, - std::string{cNetworkOutputHandlerName}, - std::string{cReducerOutputHandlerName}, - std::string{cResultsCacheOutputHandlerName}, - std::string{cStdoutCacheOutputHandlerName} + std::map const subcommands{ + {std::string{cFileOutputHandlerName}, &file_output_handler_options}, + {std::string{cNetworkOutputHandlerName}, &network_output_handler_options}, + {std::string{cReducerOutputHandlerName}, &reducer_output_handler_options}, + {std::string{cResultsCacheOutputHandlerName}, + &results_cache_output_handler_options}, + {std::string{cStdoutCacheOutputHandlerName}, nullptr} }; auto const output_options_map{ - collect_subcommands(subcommand_names, unrecognized_output_options) + collect_subcommands(subcommands, unrecognized_output_options) }; validate_archive_paths(archive_path, archive_id, m_input_paths); From fa3eb39f653629e53ec6f10f92ce3839eecbc2b8 Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Thu, 21 May 2026 15:54:51 +0000 Subject: [PATCH 5/5] Update search output destination options to use variant with visitor pattern. --- .../core/src/clp_s/CommandLineArguments.cpp | 101 ++++++++------- .../core/src/clp_s/CommandLineArguments.hpp | 68 +++++------ components/core/src/clp_s/clp-s.cpp | 115 ++++++++++-------- components/core/src/clp_s/kv_ir_search.cpp | 28 ++--- 4 files changed, 166 insertions(+), 146 deletions(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index fcbc4dbc64..752f12ffa3 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -766,6 +766,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { // clang-format on search_options.add(match_options); + NetworkOutputHandlerOptions network_options{}; po::options_description network_output_handler_options( "Network Output Handler Options" ); @@ -773,15 +774,16 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { network_output_handler_options.add_options()( "host", po::value( - &m_network_output_handler_options.host)->value_name("HOST"), + &network_options.host)->value_name("HOST"), "Network destination host" )( "port", - po::value(&m_network_output_handler_options.port)->value_name("PORT"), + po::value(&network_options.port)->value_name("PORT"), "Network destination port" ); // clang-format on + ReducerOutputHandlerOptions reducer_options{}; po::options_description reducer_output_handler_options( "Reducer Output Handler Options" ); @@ -789,16 +791,16 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { reducer_output_handler_options.add_options()( "host", po::value( - &m_reducer_output_handler_options.host)->value_name("HOST"), + &reducer_options.host)->value_name("HOST"), "Host the reducer is running on" )( "port", - po::value(&m_reducer_output_handler_options.port)->value_name("PORT"), + po::value(&reducer_options.port)->value_name("PORT"), "Port the reducer is listening on" )( "job-id", po::value( - &m_reducer_output_handler_options.job_id)->value_name("ID"), + &reducer_options.job_id)->value_name("ID"), "Job ID for the requested aggregation operation" )( "count", @@ -806,49 +808,50 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { )( "count-by-time", po::value( - &m_reducer_output_handler_options.count_by_time_bucket_size + &reducer_options.count_by_time_bucket_size )->value_name("SIZE"), "Count the number of results in each time span of the given size (ms)" ); // clang-format on + ResultsCacheOutputHandlerOptions results_cache_options{}; po::options_description results_cache_output_handler_options( "Results Cache Output Handler Options" ); results_cache_output_handler_options.add_options()( "uri", po::value( - &m_results_cache_output_handler_options.uri)->value_name("URI"), + &results_cache_options.uri)->value_name("URI"), "MongoDB URI for the results cache" )( "collection", po::value( - &m_results_cache_output_handler_options.collection)->value_name("COLLECTION"), + &results_cache_options.collection)->value_name("COLLECTION"), "MongoDB collection to output to" )( "batch-size", po::value( - &m_results_cache_output_handler_options.batch_size)->value_name("SIZE")-> - default_value(m_results_cache_output_handler_options.batch_size), + &results_cache_options.batch_size)->value_name("SIZE")-> + default_value(results_cache_options.batch_size), "The number of documents to insert into MongoDB per batch" )( "max-num-results", po::value( - &m_results_cache_output_handler_options.max_num_results)->value_name("MAX")-> - default_value(m_results_cache_output_handler_options.max_num_results), + &results_cache_options.max_num_results)->value_name("MAX")-> + default_value(results_cache_options.max_num_results), "The maximum number of results to output" )( "dataset", po::value( - &m_results_cache_output_handler_options.dataset)->value_name("DATASET"), + &results_cache_options.dataset)->value_name("DATASET"), "The dataset name to include in each result document" ); + FileOutputHandlerOptions file_options{}; po::options_description file_output_handler_options("File Output Handler Options"); file_output_handler_options.add_options()( "path", - po::value(&m_file_output_handler_options.output_path) - ->value_name("PATH"), + po::value(&file_options.output_path)->value_name("PATH"), "File output path" ); @@ -996,25 +999,34 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { for (auto const& [output_handler_name, output_handler_options] : output_options_map) { if (cNetworkOutputHandlerName == output_handler_name) { - m_output_handler_type = OutputHandlerType::Network; parse_network_dest_output_handler_options( network_output_handler_options, - output_handler_options + output_handler_options, + network_options + ); + m_output_handler_options.emplace( + std::move(network_options) ); } else if (cReducerOutputHandlerName == output_handler_name) { - m_output_handler_type = OutputHandlerType::Reducer; parse_reducer_output_handler_options( reducer_output_handler_options, - output_handler_options + output_handler_options, + reducer_options + ); + m_output_handler_options.emplace( + std::move(reducer_options) ); } else if (cResultsCacheOutputHandlerName == output_handler_name) { - m_output_handler_type = OutputHandlerType::ResultsCache; parse_results_cache_output_handler_options( results_cache_output_handler_options, - output_handler_options + output_handler_options, + results_cache_options + ); + m_output_handler_options.emplace( + std::move(results_cache_options) ); } else if (cStdoutCacheOutputHandlerName == output_handler_name) { - m_output_handler_type = OutputHandlerType::Stdout; + m_output_handler_options.emplace(); if (false == output_handler_options.empty()) { std::string error_msg{fmt::format( "stdout output handler does not support \"{}\"", @@ -1023,10 +1035,13 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { throw std::invalid_argument(error_msg); } } else if (cFileOutputHandlerName == output_handler_name) { - m_output_handler_type = OutputHandlerType::File; parse_file_output_handler_options( file_output_handler_options, - output_handler_options + output_handler_options, + file_options + ); + m_output_handler_options.emplace( + std::move(file_options) ); } else if (output_handler_name.empty()) { throw std::invalid_argument("OUTPUT_HANDLER cannot be an empty string."); @@ -1048,7 +1063,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { void CommandLineArguments::parse_network_dest_output_handler_options( po::options_description const& options_description, - std::vector const& options + std::vector const& options, + NetworkOutputHandlerOptions& network_options ) { po::variables_map parsed_options; parse_subcommand_options(options_description, options, parsed_options); @@ -1056,21 +1072,22 @@ void CommandLineArguments::parse_network_dest_output_handler_options( if (parsed_options.count("host") == 0) { throw std::invalid_argument("host must be specified."); } - if (m_network_output_handler_options.host.empty()) { + if (network_options.host.empty()) { throw std::invalid_argument("host cannot be an empty string."); } if (parsed_options.count("port") == 0) { throw std::invalid_argument("port must be specified."); } - if (m_network_output_handler_options.port <= 0) { + if (network_options.port <= 0) { throw std::invalid_argument("port must be greater than zero."); } } void CommandLineArguments::parse_reducer_output_handler_options( po::options_description const& options_description, - std::vector const& options + std::vector const& options, + ReducerOutputHandlerOptions& reducer_options ) { po::variables_map parsed_options; parse_subcommand_options(options_description, options, parsed_options); @@ -1079,27 +1096,27 @@ void CommandLineArguments::parse_reducer_output_handler_options( throw std::invalid_argument("host must be specified."); } - if (m_reducer_output_handler_options.host.empty()) { + if (reducer_options.host.empty()) { throw std::invalid_argument("host cannot be an empty string."); } if (parsed_options.count("port") == 0) { throw std::invalid_argument("port must be specified."); } - if (m_reducer_output_handler_options.port <= 0) { + if (reducer_options.port <= 0) { throw std::invalid_argument("port must be greater than zero."); } if (parsed_options.count("job-id") == 0) { throw std::invalid_argument("job-id must be specified."); } - if (m_reducer_output_handler_options.job_id < 0) { + if (reducer_options.job_id < 0) { throw std::invalid_argument("job-id cannot be negative."); } bool has_aggregation{}; if (parsed_options.count("count")) { - m_reducer_output_handler_options.aggregation_type = AggregationType::Count; + reducer_options.aggregation_type = AggregationType::Count; has_aggregation = true; } if (parsed_options.count("count-by-time")) { @@ -1109,11 +1126,11 @@ void CommandLineArguments::parse_reducer_output_handler_options( ); } - if (m_reducer_output_handler_options.count_by_time_bucket_size <= 0) { + if (reducer_options.count_by_time_bucket_size <= 0) { throw std::invalid_argument("Value for count-by-time must be greater than zero."); } - m_reducer_output_handler_options.aggregation_type = AggregationType::CountByTime; + reducer_options.aggregation_type = AggregationType::CountByTime; has_aggregation = true; } @@ -1127,7 +1144,8 @@ void CommandLineArguments::parse_reducer_output_handler_options( void CommandLineArguments::parse_results_cache_output_handler_options( po::options_description const& options_description, - std::vector const& options + std::vector const& options, + ResultsCacheOutputHandlerOptions& results_cache_options ) { po::variables_map parsed_options; parse_subcommand_options(options_description, options, parsed_options); @@ -1135,36 +1153,37 @@ void CommandLineArguments::parse_results_cache_output_handler_options( if (parsed_options.count("uri") == 0) { throw std::invalid_argument("uri must be specified."); } - if (m_results_cache_output_handler_options.uri.empty()) { + if (results_cache_options.uri.empty()) { throw std::invalid_argument("uri cannot be an empty string."); } if (parsed_options.count("collection") == 0) { throw std::invalid_argument("collection must be specified."); } - if (m_results_cache_output_handler_options.collection.empty()) { + if (results_cache_options.collection.empty()) { throw std::invalid_argument("collection cannot be an empty string."); } - if (0 == m_results_cache_output_handler_options.batch_size) { + if (0 == results_cache_options.batch_size) { throw std::invalid_argument("batch-size cannot be 0."); } - if (0 == m_results_cache_output_handler_options.max_num_results) { + if (0 == results_cache_options.max_num_results) { throw std::invalid_argument("max-num-results cannot be 0."); } } void CommandLineArguments::parse_file_output_handler_options( po::options_description const& options_description, - std::vector const& options + std::vector const& options, + FileOutputHandlerOptions& file_options ) { po::variables_map parsed_options; parse_subcommand_options(options_description, options, parsed_options); if (parsed_options.count("path") == 0) { throw std::invalid_argument("path must be specified."); } - if (m_file_output_handler_options.output_path.empty()) { + if (file_options.output_path.empty()) { throw std::invalid_argument("path cannot be an empty string."); } } diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 3a9f31448b..ee4f7c6a94 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -30,14 +31,6 @@ class CommandLineArguments { Search = 's' }; - enum class OutputHandlerType : uint8_t { - File = 0, - Network, - Reducer, - ResultsCache, - Stdout, - }; - enum class AggregationType : uint8_t { Count, CountByTime, @@ -68,6 +61,15 @@ class CommandLineArguments { int64_t count_by_time_bucket_size{}; // Milliseconds }; + struct StdoutOutputHandlerOptions {}; + + using OutputHandlerOptionsVariant = std:: + variant; + // Constructors explicit CommandLineArguments(std::string const& program_name) : m_program_name(program_name) {} @@ -117,25 +119,10 @@ class CommandLineArguments { bool get_ignore_case() const { return m_ignore_case; } - auto get_results_cache_output_handler_options() const - -> ResultsCacheOutputHandlerOptions const& { - return m_results_cache_output_handler_options; - } - - auto get_file_output_handler_options() const -> FileOutputHandlerOptions const& { - return m_file_output_handler_options; + auto get_output_handler_options() const -> OutputHandlerOptionsVariant const& { + return m_output_handler_options; } - auto get_network_output_handler_options() const -> NetworkOutputHandlerOptions const& { - return m_network_output_handler_options; - } - - auto get_reducer_output_handler_options() const -> ReducerOutputHandlerOptions const& { - return m_reducer_output_handler_options; - } - - OutputHandlerType get_output_handler_type() const { return m_output_handler_type; } - [[nodiscard]] auto get_retain_float_format() const -> bool { return false == m_no_retain_float_format; } @@ -160,44 +147,53 @@ class CommandLineArguments { * Validates output options related to the Network Destination output handler. * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may - * contain options that have the unrecognized flag set + * contain options that have the unrecognized flag set. + * @param network_options The parsed representation of the network output handler options. */ void parse_network_dest_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options + std::vector const& options, + NetworkOutputHandlerOptions& network_options ); /** * Validates output options related to the Reducer output handler. * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may - * contain options that have the unrecognized flag set + * contain options that have the unrecognized flag set. + * @param reducer_options The parsed representation of the reducer output handler options. */ void parse_reducer_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options + std::vector const& options, + ReducerOutputHandlerOptions& reducer_options ); /** * Validates output options related to the Results Cache output handler. * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may - * contain options that have the unrecognized flag set + * contain options that have the unrecognized flag set. + * @param results_cache_options The parsed representation of the results cache output handler + * options. */ void parse_results_cache_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options + std::vector const& options, + ResultsCacheOutputHandlerOptions& results_cache_options ); /** * Validates output options related to the File output handler. * @param options_description * @param options Vector of options previously parsed by boost::program_options and which may - * contain options that have the unrecognized flag set + * contain options that have the unrecognized flag set. + * @param file_options The parsed representation of the file output handler options. */ void parse_file_output_handler_options( boost::program_options::options_description const& options_description, - std::vector const& options + std::vector const& options, + FileOutputHandlerOptions& file_options ); void print_basic_usage() const; @@ -235,11 +231,7 @@ class CommandLineArguments { std::string m_mongodb_collection; // Search output handler options - OutputHandlerType m_output_handler_type{OutputHandlerType::Stdout}; - ResultsCacheOutputHandlerOptions m_results_cache_output_handler_options{}; - NetworkOutputHandlerOptions m_network_output_handler_options{}; - FileOutputHandlerOptions m_file_output_handler_options{}; - ReducerOutputHandlerOptions m_reducer_output_handler_options{}; + OutputHandlerOptionsVariant m_output_handler_options{StdoutOutputHandlerOptions{}}; // Search variables std::string m_query; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 0f71d5165a..31308b6ad0 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -16,6 +17,8 @@ #if CLP_BUILD_CLP_S_ENABLE_CURL #include "../clp/CurlGlobalInstance.hpp" #endif +#include + #include "../clp/ir/constants.hpp" #include "../clp/streaming_archive/ArchiveMetadata.hpp" #include "../reducer/network_utils.hpp" @@ -225,55 +228,60 @@ bool search_archive( std::unique_ptr output_handler; try { - switch (command_line_arguments.get_output_handler_type()) { - case CommandLineArguments::OutputHandlerType::File: { - auto const& options{command_line_arguments.get_file_output_handler_options()}; - output_handler - = std::make_unique(options.output_path, true); - break; - } - case CommandLineArguments::OutputHandlerType::Network: { - auto const& options{command_line_arguments.get_network_output_handler_options()}; - output_handler - = std::make_unique(options.host, options.port); - break; - } - case CommandLineArguments::OutputHandlerType::Reducer: { - auto const& options{command_line_arguments.get_reducer_output_handler_options()}; - if (CommandLineArguments::AggregationType::Count == options.aggregation_type) { - output_handler = std::make_unique(reducer_socket_fd); - } else if (CommandLineArguments::AggregationType::CountByTime - == options.aggregation_type) - { - output_handler = std::make_unique( - reducer_socket_fd, - options.count_by_time_bucket_size - ); - } else { - SPDLOG_ERROR("Unhandled aggregation type."); - return false; - } - break; - } - case CommandLineArguments::OutputHandlerType::ResultsCache: { - auto const& options{ - command_line_arguments.get_results_cache_output_handler_options() - }; - output_handler = std::make_unique( - options.uri, - options.collection, - options.batch_size, - options.max_num_results, - options.dataset - ); - break; - } - case CommandLineArguments::OutputHandlerType::Stdout: - output_handler = std::make_unique(); - break; - default: - SPDLOG_ERROR("Unhandled OutputHandlerType."); - return false; + std::visit( + clp::overloaded{ + [&](CommandLineArguments::FileOutputHandlerOptions const& options) -> void { + output_handler = std::make_unique( + options.output_path, + true + ); + }, + [&](CommandLineArguments::NetworkOutputHandlerOptions const& options) + -> void { + output_handler = std::make_unique( + options.host, + options.port + ); + }, + [&](CommandLineArguments::ReducerOutputHandlerOptions const& options) + -> void { + if (CommandLineArguments::AggregationType::Count + == options.aggregation_type) { + output_handler = std::make_unique( + reducer_socket_fd + ); + } else if (CommandLineArguments::AggregationType::CountByTime + == options.aggregation_type) + { + output_handler = std::make_unique( + reducer_socket_fd, + options.count_by_time_bucket_size + ); + } else { + SPDLOG_ERROR("Unhandled aggregation type."); + output_handler = nullptr; + } + }, + [&](CommandLineArguments::ResultsCacheOutputHandlerOptions const& options) + -> void { + output_handler = std::make_unique( + options.uri, + options.collection, + options.batch_size, + options.max_num_results, + options.dataset + ); + }, + [&](CommandLineArguments::StdoutOutputHandlerOptions const& options) + -> void { + output_handler = std::make_unique(); + } + }, + command_line_arguments.get_output_handler_options() + ); + if (nullptr == output_handler) { + SPDLOG_ERROR("Failed to create output handler."); + return false; } } catch (std::exception const& e) { SPDLOG_ERROR("Failed to create output handler - {}", e.what()); @@ -364,10 +372,13 @@ int main(int argc, char const* argv[]) { } int reducer_socket_fd{-1}; - if (command_line_arguments.get_output_handler_type() - == CommandLineArguments::OutputHandlerType::Reducer) + if (std::holds_alternative( + command_line_arguments.get_output_handler_options() + )) { - auto const& options{command_line_arguments.get_reducer_output_handler_options()}; + auto const& options{std::get( + command_line_arguments.get_output_handler_options() + )}; reducer_socket_fd = reducer::connect_to_reducer(options.host, options.port, options.job_id); if (-1 == reducer_socket_fd) { diff --git a/components/core/src/clp_s/kv_ir_search.cpp b/components/core/src/clp_s/kv_ir_search.cpp index da4964b11f..be344120f2 100644 --- a/components/core/src/clp_s/kv_ir_search.cpp +++ b/components/core/src/clp_s/kv_ir_search.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -132,19 +133,15 @@ auto IrUnitHandler::create( CommandLineArguments const& command_line_arguments, [[maybe_unused]] int reducer_socket_fd ) -> ystdlib::error_handling::Result { - switch (command_line_arguments.get_output_handler_type()) { - case CommandLineArguments::OutputHandlerType::Stdout: - break; - case CommandLineArguments::OutputHandlerType::Network: - case CommandLineArguments::OutputHandlerType::Reducer: - case CommandLineArguments::OutputHandlerType::ResultsCache: - SPDLOG_ERROR( - "kv-ir search: Only stdout output is supported in the current implementation." - ); - return KvIrSearchError{KvIrSearchErrorEnum::UnsupportedOutputHandlerType}; - default: - SPDLOG_ERROR("kv-ir search: Unknown output method."); - return KvIrSearchError{KvIrSearchErrorEnum::UnsupportedOutputHandlerType}; + if (false + == std::holds_alternative( + command_line_arguments.get_output_handler_options() + )) + { + SPDLOG_ERROR( + "kv-ir search: Only stdout output is supported in the current implementation." + ); + return KvIrSearchError{KvIrSearchErrorEnum::UnsupportedOutputHandlerType}; } return IrUnitHandler{}; } @@ -249,8 +246,9 @@ auto search_kv_ir_stream( return KvIrSearchError{KvIrSearchErrorEnum::ProjectionSupportNotImplemented}; } - if (CommandLineArguments::OutputHandlerType::Reducer - == command_line_arguments.get_output_handler_type()) + if (std::holds_alternative( + command_line_arguments.get_output_handler_options() + )) { SPDLOG_ERROR("kv-ir search: Count support is not implemented."); return KvIrSearchError{KvIrSearchErrorEnum::CountSupportNotImplemented};