Skip to content

Commit 98a63c5

Browse files
feat(feeds): upload progress bar (#197)
* feat(feeds): upload progress bar * feat(feeds): minor improvements to the upload progress bar
1 parent da565dd commit 98a63c5

2 files changed

Lines changed: 153 additions & 3 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ require (
4444
github.com/stretchr/testify v1.9.0
4545
github.com/subosito/gotenv v1.2.0 // indirect
4646
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
47-
golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 // indirect
47+
golang.org/x/term v0.0.0-20220411215600-e5f449aeb171
4848
golang.org/x/text v0.3.8 // indirect
4949
gopkg.in/ini.v1 v1.66.4 // indirect
5050
gopkg.in/yaml.v2 v2.4.0 // indirect

pkg/utils/upload.go

Lines changed: 152 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@ package utils
22

33
import (
44
"context"
5+
"fmt"
6+
"io"
57
"net/http"
68
"os"
9+
"path/filepath"
10+
"sync/atomic"
11+
"time"
12+
13+
"golang.org/x/term"
714
)
815

916
// UploadToS3 uploads a local file to the given S3 presigned URL via HTTP PUT.
17+
// When stderr is a TTY, a progress bar is rendered so large uploads are not opaque.
1018
func UploadToS3(ctx context.Context, filename, url string) error {
1119
data, err := os.Open(filename)
1220
if err != nil {
@@ -19,18 +27,160 @@ func UploadToS3(ctx context.Context, filename, url string) error {
1927
return err
2028
}
2129

22-
req, err := http.NewRequestWithContext(ctx, "PUT", url, data)
30+
size := stat.Size()
31+
body := io.Reader(data)
32+
var pr *progressReader
33+
if term.IsTerminal(int(os.Stderr.Fd())) {
34+
pr = newProgressReader(data, size, filepath.Base(filename), os.Stderr)
35+
body = pr
36+
}
37+
38+
req, err := http.NewRequestWithContext(ctx, "PUT", url, body)
2339
if err != nil {
2440
return err
2541
}
2642
req.Header.Set("Content-Type", "application/json")
27-
req.ContentLength = stat.Size()
43+
req.ContentLength = size
2844

2945
resp, err := http.DefaultClient.Do(req)
3046
if err != nil {
47+
if pr != nil {
48+
pr.abort()
49+
}
3150
return err
3251
}
3352
defer resp.Body.Close()
3453

54+
if pr != nil {
55+
pr.finish()
56+
}
57+
58+
if resp.StatusCode >= 300 {
59+
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2048))
60+
return fmt.Errorf("upload failed: %s: %s", resp.Status, string(body))
61+
}
3562
return nil
3663
}
64+
65+
// progressReader wraps an io.Reader and renders a single-line progress bar
66+
// (carriage-return overwritten) to the given writer. Updates are throttled.
67+
type progressReader struct {
68+
r io.Reader
69+
total int64
70+
read int64
71+
name string
72+
out io.Writer
73+
start time.Time
74+
lastPrint time.Time
75+
barWidth int
76+
done int32
77+
}
78+
79+
func newProgressReader(r io.Reader, total int64, name string, out io.Writer) *progressReader {
80+
return &progressReader{
81+
r: r,
82+
total: total,
83+
name: name,
84+
out: out,
85+
start: time.Now(),
86+
barWidth: 30,
87+
}
88+
}
89+
90+
func (p *progressReader) Read(b []byte) (int, error) {
91+
n, err := p.r.Read(b)
92+
if n > 0 {
93+
atomic.AddInt64(&p.read, int64(n))
94+
p.maybeRender(false)
95+
}
96+
return n, err
97+
}
98+
99+
func (p *progressReader) maybeRender(force bool) {
100+
now := time.Now()
101+
if !force && now.Sub(p.lastPrint) < 100*time.Millisecond {
102+
return
103+
}
104+
p.lastPrint = now
105+
p.render()
106+
}
107+
108+
func (p *progressReader) render() {
109+
read := atomic.LoadInt64(&p.read)
110+
elapsed := time.Since(p.start).Seconds()
111+
var rate float64
112+
if elapsed > 0 {
113+
rate = float64(read) / elapsed
114+
}
115+
116+
var pct float64
117+
var bar string
118+
var eta string
119+
if p.total > 0 {
120+
shown := min(read, p.total)
121+
pct = float64(shown) / float64(p.total) * 100
122+
filled := min(int(float64(p.barWidth)*float64(shown)/float64(p.total)), p.barWidth)
123+
b := make([]byte, p.barWidth)
124+
for i := 0; i < p.barWidth; i++ {
125+
if i < filled {
126+
b[i] = '='
127+
} else {
128+
b[i] = ' '
129+
}
130+
}
131+
bar = string(b)
132+
if rate > 0 && shown < p.total {
133+
remaining := time.Duration(float64(p.total-shown)/rate) * time.Second
134+
eta = " ETA " + formatDuration(remaining)
135+
}
136+
fmt.Fprintf(p.out, "\rUploading %s [%s] %5.1f%% %s/%s %s/s%s\033[K",
137+
p.name, bar, pct, humanBytes(shown), humanBytes(p.total), humanBytes(int64(rate)), eta)
138+
} else {
139+
fmt.Fprintf(p.out, "\rUploading %s %s %s/s\033[K", p.name, humanBytes(read), humanBytes(int64(rate)))
140+
}
141+
}
142+
143+
func (p *progressReader) finish() {
144+
if !atomic.CompareAndSwapInt32(&p.done, 0, 1) {
145+
return
146+
}
147+
p.render()
148+
fmt.Fprintln(p.out)
149+
}
150+
151+
func (p *progressReader) abort() {
152+
if !atomic.CompareAndSwapInt32(&p.done, 0, 1) {
153+
return
154+
}
155+
fmt.Fprintln(p.out)
156+
}
157+
158+
func humanBytes(n int64) string {
159+
const unit = 1024
160+
if n < unit {
161+
return fmt.Sprintf("%d B", n)
162+
}
163+
div, exp := int64(unit), 0
164+
for n/div >= unit {
165+
div *= unit
166+
exp++
167+
}
168+
return fmt.Sprintf("%.1f %cB", float64(n)/float64(div), "KMGTPE"[exp])
169+
}
170+
171+
func formatDuration(d time.Duration) string {
172+
if d < time.Second {
173+
return "<1s"
174+
}
175+
d = d.Round(time.Second)
176+
h := int(d / time.Hour)
177+
m := int((d % time.Hour) / time.Minute)
178+
s := int((d % time.Minute) / time.Second)
179+
if h > 0 {
180+
return fmt.Sprintf("%dh%02dm%02ds", h, m, s)
181+
}
182+
if m > 0 {
183+
return fmt.Sprintf("%dm%02ds", m, s)
184+
}
185+
return fmt.Sprintf("%ds", s)
186+
}

0 commit comments

Comments
 (0)