Skip to content

Commit 261727d

Browse files
authored
Merge pull request #78 from appbaseio/feat/index-alias-map
feat: index alias mapping
2 parents 7fb1f15 + eec29b8 commit 261727d

7 files changed

Lines changed: 194 additions & 8 deletions

File tree

middleware/classify/indices.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package classify
22

33
import (
44
"net/http"
5+
"regexp"
6+
"strings"
57

68
"github.com/appbaseio/arc/middleware"
79
"github.com/appbaseio/arc/model/index"
@@ -16,7 +18,50 @@ func Indices() middleware.Middleware {
1618
func indices(h http.HandlerFunc) http.HandlerFunc {
1719
return func(w http.ResponseWriter, req *http.Request) {
1820
indices := util.IndicesFromRequest(req)
21+
currentCache := GetIndexAliasCache()
1922

23+
for _, index := range indices {
24+
// '*' in case of all indices put alias in context
25+
if index == "*" {
26+
for cachedItem := range currentCache {
27+
alias := GetIndexAlias(cachedItem)
28+
if alias != "" {
29+
indices = append(indices, alias)
30+
}
31+
}
32+
break
33+
} else if strings.Contains(index, "*") {
34+
// in case of regex check if string contains '*' in naming pattern, if contains and doesn't have '.*' and replace '*' with '.*' because golang regex can match in that pattern. Next match regex patters with existing index names in cache and add those alias to context.
35+
regex := index
36+
37+
if !strings.Contains(index, ".*") {
38+
regex = strings.Replace(regex, "*", ".*", -1)
39+
}
40+
r, _ := regexp.Compile(regex)
41+
cachedIndices := []string{}
42+
43+
for cachedItem := range currentCache {
44+
cachedIndices = append(cachedIndices, cachedItem)
45+
}
46+
for _, val := range cachedIndices {
47+
if r.MatchString(val) {
48+
alias := GetIndexAlias(val)
49+
if alias != "" {
50+
indices = append(indices, alias)
51+
}
52+
break
53+
}
54+
}
55+
56+
} else {
57+
// get alias for index and put in context
58+
alias := GetIndexAlias(index)
59+
if alias != "" {
60+
indices = append(indices, alias)
61+
}
62+
break
63+
}
64+
}
2065
ctx := index.NewContext(req.Context(), indices)
2166
req = req.WithContext(ctx)
2267

middleware/classify/util.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package classify
2+
3+
// IndexAliasCache cache to store index alias map
4+
var IndexAliasCache = make(map[string]string)
5+
6+
// GetIndexAliasCache get whole cache
7+
func GetIndexAliasCache() map[string]string {
8+
return IndexAliasCache
9+
}
10+
11+
// GetIndexAlias get alias for specific index
12+
func GetIndexAlias(index string) string {
13+
alias, ok := IndexAliasCache[index]
14+
15+
if !ok {
16+
return ""
17+
}
18+
return alias
19+
}
20+
21+
// SetIndexAlias set alias for specific index
22+
func SetIndexAlias(index, alias string) {
23+
IndexAliasCache[index] = alias
24+
}

plugins/reindexer/dao.go

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"regexp"
78

89
log "github.com/sirupsen/logrus"
910

11+
"github.com/appbaseio/arc/middleware/classify"
1012
"github.com/appbaseio/arc/util"
1113
es7 "github.com/olivere/elastic/v7"
1214
)
@@ -114,11 +116,18 @@ func reindex(ctx context.Context, sourceIndex string, config *reindexConfig, wai
114116

115117
if destinationIndex == "" {
116118
// Fetch all the aliases of old index
117-
aliases, err := aliasesOf(ctx, sourceIndex)
119+
alias, err := aliasesOf(ctx, sourceIndex)
120+
121+
var aliases = []string{}
118122
if err != nil {
119123
return nil, fmt.Errorf(`error fetching aliases of index "%s": %v`, sourceIndex, err)
120124
}
121-
aliases = append(aliases, sourceIndex)
125+
126+
if alias == "" {
127+
aliases = append(aliases, sourceIndex)
128+
} else {
129+
aliases = append(aliases, alias)
130+
}
122131

123132
// Delete old index
124133
err = deleteIndex(ctx, sourceIndex)
@@ -210,22 +219,28 @@ func settingsOf(ctx context.Context, indexName string) (map[string]interface{},
210219
return settings, nil
211220
}
212221

213-
func aliasesOf(ctx context.Context, indexName string) ([]string, error) {
222+
func aliasesOf(ctx context.Context, indexName string) (string, error) {
214223
response, err := util.GetClient7().CatAliases().
215224
Pretty(true).
216225
Do(ctx)
217226
if err != nil {
218-
return nil, err
227+
return "", err
219228
}
220229

221-
var aliases []string
230+
var alias = ""
231+
232+
// set alias for original index name only.
233+
regex := ".*reindexed_[0-9]+"
234+
r, _ := regexp.Compile(regex)
235+
222236
for _, row := range response {
223-
if row.Index == indexName {
224-
aliases = append(aliases, row.Alias)
237+
// r.MatchString(indexName) this condition is added to handle existing alias which are created incorrectly
238+
if row.Index == indexName && r.MatchString(indexName) {
239+
alias = row.Alias
225240
}
226241
}
227242

228-
return aliases, nil
243+
return alias, nil
229244
}
230245

231246
func createIndex(ctx context.Context, indexName string, body map[string]interface{}) error {
@@ -276,6 +291,8 @@ func setAlias(ctx context.Context, indexName string, aliases ...string) error {
276291
return fmt.Errorf(`unable to set aliases "%v" for index "%s"`, aliases, indexName)
277292
}
278293

294+
// We only have one alias per index.
295+
classify.SetIndexAlias(indexName, aliases[0])
279296
return nil
280297
}
281298

@@ -288,3 +305,52 @@ func getIndicesByAlias(ctx context.Context, alias string) ([]string, error) {
288305
}
289306
return response.IndicesByAlias(alias), nil
290307
}
308+
309+
func getAliasedIndices(ctx context.Context) ([]AliasedIndices, error) {
310+
var indicesList []AliasedIndices
311+
indices, err := util.GetClient7().CatIndices().
312+
Do(ctx)
313+
if err != nil {
314+
return indicesList, err
315+
}
316+
317+
aliases, err := util.GetClient7().CatAliases().
318+
Pretty(true).
319+
Do(ctx)
320+
if err != nil {
321+
return indicesList, err
322+
}
323+
324+
for _, index := range indices {
325+
var indexStruct = AliasedIndices{
326+
Health: index.Health,
327+
Status: index.Status,
328+
Index: index.Index,
329+
UUID: index.UUID,
330+
Pri: index.Pri,
331+
Rep: index.Rep,
332+
DocsCount: index.DocsCount,
333+
DocsDeleted: index.DocsDeleted,
334+
StoreSize: index.StoreSize,
335+
PriStoreSize: index.PriStoreSize,
336+
}
337+
var alias string
338+
regex := ".*reindexed_[0-9]+"
339+
r, _ := regexp.Compile(regex)
340+
341+
for _, row := range aliases {
342+
if row.Index == index.Index && r.MatchString(index.Index) {
343+
alias = row.Alias
344+
break
345+
}
346+
}
347+
if err == nil && alias != "" {
348+
indexStruct.Alias = alias
349+
}
350+
351+
indicesList = append(indicesList, indexStruct)
352+
353+
}
354+
355+
return indicesList, nil
356+
}

plugins/reindexer/handlers.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ func (rx *reindexer) reindexSrcToDest() http.HandlerFunc {
6060
}
6161
}
6262

63+
func (rx *reindexer) aliasedIndices() http.HandlerFunc {
64+
return func(w http.ResponseWriter, req *http.Request) {
65+
res, err := getAliasedIndices(req.Context())
66+
if err != nil {
67+
util.WriteBackError(w, "Unable to get aliased indices.\n"+err.Error(), http.StatusInternalServerError)
68+
return
69+
}
70+
71+
response, err := json.Marshal(res)
72+
errorHandler(nil, w, response)
73+
}
74+
}
75+
6376
func errorHandler(err error, w http.ResponseWriter, response []byte) {
6477
if err != nil {
6578
log.Errorln(logTag, ":", err)

plugins/reindexer/reindexer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func (rx *reindexer) Name() string {
3232
}
3333

3434
func (rx *reindexer) InitFunc() error {
35+
InitIndexAliasCache()
3536
return nil
3637
}
3738

plugins/reindexer/routes.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ func (rx *reindexer) routes() []plugins.Route {
2323
HandlerFunc: middleware(rx.reindex()),
2424
Description: "Reindexes a single index with the given mappings, settings and types.",
2525
},
26+
{
27+
Name: "AliasedIndexes",
28+
Methods: []string{http.MethodGet},
29+
Path: "/_aliasedindices",
30+
HandlerFunc: middleware(rx.aliasedIndices()),
31+
Description: "Get map of indices and aliases",
32+
},
2633
}
2734
return routes
2835
}

plugins/reindexer/util.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,31 @@
11
package reindexer
22

33
import (
4+
"context"
45
"fmt"
56
"regexp"
67
"strconv"
78
"strings"
89

10+
"github.com/appbaseio/arc/middleware/classify"
911
log "github.com/sirupsen/logrus"
1012
)
1113

14+
// AliasedIndices struct
15+
type AliasedIndices struct {
16+
Alias string `json:"alias"`
17+
Health string `json:"health"`
18+
Status string `json:"status"`
19+
Index string `json:"index"`
20+
UUID string `json:"uuid"`
21+
Pri int `json:"pri"`
22+
Rep int `json:"rep"`
23+
DocsCount int `json:"docs.count"`
24+
DocsDeleted int `json:"docs.deleted"`
25+
StoreSize string `json:"store.size"`
26+
PriStoreSize string `json:"pri.store.size"`
27+
}
28+
1229
// reindexedName calculates from the name the number of times an index has been
1330
// reindexed to generate the successive name for the index. For example: for an
1431
// index named "twitter", the funtion returns "twitter_reindexed_1", and for an
@@ -38,3 +55,16 @@ func reindexedName(indexName string) (string, error) {
3855

3956
return indexName, nil
4057
}
58+
59+
// InitIndexAliasCache to set cache on arc initialization
60+
func InitIndexAliasCache() {
61+
ctx := context.Background()
62+
aliasedIndexes, _ := getAliasedIndices(ctx)
63+
64+
for _, aliasIndex := range aliasedIndexes {
65+
if aliasIndex.Alias != "" {
66+
classify.SetIndexAlias(aliasIndex.Index, aliasIndex.Alias)
67+
}
68+
}
69+
log.Println(logTag, "=> Alias Index Cache", classify.GetIndexAliasCache())
70+
}

0 commit comments

Comments
 (0)