Skip to content

Commit e67f898

Browse files
committed
image/copy: Fix missing progress reporting for chunked layers
The `PutBlobPartial` code path only updated a progress bar, but it did not report its progress to the `copy.Options.Progress` channel for chunked layers. Add missing reporting and refactor parts shared with `progressReader`. Fixes: #469 Signed-off-by: Marek Simek <msimek@redhat.com>
1 parent e782cc7 commit e67f898

5 files changed

Lines changed: 133 additions & 87 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package copy
2+
3+
import (
4+
"context"
5+
"io"
6+
"math"
7+
"time"
8+
9+
"go.podman.io/image/v5/internal/private"
10+
"go.podman.io/image/v5/types"
11+
)
12+
13+
// blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar
14+
// and optionally *progressReporter (if non-nil) with the number of received bytes.
15+
type blobChunkAccessorProxy struct {
16+
wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor
17+
bar *progressBar // A progress bar updated with the number of bytes read so far
18+
reporter *progressReporter // A progress reporter updated with the number of bytes read so far
19+
}
20+
21+
// GetBlobAt returns a sequential channel of readers that contain data for the requested
22+
// blob chunks, and a channel that might get a single error value.
23+
// The specified chunks must be not overlapping and sorted by their offset.
24+
// The readers must be fully consumed, in the order they are returned, before blocking
25+
// to read the next chunk.
26+
// If the Length for the last chunk is set to math.MaxUint64, then it
27+
// fully fetches the remaining data from the offset to the end of the blob.
28+
//
29+
// blobChunkAccessorProxy.GetBlobAt also updates a *progressBar
30+
// and *progressReporter (if non-nil) with the number of bytes read.
31+
func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
32+
start := time.Now()
33+
rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks)
34+
if err == nil {
35+
total := int64(0)
36+
for _, c := range chunks {
37+
// do not update the progress bar if there is a chunk with unknown length.
38+
if c.Length == math.MaxUint64 {
39+
return rc, errs, err
40+
}
41+
total += int64(c.Length)
42+
}
43+
// Report read bytes if possible.
44+
if s.reporter != nil {
45+
s.reporter.reportRead(uint64(total))
46+
}
47+
s.bar.EwmaIncrInt64(total, time.Since(start))
48+
}
49+
return rc, errs, err
50+
}

image/copy/progress_bars.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
package copy
22

33
import (
4-
"context"
54
"fmt"
65
"io"
7-
"math"
8-
"time"
96

107
"github.com/vbauerster/mpb/v8"
118
"github.com/vbauerster/mpb/v8/decor"
12-
"go.podman.io/image/v5/internal/private"
139
"go.podman.io/image/v5/types"
1410
)
1511

@@ -144,34 +140,3 @@ func (bar *progressBar) mark100PercentComplete() {
144140
bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete.
145141
}
146142
}
147-
148-
// blobChunkAccessorProxy wraps a BlobChunkAccessor and updates a *progressBar
149-
// with the number of received bytes.
150-
type blobChunkAccessorProxy struct {
151-
wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor
152-
bar *progressBar // A progress bar updated with the number of bytes read so far
153-
}
154-
155-
// GetBlobAt returns a sequential channel of readers that contain data for the requested
156-
// blob chunks, and a channel that might get a single error value.
157-
// The specified chunks must be not overlapping and sorted by their offset.
158-
// The readers must be fully consumed, in the order they are returned, before blocking
159-
// to read the next chunk.
160-
// If the Length for the last chunk is set to math.MaxUint64, then it
161-
// fully fetches the remaining data from the offset to the end of the blob.
162-
func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
163-
start := time.Now()
164-
rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks)
165-
if err == nil {
166-
total := int64(0)
167-
for _, c := range chunks {
168-
// do not update the progress bar if there is a chunk with unknown length.
169-
if c.Length == math.MaxUint64 {
170-
return rc, errs, err
171-
}
172-
total += int64(c.Length)
173-
}
174-
s.bar.EwmaIncrInt64(total, time.Since(start))
175-
}
176-
return rc, errs, err
177-
}

image/copy/progress_channel.go

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,48 +7,45 @@ import (
77
"go.podman.io/image/v5/types"
88
)
99

10-
// progressReader is a reader that reports its progress to a types.ProgressProperties channel on an interval.
11-
type progressReader struct {
12-
source io.Reader
13-
channel chan<- types.ProgressProperties
14-
interval time.Duration
15-
artifact types.BlobInfo
16-
lastUpdate time.Time
17-
offset uint64
18-
offsetUpdate uint64
10+
// progressReporter facilitates progress reporting through its
11+
// underlying types.ProgressProperties channel on an interval.
12+
type progressReporter struct {
13+
channel chan<- types.ProgressProperties // The reporter channel to which the progress will be sent
14+
interval time.Duration // The update interval to indicate how often the progress should update
15+
artifact types.BlobInfo // The blob metadata which is currently being progressed
16+
lastUpdate time.Time // The last time a progress channel event was sent
17+
offset uint64 // The currently downloaded size in bytes
18+
offsetUpdate uint64 // The number of bytes downloaded within the last update interval
1919
}
2020

21-
// newProgressReader creates a new progress reader for:
22-
// `source`: The source when internally reading bytes
23-
// `channel`: The reporter channel to which the progress will be sent
24-
// `interval`: The update interval to indicate how often the progress should update
25-
// `artifact`: The blob metadata which is currently being progressed
26-
func newProgressReader(
27-
source io.Reader,
28-
channel chan<- types.ProgressProperties,
29-
interval time.Duration,
30-
artifact types.BlobInfo,
31-
) *progressReader {
32-
// The progress reader constructor informs the progress channel
33-
// that a new artifact will be read
34-
channel <- types.ProgressProperties{
21+
// reportNewArtifact fires types.ProgressEventNewArtifact to its progress channel.
22+
func (r *progressReporter) reportNewArtifact() {
23+
r.channel <- types.ProgressProperties{
3524
Event: types.ProgressEventNewArtifact,
36-
Artifact: artifact,
25+
Artifact: r.artifact,
3726
}
38-
return &progressReader{
39-
source: source,
40-
channel: channel,
41-
interval: interval,
42-
artifact: artifact,
43-
lastUpdate: time.Now(),
44-
offset: 0,
45-
offsetUpdate: 0,
27+
r.lastUpdate = time.Now()
28+
}
29+
30+
// reportRead fires the types.ProgressEventRead event with `bytesRead`
31+
// to its progress channel.
32+
func (r *progressReporter) reportRead(bytesRead uint64) {
33+
r.offset += bytesRead
34+
r.offsetUpdate += bytesRead
35+
if time.Since(r.lastUpdate) > r.interval {
36+
r.channel <- types.ProgressProperties{
37+
Event: types.ProgressEventRead,
38+
Artifact: r.artifact,
39+
Offset: r.offset,
40+
OffsetUpdate: r.offsetUpdate,
41+
}
42+
r.lastUpdate = time.Now()
43+
r.offsetUpdate = 0
4644
}
4745
}
4846

49-
// reportDone indicates to the internal channel that the progress has been
50-
// finished
51-
func (r *progressReader) reportDone() {
47+
// reportDone fires the ProgressEventDone to its progress channel.
48+
func (r *progressReporter) reportDone() {
5249
r.channel <- types.ProgressProperties{
5350
Event: types.ProgressEventDone,
5451
Artifact: r.artifact,
@@ -57,23 +54,40 @@ func (r *progressReader) reportDone() {
5754
}
5855
}
5956

57+
// progressReader is an io.Reader that reports its progress to
58+
// an underlying *progressReporter.
59+
type progressReader struct {
60+
source io.Reader
61+
*progressReporter
62+
}
63+
64+
// newProgressReader creates a new progress reader for
65+
// `source`: The source when internally reading bytes
66+
// `channel`: The reporter channel to which the progress will be sent
67+
// `interval`: The update interval to indicate how often the progress should update
68+
// `artifact`: The blob metadata which is currently being progressed.
69+
func newProgressReader(
70+
source io.Reader,
71+
channel chan<- types.ProgressProperties,
72+
interval time.Duration,
73+
artifact types.BlobInfo,
74+
) *progressReader {
75+
r := &progressReader{
76+
source: source,
77+
progressReporter: &progressReporter{
78+
channel: channel,
79+
interval: interval,
80+
artifact: artifact,
81+
},
82+
}
83+
r.reportNewArtifact()
84+
return r
85+
}
86+
6087
// Read continuously reads bytes into the progress reader and reports the
61-
// status via the internal channel
88+
// status via the internal channel.
6289
func (r *progressReader) Read(p []byte) (int, error) {
6390
n, err := r.source.Read(p)
64-
r.offset += uint64(n)
65-
r.offsetUpdate += uint64(n)
66-
67-
// Fire the progress reader in the provided interval
68-
if time.Since(r.lastUpdate) > r.interval {
69-
r.channel <- types.ProgressProperties{
70-
Event: types.ProgressEventRead,
71-
Artifact: r.artifact,
72-
Offset: r.offset,
73-
OffsetUpdate: r.offsetUpdate,
74-
}
75-
r.lastUpdate = time.Now()
76-
r.offsetUpdate = 0
77-
}
91+
r.reportRead(uint64(n))
7892
return n, err
7993
}

image/copy/single.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,16 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
804804
wrapped: ic.c.rawSource,
805805
bar: bar,
806806
}
807+
// Setup progress reporting and report a new artifact event
808+
// if the channel available and a non-zero interval set.
809+
if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 {
810+
proxy.reporter = &progressReporter{
811+
channel: ic.c.options.Progress,
812+
interval: ic.c.options.ProgressInterval,
813+
artifact: srcInfo,
814+
}
815+
proxy.reporter.reportNewArtifact()
816+
}
807817
uploadedBlob, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, private.PutBlobPartialOptions{
808818
Cache: ic.c.blobInfoCache,
809819
EmptyLayer: emptyLayer,
@@ -817,6 +827,12 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
817827
}
818828
bar.mark100PercentComplete()
819829
hideProgressBar = false
830+
831+
// Report completion for an artifact if possible.
832+
if proxy.reporter != nil {
833+
proxy.reporter.reportDone()
834+
}
835+
820836
logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest)
821837
return true, updatedBlobInfoFromUpload(srcInfo, uploadedBlob), nil
822838
}

image/types/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,8 @@ type SystemContext struct {
707707
type ProgressEvent uint
708708

709709
const (
710-
// ProgressEventNewArtifact will be fired on progress reader setup
710+
// ProgressEventNewArtifact will be fired when starting processing a new
711+
// artifact
711712
ProgressEventNewArtifact ProgressEvent = iota
712713

713714
// ProgressEventRead indicates that the artifact download is currently in
@@ -719,7 +720,7 @@ const (
719720
ProgressEventDone
720721

721722
// ProgressEventSkipped is fired when the artifact has been skipped because
722-
// its already available at the destination
723+
// it's already available at the destination
723724
ProgressEventSkipped
724725
)
725726

0 commit comments

Comments
 (0)