Skip to content

Commit 29e20c4

Browse files
committed
"Replace geolocation update logic with efficient data loading and improved error handling."
Signed-off-by: Osmany Montero <osmontero@icloud.com>
1 parent 9628e84 commit 29e20c4

3 files changed

Lines changed: 165 additions & 92 deletions

File tree

plugins/geolocation/.vscode/launch.json

Lines changed: 0 additions & 16 deletions
This file was deleted.
Lines changed: 78 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -7,78 +7,94 @@ import (
77
"time"
88

99
"github.com/threatwinds/go-sdk/catcher"
10-
"github.com/threatwinds/go-sdk/plugins"
1110
"github.com/threatwinds/go-sdk/utils"
1211
)
1312

14-
func update() {
15-
first := true
16-
for {
17-
workdir, err := utils.MkdirJoin(plugins.WorkDir, "geolocation")
18-
if err != nil {
19-
_ = catcher.Error("could not create geolocation directory", err, nil)
20-
continue
21-
}
22-
var files = map[string]string{
23-
workdir.FileJoin("asn-blocks-v4.csv"): "https://cdn.utmstack.com/geoip/asn-blocks-v4.csv",
24-
workdir.FileJoin("asn-blocks-v6.csv"): "https://cdn.utmstack.com/geoip/asn-blocks-v6.csv",
25-
workdir.FileJoin("blocks-v4.csv"): "https://cdn.utmstack.com/geoip/blocks-v4.csv",
26-
workdir.FileJoin("blocks-v6.csv"): "https://cdn.utmstack.com/geoip/blocks-v6.csv",
27-
workdir.FileJoin("locations-en.csv"): "https://cdn.utmstack.com/geoip/locations-en.csv",
28-
}
29-
30-
mode := os.Getenv("MODE")
31-
if mode == "manager" {
32-
if _, err := os.Stat(workdir.FileJoin("locations-en.csv")); os.IsNotExist(err) || !first {
33-
for file, url := range files {
34-
if err := utils.Download(url, file); err != nil {
35-
_ = catcher.Error("could not download geolocation file", err, nil)
36-
continue
37-
}
38-
}
39-
}
40-
} else {
41-
time.Sleep(5 * time.Minute)
42-
}
13+
// loadGeolocationData loads the geolocation files and populates the maps
14+
func loadGeolocationData() {
15+
// Get the geolocation directory from environment variable or use default
16+
workdir := os.Getenv("GEOLOCATION_DIR")
17+
if workdir == "" {
18+
workdir = "/workdir/geolocation"
19+
}
20+
21+
// Define the geolocation files to be loaded
22+
var files = []string{
23+
"asn-blocks-v4.csv",
24+
"asn-blocks-v6.csv",
25+
"blocks-v4.csv",
26+
"blocks-v6.csv",
27+
"locations-en.csv",
28+
}
4329

30+
// Use deferring to ensure the mutex is always unlocked, even if there's an error
31+
func() {
4432
mu.Lock()
33+
defer mu.Unlock()
34+
35+
// Create new maps to avoid partial updates if there's an error
36+
newAsnBlocks := make(map[string][]*asnBlock)
37+
newCityBlocks := make(map[string][]*cityBlock)
38+
newCityLocations := make(map[int64]*cityLocation)
39+
40+
// Process each file with retry logic
41+
maxRetries := 3
42+
for _, filename := range files {
43+
filePath := workdir + "/" + filename
44+
var csv [][]string
45+
var err error
46+
47+
// Retry logic for reading the file
48+
for retry := 0; retry < maxRetries; retry++ {
49+
csv, err = utils.ReadCSV(filePath)
50+
if err == nil {
51+
break
52+
}
4553

46-
asnBlocks = make(map[string][]*asnBlock)
47-
cityBlocks = make(map[string][]*cityBlock)
48-
cityLocations = make(map[int64]*cityLocation)
54+
_ = catcher.Error("could not read geolocation file, retrying", err, map[string]any{
55+
"file": filePath,
56+
"retry": retry + 1,
57+
"maxRetries": maxRetries,
58+
})
59+
60+
if retry < maxRetries-1 {
61+
// Exponential backoff
62+
time.Sleep(time.Duration(1<<uint(retry)) * time.Second)
63+
}
64+
}
4965

50-
for file := range files {
51-
csv, err := utils.ReadCSV(file)
5266
if err != nil {
53-
_ = catcher.Error("could not read geolocation file", err, nil)
67+
_ = catcher.Error("all retries failed when reading geolocation file", err, map[string]any{
68+
"file": filePath,
69+
})
5470
continue
5571
}
5672

57-
switch file {
58-
case workdir.FileJoin("asn-blocks-v4.csv"):
59-
populateASNBlocks(csv)
60-
case workdir.FileJoin("asn-blocks-v6.csv"):
61-
populateASNBlocks(csv)
62-
case workdir.FileJoin("blocks-v4.csv"):
63-
populateCityBlocks(csv)
64-
case workdir.FileJoin("blocks-v6.csv"):
65-
populateCityBlocks(csv)
66-
case workdir.FileJoin("locations-en.csv"):
67-
populateCityLocations(csv)
73+
switch filename {
74+
case "asn-blocks-v4.csv":
75+
populateASNBlocksMap(csv, newAsnBlocks)
76+
case "asn-blocks-v6.csv":
77+
populateASNBlocksMap(csv, newAsnBlocks)
78+
case "blocks-v4.csv":
79+
populateCityBlocksMap(csv, newCityBlocks)
80+
case "blocks-v6.csv":
81+
populateCityBlocksMap(csv, newCityBlocks)
82+
case "locations-en.csv":
83+
populateCityLocationsMap(csv, newCityLocations)
6884
}
6985
}
7086

71-
mu.Unlock()
72-
73-
if first {
74-
first = false
87+
// Only update the global maps if all files were processed successfully
88+
if len(newAsnBlocks) > 0 && len(newCityBlocks) > 0 && len(newCityLocations) > 0 {
89+
asnBlocks = newAsnBlocks
90+
cityBlocks = newCityBlocks
91+
cityLocations = newCityLocations
7592
}
76-
77-
time.Sleep(168 * time.Hour)
78-
}
93+
}()
7994
}
8095

81-
func populateASNBlocks(csv [][]string) {
96+
// populateASNBlocksMap populates the provided ASN blocks map with data from the CSV
97+
func populateASNBlocksMap(csv [][]string, blocks map[string][]*asnBlock) {
8298
for key, line := range csv {
8399
if key == 0 {
84100
continue
@@ -118,11 +134,12 @@ func populateASNBlocks(csv [][]string) {
118134

119135
start := getStart(n.IP.String())
120136

121-
asnBlocks[start] = append(asnBlocks[start], t)
137+
blocks[start] = append(blocks[start], t)
122138
}
123139
}
124140

125-
func populateCityBlocks(csv [][]string) {
141+
// populateCityBlocksMap populates the provided city blocks map with data from the CSV
142+
func populateCityBlocksMap(csv [][]string, blocks map[string][]*cityBlock) {
126143
for key, line := range csv {
127144
if key == 0 {
128145
continue
@@ -198,11 +215,12 @@ func populateCityBlocks(csv [][]string) {
198215

199216
start := getStart(n.IP.String())
200217

201-
cityBlocks[start] = append(cityBlocks[start], t)
218+
blocks[start] = append(blocks[start], t)
202219
}
203220
}
204221

205-
func populateCityLocations(csv [][]string) {
222+
// populateCityLocationsMap populates the provided city locations map with data from the CSV
223+
func populateCityLocationsMap(csv [][]string, locations map[int64]*cityLocation) {
206224
for key, line := range csv {
207225
if key == 0 {
208226
continue
@@ -223,6 +241,6 @@ func populateCityLocations(csv [][]string) {
223241
cityName: line[10],
224242
}
225243

226-
cityLocations[geonameID] = t
244+
locations[geonameID] = t
227245
}
228246
}

plugins/geolocation/main.go

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"net"
66
"os"
7+
"time"
78

89
"github.com/threatwinds/go-sdk/catcher"
910
"github.com/threatwinds/go-sdk/plugins"
@@ -19,34 +20,104 @@ type parsingServer struct {
1920
}
2021

2122
func main() {
22-
filePath, err := utils.MkdirJoin(plugins.WorkDir, "sockets")
23-
if err != nil {
24-
_ = catcher.Error("cannot create directory", err, nil)
25-
os.Exit(1)
23+
// Retry logic for creating socket directory
24+
maxRetries := 3
25+
retryDelay := 2 * time.Second
26+
27+
var filePath utils.Folder
28+
var err error
29+
30+
for retry := 0; retry < maxRetries; retry++ {
31+
filePath, err = utils.MkdirJoin(plugins.WorkDir, "sockets")
32+
if err == nil {
33+
break
34+
}
35+
36+
_ = catcher.Error("cannot create directory, retrying", err, map[string]any{
37+
"retry": retry + 1,
38+
"maxRetries": maxRetries,
39+
})
40+
41+
if retry < maxRetries-1 {
42+
time.Sleep(retryDelay)
43+
// Increase delay for next retry
44+
retryDelay *= 2
45+
} else {
46+
// If all retries failed, log the error and return
47+
_ = catcher.Error("all retries failed when creating directory", err, nil)
48+
return
49+
}
2650
}
51+
2752
socketPath := filePath.FileJoin("com.utmstack.geolocation_parsing.sock")
2853
_ = os.Remove(socketPath)
2954

30-
unixAddress, err := net.ResolveUnixAddr("unix", socketPath)
31-
if err != nil {
32-
_ = catcher.Error("cannot resolve unix address", err, nil)
33-
os.Exit(1)
55+
// Retry logic for resolving unix address
56+
retryDelay = 2 * time.Second
57+
var unixAddress *net.UnixAddr
58+
59+
for retry := 0; retry < maxRetries; retry++ {
60+
unixAddress, err = net.ResolveUnixAddr("unix", socketPath)
61+
if err == nil {
62+
break
63+
}
64+
65+
_ = catcher.Error("cannot resolve unix address, retrying", err, map[string]any{
66+
"retry": retry + 1,
67+
"maxRetries": maxRetries,
68+
})
69+
70+
if retry < maxRetries-1 {
71+
time.Sleep(retryDelay)
72+
// Increase delay for next retry
73+
retryDelay *= 2
74+
} else {
75+
// If all retries failed, log the error and return
76+
_ = catcher.Error("all retries failed when resolving unix address", err, nil)
77+
return
78+
}
3479
}
3580

36-
listener, err := net.ListenUnix("unix", unixAddress)
37-
if err != nil {
38-
_ = catcher.Error("cannot listen to unix socket", err, nil)
39-
os.Exit(1)
81+
// Retry logic for listening to unix socket
82+
retryDelay = 2 * time.Second
83+
var listener *net.UnixListener
84+
85+
for retry := 0; retry < maxRetries; retry++ {
86+
listener, err = net.ListenUnix("unix", unixAddress)
87+
if err == nil {
88+
break
89+
}
90+
91+
_ = catcher.Error("cannot listen to unix socket, retrying", err, map[string]any{
92+
"retry": retry + 1,
93+
"maxRetries": maxRetries,
94+
})
95+
96+
if retry < maxRetries-1 {
97+
time.Sleep(retryDelay)
98+
// Increase delay for next retry
99+
retryDelay *= 2
100+
} else {
101+
// If all retries failed, log the error and return
102+
_ = catcher.Error("all retries failed when listening to unix socket", err, nil)
103+
return
104+
}
40105
}
41106

42107
grpcServer := grpc.NewServer()
43108
plugins.RegisterParsingServer(grpcServer, &parsingServer{})
44109

45-
go update()
110+
go loadGeolocationData()
111+
112+
// Serve with error handling and retry logic
113+
for {
114+
err := grpcServer.Serve(listener)
115+
if err == nil {
116+
break
117+
}
46118

47-
if err := grpcServer.Serve(listener); err != nil {
48-
_ = catcher.Error("cannot serve grpc", err, nil)
49-
os.Exit(1)
119+
_ = catcher.Error("cannot serve grpc, retrying", err, nil)
120+
time.Sleep(5 * time.Second)
50121
}
51122
}
52123

0 commit comments

Comments
 (0)