@@ -22,79 +22,110 @@ const (
2222
2323var ErrUnsupportedFileType = errors .New ("unsupported file type" )
2424
25+ type Option func (* Chunk )
26+
27+ // WithSize sets the chunk size
28+ func WithSize (size int ) Option {
29+ return func (args * Chunk ) {
30+ args .size = size
31+ }
32+ }
33+
34+ // WithMaxPeekSize sets the max size of look-ahead bytes
35+ func WithMaxPeekSize (maxPeekSize int ) Option {
36+ return func (args * Chunk ) {
37+ args .maxPeekSize = maxPeekSize
38+ }
39+ }
40+
41+ // WithSmallFileThreshold sets the threshold for small files
42+ func WithSmallFileThreshold (smallFileThreshold int64 ) Option {
43+ return func (args * Chunk ) {
44+ args .smallFileThreshold = smallFileThreshold
45+ }
46+ }
47+
48+ // Chunk holds two pools and sizing parameters needed for reading chunks of data with look-ahead
2549type Chunk struct {
26- BufPool * sync.Pool
27- PeekBufPool * sync.Pool
28- Size int
29- MaxPeekSize int
30- SmallFileThreshold int64
50+ bufPool * sync.Pool // *bytes.Buffer with cap Size + MaxPeekSize
51+ peekedBufPool * sync.Pool // *[]byte slices of length Size + MaxPeekSize
52+ size int // base chunk size
53+ maxPeekSize int // max size of look-ahead bytes
54+ smallFileThreshold int64 // files smaller than this skip chunking
3155}
3256
3357type IChunk interface {
34- GetBuf () (* []byte , bool )
35- PutBuf (buf * []byte )
36- GetPeekBuf (buf []byte ) (* bytes.Buffer , bool )
37- PutPeekBuf (buf * bytes.Buffer )
3858 GetSize () int
3959 GetMaxPeekSize () int
4060 GetFileThreshold () int64
4161 ReadChunk (reader * bufio.Reader , totalLines int ) (string , error )
4262}
4363
44- func NewChunk () * Chunk {
45- return NewChunkWithSize (defaultSize , defaultMaxPeekSize , defaultFileThreshold )
46- }
47-
48- func NewChunkWithSize (size , maxPeekSize , smallFileThreshold int ) * Chunk {
49- return & Chunk {
50- BufPool : & sync.Pool {
51- New : func () interface {} {
52- b := make ([]byte , size )
53- return & b
54- },
64+ func New (opts ... Option ) * Chunk {
65+ // set default options
66+ c := & Chunk {
67+ size : defaultSize ,
68+ maxPeekSize : defaultMaxPeekSize ,
69+ smallFileThreshold : defaultFileThreshold ,
70+ }
71+ // apply overrides
72+ for _ , opt := range opts {
73+ opt (c )
74+ }
75+ c .bufPool = & sync.Pool {
76+ New : func () interface {} {
77+ // pre-allocate dynamic-size buffer for reading chunks (up to chunk size + peek size)
78+ return bytes .NewBuffer (make ([]byte , 0 , c .size + c .maxPeekSize ))
5579 },
56- PeekBufPool : & sync.Pool {
57- New : func () interface {} {
58- // pre-allocate enough capacity for initial chunk + peek
59- return bytes .NewBuffer (make ([]byte , 0 , size + maxPeekSize ))
60- },
80+ }
81+ c .peekedBufPool = & sync.Pool {
82+ New : func () interface {} {
83+ // pre-allocate fixed-size block for loading chunks
84+ b := make ([]byte , c .size + c .maxPeekSize )
85+ return & b
6186 },
62- Size : size ,
63- MaxPeekSize : maxPeekSize ,
64- SmallFileThreshold : int64 (smallFileThreshold ),
6587 }
88+ return c
6689}
6790
68- func (c * Chunk ) GetBuf () (* []byte , bool ) {
69- buf , ok := c .BufPool .Get ().(* []byte )
70- return buf , ok
91+ // GetBuf returns a bytes.Buffer from the pool, seeded with the data
92+ func (c * Chunk ) GetBuf (data []byte ) (* bytes.Buffer , bool ) {
93+ window , ok := c .bufPool .Get ().(* bytes.Buffer )
94+ if ! ok {
95+ return nil , false
96+ }
97+ window .Reset ()
98+ window .Write (data ) // seed the buffer with the data
99+ return window , ok
71100}
72101
73- func (c * Chunk ) PutBuf (buf * []byte ) {
74- c .BufPool .Put (buf )
102+ // PutBuf returns the bytes.Buffer to the pool
103+ func (c * Chunk ) PutBuf (window * bytes.Buffer ) {
104+ window .Reset ()
105+ c .bufPool .Put (window )
75106}
76107
77- func (c * Chunk ) GetPeekBuf (buf []byte ) (* bytes.Buffer , bool ) {
78- peekBuf , ok := c .PeekBufPool .Get ().(* bytes.Buffer )
79- peekBuf .Reset ()
80- peekBuf .Write (buf ) // seed with buf
81- return peekBuf , ok
108+ // GetPeekedBuf returns a fixed-size []byte from the pool
109+ func (c * Chunk ) GetPeekedBuf () (* []byte , bool ) {
110+ b , ok := c .peekedBufPool .Get ().(* []byte )
111+ return b , ok
82112}
83113
84- func (c * Chunk ) PutPeekBuf (buf * bytes.Buffer ) {
85- c .PeekBufPool .Put (buf )
114+ // PutPeekedBuf returns the fixed-size []byte to the pool
115+ func (c * Chunk ) PutPeekedBuf (b * []byte ) {
116+ c .peekedBufPool .Put (b )
86117}
87118
88119func (c * Chunk ) GetSize () int {
89- return c .Size
120+ return c .size
90121}
91122
92123func (c * Chunk ) GetMaxPeekSize () int {
93- return c .MaxPeekSize
124+ return c .maxPeekSize
94125}
95126
96127func (c * Chunk ) GetFileThreshold () int64 {
97- return c .SmallFileThreshold
128+ return c .smallFileThreshold
98129}
99130
100131// ReadChunk reads the next chunk of data from file
0 commit comments