Skip to content

Commit 23efc1c

Browse files
authored
Merge pull request #107 from appbaseio/feat/date_filter_logs
feat: add date range params in logs endpoint
2 parents e687786 + 622358f commit 23efc1c

5 files changed

Lines changed: 137 additions & 41 deletions

File tree

plugins/logs/dao.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"regexp"
88
"sort"
9-
"strconv"
109
"strings"
1110

1211
log "github.com/sirupsen/logrus"
@@ -95,20 +94,21 @@ func (es *elasticsearch) indexRecord(ctx context.Context, rec record) {
9594
}
9695
}
9796

98-
func (es *elasticsearch) getRawLogs(ctx context.Context, from, size, filter string, indices ...string) ([]byte, error) {
99-
offset, err := strconv.Atoi(from)
100-
if err != nil {
101-
return nil, fmt.Errorf(`invalid value "%v" for query param "from"`, from)
102-
}
103-
s, err := strconv.Atoi(size)
104-
if err != nil {
105-
return nil, fmt.Errorf(`invalid value "%v" for query param "size"`, size)
106-
}
97+
type logsFilter struct {
98+
Offset int
99+
StartDate string
100+
EndDate string
101+
Size int
102+
Filter string
103+
Indices []string
104+
}
105+
106+
func (es *elasticsearch) getRawLogs(ctx context.Context, logsFilter logsFilter) ([]byte, error) {
107107
switch util.GetVersion() {
108108
case 6:
109-
return es.getRawLogsES6(ctx, from, s, filter, offset, indices...)
109+
return es.getRawLogsES6(ctx, logsFilter)
110110
default:
111-
return es.getRawLogsES7(ctx, from, s, filter, offset, indices...)
111+
return es.getRawLogsES7(ctx, logsFilter)
112112
}
113113
}
114114

plugins/logs/dao_es6.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,35 @@ import (
1010
es6 "gopkg.in/olivere/elastic.v6"
1111
)
1212

13-
func (es *elasticsearch) getRawLogsES6(ctx context.Context, from string, size int, filter string, offset int, indices ...string) ([]byte, error) {
14-
query := es6.NewBoolQuery()
13+
func (es *elasticsearch) getRawLogsES6(ctx context.Context, logsFilter logsFilter) ([]byte, error) {
14+
duration := es6.NewRangeQuery("timestamp").
15+
From(logsFilter.StartDate).
16+
To(logsFilter.EndDate)
17+
18+
query := es6.NewBoolQuery().Filter(duration)
1519
// apply category filter
16-
if filter == "search" {
20+
if logsFilter.Filter == "search" {
1721
filters := es6.NewTermQuery("category.keyword", "search")
1822
query.Filter(filters)
19-
} else if filter == "delete" {
23+
} else if logsFilter.Filter == "delete" {
2024
filters := es6.NewMatchQuery("request.method.keyword", "DELETE")
2125
query.Filter(filters)
22-
} else if filter == "success" {
26+
} else if logsFilter.Filter == "success" {
2327
filters := es6.NewRangeQuery("response.code").Gte(200).Lte(299)
2428
query.Filter(filters)
25-
} else if filter == "error" {
29+
} else if logsFilter.Filter == "error" {
2630
filters := es6.NewRangeQuery("response.code").Gte(400)
2731
query.Filter(filters)
2832
} else {
2933
query.Filter(es6.NewMatchAllQuery())
3034
}
3135
// apply index filtering logic
32-
util.GetIndexFilterQueryEs6(query, indices...)
36+
util.GetIndexFilterQueryEs6(query, logsFilter.Filter)
3337

3438
response, err := util.GetClient6().Search(es.indexName).
3539
Query(query).
36-
From(offset).
37-
Size(size).
40+
From(logsFilter.Offset).
41+
Size(logsFilter.Size).
3842
SortWithInfo(es6.SortInfo{Field: "timestamp", UnmappedType: "date", Ascending: false}).
3943
Do(ctx)
4044
if err != nil {
@@ -50,17 +54,17 @@ func (es *elasticsearch) getRawLogsES6(ctx context.Context, from string, size in
5054
}
5155
rawIndices, ok := source["indices"]
5256
if !ok {
53-
log.Println(logTag, ": unable to find ", indices, " in log record")
57+
log.Println(logTag, ": unable to find ", logsFilter.Indices, " in log record")
5458
}
5559
logIndices, err := util.ToStringSlice(rawIndices)
5660
if err != nil {
5761
log.Errorln(logTag, ":", err)
5862
continue
5963
}
6064

61-
if len(indices) == 0 {
65+
if len(logsFilter.Indices) == 0 {
6266
hits = append(hits, hit.Source)
63-
} else if util.IsSubset(indices, logIndices) {
67+
} else if util.IsSubset(logsFilter.Indices, logIndices) {
6468
hits = append(hits, hit.Source)
6569
}
6670
}

plugins/logs/dao_es7.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,32 +8,36 @@ import (
88
es7 "github.com/olivere/elastic/v7"
99
)
1010

11-
func (es *elasticsearch) getRawLogsES7(ctx context.Context, from string, size int, filter string, offset int, indices ...string) ([]byte, error) {
12-
query := es7.NewBoolQuery()
11+
func (es *elasticsearch) getRawLogsES7(ctx context.Context, logsFilter logsFilter) ([]byte, error) {
12+
duration := es7.NewRangeQuery("timestamp").
13+
From(logsFilter.StartDate).
14+
To(logsFilter.EndDate)
15+
16+
query := es7.NewBoolQuery().Filter(duration)
1317
// apply category filter
14-
if filter == "search" {
18+
if logsFilter.Filter == "search" {
1519
filters := es7.NewTermQuery("category.keyword", "search")
1620
query.Filter(filters)
17-
} else if filter == "delete" {
21+
} else if logsFilter.Filter == "delete" {
1822
filters := es7.NewMatchQuery("request.method.keyword", "DELETE")
1923
query.Filter(filters)
20-
} else if filter == "success" {
24+
} else if logsFilter.Filter == "success" {
2125
filters := es7.NewRangeQuery("response.code").Gte(200).Lte(299)
2226
query.Filter(filters)
23-
} else if filter == "error" {
27+
} else if logsFilter.Filter == "error" {
2428
filters := es7.NewRangeQuery("response.code").Gte(400)
2529
query.Filter(filters)
2630
} else {
2731
query.Filter(es7.NewMatchAllQuery())
2832
}
2933

3034
// apply index filtering logic
31-
util.GetIndexFilterQueryEs7(query, indices...)
35+
util.GetIndexFilterQueryEs7(query, logsFilter.Indices...)
3236

3337
response, err := util.GetClient7().Search(es.indexName).
3438
Query(query).
35-
From(offset).
36-
Size(size).
39+
From(logsFilter.Offset).
40+
Size(logsFilter.Size).
3741
SortWithInfo(es7.SortInfo{Field: "timestamp", UnmappedType: "date", Ascending: false}).
3842
Do(ctx)
3943
if err != nil {

plugins/logs/handlers.go

Lines changed: 95 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,117 @@
11
package logs
22

33
import (
4+
"fmt"
45
"net/http"
6+
"net/url"
7+
"strconv"
8+
"time"
59

610
log "github.com/sirupsen/logrus"
711

812
"github.com/appbaseio/arc/util"
913
)
1014

15+
const (
16+
defaultResponseSize = 100
17+
defaultTimeFormat = "2006/01/02"
18+
)
19+
20+
// NormalizedQueryParams represents the normalized query parameters
21+
type NormalizedQueryParams struct {
22+
StartDate string
23+
EndDate string
24+
Size int
25+
}
26+
27+
// previousMonthRange returns one month's duration starting from the current instant 30 days ago.
28+
func previousMonthRange() (from, to string) {
29+
now := time.Now()
30+
from = now.AddDate(0, 0, -30).Format(time.RFC3339)
31+
to = now.Format(time.RFC3339)
32+
return
33+
}
34+
35+
// rangeQueryParams returns the common query params that every analytics endpoint expects,
36+
// - "start_date": start of the duration in consideration
37+
// - "end_date" : end of the duration in consideration
38+
// - "size": no. of response entries
39+
func rangeQueryParams(values url.Values) NormalizedQueryParams {
40+
from, to := previousMonthRange()
41+
size := 100
42+
43+
value := values.Get("start_date")
44+
if value != "" {
45+
t, err := time.Parse(defaultTimeFormat, value)
46+
if err != nil {
47+
log.Errorln(logTag, `: unsupported "start_date" value provided, defaulting to previous month:`, err)
48+
} else {
49+
from = t.Format(time.RFC3339)
50+
}
51+
}
52+
53+
value = values.Get("end_date")
54+
if value != "" {
55+
t, err := time.Parse(defaultTimeFormat, value)
56+
if err != nil {
57+
log.Errorln(logTag, `: unsupported "end_date" value provided, defaulting to current time:`, err)
58+
} else {
59+
// Use end of the day for to range
60+
year, month, day := t.Date()
61+
to = time.Date(year, month, day, 23, 59, 59, 0, t.Location()).Format(time.RFC3339)
62+
}
63+
}
64+
65+
respSize := values.Get("size")
66+
if respSize != "" {
67+
value, err := strconv.Atoi(respSize)
68+
if err != nil {
69+
value = defaultResponseSize
70+
log.Errorln(logTag, `: invalid "size" value provided, defaulting to 100:`, err)
71+
}
72+
if value > 1000 {
73+
value = defaultResponseSize
74+
log.Println(logTag, `: "size" limit exceeded (> 1000), default to 100`)
75+
}
76+
size = value
77+
}
78+
79+
return NormalizedQueryParams{
80+
StartDate: from,
81+
EndDate: to,
82+
Size: size,
83+
}
84+
}
85+
1186
func (l *Logs) getLogs() http.HandlerFunc {
1287
return func(w http.ResponseWriter, req *http.Request) {
1388
indices := util.IndicesFromRequest(req)
1489

15-
from := req.URL.Query().Get("from")
16-
if from == "" {
17-
from = "0"
90+
offset := req.URL.Query().Get("from")
91+
if offset == "" {
92+
offset = "0"
1893
}
19-
size := req.URL.Query().Get("size")
20-
if size == "" {
21-
size = "100"
94+
95+
parsedOffset, err := strconv.Atoi(offset)
96+
if err != nil {
97+
errMsg := fmt.Errorf(`invalid value "%v" for query param "from"`, offset)
98+
log.Errorln(logTag, ": ", errMsg)
99+
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
100+
return
22101
}
23102

103+
rangeParams := rangeQueryParams(req.URL.Query())
104+
24105
filter := req.URL.Query().Get("filter")
25106

26-
raw, err := l.es.getRawLogs(req.Context(), from, size, filter, indices...)
107+
raw, err := l.es.getRawLogs(req.Context(), logsFilter{
108+
Offset: parsedOffset,
109+
StartDate: rangeParams.StartDate,
110+
EndDate: rangeParams.EndDate,
111+
Size: rangeParams.Size,
112+
Filter: filter,
113+
Indices: indices,
114+
})
27115
if err != nil {
28116
log.Errorln(logTag, ": error fetching logs :", err)
29117
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)

plugins/logs/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package logs
33
import "context"
44

55
type logsService interface {
6-
getRawLogs(ctx context.Context, from, size, filter string, indices ...string) ([]byte, error)
6+
getRawLogs(ctx context.Context, logsFilter logsFilter) ([]byte, error)
77
indexRecord(ctx context.Context, r record)
88
rolloverIndexJob(alias string)
99
}

0 commit comments

Comments
 (0)