Skip to content

Commit 71fe555

Browse files
authored
Merge pull request #26 from DudeWhoCode/version/0.1.2
Version/0.1.2
2 parents 265df95 + 639ddd5 commit 71fe555

16 files changed

Lines changed: 554 additions & 38 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,6 @@ crashlytics-build.properties
6060
fabric.properties
6161
*.zip
6262
*.txt
63+
64+
# python
65+
env/

.travis.yml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
dist: trusty
2+
3+
sudo: required
4+
5+
services:
6+
- redis-server
7+
8+
language: go
9+
10+
os:
11+
- linux
12+
13+
go:
14+
- 1.8
15+
16+
python:
17+
- 2.7
18+
19+
env:
20+
- KULAY_CONF=$TRAVIS_BUILD_DIR/testdata/kulay.toml BOTO_CONFIG=/dev/null
21+
22+
install:
23+
- pip install moto
24+
- pip install awscli
25+
- pip install flask
26+
27+
before_install:
28+
- go get -u github.com/kardianos/govendor
29+
- govendor sync
30+
31+
before_script:
32+
- sh testdata/prepare.sh
33+
34+
script:
35+
- sh test.sh
36+
37+
after_success:
38+
- bash <(curl -s https://codecov.io/bash)
39+
40+

README.md

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# Kulay
2+
[![Build Status](https://travis-ci.org/DudeWhoCode/kulay.svg?branch=master)](https://travis-ci.org/DudeWhoCode/kulay)
3+
[![codecov](https://codecov.io/gh/DudeWhoCode/kulay/branch/master/graph/badge.svg)](https://codecov.io/gh/DudeWhoCode/kulay)
4+
[![Go Report Card](https://goreportcard.com/badge/github.com/dudewhocode/kulay)](https://goreportcard.com/report/github.com/dudewhocode/kulay)
25
[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/DudeWhoCode/kulay/master/LICENSE)
36
[![GitHub issues](https://img.shields.io/github/issues/DudeWhoCode/kulay.svg)](https://github.com/DudeWhoCode/kulay/issues)
4-
[![Go Report Card](https://goreportcard.com/badge/github.com/dudewhocode/kulay)](https://goreportcard.com/report/github.com/dudewhocode/kulay)
57

68
An high speed message passing system between various queues and services.
79

@@ -65,11 +67,72 @@ Each sub section can be any of your favourite name strictly preceeded by the sec
6567
```
6668
Here the logbuffer is a custom name you give to your subsection, it can be production_buffer or prodqueue or which ever makes more sense to you. The `host`, `port`, `password`, `database` are the options we need to initilize the redis client and `queue` is the key name of the list in redis which will be used as queue.
6769

70+
Currently kulay supports passing messages between :
71+
1. Redis queue
72+
2. Redis pubsub
73+
3. AWS SQS
74+
4. Jsonl file read/write
75+
76+
### Config structures for supported services :
77+
#### Redis pubsub
78+
```
79+
[redispubsub]
80+
[redispubsub.yourCustomName]
81+
host = "localhost"
82+
port = "6379"
83+
password = "topsecret"
84+
database = 0
85+
channel = "mychannel"
86+
```
87+
host - Your redis hostname or IP address
88+
port - Port in which redis runs, default is 6379
89+
password - Password for your redis server, leave it as "" for no password
90+
channel - The pubsub channel which you need to send or receive messages
91+
92+
93+
### Redis queue
94+
```
95+
[redisq]
96+
[redisq.yourCustomName]
97+
host = "localhost"
98+
port = "6379"
99+
password = "topsecret"
100+
database = 0
101+
queue = "mychannel"
102+
```
103+
host - Your redis hostname or IP address
104+
port - Port in which redis runs, default is 6379
105+
password - Password for your redis server, leave it as "" for no password
106+
queue - The queue to which you will send or receive messages
107+
108+
### SQS
109+
```
110+
[sqs]
111+
[sqs.test_singapore]
112+
queue_url = "https://sqs.ap-southeast-1.amazonaws.com/12345678/test_queue"
113+
region = "ap-southeast-1"
114+
delete_msg = true
115+
```
116+
queue_url - URL of the queue found in AWS console
117+
region - The region where given queue was created
118+
delete_msg - Delete flag, should be true if you want to delete the message from sqs after reading
119+
120+
### Jsonl
121+
```
122+
jsonl]
123+
[jsonl.local_backup]
124+
path = "/tmp/backup.jsonl"
125+
rotate = true
126+
batch = 1000
127+
```
128+
path - Location where the files has to be created
129+
rotate - If rotate flag is enabled, kulay will create a new file if the lines crosses the count in `batch` field
130+
batch - The line count for a single file if rotate=true
131+
68132
# Built with
69133
* [Cobra](https://github.com/spf13/cobra) - command line framework
70134
* [Viper](https://github.com/spf13/viper) - configuration handler
71135
* [Logrus](https://github.com/sirupsen/logrus) - logging
72136

73137
# Versioning
74-
This project uses [SemVer](http://semver.org/) for versioning. For the versions available, see the [tags on this repository](https://github.com/DudeWhoCode/kulay/tags)
75-
138+
This project uses [SemVer](http://semver.org/) for versioning. For the versions available, see the [tags on this repository](https://github.com/DudeWhoCode/kulay/tags)

backend/fileio/jsonl.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,59 @@ package fileio
22

33
import (
44
"bufio"
5+
"fmt"
56
. "github.com/DudeWhoCode/kulay/logger"
67
"os"
8+
"path/filepath"
9+
"strings"
10+
"time"
711
)
812

9-
func Put(fpath string, rec <-chan string) {
10-
f, err := os.Create(fpath)
13+
type rotateFile struct {
14+
file string
15+
ext string
16+
count int
17+
}
18+
19+
func initRotate(fpath string) *rotateFile {
20+
count := 0
21+
ext := filepath.Ext(fpath)
22+
file := strings.TrimSuffix(fpath, ext)
23+
return &rotateFile{
24+
file,
25+
ext,
26+
count,
27+
}
28+
}
29+
30+
func (f *rotateFile) newFile() (file string) {
31+
now := time.Now()
32+
nanos := now.UnixNano()
33+
file = fmt.Sprintf("%s-%d-%d%s", f.file, nanos, f.count, f.ext)
34+
f.count++
35+
return
36+
}
37+
38+
func Put(fpath string, batch int, rec <-chan string, rotate bool) {
39+
r := initRotate(fpath)
40+
chanCnt := 0
41+
newFileName := r.newFile()
42+
f, err := os.Create(newFileName)
43+
fmt.Println("created : ", newFileName)
1144
if err != nil {
12-
Log.Error("Unable to open file for writing jsonl")
45+
Log.Error("Unable to open file for writing jsonl\n", err)
1346
}
1447
for msg := range rec {
48+
if rotate == true && chanCnt != 0 && chanCnt%batch == 0 {
49+
f.Close()
50+
newFileName := r.newFile()
51+
f, err = os.Create(newFileName)
52+
if err != nil {
53+
Log.Error("Unable to open file for writing jsonl")
54+
}
55+
}
1556
f.Write([]byte(msg + "\n"))
57+
chanCnt++
1658
}
1759
defer f.Close()
1860
}

backend/fileio/jsonl_test.go

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,20 @@ import (
44
"bufio"
55
"encoding/json"
66
"os"
7+
"path/filepath"
8+
"strings"
79
"testing"
810
)
911

1012
func TestPut(t *testing.T) {
11-
fpath := "/tmp/test_put.jsonl"
13+
path := "/tmp/jsonltest/"
14+
os.RemoveAll(path)
15+
os.MkdirAll(path, 0777)
16+
fpath := path + "test_put.jsonl"
1217
pipe := make(chan string)
18+
batch := 5
19+
rotate := false
20+
testCnt := 10
1321
type test struct {
1422
Name string `json:"name"`
1523
Desc string `json:"desc"`
@@ -23,10 +31,16 @@ func TestPut(t *testing.T) {
2331
135,
2432
}
2533
testStr, _ := json.Marshal(testData)
26-
go Put(fpath, pipe)
27-
for i := 1; i <= 10; i++ {
34+
go Put(fpath, batch, pipe, rotate)
35+
for i := 1; i <= testCnt; i++ {
2836
pipe <- string(testStr)
2937
}
38+
fileList := []string{}
39+
filepath.Walk(path, func(path string, f os.FileInfo, err error) error {
40+
fileList = append(fileList, path)
41+
return nil
42+
})
43+
fpath = fileList[1]
3044
file, err := os.Open(fpath)
3145
if err != nil {
3246
t.Errorf("Expected no errors in reading file, got %v", err)
@@ -44,6 +58,58 @@ func TestPut(t *testing.T) {
4458
}
4559
}
4660

61+
func TestPutRotate(t *testing.T) {
62+
path := "/tmp/jsonltest/"
63+
os.RemoveAll(path)
64+
os.MkdirAll(path, 0777)
65+
fpath := path + "test_put.jsonl"
66+
pipe := make(chan string)
67+
batch := 5
68+
rotate := true
69+
testCnt := 20
70+
type test struct {
71+
Name string `json:"name"`
72+
Desc string `json:"desc"`
73+
Url string `json:"url"`
74+
Stars int `json:"stars"`
75+
}
76+
testData := &test{
77+
"kulay",
78+
"High speed message routing between services",
79+
"https://github.com/kulay",
80+
135,
81+
}
82+
testStr, _ := json.Marshal(testData)
83+
go Put(fpath, batch, pipe, rotate)
84+
for i := 1; i <= testCnt; i++ {
85+
pipe <- string(testStr)
86+
}
87+
fileList := []string{}
88+
filepath.Walk(path, func(path string, f os.FileInfo, err error) error {
89+
fileList = append(fileList, path)
90+
return nil
91+
})
92+
for _, fpath := range fileList {
93+
if !strings.HasSuffix(fpath, "jsonl") {
94+
continue
95+
}
96+
file, err := os.Open(fpath)
97+
if err != nil {
98+
t.Errorf("Expected no errors in reading file, got %v", err)
99+
}
100+
scanner := bufio.NewScanner(file)
101+
for scanner.Scan() {
102+
if err := json.Unmarshal(scanner.Bytes(), &test{}); err != nil {
103+
t.Errorf("Expected no errors in unmarshalling jsonline, got %v", err)
104+
}
105+
}
106+
if err := scanner.Err(); err != nil {
107+
t.Errorf("Expected no errors in scanning file, got %v", err)
108+
}
109+
file.Close()
110+
}
111+
}
112+
47113
func TestGet(t *testing.T) {
48114
fpath := "/tmp/test_get.jsonl"
49115
testCnt := 10

backend/redispubsub/redispubsub.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package redispubsub
2+
3+
import (
4+
"github.com/DudeWhoCode/kulay/backend"
5+
. "github.com/DudeWhoCode/kulay/logger"
6+
)
7+
8+
func Get(host string, port string, pass string, db int, channel string, rec chan<- string) {
9+
client := backend.NewRedisSession(host, port, pass, db)
10+
pubsub := client.Subscribe(channel)
11+
for {
12+
msg, err := pubsub.ReceiveMessage()
13+
if err != nil {
14+
Log.Error("Unable to receive message from redis pubsub channel\n", err)
15+
}
16+
if msg.Payload == "$^KILL^$" {
17+
break
18+
}
19+
rec <- msg.Payload
20+
}
21+
}
22+
23+
func Put(host string, port string, pass string, db int, channel string, snd <-chan string) {
24+
client := backend.NewRedisSession(host, port, pass, db)
25+
for msg := range snd {
26+
if err := client.Publish(channel, msg).Err(); err != nil {
27+
Log.Error("Unable to publish message tp redis pubsub channel\n", err)
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)