Skip to content

Commit 2e051a3

Browse files
pulltheflowerDev Agent
andauthored
Add temporal GetSystemInfo timeout config (#838)
Co-authored-by: Dev Agent <dev-agent@example.com>
1 parent 7948fd6 commit 2e051a3

4 files changed

Lines changed: 69 additions & 17 deletions

File tree

cmd/csghub-server/cmd/moderation/launch.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
package moderation
22

33
import (
4+
"context"
45
"fmt"
56
"log/slog"
7+
"time"
8+
9+
"go.temporal.io/sdk/client"
10+
"go.temporal.io/sdk/log"
11+
"opencsg.com/csghub-server/builder/instrumentation"
12+
"opencsg.com/csghub-server/builder/temporal"
613

714
"github.com/spf13/cobra"
815
"opencsg.com/csghub-server/api/httpbase"
916
"opencsg.com/csghub-server/builder/store/database"
1017
"opencsg.com/csghub-server/common/config"
1118
"opencsg.com/csghub-server/moderation/checker"
1219
"opencsg.com/csghub-server/moderation/router"
13-
"opencsg.com/csghub-server/moderation/workflow"
1420
)
1521

1622
var cmdLaunch = &cobra.Command{
@@ -23,6 +29,10 @@ var cmdLaunch = &cobra.Command{
2329
return err
2430
}
2531
slog.Debug("config", slog.Any("data", cfg))
32+
stopOtel, err := instrumentation.SetupOTelSDK(context.Background(), cfg, instrumentation.Moderation)
33+
if err != nil {
34+
panic(err)
35+
}
2636
// Check APIToken length
2737
if len(cfg.APIToken) < 128 {
2838
return fmt.Errorf("API token length is less than 128, please check")
@@ -36,11 +46,16 @@ var cmdLaunch = &cobra.Command{
3646
return fmt.Errorf("database initialization failed: %w", err)
3747
}
3848
checker.Init(cfg)
39-
40-
//init async moderation process
41-
err = workflow.StartWorker(cfg)
49+
slog.Info("starting temporal client")
50+
temporalClient, err := temporal.NewClient(client.Options{
51+
HostPort: cfg.WorkFLow.Endpoint,
52+
Logger: log.NewStructuredLogger(slog.Default()),
53+
ConnectionOptions: client.ConnectionOptions{
54+
GetSystemInfoTimeout: time.Duration(cfg.Temporal.GetSystemInfoTimeout) * time.Second,
55+
},
56+
}, instrumentation.Moderation)
4257
if err != nil {
43-
return fmt.Errorf("failed to start workflow worker,%w", err)
58+
return fmt.Errorf("unable to create temporal client, error: %w", err)
4459
}
4560

4661
r, err := router.NewRouter(cfg)
@@ -56,8 +71,8 @@ var cmdLaunch = &cobra.Command{
5671
)
5772
server.Run()
5873

59-
workflow.StopWorker()
60-
74+
_ = stopOtel(context.Background())
75+
temporalClient.Close()
6176
return nil
6277
},
6378
}

cmd/csghub-server/cmd/notification/launch.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package notification
22

33
import (
4+
"context"
45
"fmt"
56
"log/slog"
7+
"time"
8+
9+
"go.temporal.io/sdk/client"
10+
"go.temporal.io/sdk/log"
11+
"opencsg.com/csghub-server/builder/instrumentation"
12+
"opencsg.com/csghub-server/builder/temporal"
613

714
"github.com/spf13/cobra"
815
"opencsg.com/csghub-server/api/httpbase"
916
"opencsg.com/csghub-server/builder/store/database"
1017
"opencsg.com/csghub-server/common/config"
1118
"opencsg.com/csghub-server/notification/router"
12-
"opencsg.com/csghub-server/notification/workflow"
1319
)
1420

1521
var launchCmd = &cobra.Command{
@@ -21,7 +27,10 @@ var launchCmd = &cobra.Command{
2127
if err != nil {
2228
return err
2329
}
24-
30+
stopOtel, err := instrumentation.SetupOTelSDK(context.Background(), cfg, instrumentation.Notification)
31+
if err != nil {
32+
panic(err)
33+
}
2534
// Check APIToken length
2635
if len(cfg.APIToken) < 128 {
2736
return fmt.Errorf("API token length is less than 128, please check")
@@ -34,10 +43,15 @@ var launchCmd = &cobra.Command{
3443
slog.Error("failed to initialize database", slog.Any("error", err))
3544
return fmt.Errorf("database initialization failed: %w", err)
3645
}
37-
38-
err = workflow.StartWorkflow(cfg)
46+
workflowClient, err := temporal.NewClient(client.Options{
47+
HostPort: cfg.WorkFLow.Endpoint,
48+
Logger: log.NewStructuredLogger(slog.Default()),
49+
ConnectionOptions: client.ConnectionOptions{
50+
GetSystemInfoTimeout: time.Duration(cfg.Temporal.GetSystemInfoTimeout) * time.Second,
51+
},
52+
}, "csghub-notification")
3953
if err != nil {
40-
slog.Warn("failed to start workflow", slog.Any("error", err))
54+
return fmt.Errorf("unable to create workflow client, error: %w", err)
4155
}
4256

4357
r, err := router.NewNotifierRouter(cfg)
@@ -53,6 +67,9 @@ var launchCmd = &cobra.Command{
5367
)
5468
server.Run()
5569

70+
_ = stopOtel(context.Background())
71+
workflowClient.Close()
72+
5673
return nil
5774
},
5875
}

cmd/csghub-server/cmd/temporal-worker/launch.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"log/slog"
7+
"time"
78

89
"opencsg.com/csghub-server/moderation/checker"
910

@@ -81,6 +82,9 @@ var cmdLaunch = &cobra.Command{
8182
temporalClient, err := temporal.NewClient(client.Options{
8283
HostPort: cfg.WorkFLow.Endpoint,
8384
Logger: log.NewStructuredLogger(slog.Default()),
85+
ConnectionOptions: client.ConnectionOptions{
86+
GetSystemInfoTimeout: time.Duration(cfg.Temporal.GetSystemInfoTimeout) * time.Second,
87+
},
8488
}, instrumentation.TemporalWorker)
8589
if err != nil {
8690
return fmt.Errorf("unable to create temporal client, error: %w", err)

cmd/csghub-server/cmd/user/launch.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package user
22

33
import (
4+
"context"
45
"fmt"
56
"log/slog"
7+
"time"
8+
9+
"go.temporal.io/sdk/client"
10+
"go.temporal.io/sdk/log"
11+
"opencsg.com/csghub-server/builder/instrumentation"
12+
"opencsg.com/csghub-server/builder/temporal"
613

714
"github.com/spf13/cobra"
815
"opencsg.com/csghub-server/api/httpbase"
916
"opencsg.com/csghub-server/builder/store/database"
1017
"opencsg.com/csghub-server/common/config"
1118
"opencsg.com/csghub-server/user/router"
12-
"opencsg.com/csghub-server/user/workflow"
1319
)
1420

1521
var cmdLaunch = &cobra.Command{
@@ -22,6 +28,10 @@ var cmdLaunch = &cobra.Command{
2228
return err
2329
}
2430
slog.Debug("config", slog.Any("data", cfg))
31+
stopOtel, err := instrumentation.SetupOTelSDK(context.Background(), cfg, instrumentation.User)
32+
if err != nil {
33+
panic(err)
34+
}
2535
// Check APIToken length
2636
if len(cfg.APIToken) < 128 {
2737
return fmt.Errorf("API token length is less than 128, please check")
@@ -35,9 +45,15 @@ var cmdLaunch = &cobra.Command{
3545
return fmt.Errorf("database initialization failed: %w", err)
3646
}
3747

38-
err = workflow.StartWorker(cfg)
48+
wfClient, err := temporal.NewClient(client.Options{
49+
HostPort: cfg.WorkFLow.Endpoint,
50+
Logger: log.NewStructuredLogger(slog.Default()),
51+
ConnectionOptions: client.ConnectionOptions{
52+
GetSystemInfoTimeout: time.Duration(cfg.Temporal.GetSystemInfoTimeout) * time.Second,
53+
},
54+
}, "csghub-user")
3955
if err != nil {
40-
return fmt.Errorf("failed to start user workflow worker: %w", err)
56+
return fmt.Errorf("unable to create workflow client, error:%w", err)
4157
}
4258

4359
r, err := router.NewRouter(cfg)
@@ -53,8 +69,8 @@ var cmdLaunch = &cobra.Command{
5369
)
5470
server.Run()
5571

56-
workflow.StopWorker()
57-
72+
_ = stopOtel(context.Background())
73+
wfClient.Close()
5874
return nil
5975
},
6076
}

0 commit comments

Comments
 (0)