@@ -71,80 +71,107 @@ Scan through the offending chunk until a newline delimiter is found.
7171 2. After discarding, attempt to fill a buffer with as much data as possible
7272 from the underlying chunks.
7373 */
74+
75+ var chunkIndex = 0 ;
7476
7577 if ( _discardingHead != null )
7678 {
79+ // We're discarding an oversize payload
7780 var discardingRentedArray = ArrayPool < byte > . Shared . Rent ( maxSize ) ;
7881
7982 // NOTE: We don't use `maxSize` here, because we're discarding these bytes
8083 // so it doesn't matter what size the target array is
8184 var discardingBatchBuffer = discardingRentedArray . AsSpan ( ) ;
8285
83- while ( _discardingHead != null )
86+ while ( _discardingHead != null && chunkIndex < _sortedChunks . Count )
8487 {
85- var chunk = _sortedChunks [ 0 ] ;
88+ var chunk = _sortedChunks [ chunkIndex ] ;
8689
87- // If the chunk has changed (it may have been deleted externally)
90+ // If the first chunk has changed (it may have been deleted externally)
8891 // then stop discarding
8992 if ( chunk . Name . Id != _discardingHead . Value . ChunkId )
9093 {
9194 _discardingHead = null ;
9295
93- ArrayPool < byte > . Shared . Return ( discardingRentedArray ) ;
9496 break ;
9597 }
9698
97- var chunkHead = Extents ( chunk ) ;
99+ // Try read to the end of the chunk
100+ //
101+ // If reading the chunk length fails then advance over it
102+ if ( ! chunk . File . TryGetLength ( out var length ) )
103+ {
104+ chunkIndex += 1 ;
105+
106+ continue ;
107+ }
108+
109+ var chunkHead = new BufferReaderChunkExtents ( Math . Min ( length . Value , _discardingHead . Value . Offset ) , length . Value ) ;
98110
99111 // Attempt to fill the buffer with data from the underlying chunk
112+ //
113+ // If reading from the chunk fails then advance over it
100114 if ( ! TryFillChunk ( chunk ,
101- chunkHead with { CommitHead = _discardingHead . Value . Offset } ,
115+ chunkHead ,
102116 discardingBatchBuffer ,
103- out var fill ) )
117+ out var filled ) )
104118 {
105- // If attempting to read from the chunk fails then remove it and carry on
106- // This is also done below in the regular read-loop if reading fails
107- _sortedChunks . RemoveAt ( 0 ) ;
108- _discardingHead = null ;
119+ chunkIndex += 1 ;
109120
110- ArrayPool < byte > . Shared . Return ( discardingRentedArray ) ;
111- break ;
121+ continue ;
112122 }
113123
114124 // Scan forwards for the next newline
115- var firstNewlineIndex = discardingBatchBuffer [ ..fill . Value ] . IndexOf ( ( byte ) '\n ' ) ;
116-
117- // If a newline was found then advance the reader to it and stop discarding
118- if ( firstNewlineIndex >= 0 ) fill = firstNewlineIndex + 1 ;
125+ var firstNewlineIndex = discardingBatchBuffer [ ..filled . Value ] . IndexOf ( ( byte ) '\n ' ) ;
126+ if ( firstNewlineIndex >= 0 ) filled = firstNewlineIndex + 1 ;
119127
120128 _discardingHead = _discardingHead . Value with
121129 {
122- Offset = _discardingHead . Value . Offset + fill . Value
130+ Offset = _discardingHead . Value . Offset + filled . Value
123131 } ;
124132 _readHead = _discardingHead . Value ;
125133
126- var isChunkFinished = _discardingHead . Value . Offset == chunkHead . WriteHead ;
127-
128- // If the chunk is finished or a newline is found then stop discarding
129- if ( firstNewlineIndex >= 0 || ( isChunkFinished && _sortedChunks . Count > 1 ) )
134+ // If a newline was found then advance the reader to it and stop discarding
135+ if ( firstNewlineIndex >= 0 )
130136 {
131137 _discardingHead = null ;
132138
133- ArrayPool < byte > . Shared . Return ( discardingRentedArray ) ;
134139 break ;
135140 }
141+
142+ var isChunkFinished = chunkHead . CommitHead + filled == chunkHead . WriteHead ;
143+
144+ // If we've discarded to the end of the chunk then update our state from the disk and return
145+ //
146+ // The next time we attempt to fill a chunk we'll resume from this point.
147+ if ( isChunkFinished )
148+ {
149+ // If there's no way new data can arrive to complete this event then advance over it.
150+ // If the chunk is the last one then it's considered actively writable, and so we
151+ // presume we're seeing a torn write here.
152+ //
153+ // A future sync from the files on disk will delete it.
154+ if ( _sortedChunks . Count > 1 )
155+ {
156+ _discardingHead = null ;
136157
137- // If there's more data in the chunk to read then loop back through
138- if ( ! isChunkFinished ) continue ;
139-
140- // If the chunk is finished but a newline wasn't found then refresh
141- // our set of chunks and loop back through
142- ReadChunks ( ) ;
158+ break ;
159+ }
160+
161+ // There's only a single chunk, update our state from the disk in case the writer
162+ // has moved on to another chunk and return. We may end up coming back later and
163+ // reading more to discard.
164+ ReadChunks ( ) ;
143165
144- ArrayPool < byte > . Shared . Return ( discardingRentedArray ) ;
145- batch = null ;
146- return false ;
166+ ArrayPool < byte > . Shared . Return ( discardingRentedArray ) ;
167+ batch = null ;
168+ return false ;
169+ }
147170 }
171+
172+ ReadChunks ( ) ;
173+
174+ ArrayPool < byte > . Shared . Return ( discardingRentedArray ) ;
148175 }
149176
150177 // Fill a buffer with newline-delimited values
@@ -154,45 +181,69 @@ from the underlying chunks.
154181 var batchLength = 0 ;
155182
156183 BufferPosition ? batchHead = null ;
157- var chunkIndex = 0 ;
158184
159185 // Try fill the buffer with as much data as possible
160186 // by walking over all chunks
161187 while ( chunkIndex < _sortedChunks . Count )
162188 {
163189 var chunk = _sortedChunks [ chunkIndex ] ;
164- var chunkHead = Extents ( chunk ) ;
190+
191+ BufferReaderChunkExtents chunkHead ;
192+ if ( chunk . Name . Id == _readHead . ChunkId )
193+ {
194+ // The chunk is the one we're currently reading; resume from where we left off
195+ // If the file was truncated externally then we'll treat it as complete
196+ chunkHead = chunk . File . TryGetLength ( out var length )
197+ ? new BufferReaderChunkExtents ( Math . Min ( _readHead . Offset , length . Value ) , length . Value )
198+ : new BufferReaderChunkExtents ( _readHead . Offset , _readHead . Offset ) ;
199+ }
200+ else
201+ {
202+ // The chunk is not the one we've been reading; start from the beginning
203+ chunk . File . TryGetLength ( out var length ) ;
204+ chunkHead = new BufferReaderChunkExtents ( 0 , length ?? 0 ) ;
205+ }
165206
166- if ( ! TryFillChunk ( chunk , chunkHead , batchBuffer [ batchLength ..] , out var fill ) )
207+ if ( ! TryFillChunk ( chunk , chunkHead , batchBuffer [ batchLength ..] , out var filled ) )
167208 {
168- // If we can't read from this chunk anymore then remove it and continue
169- _sortedChunks . RemoveAt ( chunkIndex ) ;
209+ // If we can't read from this chunk anymore then step over it
210+ chunkIndex += 1 ;
170211 continue ;
171212 }
172213
173- var isBufferFull = batchLength + fill == maxSize ;
174- var isChunkFinished = fill == chunkHead . WriteHead ;
214+ var isBufferFull = batchLength + filled == maxSize ;
215+ var isChunkFinished = chunkHead . CommitHead + filled == chunkHead . WriteHead ;
175216
176- // If either the buffer has been filled or we've reached the end of a chunk
177- // then scan to the last newline
217+ // If either the buffer has been filled or we've reached the end of the chunk
218+ // then scan backwards to the last newline delimiter
178219 if ( isBufferFull || isChunkFinished )
179220 {
180- // If the chunk is finished then we expect this to immediately find a trailing newline
221+ // If the chunk is valid and finished then we expect this to immediately find a trailing newline
181222 // NOTE: `Span.LastIndexOf` and similar methods are vectorized
182- var lastNewlineIndex = batchBuffer [ batchLength ..( batchLength + fill . Value ) ] . LastIndexOf ( ( byte ) '\n ' ) ;
223+ var lastNewlineIndex = batchBuffer [ batchLength ..( batchLength + filled . Value ) ] . LastIndexOf ( ( byte ) '\n ' ) ;
183224 if ( lastNewlineIndex == - 1 )
184225 {
185- // If this isn't the last chunk then discard the trailing data and move on
226+ // The data we wrote didn't contain any newline delimiters
227+
228+ // If there's no way new data can arrive to complete this event then advance over it.
229+ // If the chunk is the last one then it's considered actively writable, and so we
230+ // presume we're seeing a torn write here.
231+ //
232+ // A subsequent attempt to fill will overwrite the incomplete data in it, and
233+ // a future sync from the files on disk will delete it
186234 if ( isChunkFinished && chunkIndex < _sortedChunks . Count )
187235 {
188236 chunkIndex += 1 ;
189237 continue ;
190238 }
191239
192- // If this is the first chunk then we've hit an oversize payload
240+ // If we're looking at the first chunk then start discarding
241+ //
242+ // We'll hit this point if we happen to start from an oversized payload, or if our last attempt
243+ // to fill a batch advanced up to an oversize event
193244 if ( chunkIndex == 0 )
194245 {
195- _discardingHead = new BufferPosition ( chunk . Name . Id , chunkHead . CommitHead + fill . Value ) ;
246+ _discardingHead = new BufferPosition ( chunk . Name . Id , chunkHead . CommitHead + filled . Value ) ;
196247
197248 // Ensures we don't attempt to yield the data we've read
198249 batchHead = null ;
@@ -202,11 +253,12 @@ from the underlying chunks.
202253 break ;
203254 }
204255
205- fill = lastNewlineIndex + 1 ;
256+ // Only consider the read data up to the last newline
257+ filled = lastNewlineIndex + 1 ;
206258 }
207259
208- batchLength += fill . Value ;
209- batchHead = new BufferPosition ( chunk . Name . Id , chunkHead . CommitHead + fill . Value ) ;
260+ batchLength += filled . Value ;
261+ batchHead = new BufferPosition ( chunk . Name . Id , chunkHead . CommitHead + filled . Value ) ;
210262
211263 chunkIndex += 1 ;
212264 }
@@ -245,6 +297,7 @@ public void AdvanceTo(BufferPosition newReaderHead)
245297 // The remainder of the chunk is being skipped
246298 if ( chunk . Name . Id < newReaderHead . ChunkId )
247299 {
300+ chunk . Dispose ( ) ;
248301 _storeDirectory . TryDelete ( chunk . Name . ToString ( ) ) ;
249302 }
250303 else
@@ -261,17 +314,12 @@ public void AdvanceTo(BufferPosition newReaderHead)
261314 _sortedChunks . RemoveRange ( 0 , removeLength ) ;
262315 }
263316
264- BufferReaderChunkExtents Extents ( BufferReaderChunk chunk )
265- {
266- if ( chunk . Name . Id == _readHead . ChunkId )
267- return chunk . Chunk . TryGetLength ( out var writeHead )
268- ? new BufferReaderChunkExtents ( Math . Min ( _readHead . Offset , writeHead . Value ) , writeHead . Value )
269- : new BufferReaderChunkExtents ( _readHead . Offset , _readHead . Offset ) ;
270-
271- chunk . Chunk . TryGetLength ( out var length ) ;
272- return new BufferReaderChunkExtents ( 0 , length ?? 0 ) ;
273- }
274-
317+ /// <summary>
318+ /// Read the current state of the store from files on disk.
319+ /// </summary>
320+ /// <remarks>
321+ /// This method will delete any files it finds before the current read head.
322+ /// </remarks>
275323 void ReadChunks ( )
276324 {
277325 List < BufferReaderChunk > chunks = [ ] ;
0 commit comments