diff --git a/ftdc_exporter/ftdc/stream.go b/ftdc_exporter/ftdc/stream.go index ef6c8d3..5e12593 100644 --- a/ftdc_exporter/ftdc/stream.go +++ b/ftdc_exporter/ftdc/stream.go @@ -9,18 +9,25 @@ import ( ) func streamFTDCMetricsInBatches(ctx context.Context, path string, metricsIncludeFilePath string, batchSize, buffer int) (<-chan StreamBatch, <-chan error) { - metricsIncludeFile, err := os.Open(metricsIncludeFilePath) - if err != nil { - fmt.Errorf("couldn't open BSON file: %v", err) + out := make(chan StreamBatch, buffer) + errc := make(chan error, 1) + sendErr := func(err error) (<-chan StreamBatch, <-chan error) { + errc <- err + close(out) + close(errc) + return out, errc } + metricsIncludeFile, err := os.Open(metricsIncludeFilePath) + if err != nil { + return sendErr(fmt.Errorf("couldn't open metrics include file %q: %w", metricsIncludeFilePath, err)) + } defer metricsIncludeFile.Close() file, err := os.Open(path) if err != nil { - fmt.Errorf("couldn't open BSON file: %v", err) - + return sendErr(fmt.Errorf("couldn't open FTDC file %q: %w", path, err)) } scanner := bufio.NewScanner(metricsIncludeFile) @@ -32,9 +39,6 @@ func streamFTDCMetricsInBatches(ctx context.Context, path string, metricsInclude } } - out := make(chan StreamBatch, buffer) - errc := make(chan error, 1) - iter := readFTDCData(ctx, file) go func() { diff --git a/ftdc_exporter/internal/config/config.go b/ftdc_exporter/internal/config/config.go index 517fc02..cbe5307 100644 --- a/ftdc_exporter/internal/config/config.go +++ b/ftdc_exporter/internal/config/config.go @@ -38,7 +38,7 @@ func ParseFlags() *Config { flag.IntVar(&cfg.Parallel, "parallel", 4, "Number of files to process in parallel") flag.IntVar(&cfg.BatchSize, "batch-size", 1000, "Number of FTDC metrics per batch") flag.IntVar(&cfg.BatchBuffer, "batch-buffer", 1, "Number of batches to queue before blocking") - flag.StringVar(&cfg.MetricsIncludeFile, "metrics-include-file", "", "Number of batches to queue before blocking") + flag.StringVar(&cfg.MetricsIncludeFile, "metrics-include-file", "", "Path to a file listing metric keys to include (one per line); if empty, all metrics are included") flag.BoolVar(&cfg.Debug, "debug", false, "Enable debug logging") flag.BoolVar(&cfg.WaitForever, "wait-forever", true, "Wait indefinitely") diff --git a/ftdc_exporter/main.go b/ftdc_exporter/main.go index 364e37b..1f54630 100644 --- a/ftdc_exporter/main.go +++ b/ftdc_exporter/main.go @@ -21,7 +21,7 @@ import ( "time" ) -func buildGrafanaURL(ctx context.Context, cfg *config.Config) (error, string) { +func buildGrafanaURL(ctx context.Context, cfg *config.Config) (string, error) { client := influx.NewClient(ctx, influx.Config{ Org: cfg.InfluxOrg, Bucket: cfg.InfluxBucket, @@ -33,15 +33,15 @@ func buildGrafanaURL(ctx context.Context, cfg *config.Config) (error, string) { defer client.Close() err, earliest := client.FetchEarliestTimestamp() if err != nil { - return err, "" + return "", err } err, latest := client.FetchLatestTimestamp() if err != nil { - return err, "" + return "", err } baseURL := "http://localhost:3001/d/ddnw277huiv40ae/ftdc-dashboard" - return nil, fmt.Sprintf("%s?from=%s&to=%s&timezone=UTC", baseURL, earliest, latest) + return fmt.Sprintf("%s?from=%s&to=%s&timezone=UTC", baseURL, earliest, latest), nil } func ingestFTDCFromFile(absInputPath string, cfg *config.Config, counter *atomic.Int64) error { @@ -97,6 +97,7 @@ func main() { var processed atomic.Int64 + logging.Info("Waiting for InfluxDB to become ready...") time.Sleep(5 * time.Second) done := make(chan struct{}) @@ -106,9 +107,9 @@ func main() { for { select { case <-ticker.C: - duration := time.Duration(processed.Load()) * time.Second + count := processed.Load() timestamp := time.Now().Format("15:04:05") - fmt.Printf("\r[%s] Ingested %-15s of diagnostics metrics", timestamp, duration) + fmt.Printf("\r[%s] Ingested %-10d data points", timestamp, count) case <-done: return } @@ -170,7 +171,7 @@ func main() { // stop the periodic log updates close(done) - err, grafanaUrl := buildGrafanaURL(ctx, cfg) + grafanaUrl, err := buildGrafanaURL(ctx, cfg) if err != nil { log.Fatal(err) } diff --git a/run.sh b/run.sh index 682d2ea..1f77c6e 100755 --- a/run.sh +++ b/run.sh @@ -10,6 +10,7 @@ export INFLUX_API_TOKEN="$(uuidgen)" export GRAFANA_ADMIN_PASSWORD="$(uuidgen)" export INFLUX_ORG="org" export INFLUX_BUCKET="bucket" +export METRICS_INCLUDE_FILE="$(dirname "$0")/metrics_to_get.txt" # --- Parse CLI args --- while [[ "$#" -gt 0 ]]; do @@ -21,9 +22,20 @@ while [[ "$#" -gt 0 ]]; do --parallel) export PARALLEL="$2"; shift 2 ;; --batch-size) export BATCH_SIZE="$2"; shift 2 ;; --influx-data-directory) export INFLUX_DB_DATA_DIRECTORY="$2"; shift 2 ;; + --metrics-include-file) export METRICS_INCLUDE_FILE="$2"; shift 2 ;; -h|--help) - echo "Usage: $0 --input-dir /path/to/data --influx-url http://influx:8086 \\" - echo " --influx-token mytoken --influx-org my-org --influx-bucket mybucket [--parallel 5 --batch-size 100]" + echo "Usage: $0 --input-dir /path/to/data [options]" + echo "" + echo "Required:" + echo " --input-dir DIR Path to the directory containing FTDC files" + echo "" + echo "Optional:" + echo " --metrics-include-file FILE Path to metrics filter list (default: ./metrics_to_get.txt)" + echo " --influx-org ORG InfluxDB organization (default: org)" + echo " --influx-bucket BUCKET InfluxDB bucket (default: bucket)" + echo " --influx-data-directory DIR InfluxDB data directory (default: /tmp/influxdb_data)" + echo " --parallel N Number of files to process in parallel (default: 10)" + echo " --batch-size N Number of FTDC metrics per batch (default: 200)" exit 0 ;; *) @@ -33,17 +45,19 @@ while [[ "$#" -gt 0 ]]; do esac done - - if [ -z "$INPUT_DIR" ]; then - echo "Error: --input-dir not specified." + echo "Error: --input-dir not specified. Run '$0 --help' for usage." exit 1 fi +if [ ! -d "$INPUT_DIR" ]; then + echo "Error: --input-dir '$INPUT_DIR' does not exist or is not a directory." + exit 1 +fi -if [ -z "$BATCH_SIZE" ]; then - echo "Error: No batch size specified." - echo "Example: --batch-size 100" +if [ ! -f "$METRICS_INCLUDE_FILE" ]; then + echo "Error: metrics include file '$METRICS_INCLUDE_FILE' does not exist." + echo "Use --metrics-include-file to specify a different path." exit 1 fi @@ -99,5 +113,5 @@ echo "Grafana user = admin" echo "Grafana Password = $GRAFANA_ADMIN_PASSWORD" -# Tail the logs of the metrics-processor service -docker attach mongodb_ftdc_viewer-ftdc_exporter-1 +# Stream logs from the ftdc_exporter service until it exits +docker-compose logs -f ftdc_exporter