Skip to content

Commit 80883b6

Browse files
committed
fixup! image/copy: Fix missing progress reporting for chunked layers
Signed-off-by: Marek Simek <msimek@redhat.com>
1 parent fdf8af4 commit 80883b6

8 files changed

Lines changed: 239 additions & 135 deletions

File tree

image/copy/blob.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo,
2020
getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer,
2121
isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool,
22+
reporter *progressReporter,
2223
) (types.BlobInfo, error) {
2324
// The copying happens through a pipeline of connected io.Readers;
2425
// that pipeline is built by updating stream.
@@ -84,16 +85,9 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read
8485
return types.BlobInfo{}, err
8586
}
8687

87-
// === Report progress using the ic.c.options.Progress channel, if required.
88-
if ic.c.options.Progress != nil && ic.c.options.ProgressInterval > 0 {
89-
progressReader := newProgressReader(
90-
stream.reader,
91-
ic.c.options.Progress,
92-
ic.c.options.ProgressInterval,
93-
srcInfo,
94-
)
95-
defer progressReader.reportDone()
96-
stream.reader = progressReader
88+
// === Wrap stream with progress reporting if a reporter was provided.
89+
if reporter != nil {
90+
stream.reader = newProgressReader(stream.reader, reporter)
9791
}
9892

9993
// === Finally, send the layer stream to dest.

image/copy/blob_chunk_accessor_proxy.go

Lines changed: 0 additions & 50 deletions
This file was deleted.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package copy
22

33
import (
4+
"context"
45
"fmt"
56
"io"
7+
"math"
8+
"time"
69

710
"github.com/vbauerster/mpb/v8"
811
"github.com/vbauerster/mpb/v8/decor"
12+
"go.podman.io/image/v5/internal/private"
913
"go.podman.io/image/v5/types"
1014
)
1115

@@ -140,3 +144,38 @@ func (bar *progressBar) mark100PercentComplete() {
140144
bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete.
141145
}
142146
}
147+
148+
// blobChunkAccessorProxy wraps a BlobChunkAccessor to update a *progressBar
149+
// and optionally *progressReporter (if non-nil) with the number of received bytes.
150+
type blobChunkAccessorProxy struct {
151+
wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor
152+
bar *progressBar // A progress bar updated with the number of bytes read so far
153+
reporter *progressReporter // A progress reporter updated with the number of bytes read so far
154+
}
155+
156+
// GetBlobAt returns a sequential channel of readers that contain data for the requested
157+
// blob chunks, and a channel that might get a single error value.
158+
// The specified chunks must be not overlapping and sorted by their offset.
159+
// The readers must be fully consumed, in the order they are returned, before blocking
160+
// to read the next chunk.
161+
// If the Length for the last chunk is set to math.MaxUint64, then it
162+
// fully fetches the remaining data from the offset to the end of the blob.
163+
func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
164+
start := time.Now()
165+
rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks)
166+
if err == nil {
167+
total := int64(0)
168+
for _, c := range chunks {
169+
// do not update the progress bar if there is a chunk with unknown length.
170+
if c.Length == math.MaxUint64 {
171+
return rc, errs, err
172+
}
173+
total += int64(c.Length)
174+
}
175+
if s.reporter != nil {
176+
s.reporter.reportRead(uint64(total))
177+
}
178+
s.bar.EwmaIncrInt64(total, time.Since(start))
179+
}
180+
return rc, errs, err
181+
}

image/copy/progress_bars_test.go

Lines changed: 0 additions & 24 deletions
This file was deleted.

image/copy/progress_channel.go

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,47 @@ type progressReporter struct {
1515
artifact types.BlobInfo // The blob metadata which is currently being progressed
1616
lastUpdate time.Time // The last time a progress channel event was sent
1717
offset uint64 // The currently downloaded size in bytes
18-
offsetUpdate uint64 // The number of bytes downloaded within the last update interval
18+
offsetUpdate uint64 // The number of bytes downloaded since lastUpdate
1919
}
2020

21-
// reportNewArtifact fires types.ProgressEventNewArtifact to its progress channel.
22-
func (r *progressReporter) reportNewArtifact() {
23-
r.channel <- types.ProgressProperties{
21+
// newProgressReporter creates a new progress reporter
22+
// and immediately reports a new artifact event.
23+
func newProgressReporter(
24+
channel chan<- types.ProgressProperties,
25+
interval time.Duration,
26+
artifact types.BlobInfo,
27+
) *progressReporter {
28+
channel <- types.ProgressProperties{
2429
Event: types.ProgressEventNewArtifact,
25-
Artifact: r.artifact,
30+
Artifact: artifact,
31+
}
32+
return &progressReporter{
33+
channel: channel,
34+
interval: interval,
35+
artifact: artifact,
36+
lastUpdate: time.Now(),
37+
}
38+
}
39+
40+
// reset resets the reporters progress
41+
// and reports its zeroed state.
42+
// It's meant to be used on error when
43+
// the processing has to be re-started
44+
// (e.g. ErrFallbackToOrdinaryLayerDownload).
45+
func (r *progressReporter) reset() {
46+
r.offset = 0
47+
r.offsetUpdate = 0
48+
49+
r.channel <- types.ProgressProperties{
50+
Event: types.ProgressEventRead,
51+
Artifact: r.artifact,
52+
Offset: r.offset,
53+
OffsetUpdate: r.offsetUpdate,
2654
}
2755
r.lastUpdate = time.Now()
2856
}
2957

30-
// reportRead fires the types.ProgressEventRead event with `bytesRead`
31-
// to its progress channel.
58+
// reportRead reports progress with the number of `bytesRead`.
3259
func (r *progressReporter) reportRead(bytesRead uint64) {
3360
r.offset += bytesRead
3461
r.offsetUpdate += bytesRead
@@ -44,7 +71,7 @@ func (r *progressReporter) reportRead(bytesRead uint64) {
4471
}
4572
}
4673

47-
// reportDone fires the ProgressEventDone to its progress channel.
74+
// reportDone reports completion.
4875
func (r *progressReporter) reportDone() {
4976
r.channel <- types.ProgressProperties{
5077
Event: types.ProgressEventDone,
@@ -54,34 +81,23 @@ func (r *progressReporter) reportDone() {
5481
}
5582
}
5683

57-
// progressReader is an io.Reader that reports its progress to
58-
// an underlying *progressReporter.
84+
// progressReader extends a wrapped io.Reader
85+
// with additional reporting of its progress.
5986
type progressReader struct {
6087
source io.Reader
6188
*progressReporter
6289
}
6390

64-
// newProgressReader creates a new progress reader for
65-
// `source`: The source when internally reading bytes
66-
// `channel`: The reporter channel to which the progress will be sent
67-
// `interval`: The update interval to indicate how often the progress should update
68-
// `artifact`: The blob metadata which is currently being progressed.
91+
// newProgressReader creates a new progress reader that wraps source
92+
// and reports progress through the given reporter.
6993
func newProgressReader(
7094
source io.Reader,
71-
channel chan<- types.ProgressProperties,
72-
interval time.Duration,
73-
artifact types.BlobInfo,
95+
reporter *progressReporter,
7496
) *progressReader {
75-
r := &progressReader{
76-
source: source,
77-
progressReporter: &progressReporter{
78-
channel: channel,
79-
interval: interval,
80-
artifact: artifact,
81-
},
97+
return &progressReader{
98+
source: source,
99+
progressReporter: reporter,
82100
}
83-
r.reportNewArtifact()
84-
return r
85101
}
86102

87103
// Read continuously reads bytes into the progress reader and reports the

image/copy/progress_channel_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,17 @@ func newSUT(
1818
) *progressReader {
1919
artifact := types.BlobInfo{Size: 10}
2020

21-
go func() {
22-
res := <-channel
23-
assert.Equal(t, res.Event, types.ProgressEventNewArtifact)
24-
assert.Equal(t, res.Artifact, artifact)
25-
}()
26-
res := newProgressReader(reader, channel, duration, artifact)
21+
reporter := newProgressReporter(channel, duration, artifact)
22+
res := <-channel
23+
assert.Equal(t, res.Event, types.ProgressEventNewArtifact)
24+
assert.Equal(t, res.Artifact, artifact)
2725

28-
return res
26+
return newProgressReader(reader, reporter)
2927
}
3028

3129
func TestNewProgressReader(t *testing.T) {
3230
// Given
33-
channel := make(chan types.ProgressProperties)
31+
channel := make(chan types.ProgressProperties, 1)
3432
sut := newSUT(t, nil, time.Second, channel)
3533
assert.NotNil(t, sut)
3634

@@ -44,7 +42,7 @@ func TestNewProgressReader(t *testing.T) {
4442

4543
func TestReadWithoutEvent(t *testing.T) {
4644
// Given
47-
channel := make(chan types.ProgressProperties)
45+
channel := make(chan types.ProgressProperties, 1)
4846
reader := bytes.NewReader([]byte{0, 1, 2})
4947
sut := newSUT(t, reader, time.Second, channel)
5048
assert.NotNil(t, sut)
@@ -60,7 +58,7 @@ func TestReadWithoutEvent(t *testing.T) {
6058

6159
func TestReadWithEvent(t *testing.T) {
6260
// Given
63-
channel := make(chan types.ProgressProperties)
61+
channel := make(chan types.ProgressProperties, 1)
6462
reader := bytes.NewReader([]byte{0, 1, 2, 3, 4, 5, 6})
6563
sut := newSUT(t, reader, time.Nanosecond, channel)
6664
assert.NotNil(t, sut)

0 commit comments

Comments
 (0)