-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate.go
More file actions
131 lines (102 loc) · 2.52 KB
/
update.go
File metadata and controls
131 lines (102 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"fmt"
"strconv"
"time"
pgsql "go-db-update/pkg/configs/db/postgres"
goroutine_util "go-db-update/pkg/utils/goroutine"
util "go-db-update/pkg/utils/util"
"database/sql"
)
type Batch struct {
limit int
offset int
}
func generateChannel(limit int, iteration int) (<-chan Batch) {
c := make(chan Batch)
go func() {
for i:=0; i<iteration; i++ {
offset := i * limit
c <- Batch{ offset: offset, limit: limit }
}
close(c)
}()
return c
}
func getData(db *sql.DB) (int) {
sqlStatement := "SELECT COUNT(1) AS total FROM table_updates"
rows, _ := db.Query(sqlStatement)
dataCount := 0
for rows.Next(){
err := rows.Scan(&dataCount)
if err != nil {
fmt.Println(err)
}
}
return dataCount
}
func updateDB(timestamp time.Time, in <-chan Batch, db *sql.DB) (<-chan string) {
c := make(chan string)
sqlStatement := "UPDATE table_updates SET timestamp=$1 WHERE uuid IN (SELECT uuid FROM table_updates ORDER BY uuid LIMIT $2 OFFSET $3)"
go func() {
for b := range in {
limit := b.limit
offset := b.offset
_, err := db.Exec(sqlStatement, timestamp, limit, offset)
if err != nil {
fmt.Println("error update ==>", err)
}
c <- "finish offset " + strconv.Itoa(b.offset)
}
close(c)
}()
return c
}
func method1(timestamp time.Time, limit int, iteration int, db *sql.DB) {
gen := generateChannel(limit, iteration)
var arr []<-chan string
for i:=0; i<iteration; i++ {
arr = append(arr, updateDB(timestamp, gen, db))
}
out := goroutine_util.Merge(arr)
for i:=0; i<iteration; i++ {
fmt.Println("Result: ", <-out)
}
}
func method2(timestamp time.Time, limit int, iteration int, db *sql.DB) {
gen := generateChannel(limit * iteration, 1)
var arr []<-chan string
for i:=0; i<iteration; i++ {
arr = append(arr, updateDB(timestamp, gen, db))
}
out := goroutine_util.Merge(arr)
for i:=0; i<iteration; i++ {
<-out
}
}
func main() {
util.LoadENV()
defer func() {
if err := recover(); err != nil {
time.Sleep(time.Millisecond * time.Duration(1000))
fmt.Println("Exception: ", err)
main()
}
}()
db, _ := pgsql.PGConnect()
totalData := getData(db)
for {
timestamp := time.Now()
limit := 1000
iteration := totalData / limit
start := time.Now()
method1(timestamp, limit, iteration, db)
// method2(timestamp, limit, iteration, db)
end := time.Now()
elapsed := end.Sub(start)
fmt.Println("update time elapsed ==>", elapsed)
util.PrintMemUsage()
time.Sleep(time.Millisecond * time.Duration(60000))
}
defer pgsql.PGClose(db)
}