Skip to content

Commit 7080b75

Browse files
authored
feat: replace index name with alias name (#83)
1 parent 4b0fe7a commit 7080b75

5 files changed

Lines changed: 154 additions & 70 deletions

File tree

middleware/classify/util.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package classify
22

3-
// IndexAliasCache cache to store index alias map
3+
// IndexAliasCache cache to store index -> alias map
44
var IndexAliasCache = make(map[string]string)
55

6+
// AliasIndexCache cache to store alias -> index map
7+
var AliasIndexCache = make(map[string]string)
8+
69
// GetIndexAliasCache get whole cache
710
func GetIndexAliasCache() map[string]string {
811
return IndexAliasCache
@@ -22,3 +25,27 @@ func GetIndexAlias(index string) string {
2225
func SetIndexAlias(index, alias string) {
2326
IndexAliasCache[index] = alias
2427
}
28+
29+
// GetAliasIndex get index for specific alias
30+
func GetAliasIndex(alias string) string {
31+
index, ok := AliasIndexCache[alias]
32+
if !ok {
33+
return ""
34+
}
35+
return index
36+
}
37+
38+
// SetAliasIndex set index for specific alias
39+
func SetAliasIndex(alias, index string) {
40+
AliasIndexCache[alias] = index
41+
}
42+
43+
// SetAliasIndexCache set the whole cache
44+
func SetAliasIndexCache(data map[string]string) {
45+
AliasIndexCache = data
46+
}
47+
48+
// GetAliasIndexCache get the whole cache
49+
func GetAliasIndexCache() map[string]string {
50+
return AliasIndexCache
51+
}

plugins/elasticsearch/middleware.go

Lines changed: 97 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io/ioutil"
88
"net/http"
9+
"net/http/httptest"
910
"strings"
1011

1112
log "github.com/sirupsen/logrus"
@@ -17,6 +18,7 @@ import (
1718
"github.com/appbaseio/arc/middleware/validate"
1819
"github.com/appbaseio/arc/model/acl"
1920
"github.com/appbaseio/arc/model/category"
21+
"github.com/appbaseio/arc/model/index"
2022
"github.com/appbaseio/arc/model/op"
2123
"github.com/appbaseio/arc/model/permission"
2224
"github.com/appbaseio/arc/plugins/auth"
@@ -49,7 +51,7 @@ func list() []middleware.Middleware {
4951
validate.ACL(),
5052
validate.Operation(),
5153
validate.PermissionExpiry(),
52-
transformRequest,
54+
intercept,
5355
}
5456
}
5557

@@ -123,7 +125,7 @@ func classifyOp(h http.HandlerFunc) http.HandlerFunc {
123125
}
124126
}
125127

126-
func transformRequest(h http.HandlerFunc) http.HandlerFunc {
128+
func intercept(h http.HandlerFunc) http.HandlerFunc {
127129
return func(w http.ResponseWriter, req *http.Request) {
128130
ctx := req.Context()
129131
reqACL, err := acl.FromContext(ctx)
@@ -138,75 +140,104 @@ func transformRequest(h http.HandlerFunc) http.HandlerFunc {
138140
reqPermission, err := permission.FromContext(ctx)
139141
if err != nil {
140142
log.Errorln(logTag, ":", err)
141-
h(w, req)
142-
return
143-
}
144-
sources := make(map[string]interface{})
145-
var Includes, Excludes []string
146-
Includes = reqPermission.Includes
147-
Excludes = reqPermission.Excludes
148-
if len(Includes) > 0 {
149-
sources["includes"] = Includes
150-
}
151-
if len(Excludes) > 0 {
152-
sources["excludes"] = Excludes
153-
}
154-
_, isExcludesPresent := sources["excludes"]
155-
isEmpty := len(Includes) == 0 && len(Excludes) == 0
156-
isDefaultInclude := len(Includes) > 0 && Includes[0] == "*"
157-
shouldApplyFilters := !isEmpty && (!isDefaultInclude || isExcludesPresent)
158-
if shouldApplyFilters {
159-
if isMsearch {
160-
// Handle the _msearch requests
161-
body, err := ioutil.ReadAll(req.Body)
162-
if err != nil {
163-
log.Errorln(logTag, ":", err)
164-
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
165-
return
166-
}
167-
var reqBodyString = string(body)
168-
splitReq := strings.Split(reqBodyString, "\n")
169-
var modifiedBodyString string
170-
for index, element := range splitReq {
171-
if index%2 == 1 { // even lines
172-
var reqBody = make(map[string]interface{})
173-
err := json.Unmarshal([]byte(element), &reqBody)
174-
if err != nil {
175-
log.Errorln(logTag, ":", err)
176-
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
177-
return
178-
}
179-
reqBody["_source"] = sources
180-
raw, err := json.Marshal(reqBody)
181-
if err != nil {
182-
log.Errorln(logTag, ":", err)
183-
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
184-
return
143+
} else {
144+
sources := make(map[string]interface{})
145+
var Includes, Excludes []string
146+
Includes = reqPermission.Includes
147+
Excludes = reqPermission.Excludes
148+
if len(Includes) > 0 {
149+
sources["includes"] = Includes
150+
}
151+
if len(Excludes) > 0 {
152+
sources["excludes"] = Excludes
153+
}
154+
_, isExcludesPresent := sources["excludes"]
155+
isEmpty := len(Includes) == 0 && len(Excludes) == 0
156+
isDefaultInclude := len(Includes) > 0 && Includes[0] == "*"
157+
shouldApplyFilters := !isEmpty && (!isDefaultInclude || isExcludesPresent)
158+
if shouldApplyFilters {
159+
if isMsearch {
160+
// Handle the _msearch requests
161+
body, err := ioutil.ReadAll(req.Body)
162+
if err != nil {
163+
log.Errorln(logTag, ":", err)
164+
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
165+
return
166+
}
167+
var reqBodyString = string(body)
168+
splitReq := strings.Split(reqBodyString, "\n")
169+
var modifiedBodyString string
170+
for index, element := range splitReq {
171+
if index%2 == 1 { // even lines
172+
var reqBody = make(map[string]interface{})
173+
err := json.Unmarshal([]byte(element), &reqBody)
174+
if err != nil {
175+
log.Errorln(logTag, ":", err)
176+
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
177+
return
178+
}
179+
reqBody["_source"] = sources
180+
raw, err := json.Marshal(reqBody)
181+
if err != nil {
182+
log.Errorln(logTag, ":", err)
183+
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
184+
return
185+
}
186+
modifiedBodyString += string(raw)
187+
} else {
188+
modifiedBodyString += element
185189
}
186-
modifiedBodyString += string(raw)
187-
} else {
188-
modifiedBodyString += element
190+
modifiedBodyString += "\n"
189191
}
190-
modifiedBodyString += "\n"
191-
}
192-
modifiedBody := []byte(modifiedBodyString)
193-
req.Body = ioutil.NopCloser(bytes.NewReader(modifiedBody))
194-
} else {
195-
body, err := ioutil.ReadAll(req.Body)
196-
if err != nil {
197-
log.Errorln(logTag, ":", err)
198-
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
199-
return
192+
modifiedBody := []byte(modifiedBodyString)
193+
req.Body = ioutil.NopCloser(bytes.NewReader(modifiedBody))
194+
} else {
195+
body, err := ioutil.ReadAll(req.Body)
196+
if err != nil {
197+
log.Errorln(logTag, ":", err)
198+
util.WriteBackError(w, err.Error(), http.StatusInternalServerError)
199+
return
200+
}
201+
d := json.NewDecoder(ioutil.NopCloser(bytes.NewReader(body)))
202+
reqBody := make(map[string]interface{})
203+
d.Decode(&reqBody)
204+
reqBody["_source"] = sources
205+
modifiedBody, _ := json.Marshal(reqBody)
206+
req.Body = ioutil.NopCloser(bytes.NewReader(modifiedBody))
200207
}
201-
d := json.NewDecoder(ioutil.NopCloser(bytes.NewReader(body)))
202-
reqBody := make(map[string]interface{})
203-
d.Decode(&reqBody)
204-
reqBody["_source"] = sources
205-
modifiedBody, _ := json.Marshal(reqBody)
206-
req.Body = ioutil.NopCloser(bytes.NewReader(modifiedBody))
207208
}
208209
}
210+
209211
}
210-
h(w, req)
212+
213+
resp := httptest.NewRecorder()
214+
indices, err := index.FromContext(req.Context())
215+
h(resp, req)
216+
217+
// Copy the response to writer
218+
for k, v := range resp.Header() {
219+
w.Header()[k] = v
220+
}
221+
222+
result := resp.Result()
223+
body, err2 := ioutil.ReadAll(result.Body)
224+
if err2 != nil {
225+
log.Errorln(logTag, ":", err2)
226+
util.WriteBackError(w, "error reading response body", http.StatusInternalServerError)
227+
return
228+
}
229+
for _, index := range indices {
230+
alias := classify.GetIndexAlias(index)
231+
if alias != "" {
232+
body = bytes.Replace(body, []byte(index), []byte(alias), -1)
233+
continue
234+
}
235+
// if alias is present in url get index name from cache
236+
indexName := classify.GetAliasIndex(index)
237+
if indexName != "" {
238+
body = bytes.Replace(body, []byte(indexName), []byte(index), -1)
239+
}
240+
}
241+
util.WriteBackRaw(w, body, http.StatusOK)
211242
}
212243
}

plugins/reindexer/dao.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ func setAlias(ctx context.Context, indexName string, aliases ...string) error {
293293

294294
// We only have one alias per index.
295295
classify.SetIndexAlias(indexName, aliases[0])
296+
classify.SetAliasIndex(aliases[0], indexName)
296297
return nil
297298
}
298299

@@ -354,3 +355,19 @@ func getAliasedIndices(ctx context.Context) ([]AliasedIndices, error) {
354355

355356
return indicesList, nil
356357
}
358+
359+
func getAliasIndexMap(ctx context.Context) (map[string]string, error) {
360+
var res = make(map[string]string)
361+
aliases, err := util.GetClient7().CatAliases().
362+
Pretty(true).
363+
Do(ctx)
364+
if err != nil {
365+
return res, err
366+
}
367+
368+
for _, alias := range aliases {
369+
res[alias.Alias] = alias.Index
370+
}
371+
372+
return res, nil
373+
}

plugins/reindexer/reindexer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func (rx *reindexer) Name() string {
3333

3434
func (rx *reindexer) InitFunc() error {
3535
InitIndexAliasCache()
36+
InitAliasIndexCache()
3637
return nil
3738
}
3839

plugins/reindexer/util.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,20 @@ func reindexedName(indexName string) (string, error) {
5959
// InitIndexAliasCache to set cache on arc initialization
6060
func InitIndexAliasCache() {
6161
ctx := context.Background()
62-
aliasedIndexes, _ := getAliasedIndices(ctx)
62+
indexAlias, _ := getAliasedIndices(ctx)
6363

64-
for _, aliasIndex := range aliasedIndexes {
64+
for _, aliasIndex := range indexAlias {
6565
if aliasIndex.Alias != "" {
6666
classify.SetIndexAlias(aliasIndex.Index, aliasIndex.Alias)
6767
}
6868
}
69-
log.Println(logTag, "=> Alias Index Cache", classify.GetIndexAliasCache())
69+
log.Println(logTag, "=> Index Alias Cache", classify.GetIndexAliasCache())
70+
}
71+
72+
// InitAliasIndexCache to set alias -> index cache on initialization
73+
func InitAliasIndexCache() {
74+
ctx := context.Background()
75+
aliasIndexMap, _ := getAliasIndexMap(ctx)
76+
classify.SetAliasIndexCache(aliasIndexMap)
77+
log.Println(logTag, "=> Alias Index Cache", classify.GetAliasIndexCache())
7078
}

0 commit comments

Comments
 (0)