Skip to content

Commit 861fa60

Browse files
committed
Migrate from correlation service to direct Logstash connection in office365 integration.
1 parent ca8ab95 commit 861fa60

3 files changed

Lines changed: 59 additions & 14 deletions

File tree

office365/configuration/const.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ const (
1414
endPointStartSubscription = "/activity/feed/subscriptions/start"
1515
endPointContent = "/activity/feed/subscriptions/content"
1616
BASEURL = "https://manage.office.com/api/v1.0/"
17-
CORRELATIONURL = "http://correlation:8080/v1/newlog"
17+
LogstashEndpoint = "http://%s:%s"
18+
UTMLogSeparator = "<utm-log-separator>"
1819
)
1920

2021
func GetInternalKey() string {
@@ -36,3 +37,11 @@ func GetStartSubscriptionLink(tenant string) string {
3637
func GetContentLink(tenant string) string {
3738
return fmt.Sprintf("%s%s%s", BASEURL, tenant, endPointContent)
3839
}
40+
41+
func GetLogstashHost() string {
42+
return utils.Getenv("UTM_LOGSTASH_HOST")
43+
}
44+
45+
func GetLogstashPort() string {
46+
return utils.Getenv("UTM_LOGSTASH_PORT")
47+
}

office365/processor/processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ func (o *OfficeProcessor) GetLogs(startTime string, endTime string, group types.
146146
if len(details) > 0 {
147147
logsCounter += len(details)
148148
cleanLogs := ETLProcess(details, group)
149-
err = SendToCorrelation(cleanLogs)
149+
err := SendToLogstash(cleanLogs)
150150
if err != nil {
151-
utils.Logger.ErrorF("error sending logs to correlation: %v", err) // Debug
151+
utils.Logger.ErrorF("error sending logs to logstash: %v", err) // Debug
152152
continue
153153
}
154154
}

office365/processor/sendData.go

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,71 @@
11
package processor
22

33
import (
4+
"bytes"
5+
"crypto/tls"
46
"encoding/json"
7+
"fmt"
58
"net/http"
9+
"strings"
10+
"time"
611

12+
"github.com/threatwinds/logger"
713
"github.com/utmstack/UTMStack/office365/configuration"
814
"github.com/utmstack/UTMStack/office365/utils"
915
)
1016

11-
func SendToCorrelation(data []TransformedLog) error {
12-
utils.Logger.Info("uploading %d logs...", len(data))
17+
var transport = &http.Transport{
18+
MaxIdleConns: 100,
19+
IdleConnTimeout: 2 * time.Second,
20+
ResponseHeaderTimeout: 2 * time.Second,
21+
ForceAttemptHTTP2: true,
22+
TLSClientConfig: &tls.Config{
23+
InsecureSkipVerify: true,
24+
},
25+
}
26+
27+
var client = &http.Client{Transport: transport, Timeout: 2 * time.Second}
1328

29+
func SendToLogstash(data []TransformedLog) *logger.Error {
30+
var logStrings []string
1431
for _, log := range data {
1532
body, err := json.Marshal(log)
1633
if err != nil {
1734
utils.Logger.ErrorF("error encoding log to JSON: %v", err)
1835
continue
1936
}
37+
logStrings = append(logStrings, string(body))
38+
}
2039

21-
_, status, e := utils.DoReq[map[string]interface{}](configuration.CORRELATIONURL, body, http.MethodPost, map[string]string{})
22-
if e != nil {
23-
utils.Logger.ErrorF("error sending log to correlation engine: %v", e)
24-
continue
25-
} else if status != http.StatusOK && status != http.StatusCreated {
26-
utils.Logger.ErrorF("error sending log to correlation engine: status code %d", status)
27-
continue
40+
if len(logStrings) == 0 {
41+
return nil
42+
}
43+
44+
var logs string
45+
for _, str := range logStrings {
46+
logs += str + configuration.UTMLogSeparator
47+
}
48+
49+
url := fmt.Sprintf(configuration.LogstashEndpoint, configuration.GetLogstashHost(), configuration.GetLogstashPort())
50+
51+
req, err := http.NewRequest("POST", url, bytes.NewBufferString(logs))
52+
if err != nil {
53+
return utils.Logger.ErrorF("error creating request: %v", err.Error())
54+
}
55+
56+
resp, err := client.Do(req)
57+
if err != nil {
58+
if !strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") {
59+
utils.Logger.ErrorF("error sending logs with error: %v", err.Error())
2860
}
61+
return utils.Logger.ErrorF("error sending logs: %v", err.Error())
62+
}
63+
defer resp.Body.Close()
2964

30-
utils.Logger.Info("log successfully sent to correlation engine")
65+
if resp.StatusCode != http.StatusOK {
66+
return utils.Logger.ErrorF("error sending logs with http code %d", resp.StatusCode)
3167
}
3268

33-
utils.Logger.Info("all logs were sent to correlation")
69+
utils.Logger.Info("successfully sent %d logs to Logstash", len(logStrings))
3470
return nil
3571
}

0 commit comments

Comments
 (0)