Skip to content

Commit fe8432e

Browse files
committed
refactor: reorganize agent, generator, sqlutil, and error_status packages
Replace agent/ with runner/, move generator/ to recipe/scaffold, relocate plugins/sqlutil to plugins/internal/sqlutil, and move utils/error_status to metrics/otelgrpc/status. Update all imports across extractors, cmd, metrics, and recipe packages.
1 parent 227c0d4 commit fe8432e

File tree

36 files changed

+156
-165
lines changed

36 files changed

+156
-165
lines changed

cmd/lint.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"strings"
88

99
"github.com/MakeNowJust/heredoc"
10-
"github.com/raystack/meteor/agent"
10+
"github.com/raystack/meteor/runner"
1111
"github.com/raystack/meteor/config"
1212
"github.com/raystack/meteor/plugins"
1313
"github.com/raystack/meteor/recipe"
@@ -61,7 +61,7 @@ func LintCmd() *cobra.Command {
6161
lg := log.NewLogrus(log.LogrusWithLevel(cfg.LogLevel))
6262
plugins.SetLog(lg)
6363

64-
runner := agent.NewAgent(agent.Config{
64+
rnr := runner.NewRunner(runner.Config{
6565
ExtractorFactory: registry.Extractors,
6666
ProcessorFactory: registry.Processors,
6767
SinkFactory: registry.Sinks,
@@ -81,7 +81,7 @@ func LintCmd() *cobra.Command {
8181

8282
// Run linters and generate report
8383
for _, recipe := range recipes {
84-
errs := runner.Validate(recipe)
84+
errs := rnr.Validate(recipe)
8585
var row []string
8686
var icon string
8787

cmd/recipe.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/AlecAivazis/survey/v2"
1010
"github.com/MakeNowJust/heredoc"
11-
"github.com/raystack/meteor/generator"
1211
"github.com/raystack/meteor/recipe"
1312
"github.com/raystack/meteor/registry"
1413
"github.com/spf13/cobra"
@@ -90,7 +89,7 @@ func recipeInitCmd() *cobra.Command {
9089
}
9190
}
9291

93-
return generator.RecipeWriteTo(generator.RecipeParams{
92+
return recipe.ScaffoldWriteTo(recipe.ScaffoldParams{
9493
Name: args[0],
9594
Source: extractor,
9695
Scope: scope,

cmd/run.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/MakeNowJust/heredoc"
15-
"github.com/raystack/meteor/agent"
15+
"github.com/raystack/meteor/runner"
1616
"github.com/raystack/meteor/config"
1717
"github.com/raystack/meteor/metrics"
1818
"github.com/raystack/meteor/plugins"
@@ -84,7 +84,7 @@ func RunCmd() *cobra.Command {
8484
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
8585
defer stop()
8686

87-
var mts agent.Monitor
87+
var mts runner.Monitor
8888

8989
if cfg.OtelEnabled {
9090
doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version)
@@ -96,7 +96,7 @@ func RunCmd() *cobra.Command {
9696
mts = metrics.NewOtelMonitor()
9797
}
9898

99-
runner := agent.NewAgent(agent.Config{
99+
rnr := runner.NewRunner(runner.Config{
100100
ExtractorFactory: registry.Extractors,
101101
ProcessorFactory: registry.Processors,
102102
SinkFactory: registry.Sinks,
@@ -137,7 +137,7 @@ func RunCmd() *cobra.Command {
137137
)
138138

139139
// Run recipes and collect results
140-
runs := runner.RunMultiple(ctx, recipes)
140+
runs := rnr.RunMultiple(ctx, recipes)
141141
for _, run := range runs {
142142
lg.Debug("recipe details", "recipe", run.Recipe)
143143
var row []string
@@ -201,7 +201,7 @@ func formatEntityTypes(types map[string]int) string {
201201
}
202202

203203
// printRunErrors prints error details for failed runs.
204-
func printRunErrors(runs []agent.Run) {
204+
func printRunErrors(runs []runner.Run) {
205205
for _, run := range runs {
206206
if run.Error != nil {
207207
fmt.Printf(" %s %s: %s\n", printer.Icon("failure"), run.Recipe.Name, run.Error)

metrics/opentelemetry_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"testing"
66

7-
"github.com/raystack/meteor/agent"
7+
"github.com/raystack/meteor/runner"
88
"github.com/raystack/meteor/config"
99
"github.com/raystack/meteor/metrics"
1010
"github.com/raystack/meteor/recipe"
@@ -34,7 +34,7 @@ func TestOtelMonitor_RecordRun(t *testing.T) {
3434

3535
monitor := metrics.NewOtelMonitor()
3636

37-
monitor.RecordRun(ctx, agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: false})
37+
monitor.RecordRun(ctx, runner.Run{Recipe: recipe, DurationInMs: duration, RecordCount: recordCount, Success: false})
3838

3939
assert.NotNil(t, monitor)
4040
assert.NotNil(t, done)
@@ -64,7 +64,7 @@ func TestOtelMonitor_RecordPlugin(t *testing.T) {
6464
monitor := metrics.NewOtelMonitor()
6565

6666
monitor.RecordPlugin(context.Background(),
67-
agent.PluginInfo{
67+
runner.PluginInfo{
6868
RecipeName: recipe.Name,
6969
PluginName: recipe.Sinks[0].Name,
7070
PluginType: "sink",

metrics/otel_monitor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package metrics
33
import (
44
"context"
55

6-
"github.com/raystack/meteor/agent"
6+
"github.com/raystack/meteor/runner"
77
"go.opentelemetry.io/otel"
88
"go.opentelemetry.io/otel/attribute"
99
"go.opentelemetry.io/otel/metric"
@@ -41,7 +41,7 @@ func NewOtelMonitor() *OtelMonitor {
4141
}
4242

4343
// RecordRun records a run behavior
44-
func (m *OtelMonitor) RecordRun(ctx context.Context, run agent.Run) {
44+
func (m *OtelMonitor) RecordRun(ctx context.Context, run runner.Run) {
4545
m.recipeDuration.Record(ctx,
4646
float64(run.DurationInMs)/1000.0,
4747
metric.WithAttributes(
@@ -70,9 +70,9 @@ func (m *OtelMonitor) RecordRun(ctx context.Context, run agent.Run) {
7070
}
7171

7272
// RecordPlugin records a individual plugin behavior in a run, this is being handled in otelmw
73-
func (*OtelMonitor) RecordPlugin(context.Context, agent.PluginInfo) {}
73+
func (*OtelMonitor) RecordPlugin(context.Context, runner.PluginInfo) {}
7474

75-
func (m *OtelMonitor) RecordSinkRetryCount(ctx context.Context, pluginInfo agent.PluginInfo) {
75+
func (m *OtelMonitor) RecordSinkRetryCount(ctx context.Context, pluginInfo runner.PluginInfo) {
7676
m.sinkRetries.Add(ctx,
7777
1,
7878
metric.WithAttributes(

metrics/otelgrpc/otelgrpc.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"strings"
77
"time"
88

9-
"github.com/raystack/meteor/utils"
109
"go.opentelemetry.io/otel"
1110
"go.opentelemetry.io/otel/attribute"
1211
"go.opentelemetry.io/otel/metric"
@@ -73,7 +72,7 @@ func (m *Monitor) RecordUnary(ctx context.Context, p UnaryParams) {
7372

7473
attrs := make([]attribute.KeyValue, len(m.attributes))
7574
copy(attrs, m.attributes)
76-
attrs = append(attrs, attribute.String("rpc.grpc.status_text", utils.StatusText(p.Err)))
75+
attrs = append(attrs, attribute.String("rpc.grpc.status_text", StatusText(p.Err)))
7776
attrs = append(attrs, attribute.String("network.type", netTypeFromCtx(ctx)))
7877
attrs = append(attrs, ParseFullMethod(p.Method)...)
7978

@@ -93,7 +92,7 @@ func (m *Monitor) RecordUnary(ctx context.Context, p UnaryParams) {
9392
func (m *Monitor) RecordStream(ctx context.Context, start time.Time, method string, err error) {
9493
attrs := make([]attribute.KeyValue, len(m.attributes))
9594
copy(attrs, m.attributes)
96-
attrs = append(attrs, attribute.String("rpc.grpc.status_text", utils.StatusText(err)))
95+
attrs = append(attrs, attribute.String("rpc.grpc.status_text", StatusText(err)))
9796
attrs = append(attrs, attribute.String("network.type", netTypeFromCtx(ctx)))
9897
attrs = append(attrs, ParseFullMethod(method)...)
9998

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package utils
1+
package otelgrpc
22

33
import (
44
"errors"
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
package utils
1+
package otelgrpc_test
22

33
import (
4+
"errors"
45
"fmt"
56
"testing"
67

7-
"errors"
8-
8+
"github.com/raystack/meteor/metrics/otelgrpc"
99
"github.com/stretchr/testify/assert"
1010
"google.golang.org/grpc/codes"
1111
"google.golang.org/grpc/status"
@@ -40,7 +40,7 @@ func TestStatusCode(t *testing.T) {
4040
}
4141
for _, tc := range cases {
4242
t.Run(tc.name, func(t *testing.T) {
43-
assert.Equal(t, tc.expected, StatusCode(tc.err))
43+
assert.Equal(t, tc.expected, otelgrpc.StatusCode(tc.err))
4444
})
4545
}
4646
}

plugins/extractors/bigtable/middleware.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"cloud.google.com/go/bigtable"
99
"github.com/googleapis/gax-go/v2/apierror"
10-
"github.com/raystack/meteor/utils"
10+
"github.com/raystack/meteor/metrics/otelgrpc"
1111
"go.opentelemetry.io/otel"
1212
"go.opentelemetry.io/otel/attribute"
1313
"go.opentelemetry.io/otel/metric"
@@ -107,7 +107,7 @@ func (o *InstanceAdminClientMW) Instances(ctx context.Context) (res []*bigtable.
107107
}
108108

109109
func apiErrReason(err error) string {
110-
reason := utils.StatusCode(err).String()
110+
reason := otelgrpc.StatusCode(err).String()
111111
var apiErr *apierror.APIError
112112
if errors.As(err, &apiErr) {
113113
reason = apiErr.Reason()

plugins/extractors/cassandra/cassandra.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"github.com/gocql/gocql"
99
"github.com/raystack/meteor/models"
10-
"github.com/raystack/meteor/plugins/sqlutil"
10+
"github.com/raystack/meteor/plugins/internal/sqlutil"
1111

1212
"github.com/raystack/meteor/plugins"
1313
"github.com/raystack/meteor/registry"

0 commit comments

Comments
 (0)