From 50e2110f342fa740b64a2c38c12838154033abb9 Mon Sep 17 00:00:00 2001 From: VishalDalwadi Date: Fri, 22 May 2026 00:04:55 +0530 Subject: [PATCH] feat(go): move flow client to netmaker; --- flow/exporter/grpc.go | 243 ------------------------------------------ flow/manager_linux.go | 8 +- go.mod | 4 +- go.sum | 4 +- 4 files changed, 8 insertions(+), 251 deletions(-) delete mode 100644 flow/exporter/grpc.go diff --git a/flow/exporter/grpc.go b/flow/exporter/grpc.go deleted file mode 100644 index 4fccaeb9..00000000 --- a/flow/exporter/grpc.go +++ /dev/null @@ -1,243 +0,0 @@ -package exporter - -import ( - "context" - "crypto/tls" - "fmt" - "io" - "sync" - "time" - - pbflow "github.com/gravitl/netmaker/grpc/flow" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" -) - -const ( - DefaultBatchSize = 200 - DefaultBatchTime = 30 * time.Second - DefaultRetryCount = 3 - DefaultRetryBackoff = 300 * time.Millisecond -) - -type Options struct { - tlsCreds credentials.TransportCredentials - batchSize int - batchTime time.Duration - retryCount int - retryBackoff time.Duration -} - -func WithTLS(cfg *tls.Config) func(*Options) { - return func(o *Options) { o.tlsCreds = credentials.NewTLS(cfg) } -} -func WithBatchSize(n int) func(*Options) { - return func(o *Options) { o.batchSize = n } -} -func WithBatchTime(t time.Duration) func(*Options) { - return func(o *Options) { o.batchTime = t } -} -func WithRetryPolicy(count int, backoff time.Duration) func(*Options) { - return func(o *Options) { - o.retryCount = count - o.retryBackoff = backoff - } -} - -type FlowGrpcClient struct { - serverAddr string - opts Options - - conn *grpc.ClientConn - stream pbflow.FlowService_StreamFlowsClient - - mu sync.Mutex - events []*pbflow.FlowEvent - - stopCh chan struct{} - wg sync.WaitGroup -} - -func NewFlowGrpcClient(serverURL string, optFns ...func(*Options)) *FlowGrpcClient { - opts := Options{ - tlsCreds: nil, - batchSize: DefaultBatchSize, - batchTime: DefaultBatchTime, - retryCount: DefaultRetryCount, - retryBackoff: DefaultRetryBackoff, - } - for _, fn := range optFns { - fn(&opts) - } - - return &FlowGrpcClient{ - serverAddr: serverURL, - opts: opts, - stopCh: make(chan struct{}), - } -} - -func (c *FlowGrpcClient) Start() error { - if err := c.connect(); err != nil { - return err - } - - c.wg.Add(1) - go c.batchLoop() - - return nil -} - -func (c *FlowGrpcClient) Stop() error { - close(c.stopCh) - c.wg.Wait() - - if c.conn != nil { - return c.conn.Close() - } - return nil -} - -func (c *FlowGrpcClient) Export(event *pbflow.FlowEvent) error { - c.mu.Lock() - defer c.mu.Unlock() - - c.events = append(c.events, event) - if len(c.events) >= c.opts.batchSize { - go c.flush() - } - return nil -} - -func (c *FlowGrpcClient) batchLoop() { - defer c.wg.Done() - - ticker := time.NewTicker(c.opts.batchTime) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - c.flush() - case <-c.stopCh: - c.flush() - return - } - } -} - -func (c *FlowGrpcClient) flush() { - c.mu.Lock() - if len(c.events) == 0 { - c.mu.Unlock() - return - } - evs := c.events - c.events = nil - c.mu.Unlock() - - env := &pbflow.FlowEnvelope{ - Events: evs, - } - - err := c.sendWithRetries(env) - if err != nil { - fmt.Println("[flow] permanently dropped batch:", err) - } -} - -func (c *FlowGrpcClient) sendWithRetries(env *pbflow.FlowEnvelope) error { - var err error - - for attempt := 1; attempt <= c.opts.retryCount; attempt++ { - err = c.sendOnce(env) - if err == nil { - return nil - } - - fmt.Printf("[flow] send attempt %d failed: %v\n", attempt, err) - time.Sleep(c.opts.retryBackoff) - } - - return fmt.Errorf("retry limit exceeded: %w", err) -} - -func (c *FlowGrpcClient) sendOnce(env *pbflow.FlowEnvelope) error { - if c.stream == nil { - err := c.reconnect() - if err != nil { - return err - } - } - - err := c.stream.Send(env) - if err != nil { - return c.handleStreamError(err) - } - - resp, err := c.stream.Recv() - if err != nil { - return c.handleStreamError(err) - } - - if !resp.Success { - return fmt.Errorf("server rejected: %s", resp.Error) - } - - return nil -} - -func (c *FlowGrpcClient) connect() error { - var dialOpts []grpc.DialOption - - if c.opts.tlsCreds != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.opts.tlsCreds)) - } else { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) - } - - conn, err := grpc.NewClient(c.serverAddr, dialOpts...) - if err != nil { - return fmt.Errorf("dial: %w", err) - } - - c.conn = conn - - client := pbflow.NewFlowServiceClient(conn) - - // The stream should live beyond dial timeout → use background context - stream, err := client.StreamFlows(context.Background()) - if err != nil { - return fmt.Errorf("open stream: %w", err) - } - - c.stream = stream - return nil -} - -func (c *FlowGrpcClient) reconnect() error { - if c.conn != nil { - _ = c.conn.Close() - } - c.stream = nil - time.Sleep(300 * time.Millisecond) - return c.connect() -} - -func (c *FlowGrpcClient) handleStreamError(err error) error { - if err == io.EOF { - recErr := c.reconnect() - if recErr != nil { - return recErr - } - - return fmt.Errorf("stream closed: %w", err) - } - st, ok := status.FromError(err) - if ok { - return fmt.Errorf("grpc status: %s", st.Message()) - } - return err -} diff --git a/flow/manager_linux.go b/flow/manager_linux.go index ec82aab4..0c6fceb6 100644 --- a/flow/manager_linux.go +++ b/flow/manager_linux.go @@ -12,9 +12,9 @@ import ( "github.com/gravitl/netclient/config" "github.com/gravitl/netclient/dns/querycache" - "github.com/gravitl/netclient/flow/exporter" "github.com/gravitl/netclient/flow/tracker" pbflow "github.com/gravitl/netmaker/grpc/flow" + grpcoptions "github.com/gravitl/netmaker/grpc/options" "github.com/gravitl/netmaker/models" ct "github.com/ti-mo/conntrack" ) @@ -23,7 +23,7 @@ const RefreshDuration = 10 * time.Minute type Manager struct { participantIdentifiers map[string]models.PeerIdentity - flowClient *exporter.FlowGrpcClient + flowClient *pbflow.GrpcClient flowTracker *tracker.FlowTracker startOnce sync.Once mu sync.RWMutex @@ -50,9 +50,9 @@ func (m *Manager) Start(participantIdentifiers map[string]models.PeerIdentity) e querycache.GetManager().Enable() - flowClient := exporter.NewFlowGrpcClient( + flowClient := pbflow.NewGrpcClient( config.GetServer(config.CurrServer).GRPC, - exporter.WithTLS(&tls.Config{}), + grpcoptions.WithTLS(&tls.Config{}), ) err = flowClient.Start() diff --git a/go.mod b/go.mod index f1f45f68..2676a84f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/google/nftables v0.3.0 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 - github.com/gravitl/netmaker v1.5.2-0.20260515175015-282d7e8072f0 + github.com/gravitl/netmaker v1.5.2-0.20260521182620-bc618a9533e0 github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06 github.com/hashicorp/go-retryablehttp v0.7.8 github.com/hashicorp/go-version v1.9.0 @@ -35,7 +35,6 @@ require ( golang.zx2c4.com/wireguard v0.0.0-20220920152132-bb719d3a6e2c golang.zx2c4.com/wireguard/wgctrl v0.0.0-20221104135756-97bc4ad4a1cb golang.zx2c4.com/wireguard/windows v1.0.1 - google.golang.org/grpc v1.81.1 gopkg.in/yaml.v3 v3.0.1 gortc.io/stun v1.23.0 ) @@ -94,6 +93,7 @@ require ( golang.org/x/tools v0.44.0 // indirect golang.zx2c4.com/wintun v0.0.0-20211104114900-415007cec224 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260311181403-84a4fc48630c // indirect + google.golang.org/grpc v1.81.1 // indirect google.golang.org/protobuf v1.36.11 // indirect gorm.io/datatypes v1.2.7 // indirect gorm.io/driver/mysql v1.5.6 // indirect diff --git a/go.sum b/go.sum index 42e0a7ad..442116d0 100644 --- a/go.sum +++ b/go.sum @@ -75,8 +75,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gravitl/netmaker v1.5.2-0.20260515175015-282d7e8072f0 h1:IpEh9pDQDtqVExz2Qo4Wsl7iEkhF43CJ7B0Sw82Ac8M= -github.com/gravitl/netmaker v1.5.2-0.20260515175015-282d7e8072f0/go.mod h1:9J5VlW1h9wnAWJT5xQZCcP5bKEBq76wkRRPtg0dSnKk= +github.com/gravitl/netmaker v1.5.2-0.20260521182620-bc618a9533e0 h1:zmIcisohgH/6hyRAc29O8thjK+amzfHciUyq0B1isMw= +github.com/gravitl/netmaker v1.5.2-0.20260521182620-bc618a9533e0/go.mod h1:hXnsTXqY0X/bZmClQ4+gtaybiJZYdAyRe0MMz/78sxg= github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06 h1:g2fBXRNT9eiQohyHcoME3SVmeG7OKoJPWrs7A+009kU= github.com/gravitl/tcping v0.1.2-0.20230801110928-546055ebde06/go.mod h1:12iViYKWAzRPj5/oEGAaD7Wje+Nuz8M9eDJbV7qhKAA= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=