Skip to content

Commit 20fb8f4

Browse files
authored
Merge pull request #18 from DudeWhoCode/version/0.1.1
Version/0.1.1
2 parents eadfb66 + 00572af commit 20fb8f4

165 files changed

Lines changed: 27400 additions & 10110 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,5 @@ com_crashlytics_export_strings.xml
5858
crashlytics.properties
5959
crashlytics-build.properties
6060
fabric.properties
61+
*.zip
62+
*.txt

backend/fileio/fileWriter.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

backend/fileio/jsonl.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package fileio
2+
3+
import (
4+
"bufio"
5+
. "github.com/DudeWhoCode/kulay/logger"
6+
"os"
7+
)
8+
9+
func Put(fpath string, rec <-chan string) {
10+
f, err := os.Create(fpath)
11+
if err != nil {
12+
Log.Error("Unable to open file for writing jsonl")
13+
}
14+
for msg := range rec {
15+
f.Write([]byte(msg + "\n"))
16+
}
17+
defer f.Close()
18+
}
19+
20+
func Get(fpath string, snd chan<- string) {
21+
f, err := os.Open(fpath)
22+
Log.Info("file openend")
23+
if err != nil {
24+
Log.Error("Unable to open file for reading jsonl")
25+
}
26+
scanner := bufio.NewScanner(f)
27+
Log.Info("new scanner initiaited")
28+
for scanner.Scan() {
29+
snd <- string(scanner.Bytes())
30+
Log.Info("sending file content to channel")
31+
}
32+
if err := scanner.Err(); err != nil {
33+
Log.Fatal("Error while scanning the file\n", err)
34+
}
35+
defer f.Close()
36+
}
Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func TestPut(t *testing.T) {
11-
fpath := "/tmp/logs_bkp.jsonl"
11+
fpath := "/tmp/test_put.jsonl"
1212
pipe := make(chan string)
1313
type test struct {
1414
Name string `json:"name"`
@@ -43,3 +43,35 @@ func TestPut(t *testing.T) {
4343
t.Errorf("Expected no errors in scanning file, got %v", err)
4444
}
4545
}
46+
47+
func TestGet(t *testing.T) {
48+
fpath := "/tmp/test_get.jsonl"
49+
testCnt := 10
50+
pipe := make(chan string, testCnt)
51+
type test struct {
52+
Name string `json:"name"`
53+
Desc string `json:"desc"`
54+
Url string `json:"url"`
55+
Stars int `json:"stars"`
56+
}
57+
testData := &test{
58+
"kulay",
59+
"High speed message routing between services",
60+
"https://github.com/kulay",
61+
135,
62+
}
63+
testMsg, _ := json.Marshal(testData)
64+
testMsg = append(testMsg, "\n"...)
65+
toWrite, err := os.Create(fpath)
66+
if err != nil {
67+
t.Fatal("Unable to open file for writing jsonl")
68+
}
69+
for i := 1; i <= testCnt; i++ {
70+
toWrite.Write(testMsg)
71+
}
72+
toWrite.Close()
73+
Get(fpath, pipe)
74+
if len(pipe) != testCnt {
75+
t.Errorf("Expected message count is %v, got %v", testCnt, len(pipe))
76+
}
77+
}

backend/redisq/redisq.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package redisq
2+
3+
import (
4+
"github.com/DudeWhoCode/kulay/backend"
5+
. "github.com/DudeWhoCode/kulay/logger"
6+
"time"
7+
)
8+
9+
func Put(host string, port string, pass string, db int, queue string, rec <-chan string) {
10+
client := backend.NewRedisSession(host, port, pass, db)
11+
for msg := range rec {
12+
if err := client.RPush(queue, msg).Err(); err != nil {
13+
Log.Warn(err)
14+
}
15+
}
16+
}
17+
18+
func Get(host string, port string, pass string, db int, queue string, rec chan<- string) {
19+
client := backend.NewRedisSession(host, port, pass, db)
20+
// use `client.BLPop(0, "queue")` for infinite waiting time
21+
for {
22+
result, err := client.BLPop(1*time.Second, queue).Result()
23+
if err != nil {
24+
Log.Warn(err)
25+
}
26+
if result == nil {
27+
Log.Info("Received all messages from redis queue")
28+
break
29+
}
30+
rec <- result[1]
31+
}
32+
}

backend/redisq/redisq_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package redisq
2+
3+
import (
4+
"encoding/json"
5+
"github.com/DudeWhoCode/kulay/backend"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestPut(t *testing.T) {
11+
host := "localhost"
12+
port := "6379"
13+
pass := ""
14+
db := 0
15+
queue := "testput"
16+
testCnt := 5
17+
client := backend.NewRedisSession(host, port, pass, db)
18+
client.Del(queue)
19+
pipe := make(chan string, testCnt)
20+
type test struct {
21+
Name string `json:"name"`
22+
Desc string `json:"desc"`
23+
Url string `json:"url"`
24+
Stars int `json:"stars"`
25+
}
26+
testData := &test{
27+
"kulay",
28+
"High speed message routing between services",
29+
"https://github.com/kulay",
30+
135,
31+
}
32+
testStr, _ := json.Marshal(testData)
33+
for i := 1; i <= testCnt; i++ {
34+
pipe <- string(testStr)
35+
}
36+
go Put(host, port, pass, db, queue, pipe)
37+
// Wait until Put populates redis queue
38+
// TODO : Implement timeout for channels
39+
time.Sleep(1 * time.Second)
40+
queueLen, err := client.LLen(queue).Result()
41+
if err != nil {
42+
t.Errorf("Expected no error, got %s", err)
43+
}
44+
if int(queueLen) != testCnt {
45+
t.Errorf("Expected message count in redis queue is %v, got %v", testCnt, queueLen)
46+
}
47+
48+
}
49+
50+
func TestGet(t *testing.T) {
51+
host := "localhost"
52+
port := "6379"
53+
pass := ""
54+
db := 0
55+
queue := "testget"
56+
testCnt := 5
57+
client := backend.NewRedisSession(host, port, pass, db)
58+
client.Del(queue)
59+
pipe := make(chan string, testCnt)
60+
type test struct {
61+
Name string `json:"name"`
62+
Desc string `json:"desc"`
63+
Url string `json:"url"`
64+
Stars int `json:"stars"`
65+
}
66+
testData := &test{
67+
"kulay",
68+
"High speed message routing between services",
69+
"https://github.com/kulay",
70+
135,
71+
}
72+
testStr, _ := json.Marshal(testData)
73+
for i := 1; i <= testCnt; i++ {
74+
if err := client.RPush(queue, testStr).Err(); err != nil {
75+
t.Fatalf("Expected no error while pushing messages, got %s", err)
76+
}
77+
}
78+
Get(host, port, pass, db, queue, pipe)
79+
if len(pipe) != testCnt {
80+
t.Errorf("Expected message count in channel is %v, got %v", testCnt, len(pipe))
81+
}
82+
83+
}

backend/sessions.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
package backend
22

33
import (
4+
"github.com/aws/aws-sdk-go/aws"
45
"github.com/aws/aws-sdk-go/aws/session"
6+
"github.com/go-redis/redis"
7+
"os"
58
)
69

7-
var sess *session.Session
8-
9-
func NewAwsSession() *session.Session {
10-
sess = session.Must(session.NewSessionWithOptions(session.Options{
11-
SharedConfigState: session.SharedConfigEnable,
10+
func NewAwsSession(region string) *session.Session {
11+
os.Setenv("AWS_SDK_LOAD_CONFIG", "true")
12+
sess := session.Must(session.NewSession(&aws.Config{
13+
Region: aws.String(region),
1214
}))
1315
return sess
1416
}
17+
18+
func NewRedisSession(host string, port string, pass string, db int) *redis.Client {
19+
addr := host + ":" + port
20+
return redis.NewClient(&redis.Options{
21+
Addr: addr,
22+
Password: pass, // "" for no password
23+
DB: db, // default DB : 0
24+
})
25+
}

backend/sessions_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66
)
77

88
func TestNewAwsSession(t *testing.T) {
9-
sess := NewAwsSession()
9+
region := "us-east-1"
10+
sess := NewAwsSession(region)
1011
if sessType := reflect.TypeOf(sess).String(); sessType == "*session.Session" {
1112
t.Log("Received expected session type")
1213
} else {
@@ -18,3 +19,15 @@ func TestNewAwsSession(t *testing.T) {
1819
}
1920
}()
2021
}
22+
23+
func TestNewRedisSession(t *testing.T) {
24+
host := "localhost"
25+
port := "6379"
26+
pass := ""
27+
db := 0
28+
client := NewRedisSession(host, port, pass, db)
29+
if pong, err := client.Ping().Result(); pong != "PONG" {
30+
t.Errorf("Expected PONG, got %v", pong)
31+
t.Errorf("Expected no errors, got %s", err)
32+
}
33+
}

backend/sqs/sqs.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package sqsapp
22

33
import (
4-
"github.com/aws/aws-sdk-go/service/sqs"
5-
"github.com/aws/aws-sdk-go/aws"
6-
. "github.com/DudeWhoCode/kulay/logger"
74
"github.com/DudeWhoCode/kulay/backend"
5+
. "github.com/DudeWhoCode/kulay/logger"
6+
"github.com/aws/aws-sdk-go/aws"
7+
"github.com/aws/aws-sdk-go/service/sqs"
88
)
99

10-
var svc *sqs.SQS
11-
12-
func Put(qURL string, rec <-chan string) {
13-
sess := backend.NewAwsSession()
14-
svc = sqs.New(sess)
10+
func Put(qURL string, region string, rec <-chan string) {
11+
sess := backend.NewAwsSession(region)
12+
svc := sqs.New(sess)
1513
for msg := range rec {
1614
result, err := svc.SendMessage(&sqs.SendMessageInput{
1715
DelaySeconds: aws.Int64(10),
@@ -26,10 +24,9 @@ func Put(qURL string, rec <-chan string) {
2624
}
2725
}
2826

29-
30-
func Get(qURL string, snd chan<- string, del bool) {
31-
sess := backend.NewAwsSession()
32-
svc = sqs.New(sess)
27+
func Get(qURL string, region string, del bool, snd chan<- string) {
28+
sess := backend.NewAwsSession(region)
29+
svc := sqs.New(sess)
3330
for {
3431
result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
3532
AttributeNames: []*string{
@@ -71,5 +68,3 @@ func Get(qURL string, snd chan<- string, del bool) {
7168
}
7269

7370
}
74-
75-

0 commit comments

Comments
 (0)