Skip to content

Commit ca8ab95

Browse files
committed
Migrate from correlation service to direct Logstash connection in aws integration.
1 parent 5a290f2 commit ca8ab95

3 files changed

Lines changed: 58 additions & 10 deletions

File tree

aws/configuration/const.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package configuration
33
import "github.com/utmstack/UTMStack/aws/utils"
44

55
const (
6-
CORRELATIONURL = "http://correlation:8080/v1/newlog"
76
URL_CHECK_CONNECTION = "https://sts.amazonaws.com"
7+
LogstashEndpoint = "http://%s:%s"
8+
UTMLogSeparator = "<utm-log-separator>"
89
)
910

1011
func GetInternalKey() string {
@@ -14,3 +15,11 @@ func GetInternalKey() string {
1415
func GetPanelServiceName() string {
1516
return utils.Getenv("PANEL_SERV_NAME")
1617
}
18+
19+
func GetLogstashHost() string {
20+
return utils.Getenv("UTM_LOGSTASH_HOST")
21+
}
22+
23+
func GetLogstashPort() string {
24+
return utils.Getenv("UTM_LOGSTASH_PORT")
25+
}

aws/processor/pull.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func PullLogs(startTime time.Time, endTime time.Time, group types.ModuleGroup) *
1818
return err
1919
}
2020

21-
err = SendToCorrelation(logs)
21+
err = SendToLogstash(logs)
2222
if err != nil {
2323
return err
2424
}

aws/processor/sendData.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,71 @@
11
package processor
22

33
import (
4+
"bytes"
5+
"crypto/tls"
46
"encoding/json"
7+
"fmt"
58
"net/http"
9+
"strings"
10+
"time"
611

712
"github.com/threatwinds/logger"
813
"github.com/utmstack/UTMStack/aws/configuration"
914
"github.com/utmstack/UTMStack/aws/utils"
1015
)
1116

12-
func SendToCorrelation(data []TransformedLog) *logger.Error {
17+
var transport = &http.Transport{
18+
MaxIdleConns: 100,
19+
IdleConnTimeout: 2 * time.Second,
20+
ResponseHeaderTimeout: 2 * time.Second,
21+
ForceAttemptHTTP2: true,
22+
TLSClientConfig: &tls.Config{
23+
InsecureSkipVerify: true,
24+
},
25+
}
26+
27+
var client = &http.Client{Transport: transport, Timeout: 2 * time.Second}
28+
29+
func SendToLogstash(data []TransformedLog) *logger.Error {
30+
var logStrings []string
1331
for _, log := range data {
1432
body, err := json.Marshal(log)
1533
if err != nil {
1634
utils.Logger.ErrorF("error encoding log to JSON: %v", err)
1735
continue
1836
}
37+
logStrings = append(logStrings, string(body))
38+
}
1939

20-
_, status, e := utils.DoReq[map[string]interface{}](configuration.CORRELATIONURL, body, http.MethodPost, map[string]string{})
21-
if e != nil {
22-
utils.Logger.ErrorF("error sending log to correlation engine: %v", e)
23-
continue
24-
} else if status != http.StatusOK && status != http.StatusCreated {
25-
utils.Logger.ErrorF("error sending log to correlation engine: status %v", status)
26-
continue
40+
if len(logStrings) == 0 {
41+
return nil
42+
}
43+
44+
var logs string
45+
for _, str := range logStrings {
46+
logs += str + configuration.UTMLogSeparator
47+
}
48+
49+
url := fmt.Sprintf(configuration.LogstashEndpoint, configuration.GetLogstashHost(), configuration.GetLogstashPort())
50+
51+
req, err := http.NewRequest("POST", url, bytes.NewBufferString(logs))
52+
if err != nil {
53+
return utils.Logger.ErrorF("error creating request: %v", err.Error())
54+
}
55+
56+
resp, err := client.Do(req)
57+
if err != nil {
58+
if !strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") {
59+
utils.Logger.ErrorF("error sending logs with error: %v", err.Error())
2760
}
61+
return utils.Logger.ErrorF("error sending logs: %v", err.Error())
62+
}
63+
defer resp.Body.Close()
2864

65+
if resp.StatusCode != http.StatusOK {
66+
return utils.Logger.ErrorF("error sending logs with http code %d", resp.StatusCode)
2967
}
3068

69+
utils.Logger.Info("successfully sent %d logs to Logstash", len(logStrings))
3170
return nil
3271
}

0 commit comments

Comments
 (0)