Skip to content

Commit 405f490

Browse files
committed
fix(agent,installer): hybrid NetFlow decoder and postgres email via docker exec
1 parent 7f1aaa4 commit 405f490

File tree

11 files changed

+694
-166
lines changed

11 files changed

+694
-166
lines changed

agent/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/glebarez/sqlite v1.11.0
99
github.com/google/uuid v1.6.0
1010
github.com/kardianos/service v1.2.4
11+
github.com/netsampler/goflow2 v1.3.7
1112
github.com/tehmaze/netflow v0.0.0-20240303214733-8c13bb004068
1213
github.com/threatwinds/go-sdk v1.1.7
1314
github.com/threatwinds/logger v1.2.3

agent/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
8787
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
8888
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
8989
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
90+
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=
91+
github.com/netsampler/goflow2 v1.3.7/go.mod h1:4UZsVGVAs//iMCptUHn3WNScztJeUhZH7kDW2+/vDdQ=
9092
github.com/opensearch-project/opensearch-go/v4 v4.6.0 h1:Ac8aLtDSmLEyOmv0r1qhQLw3b4vcUhE42NE9k+Z4cRc=
9193
github.com/opensearch-project/opensearch-go/v4 v4.6.0/go.mod h1:3iZtb4SNt3IzaxavKq0dURh1AmtVgYW71E4XqmYnIiQ=
9294
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=

agent/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"fmt"
55
"os"
6+
"path/filepath"
67
"time"
78

89
pb "github.com/utmstack/UTMStack/agent/agent"
@@ -214,6 +215,20 @@ func main() {
214215
case "uninstall":
215216
fmt.Println("Uninstalling UTMStackAgent service ...")
216217

218+
fmt.Print("Stopping UTMStackUpdater service... ")
219+
updaterPath := filepath.Join(utils.GetMyPath(), fmt.Sprintf(config.UpdaterFile, ""))
220+
if utils.CheckIfPathExist(updaterPath) {
221+
err := utils.Execute(updaterPath, utils.GetMyPath(), "uninstall")
222+
if err != nil {
223+
fmt.Printf("Warning: %v\n", err)
224+
} else {
225+
fmt.Println("[OK]")
226+
}
227+
time.Sleep(2 * time.Second)
228+
} else {
229+
fmt.Println("[SKIPPED - not found]")
230+
}
231+
217232
cnf, err := config.GetCurrentConfig()
218233
if err != nil {
219234
fmt.Println("Error getting config: ", err)

agent/modules/configuration.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func ChangeIntegrationStatus(logTyp string, proto string, isEnabled bool, tlsOpt
7777
mod := GetModule(logTyp)
7878
if mod != nil && mod.IsPortListen(proto) {
7979
mod.DisablePort(proto)
80-
time.Sleep(100 * time.Millisecond)
80+
time.Sleep(200 * time.Millisecond)
8181
err := mod.EnablePort(proto, true)
8282
if err != nil {
8383
return "", fmt.Errorf("error enabling TLS on running module: %v", err)
@@ -89,7 +89,7 @@ func ChangeIntegrationStatus(logTyp string, proto string, isEnabled bool, tlsOpt
8989
mod := GetModule(logTyp)
9090
if mod != nil && mod.IsPortListen(proto) {
9191
mod.DisablePort(proto)
92-
time.Sleep(100 * time.Millisecond)
92+
time.Sleep(200 * time.Millisecond)
9393
err := mod.EnablePort(proto, false)
9494
if err != nil {
9595
return "", fmt.Errorf("error disabling TLS on running module: %v", err)
@@ -248,7 +248,7 @@ func EnableTLSForIntegration(logTyp string, proto string) (string, error) {
248248
mod := GetModule(logTyp)
249249
if mod != nil && mod.IsPortListen(proto) {
250250
mod.DisablePort(proto)
251-
time.Sleep(100 * time.Millisecond)
251+
time.Sleep(200 * time.Millisecond)
252252
err := mod.EnablePort(proto, true)
253253
if err != nil {
254254
return port, fmt.Errorf("error enabling TLS on running module: %v", err)
@@ -278,7 +278,7 @@ func DisableTLSForIntegration(logTyp string, proto string) error {
278278
mod := GetModule(logTyp)
279279
if mod != nil && mod.IsPortListen(proto) {
280280
mod.DisablePort(proto)
281-
time.Sleep(100 * time.Millisecond)
281+
time.Sleep(200 * time.Millisecond)
282282
err := mod.EnablePort(proto, false)
283283
if err != nil {
284284
return fmt.Errorf("error disabling TLS on running module: %v", err)

agent/modules/modules.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ func StartModules() {
8484
}
8585
if conf[0] {
8686
moCache[index].DisablePort(proto)
87+
if conf[1] {
88+
time.Sleep(200 * time.Millisecond)
89+
}
8790
}
8891
if changeAllowed {
8992
moCache[index].SetNewPort(proto, port)

agent/modules/netflow.go

Lines changed: 216 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@ package modules
33
import (
44
"bytes"
55
"context"
6+
"encoding/binary"
67
"errors"
78
"fmt"
89
"net"
910
"strconv"
11+
"strings"
1012
"sync"
1113
"time"
1214

13-
"github.com/tehmaze/netflow"
15+
"github.com/netsampler/goflow2/decoders/netflow"
16+
"github.com/netsampler/goflow2/decoders/netflowlegacy"
17+
tehmaze "github.com/tehmaze/netflow"
1418
"github.com/tehmaze/netflow/session"
1519
"github.com/utmstack/UTMStack/agent/config"
1620
"github.com/utmstack/UTMStack/agent/logservice"
@@ -23,44 +27,128 @@ var (
2327
netflowOnce sync.Once
2428
)
2529

30+
// templateSystem implements netflow.NetFlowTemplateSystem for goflow2
31+
type templateSystem struct {
32+
templates map[uint16]map[uint32]map[uint16]interface{}
33+
mu sync.RWMutex
34+
}
35+
36+
func newTemplateSystem() *templateSystem {
37+
return &templateSystem{
38+
templates: make(map[uint16]map[uint32]map[uint16]interface{}),
39+
}
40+
}
41+
42+
func (t *templateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
43+
t.mu.RLock()
44+
defer t.mu.RUnlock()
45+
46+
if versionMap, ok := t.templates[version]; ok {
47+
if domainMap, ok := versionMap[obsDomainId]; ok {
48+
if template, ok := domainMap[templateId]; ok {
49+
return template, nil
50+
}
51+
}
52+
}
53+
return nil, fmt.Errorf("template not found: version=%d, obsDomainId=%d, templateId=%d", version, obsDomainId, templateId)
54+
}
55+
56+
func (t *templateSystem) AddTemplate(version uint16, obsDomainId uint32, template interface{}) {
57+
t.mu.Lock()
58+
defer t.mu.Unlock()
59+
60+
if _, ok := t.templates[version]; !ok {
61+
t.templates[version] = make(map[uint32]map[uint16]interface{})
62+
}
63+
if _, ok := t.templates[version][obsDomainId]; !ok {
64+
t.templates[version][obsDomainId] = make(map[uint16]interface{})
65+
}
66+
67+
// Extract template ID based on type
68+
var templateId uint16
69+
switch tmpl := template.(type) {
70+
case netflow.TemplateRecord:
71+
templateId = tmpl.TemplateId
72+
case netflow.IPFIXOptionsTemplateRecord:
73+
templateId = tmpl.TemplateId
74+
case netflow.NFv9OptionsTemplateRecord:
75+
templateId = tmpl.TemplateId
76+
default:
77+
return
78+
}
79+
80+
t.templates[version][obsDomainId][templateId] = template
81+
}
82+
2683
type NetflowModule struct {
27-
DataType string
28-
Parser parser.Parser
29-
Decoders map[string]*netflow.Decoder
30-
Listener *net.UDPConn
31-
CTX context.Context
32-
Cancel context.CancelFunc
33-
IsEnabled bool
84+
DataType string
85+
Parser parser.Parser
86+
LegacyDecoders map[string]*tehmaze.Decoder // For v1, v6, v7 (tehmaze/netflow)
87+
TemplateSystem map[string]*templateSystem // For v5, v9, IPFIX (goflow2)
88+
Listener *net.UDPConn
89+
CTX context.Context
90+
Cancel context.CancelFunc
91+
IsEnabled bool
92+
mu sync.RWMutex
3493
}
3594

3695
func GetNetflowModule() *NetflowModule {
3796
netflowOnce.Do(func() {
3897
netflowModule = &NetflowModule{
39-
Parser: parser.GetParser("netflow"),
40-
DataType: "netflow",
41-
IsEnabled: false,
42-
Decoders: make(map[string]*netflow.Decoder),
98+
Parser: parser.GetParser("netflow"),
99+
DataType: "netflow",
100+
IsEnabled: false,
101+
LegacyDecoders: make(map[string]*tehmaze.Decoder),
102+
TemplateSystem: make(map[string]*templateSystem),
43103
}
44104
})
45105
return netflowModule
46106
}
47107

108+
func (m *NetflowModule) getOrCreateTemplateSystem(addr string) *templateSystem {
109+
m.mu.Lock()
110+
defer m.mu.Unlock()
111+
112+
if ts, ok := m.TemplateSystem[addr]; ok {
113+
return ts
114+
}
115+
ts := newTemplateSystem()
116+
m.TemplateSystem[addr] = ts
117+
return ts
118+
}
119+
120+
func (m *NetflowModule) getOrCreateLegacyDecoder(addr string) *tehmaze.Decoder {
121+
m.mu.Lock()
122+
defer m.mu.Unlock()
123+
124+
if d, ok := m.LegacyDecoders[addr]; ok {
125+
return d
126+
}
127+
s := session.New()
128+
d := tehmaze.NewDecoder(s)
129+
m.LegacyDecoders[addr] = d
130+
return d
131+
}
132+
133+
func (m *NetflowModule) removeLegacyDecoder(addr string) {
134+
m.mu.Lock()
135+
defer m.mu.Unlock()
136+
delete(m.LegacyDecoders, addr)
137+
}
138+
48139
func (m *NetflowModule) EnablePort(proto string, enableTLS bool) error {
49140
if enableTLS {
50141
return fmt.Errorf("TLS not supported for NetFlow protocol")
51142
}
52143

53144
if proto == "udp" && !m.IsEnabled {
54-
utils.Logger.Info("Server %s listening in port: %s protocol: UDP", m.DataType, config.ProtoPorts[config.DataTypeNetflow].UDP)
55-
m.IsEnabled = true
56-
57145
port, err := strconv.Atoi(config.ProtoPorts[config.DataTypeNetflow].UDP)
58146
if err != nil {
59147
utils.Logger.ErrorF("error converting port to int: %v", err)
60148
return err
61149
}
62150

63-
m.Listener, err = net.ListenUDP("udp", &net.UDPAddr{
151+
listener, err := net.ListenUDP("udp", &net.UDPAddr{
64152
Port: port,
65153
IP: net.ParseIP("0.0.0.0"),
66154
})
@@ -69,9 +157,13 @@ func (m *NetflowModule) EnablePort(proto string, enableTLS bool) error {
69157
return err
70158
}
71159

160+
m.IsEnabled = true
161+
m.Listener = listener
72162
m.CTX, m.Cancel = context.WithCancel(context.Background())
73163

74-
buffer := make([]byte, 2048)
164+
utils.Logger.Info("Server %s listening in port: %s protocol: UDP", m.DataType, config.ProtoPorts[config.DataTypeNetflow].UDP)
165+
166+
buffer := make([]byte, 65535)
75167

76168
go func() {
77169
for {
@@ -97,16 +189,54 @@ func (m *NetflowModule) EnablePort(proto string, enableTLS bool) error {
97189
continue
98190
}
99191

100-
d, found := m.Decoders[addr.String()]
101-
if !found {
102-
s := session.New()
103-
d = netflow.NewDecoder(s)
104-
m.Decoders[addr.String()] = d
192+
// Validate packet structure before attempting to decode
193+
packetData := buffer[:length]
194+
packetInfo, validationErr := validateNetflowPacket(packetData)
195+
if validationErr != nil {
196+
utils.Logger.ErrorF("invalid NetFlow packet from %s (length: %d bytes): %v", addr.String(), length, validationErr)
197+
continue
105198
}
106199

107-
message, err := d.Read(bytes.NewBuffer(buffer[:length]))
108-
if err != nil {
109-
utils.Logger.ErrorF("error decoding NetFlow message: %v", err)
200+
var message interface{}
201+
202+
// Use hybrid approach: goflow2 for v5/v9/IPFIX, tehmaze for v1/v6/v7
203+
switch packetInfo.version {
204+
case 5:
205+
// Use goflow2 for NetFlow v5
206+
msg, err := netflowlegacy.DecodeMessage(bytes.NewBuffer(packetData))
207+
if err != nil {
208+
utils.Logger.ErrorF("error decoding %s message from %s: %v", packetInfo.versionName, addr.String(), err)
209+
continue
210+
}
211+
message = msg
212+
213+
case 9, 10:
214+
// Use goflow2 for NetFlow v9 and IPFIX
215+
ts := m.getOrCreateTemplateSystem(addr.String())
216+
msg, err := netflow.DecodeMessage(bytes.NewBuffer(packetData), ts)
217+
if err != nil {
218+
// Template not found is expected when data arrives before template
219+
// This is normal NetFlow v9/IPFIX behavior, don't log as error
220+
if !strings.Contains(err.Error(), "template not found") {
221+
utils.Logger.ErrorF("error decoding %s message from %s: %v", packetInfo.versionName, addr.String(), err)
222+
}
223+
continue
224+
}
225+
message = msg
226+
227+
case 1, 6, 7:
228+
// Use tehmaze/netflow for legacy versions (v1, v6, v7)
229+
d := m.getOrCreateLegacyDecoder(addr.String())
230+
msg, err := d.Read(bytes.NewBuffer(packetData))
231+
if err != nil {
232+
utils.Logger.ErrorF("error decoding %s message from %s: %v", packetInfo.versionName, addr.String(), err)
233+
m.removeLegacyDecoder(addr.String())
234+
continue
235+
}
236+
message = msg
237+
238+
default:
239+
utils.Logger.ErrorF("unsupported NetFlow version %d from %s", packetInfo.version, addr.String())
110240
continue
111241
}
112242

@@ -159,3 +289,64 @@ func (m *NetflowModule) GetPort(proto string) string {
159289
return ""
160290
}
161291
}
292+
293+
// netflowPacketInfo contains basic information about a NetFlow packet for validation
294+
type netflowPacketInfo struct {
295+
version uint16
296+
count uint16
297+
minSize int
298+
versionName string
299+
}
300+
301+
// validateNetflowPacket checks if a NetFlow packet has valid structure before decoding
302+
// Returns packet info if valid, error otherwise
303+
func validateNetflowPacket(data []byte) (*netflowPacketInfo, error) {
304+
if len(data) < 4 {
305+
return nil, fmt.Errorf("packet too small: %d bytes (minimum 4 bytes for version and count)", len(data))
306+
}
307+
308+
version := binary.BigEndian.Uint16(data[0:2])
309+
count := binary.BigEndian.Uint16(data[2:4])
310+
311+
info := &netflowPacketInfo{
312+
version: version,
313+
count: count,
314+
}
315+
316+
switch version {
317+
case 1:
318+
info.versionName = "NetFlow v1"
319+
info.minSize = 24 + int(count)*48 // header (24) + records (48 each)
320+
case 5:
321+
info.versionName = "NetFlow v5"
322+
info.minSize = 24 + int(count)*48 // header (24) + records (48 each)
323+
case 6:
324+
info.versionName = "NetFlow v6"
325+
info.minSize = 24 + int(count)*52 // header (24) + records (52 each)
326+
case 7:
327+
info.versionName = "NetFlow v7"
328+
info.minSize = 24 + int(count)*52 // header (24) + records (52 each)
329+
case 9:
330+
info.versionName = "NetFlow v9"
331+
// NetFlow v9 header is 20 bytes, minimum packet size is just the header
332+
info.minSize = 20
333+
case 10:
334+
info.versionName = "IPFIX"
335+
// IPFIX header is 16 bytes, field at offset 2-4 is the total message length
336+
info.minSize = 16
337+
ipfixLength := binary.BigEndian.Uint16(data[2:4])
338+
if int(ipfixLength) != len(data) {
339+
return nil, fmt.Errorf("IPFIX length mismatch: header says %d bytes, received %d bytes", ipfixLength, len(data))
340+
}
341+
return info, nil
342+
default:
343+
return nil, fmt.Errorf("unsupported NetFlow version: %d", version)
344+
}
345+
346+
if len(data) < info.minSize {
347+
return nil, fmt.Errorf("%s packet too small: received %d bytes, minimum expected %d bytes (count=%d)",
348+
info.versionName, len(data), info.minSize, count)
349+
}
350+
351+
return info, nil
352+
}

0 commit comments

Comments
 (0)