Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions ftdc_exporter/ftdc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@ import (
)

func streamFTDCMetricsInBatches(ctx context.Context, path string, metricsIncludeFilePath string, batchSize, buffer int) (<-chan StreamBatch, <-chan error) {
metricsIncludeFile, err := os.Open(metricsIncludeFilePath)
if err != nil {
fmt.Errorf("couldn't open BSON file: %v", err)
out := make(chan StreamBatch, buffer)
errc := make(chan error, 1)

sendErr := func(err error) (<-chan StreamBatch, <-chan error) {
errc <- err
close(out)
close(errc)
return out, errc
}

metricsIncludeFile, err := os.Open(metricsIncludeFilePath)
if err != nil {
return sendErr(fmt.Errorf("couldn't open metrics include file %q: %w", metricsIncludeFilePath, err))
}
defer metricsIncludeFile.Close()

file, err := os.Open(path)
if err != nil {
fmt.Errorf("couldn't open BSON file: %v", err)

return sendErr(fmt.Errorf("couldn't open FTDC file %q: %w", path, err))
}

scanner := bufio.NewScanner(metricsIncludeFile)
Expand All @@ -32,9 +39,6 @@ func streamFTDCMetricsInBatches(ctx context.Context, path string, metricsInclude
}
}

out := make(chan StreamBatch, buffer)
errc := make(chan error, 1)

iter := readFTDCData(ctx, file)

go func() {
Expand Down
2 changes: 1 addition & 1 deletion ftdc_exporter/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func ParseFlags() *Config {
flag.IntVar(&cfg.Parallel, "parallel", 4, "Number of files to process in parallel")
flag.IntVar(&cfg.BatchSize, "batch-size", 1000, "Number of FTDC metrics per batch")
flag.IntVar(&cfg.BatchBuffer, "batch-buffer", 1, "Number of batches to queue before blocking")
flag.StringVar(&cfg.MetricsIncludeFile, "metrics-include-file", "", "Number of batches to queue before blocking")
flag.StringVar(&cfg.MetricsIncludeFile, "metrics-include-file", "", "Path to a file listing metric keys to include (one per line); if empty, all metrics are included")
flag.BoolVar(&cfg.Debug, "debug", false, "Enable debug logging")
flag.BoolVar(&cfg.WaitForever, "wait-forever", true, "Wait indefinitely")

Expand Down
15 changes: 8 additions & 7 deletions ftdc_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"time"
)

func buildGrafanaURL(ctx context.Context, cfg *config.Config) (error, string) {
func buildGrafanaURL(ctx context.Context, cfg *config.Config) (string, error) {
client := influx.NewClient(ctx, influx.Config{
Org: cfg.InfluxOrg,
Bucket: cfg.InfluxBucket,
Expand All @@ -33,15 +33,15 @@ func buildGrafanaURL(ctx context.Context, cfg *config.Config) (error, string) {
defer client.Close()
err, earliest := client.FetchEarliestTimestamp()
if err != nil {
return err, ""
return "", err
}

err, latest := client.FetchLatestTimestamp()
if err != nil {
return err, ""
return "", err
}
baseURL := "http://localhost:3001/d/ddnw277huiv40ae/ftdc-dashboard"
return nil, fmt.Sprintf("%s?from=%s&to=%s&timezone=UTC", baseURL, earliest, latest)
return fmt.Sprintf("%s?from=%s&to=%s&timezone=UTC", baseURL, earliest, latest), nil
}

func ingestFTDCFromFile(absInputPath string, cfg *config.Config, counter *atomic.Int64) error {
Expand Down Expand Up @@ -97,6 +97,7 @@ func main() {

var processed atomic.Int64

logging.Info("Waiting for InfluxDB to become ready...")
time.Sleep(5 * time.Second)

done := make(chan struct{})
Expand All @@ -106,9 +107,9 @@ func main() {
for {
select {
case <-ticker.C:
duration := time.Duration(processed.Load()) * time.Second
count := processed.Load()
timestamp := time.Now().Format("15:04:05")
fmt.Printf("\r[%s] Ingested %-15s of diagnostics metrics", timestamp, duration)
fmt.Printf("\r[%s] Ingested %-10d data points", timestamp, count)
case <-done:
return
}
Expand Down Expand Up @@ -170,7 +171,7 @@ func main() {
// stop the periodic log updates
close(done)

err, grafanaUrl := buildGrafanaURL(ctx, cfg)
grafanaUrl, err := buildGrafanaURL(ctx, cfg)
if err != nil {
log.Fatal(err)
}
Expand Down
34 changes: 24 additions & 10 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export INFLUX_API_TOKEN="$(uuidgen)"
export GRAFANA_ADMIN_PASSWORD="$(uuidgen)"
export INFLUX_ORG="org"
export INFLUX_BUCKET="bucket"
export METRICS_INCLUDE_FILE="$(dirname "$0")/metrics_to_get.txt"

# --- Parse CLI args ---
while [[ "$#" -gt 0 ]]; do
Expand All @@ -21,9 +22,20 @@ while [[ "$#" -gt 0 ]]; do
--parallel) export PARALLEL="$2"; shift 2 ;;
--batch-size) export BATCH_SIZE="$2"; shift 2 ;;
--influx-data-directory) export INFLUX_DB_DATA_DIRECTORY="$2"; shift 2 ;;
--metrics-include-file) export METRICS_INCLUDE_FILE="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 --input-dir /path/to/data --influx-url http://influx:8086 \\"
echo " --influx-token mytoken --influx-org my-org --influx-bucket mybucket [--parallel 5 --batch-size 100]"
echo "Usage: $0 --input-dir /path/to/data [options]"
echo ""
echo "Required:"
echo " --input-dir DIR Path to the directory containing FTDC files"
echo ""
echo "Optional:"
echo " --metrics-include-file FILE Path to metrics filter list (default: ./metrics_to_get.txt)"
echo " --influx-org ORG InfluxDB organization (default: org)"
echo " --influx-bucket BUCKET InfluxDB bucket (default: bucket)"
echo " --influx-data-directory DIR InfluxDB data directory (default: /tmp/influxdb_data)"
echo " --parallel N Number of files to process in parallel (default: 10)"
echo " --batch-size N Number of FTDC metrics per batch (default: 200)"
exit 0
;;
*)
Expand All @@ -33,17 +45,19 @@ while [[ "$#" -gt 0 ]]; do
esac
done



if [ -z "$INPUT_DIR" ]; then
echo "Error: --input-dir not specified."
echo "Error: --input-dir not specified. Run '$0 --help' for usage."
exit 1
fi

if [ ! -d "$INPUT_DIR" ]; then
echo "Error: --input-dir '$INPUT_DIR' does not exist or is not a directory."
exit 1
fi

if [ -z "$BATCH_SIZE" ]; then
echo "Error: No batch size specified."
echo "Example: --batch-size 100"
if [ ! -f "$METRICS_INCLUDE_FILE" ]; then
echo "Error: metrics include file '$METRICS_INCLUDE_FILE' does not exist."
echo "Use --metrics-include-file to specify a different path."
exit 1
fi

Expand Down Expand Up @@ -99,5 +113,5 @@ echo "Grafana user = admin"
echo "Grafana Password = $GRAFANA_ADMIN_PASSWORD"


# Tail the logs of the metrics-processor service
docker attach mongodb_ftdc_viewer-ftdc_exporter-1
# Stream logs from the ftdc_exporter service until it exits
docker-compose logs -f ftdc_exporter