Skip to content

Commit b648e63

Browse files
committed
refactor(storage): consolidate IO reader wrappers
Unify the scattered io.ReadCloser wrapper types into a single io_wrappers.go file with interface assertions as a TOC: offsetReader, instrumentedReader, cancelReader, sectionReader.
1 parent f2c360d commit b648e63

7 files changed

Lines changed: 187 additions & 153 deletions

File tree

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
8+
"go.opentelemetry.io/otel/trace"
9+
10+
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
11+
)
12+
13+
// io.ReadCloser / io.Reader wrappers used in the storage read path.
14+
var (
15+
_ io.Reader = (*offsetReader)(nil) // ReaderAt → sequential Reader adapter
16+
_ io.ReadCloser = (*instrumentedReader)(nil) // OTEL timer or span on Close
17+
_ io.ReadCloser = (*cancelReader)(nil) // context cancel on Close
18+
_ io.ReadCloser = (*sectionReader)(nil) // os.File SectionReader for range reads
19+
)
20+
21+
// offsetReader adapts an io.ReaderAt into a sequential io.Reader
22+
// starting at the given offset.
23+
type offsetReader struct {
24+
wrapped io.ReaderAt
25+
offset int64
26+
}
27+
28+
func (r *offsetReader) Read(p []byte) (n int, err error) {
29+
n, err = r.wrapped.ReadAt(p, r.offset)
30+
r.offset += int64(n)
31+
32+
return
33+
}
34+
35+
func newOffsetReader(reader io.ReaderAt, offset int64) *offsetReader {
36+
return &offsetReader{reader, offset}
37+
}
38+
39+
// instrumentedReader wraps an io.ReadCloser with OTEL instrumentation.
40+
// Counts bytes read, tracks read errors, and on Close records the timer
41+
// as Success/Failure and/or ends the span. Construct via newTimedReader
42+
// or newSpanReader; exactly one of timer/span is set per call site.
43+
type instrumentedReader struct {
44+
inner io.ReadCloser
45+
timer *telemetry.Stopwatch
46+
span trace.Span
47+
ctx context.Context //nolint:containedctx // needed to record timer/span on Close
48+
bytes int64
49+
readErr error
50+
}
51+
52+
func newTimedReader(inner io.ReadCloser, timer *telemetry.Stopwatch, ctx context.Context) *instrumentedReader {
53+
return &instrumentedReader{inner: inner, timer: timer, ctx: ctx}
54+
}
55+
56+
func newSpanReader(inner io.ReadCloser, span trace.Span, ctx context.Context) *instrumentedReader {
57+
return &instrumentedReader{inner: inner, span: span, ctx: ctx}
58+
}
59+
60+
func (r *instrumentedReader) Read(p []byte) (int, error) {
61+
n, err := r.inner.Read(p)
62+
r.bytes += int64(n)
63+
64+
if err != nil && err != io.EOF {
65+
r.readErr = err
66+
}
67+
68+
return n, err
69+
}
70+
71+
func (r *instrumentedReader) Close() error {
72+
closeErr := r.inner.Close()
73+
74+
if r.timer != nil {
75+
if r.readErr != nil || closeErr != nil {
76+
r.timer.Failure(r.ctx, r.bytes)
77+
} else {
78+
r.timer.Success(r.ctx, r.bytes)
79+
}
80+
}
81+
82+
if r.span != nil {
83+
if closeErr != nil {
84+
recordError(r.span, closeErr)
85+
} else if r.readErr != nil {
86+
recordError(r.span, r.readErr)
87+
}
88+
89+
r.span.End()
90+
}
91+
92+
return closeErr
93+
}
94+
95+
// cancelReader calls a CancelFunc on Close, ensuring the context used
96+
// to create the reader is cleaned up.
97+
type cancelReader struct {
98+
io.ReadCloser
99+
100+
cancel context.CancelFunc
101+
}
102+
103+
func (r *cancelReader) Close() error {
104+
defer r.cancel()
105+
106+
return r.ReadCloser.Close()
107+
}
108+
109+
// sectionReader exposes a bounded section of an os.File as a ReadCloser,
110+
// closing the underlying file on Close.
111+
type sectionReader struct {
112+
*io.SectionReader
113+
114+
file *os.File
115+
}
116+
117+
func newSectionReader(f *os.File, off, length int64) *sectionReader {
118+
return &sectionReader{
119+
SectionReader: io.NewSectionReader(f, off, length),
120+
file: f,
121+
}
122+
}
123+
124+
func (r *sectionReader) Close() error {
125+
return r.file.Close()
126+
}

packages/shared/pkg/storage/offset_reader.go

Lines changed: 0 additions & 23 deletions
This file was deleted.

packages/shared/pkg/storage/storage_cache_seekable.go

Lines changed: 26 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -83,27 +83,30 @@ func (c *cachedSeekable) OpenRangeReader(ctx context.Context, off int64, length
8383
attribute.Bool("compressed", compressed),
8484
))
8585

86-
if compressed {
87-
rc, err := c.openReaderCompressed(ctx, off, frameTable)
88-
if err != nil {
89-
recordError(span, err)
90-
span.End()
91-
92-
return nil, err
86+
var rc io.ReadCloser
87+
var err error
88+
switch {
89+
case compressed:
90+
rc, err = c.openReaderCompressed(ctx, off, frameTable)
91+
default:
92+
if err = c.validateReadParams(length, off); err == nil {
93+
rc, err = c.openReaderUncompressed(ctx, off, length)
9394
}
94-
95-
rc = withSpan(rc, span)
96-
97-
return rc, nil
9895
}
9996

100-
if err := c.validateReadParams(length, off); err != nil {
97+
if err != nil {
10198
recordError(span, err)
10299
span.End()
103100

104101
return nil, err
105102
}
106103

104+
return newSpanReader(rc, span, ctx), nil
105+
}
106+
107+
// openReaderUncompressed is the sequential NFS-then-remote path for
108+
// uncompressed chunks.
109+
func (c *cachedSeekable) openReaderUncompressed(ctx context.Context, off, length int64) (io.ReadCloser, error) {
107110
timer := cacheSlabReadTimerFactory.Begin(
108111
attribute.String(nfsCacheOperationAttr, nfsCacheOperationAttrReadAt),
109112
attribute.Bool("compressed", false),
@@ -116,10 +119,7 @@ func (c *cachedSeekable) OpenRangeReader(ctx context.Context, off int64, length
116119
recordCacheRead(ctx, true, length, cacheTypeSeekable, cacheOpOpenRangeReader)
117120
timer.Success(ctx, length)
118121

119-
rc := io.ReadCloser(&fsRangeReadCloser{Reader: io.NewSectionReader(fp, 0, length), file: fp})
120-
rc = withSpan(rc, span)
121-
122-
return rc, nil
122+
return newSectionReader(fp, 0, length), nil
123123
}
124124

125125
if !os.IsNotExist(err) {
@@ -130,50 +130,23 @@ func (c *cachedSeekable) OpenRangeReader(ctx context.Context, off int64, length
130130

131131
rc, err := c.inner.OpenRangeReader(ctx, off, length, nil)
132132
if err != nil {
133-
recordError(span, err)
134-
span.End()
135-
136133
return nil, fmt.Errorf("failed to open inner range reader: %w", err)
137134
}
138135

139136
recordCacheRead(ctx, false, length, cacheTypeSeekable, cacheOpOpenRangeReader)
140137

141138
if !skipCacheWriteback(ctx) {
142-
rc = newCacheWriteThroughReader(rc, c, ctx, off, length, chunkPath)
139+
rc = newCacheWritebackReader(rc, c, ctx, off, length, chunkPath)
143140
}
144141

145-
rc = withSpan(rc, span)
146-
147142
return rc, nil
148143
}
149144

150-
// withSpan wraps a reader with an OTEL span that ends on Close.
151-
func withSpan(rc io.ReadCloser, span trace.Span) io.ReadCloser {
152-
return &spanReadCloser{inner: rc, span: span}
153-
}
154-
155-
type spanReadCloser struct {
156-
inner io.ReadCloser
157-
span trace.Span
158-
}
159-
160-
func (r *spanReadCloser) Read(p []byte) (int, error) {
161-
return r.inner.Read(p)
162-
}
163-
164-
func (r *spanReadCloser) Close() error {
165-
err := r.inner.Close()
166-
recordError(r.span, err)
167-
r.span.End()
168-
169-
return err
170-
}
171-
172-
// newCacheWriteThroughReader wraps a reader, buffering all data read through it.
173-
// On Close, it asynchronously writes the buffered data to the NFS cache only
174-
// if the total bytes read match the expected length (to avoid caching truncated data).
175-
func newCacheWriteThroughReader(inner io.ReadCloser, cache *cachedSeekable, ctx context.Context, off, expectedLen int64, chunkPath string) io.ReadCloser {
176-
return &cacheWriteThroughReader{
145+
// writebackReader buffers all data read through it. On Close, it
146+
// asynchronously writes the buffered data to the NFS cache only if the
147+
// total bytes read match the expected length (to avoid caching truncated data).
148+
func newCacheWritebackReader(inner io.ReadCloser, cache *cachedSeekable, ctx context.Context, off, expectedLen int64, chunkPath string) io.ReadCloser {
149+
return &cacheWritebackReader{
177150
inner: inner,
178151
buf: bytes.NewBuffer(make([]byte, 0, expectedLen)),
179152
cache: cache,
@@ -184,7 +157,7 @@ func newCacheWriteThroughReader(inner io.ReadCloser, cache *cachedSeekable, ctx
184157
}
185158
}
186159

187-
type cacheWriteThroughReader struct {
160+
type cacheWritebackReader struct {
188161
inner io.ReadCloser
189162
buf *bytes.Buffer
190163
cache *cachedSeekable
@@ -194,7 +167,7 @@ type cacheWriteThroughReader struct {
194167
chunkPath string
195168
}
196169

197-
func (r *cacheWriteThroughReader) Read(p []byte) (int, error) {
170+
func (r *cacheWritebackReader) Read(p []byte) (int, error) {
198171
n, err := r.inner.Read(p)
199172
if n > 0 {
200173
r.buf.Write(p[:n])
@@ -203,12 +176,11 @@ func (r *cacheWriteThroughReader) Read(p []byte) (int, error) {
203176
return n, err
204177
}
205178

206-
func (r *cacheWriteThroughReader) Close() error {
179+
func (r *cacheWritebackReader) Close() error {
207180
closeErr := r.inner.Close()
208181

209182
// Only cache when the total bytes read match the expected length.
210-
// Unlike ReadAt where io.EOF can justify a short read (last chunk),
211-
// a streaming reader always ends with EOF regardless of whether the
183+
// A streaming reader always ends with EOF regardless of whether the
212184
// data was truncated, so the byte count is the only reliable check.
213185
if isCompleteRead(r.buf.Len(), int(r.expectedLen), nil) {
214186
data := make([]byte, r.buf.Len())

packages/shared/pkg/storage/storage_cache_seekable_compressed.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"go.opentelemetry.io/otel/attribute"
1111
)
1212

13+
var _ io.ReadCloser = (*decompressingCacheReader)(nil) // decompress on Read, cache compressed bytes on Close
14+
1315
// Precomputed OTEL attributes for compressed cache reads (avoids per-read allocation).
1416
var compressedCacheReadAttrs = []attribute.KeyValue{
1517
attribute.String(nfsCacheOperationAttr, nfsCacheOperationAttrReadAt),
@@ -69,8 +71,8 @@ func (c *cachedSeekable) openReaderCompressed(ctx context.Context, offsetU int64
6971
return rc, nil
7072
}
7173

72-
// newDecompressingCacheReader creates a reader that decompresses on Read and
73-
// writes the accumulated compressed bytes to the NFS cache on Close.
74+
// decompressingCacheReader decompresses on Read and writes the accumulated
75+
// compressed bytes to the NFS cache on Close.
7476
func newDecompressingCacheReader(
7577
raw io.ReadCloser,
7678
ct CompressionType,

0 commit comments

Comments
 (0)