Skip to content

Commit b7540e9

Browse files
core: Add nats handler for spaces
1 parent f1d6e3a commit b7540e9

5 files changed

Lines changed: 100 additions & 1 deletion

File tree

integrations/spaces-go-sdk/spaces/spaces.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (s *Service) GetSpace(id string) (*InternalSpace, error) {
2828
return spaceFromCache, nil
2929
}
3030

31-
resp, err := s.broker.Request("fancyspaces.space.get", []byte(id))
31+
resp, err := s.broker.Request("fancyspaces.core.spaces.get", []byte(id))
3232
if err != nil {
3333
return nil, err
3434
}

services/core/cmd/local/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"syscall"
1111
"time"
1212

13+
"github.com/OliverSchlueter/goutils/broker"
1314
"github.com/OliverSchlueter/goutils/containers"
1415
"github.com/OliverSchlueter/goutils/middleware"
1516
"github.com/OliverSchlueter/goutils/sloki"
@@ -21,6 +22,9 @@ import (
2122
)
2223

2324
func main() {
25+
nc := containers.ConnectToNatsE2E()
26+
b := broker.NewNatsBroker(&broker.NatsConfiguration{Nats: nc})
27+
2428
// Feature flags
2529
fflags.DisableIssueSyncer.Enable()
2630

@@ -64,6 +68,7 @@ func main() {
6468
app.Start(app.Configuration{
6569
Mux: mux,
6670
MavenMux: mavenMux,
71+
Broker: b,
6772
Mongo: mc,
6873
ClickHouse: ch,
6974
MinIO: mio,
@@ -101,6 +106,7 @@ func main() {
101106
case os.Interrupt:
102107
slog.Info("Received interrupt signal, shutting down...")
103108

109+
containers.DisconnectNats(nc)
104110
containers.DisconnectMongo(mc)
105111
containers.DisconnectClickhouse(ch)
106112
containers.DisconnectMinIO(mio)

services/core/cmd/prod/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strings"
1111
"syscall"
1212

13+
"github.com/OliverSchlueter/goutils/broker"
1314
"github.com/OliverSchlueter/goutils/containers"
1415
"github.com/OliverSchlueter/goutils/env"
1516
"github.com/OliverSchlueter/goutils/middleware"
@@ -21,6 +22,9 @@ import (
2122
)
2223

2324
const (
25+
natsUrlEnv = "NATS_URL"
26+
natsAuthTokenEnv = "NATS_AUTH_TOKEN"
27+
2428
usersPathEnv = "USERS_PATH"
2529

2630
mongodbUrlEnv = "MONGODB_URL"
@@ -34,6 +38,9 @@ const (
3438
)
3539

3640
func main() {
41+
nc := containers.ConnectToNats(env.MustGetStr(natsUrlEnv), env.MustGetStr(natsAuthTokenEnv))
42+
b := broker.NewNatsBroker(&broker.NatsConfiguration{Nats: nc})
43+
3744
// Setup logging
3845
logService := sloki.NewService(sloki.Configuration{
3946
URL: "http://localhost:3100/loki/api/v1/push",
@@ -72,6 +79,7 @@ func main() {
7279
app.Start(app.Configuration{
7380
Mux: mux,
7481
MavenMux: mavenMux,
82+
Broker: b,
7583
Mongo: mc,
7684
ClickHouse: ch,
7785
MinIO: mio,
@@ -116,6 +124,7 @@ func main() {
116124
case os.Interrupt:
117125
slog.Info("Received interrupt signal, shutting down...")
118126

127+
containers.DisconnectNats(nc)
119128
containers.DisconnectMongo(mc)
120129
containers.DisconnectClickhouse(ch)
121130
containers.DisconnectMinIO(mio)

services/core/internal/app/app.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net/http"
77

88
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
9+
"github.com/OliverSchlueter/goutils/broker"
910
"github.com/OliverSchlueter/goutils/ratelimit"
1011
"github.com/OliverSchlueter/goutils/sitemapgen"
1112
"github.com/fancyinnovations/fancyspaces/core/internal/analytics"
@@ -42,6 +43,7 @@ const apiPrefix = "/api/v1"
4243
type Configuration struct {
4344
Mux *http.ServeMux
4445
MavenMux *http.ServeMux
46+
Broker broker.Broker
4547
Mongo *mongo.Database
4648
ClickHouse driver.Conn
4749
MinIO *minio.Client
@@ -72,6 +74,13 @@ func Start(cfg Configuration) {
7274
Analytics: as,
7375
})
7476
sh.Register(apiPrefix, cfg.Mux)
77+
snh := spacesHandler.NewNatsHandler(spacesHandler.NatsConfiguration{
78+
Broker: cfg.Broker,
79+
Store: spacesStore,
80+
})
81+
if err := snh.Register(); err != nil {
82+
panic(fmt.Errorf("could not register spaces nats handler: %w", err))
83+
}
7584

7685
// Versions
7786
versionsDB := mongoVersionsDB.NewDB(&mongoVersionsDB.Configuration{
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package handler
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"fmt"
7+
"log/slog"
8+
9+
"github.com/OliverSchlueter/goutils/broker"
10+
"github.com/OliverSchlueter/goutils/middleware"
11+
"github.com/OliverSchlueter/goutils/problems"
12+
"github.com/OliverSchlueter/goutils/sloki"
13+
"github.com/fancyinnovations/fancyspaces/core/internal/spaces"
14+
spaces2 "github.com/fancyinnovations/fancyspaces/integrations/spaces-go-sdk/spaces"
15+
"github.com/nats-io/nats.go"
16+
)
17+
18+
type NatsHandler struct {
19+
broker broker.Broker
20+
store *spaces.Store
21+
}
22+
23+
type NatsConfiguration struct {
24+
Broker broker.Broker
25+
Store *spaces.Store
26+
}
27+
28+
func NewNatsHandler(cfg NatsConfiguration) *NatsHandler {
29+
return &NatsHandler{
30+
broker: cfg.Broker,
31+
store: cfg.Store,
32+
}
33+
}
34+
35+
func (h *NatsHandler) Register() error {
36+
if err := h.broker.SubscribeQueue("fancyspaces.core.spaces.get", "fancyspaces.core.spaces.get", middleware.NatsLogging(h.handleGet)); err != nil {
37+
return fmt.Errorf("could not subscribe to nats subject: %w", err)
38+
}
39+
40+
return nil
41+
}
42+
43+
func (h *NatsHandler) handleGet(msg *nats.Msg) {
44+
id := string(msg.Data)
45+
46+
if id == "" {
47+
problems.ValidationError("space_id", "Space ID is required").WriteToBroker(h.broker, msg.Reply)
48+
return
49+
}
50+
51+
space, err := h.store.GetByID(id)
52+
if err != nil {
53+
if errors.Is(err, spaces.ErrSpaceNotFound) {
54+
problems.NotFound("Space", id).WriteToBroker(h.broker, msg.Reply)
55+
return
56+
}
57+
slog.Error("Could not get space", sloki.WrapError(err))
58+
problems.InternalServerError("").WriteToBroker(h.broker, msg.Reply)
59+
return
60+
}
61+
62+
response, err := json.Marshal(spaces2.InternalSpace{
63+
Space: *space,
64+
AnalyticsWriteKey: space.AnalyticsSettings.WriteKey,
65+
})
66+
if err != nil {
67+
slog.Error("failed to marshal space", sloki.WrapError(err))
68+
problems.InternalServerError("Could not marshal space").WriteToBroker(h.broker, msg.Reply)
69+
return
70+
}
71+
72+
if err := h.broker.Publish(msg.Reply, response); err != nil {
73+
slog.Error("failed to publish space response", sloki.WrapError(err))
74+
}
75+
}

0 commit comments

Comments
 (0)