Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@

## Unreleased

* Breaking: `Scanl.filter`, `Scanl.filterM` and `Scanl.catMaybes` now emit no
output for a filtered-out (or `Nothing`) element instead of re-emitting the
previous output. When used for scanning (`Stream.scanl`/`Stream.postscanl`)
the resulting output stream is therefore shorter and may even be empty. The
corresponding `Fold` combinators (`Fold.filter`, `Fold.filterM`,
`Fold.catMaybes`) are unaffected, because a fold always runs to its `final`
step and yields a value.
Migration: if you relied on the previous "re-emit the previous value on a
filtered element" behaviour, hold the last value explicitly downstream (e.g.
using `Scanl.latest`) rather than relying on `filter`.
* Internal: Added a `Continue` constructor to `Step`
(`Streamly.Internal.Data.Fold.Step`), meaning "advance the scan state but emit
no output". The `Step` type is shared by `Fold` and `Scanl`; for a `Fold`
(driven to `final`) `Continue` behaves exactly like `Partial`. Code
pattern-matching on `Step` must handle the new `Continue` case (treat it like
`Partial`, unless the code specifically drives scan output, where `Continue`
should suppress the per-input output like a `Skip`).
* Fixed `Stream.postscanl` to omit the output of a scan that terminates without
consuming any input (e.g. `Scanl.take 0`).
* Breaking: In `FileSystem.Path` module the default for `eqPath` changed
Expand Down
4 changes: 4 additions & 0 deletions core/src/Streamly/Internal/Data/Array.hs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ foldBreakChunks (Fold fstep initial _ final) stream@(Stream step state) = do
res <- initial
case res of
Fold.Partial fs -> go SPEC state fs
Fold.Continue fs -> go SPEC state fs
Fold.Done fb -> return $! (fb, stream)

where
Expand Down Expand Up @@ -723,6 +724,7 @@ foldBreakChunks (Fold fstep initial _ final) stream@(Stream step state) = do
let arr = Array contents next end
return $! (b, D.cons arr (D.Stream step st))
Fold.Partial fs1 -> goArray SPEC st fp next fs1
Fold.Continue fs1 -> goArray SPEC st fp next fs1

-- This may be more robust wrt fusion compared to unfoldMany?

Expand Down Expand Up @@ -759,6 +761,7 @@ foldBreak (Fold fstep initial _ final) stream = do
res <- initial
case res of
Fold.Partial fs -> go fs stream
Fold.Continue fs -> go fs stream
Fold.Done fb -> return (fb, stream)

where
Expand All @@ -784,6 +787,7 @@ foldBreak (Fold fstep initial _ final) stream = do
let arr = Array contents next end
return $! (b, StreamK.cons arr st)
Fold.Partial fs1 -> goArray fs1 st fp next
Fold.Continue fs1 -> goArray fs1 st fp next

RENAME(foldBreakChunksK,foldBreak)

Expand Down
116 changes: 116 additions & 0 deletions core/src/Streamly/Internal/Data/Fold/Combinators.hs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ pipe (Pipe consume produce pinitial) (Fold fstep finitial fextract ffinal) =
return
$ case acc1 of
Partial s -> Partial $ Tuple' cs1 s
Continue s -> Partial $ Tuple' cs1 s
Done b1 -> Done b1
-- XXX this case is recursive may cause fusion issues.
-- To remove recursion we will need a produce mode in folds which makes
Expand All @@ -470,6 +471,7 @@ pipe (Pipe consume produce pinitial) (Fold fstep finitial fextract ffinal) =
r <- produce ps1
case acc1 of
Partial s -> go s r
Continue s -> go s r
Done b1 -> return $ Done b1
go acc (Pipe.SkipC cs1) =
return $ Partial $ Tuple' cs1 acc
Expand Down Expand Up @@ -1390,6 +1392,21 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
(MutArray mba _ _ _) :: MutArray a <-
liftIO $ MA.emptyOf patLen
return $ Partial $ SplitOnSeqKR acc 0 mba
Continue acc
| patLen == 0 ->
-- XXX Should we match nothing or everything on empty
-- pattern?
-- Done <$> ffinal acc
return $ Partial $ SplitOnSeqEmpty acc
| patLen == 1 -> do
pat <- liftIO $ Array.unsafeGetIndexIO 0 patArr
return $ Partial $ SplitOnSeqSingle acc pat
| SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) ->
return $ Partial $ SplitOnSeqWord acc 0 0
| otherwise -> do
(MutArray mba _ _ _) :: MutArray a <-
liftIO $ MA.emptyOf patLen
return $ Partial $ SplitOnSeqKR acc 0 mba
Done b -> return $ Done b

-- Word pattern related
Expand Down Expand Up @@ -1423,13 +1440,17 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
res <- fstep s x
case res of
Partial s1 -> return $ Partial $ SplitOnSeqEmpty s1
Continue s1 -> return $ Partial $ SplitOnSeqEmpty s1
Done b -> return $ Done b
step (SplitOnSeqSingle s pat) x = do
res <- fstep s x
case res of
Partial s1
| pat /= x -> return $ Partial $ SplitOnSeqSingle s1 pat
| otherwise -> Done <$> ffinal s1
Continue s1
| pat /= x -> return $ Partial $ SplitOnSeqSingle s1 pat
| otherwise -> Done <$> ffinal s1
Done b -> return $ Done b
step (SplitOnSeqWord s idx wrd) x = do
res <- fstep s x
Expand All @@ -1442,6 +1463,13 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
else return $ Partial $ SplitOnSeqWordLoop s1 wrd1
| otherwise ->
return $ Partial $ SplitOnSeqWord s1 (idx + 1) wrd1
Continue s1
| idx == maxIndex -> do
if wrd1 .&. wordMask == wordPat
then Done <$> ffinal s1
else return $ Partial $ SplitOnSeqWordLoop s1 wrd1
| otherwise ->
return $ Partial $ SplitOnSeqWord s1 (idx + 1) wrd1
Done b -> return $ Done b
step (SplitOnSeqWordLoop s wrd) x = do
res <- fstep s x
Expand All @@ -1452,6 +1480,11 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
Done <$> ffinal s1
| otherwise ->
return $ Partial $ SplitOnSeqWordLoop s1 wrd1
Continue s1
| wrd1 .&. wordMask == wordPat ->
Done <$> ffinal s1
| otherwise ->
return $ Partial $ SplitOnSeqWordLoop s1 wrd1
Done b -> return $ Done b
step (SplitOnSeqKR s offset mba) x = do
res <- fstep s x
Expand All @@ -1471,6 +1504,21 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
else return $ Partial $ SplitOnSeqKRLoop s1 ringHash mba 0
else
return $ Partial $ SplitOnSeqKR s1 (offset + SIZE_OF(a)) mba
Continue s1 -> do
liftIO $ pokeAt offset mba x
if offset == maxOffset
then do
let arr :: Array a = Array
{ arrContents = mba
, arrStart = 0
, arrEnd = patBytes
}
let ringHash = Array.foldl' addCksum 0 arr
if ringHash == patHash && Array.byteEq arr patArr
then Done <$> ffinal s1
else return $ Partial $ SplitOnSeqKRLoop s1 ringHash mba 0
else
return $ Partial $ SplitOnSeqKR s1 (offset + SIZE_OF(a)) mba
Done b -> return $ Done b
step (SplitOnSeqKRLoop s cksum mba offset) x = do
res <- fstep s x
Expand All @@ -1491,6 +1539,22 @@ takeEndBySeq patArr (Fold fstep finitial fextract ffinal) =
if matches
then Done <$> ffinal s1
else return $ Partial $ SplitOnSeqKRLoop s1 ringHash mba rh1
Continue s1 -> do
let rb = RingArray
{ ringContents = mba
, ringSize = patBytes
, ringHead = offset
}
(rb1, old :: a) <- liftIO (RingArray.replace rb x)
let ringHash = deltaCksum cksum old x
let rh1 = ringHead rb1
matches <-
if ringHash == patHash
then liftIO $ RingArray.eqArray rb1 patArr
else return False
if matches
then Done <$> ffinal s1
else return $ Partial $ SplitOnSeqKRLoop s1 ringHash mba rh1
Done b -> return $ Done b

extractFunc fex state =
Expand Down Expand Up @@ -1557,6 +1621,22 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
(MutArray mba _ _ _) :: MutArray a <-
liftIO $ MA.emptyOf patLen
return $ Partial $ SplitOnSeqKR acc 0 mba
Continue acc
| patLen == 0 ->
-- XXX Should we match nothing or everything on empty
-- pattern?
-- Done <$> ffinal acc
return $ Partial $ SplitOnSeqEmpty acc
| patLen == 1 -> do
pat <- liftIO $ Array.unsafeGetIndexIO 0 patArr
return $ Partial $ SplitOnSeqSingle acc pat
-- XXX Need to add tests for this case
| SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) ->
return $ Partial $ SplitOnSeqWord acc 0 0
| otherwise -> do
(MutArray mba _ _ _) :: MutArray a <-
liftIO $ MA.emptyOf patLen
return $ Partial $ SplitOnSeqKR acc 0 mba
Done b -> return $ Done b

-- Word pattern related
Expand Down Expand Up @@ -1593,13 +1673,15 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
res <- fstep s x
case res of
Partial s1 -> return $ Partial $ SplitOnSeqEmpty s1
Continue s1 -> return $ Partial $ SplitOnSeqEmpty s1
Done b -> return $ Done b
step (SplitOnSeqSingle s pat) x = do
if pat /= x
then do
res <- fstep s x
case res of
Partial s1 -> return $ Partial $ SplitOnSeqSingle s1 pat
Continue s1 -> return $ Partial $ SplitOnSeqSingle s1 pat
Done b -> return $ Done b
else Done <$> ffinal s
step (SplitOnSeqWord s idx wrd) x = do
Expand All @@ -1621,6 +1703,11 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
Done <$> ffinal s1
| otherwise ->
return $ Partial $ SplitOnSeqWordLoop s1 wrd1
Continue s1
| wrd1 .&. wordMask == wordPat ->
Done <$> ffinal s1
| otherwise ->
return $ Partial $ SplitOnSeqWordLoop s1 wrd1
Done b -> return $ Done b
step (SplitOnSeqKR s offset mba) x = do
liftIO $ pokeAt offset mba x
Expand Down Expand Up @@ -1655,6 +1742,16 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
if matches
then Done <$> ffinal s1
else return $ Partial $ SplitOnSeqKRLoop s1 ringHash mba rh1
Continue s1 -> do
let ringHash = deltaCksum cksum old x
let rh1 = ringHead rb1
matches <-
if ringHash == patHash
then liftIO $ RingArray.eqArray rb1 patArr
else return False
if matches
then Done <$> ffinal s1
else return $ Partial $ SplitOnSeqKRLoop s1 ringHash mba rh1
Done b -> return $ Done b

-- XXX extract should return backtrack count as well. If the fold
Expand All @@ -1670,6 +1767,7 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
r <- fstep s (toEnum $ fromIntegral old)
case r of
Partial s1 -> consumeWord s1 (n - 1) wrd
Continue s1 -> consumeWord s1 (n - 1) wrd
Done b -> return b

let consumeArray s end mba offset =
Expand All @@ -1681,6 +1779,8 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
case r of
Partial s1 ->
consumeArray s1 end mba (offset + SIZE_OF(a))
Continue s1 ->
consumeArray s1 end mba (offset + SIZE_OF(a))
Done b -> return b

let consumeRing s orig mba offset = do
Expand All @@ -1698,6 +1798,11 @@ takeEndBySeq_ patArr (Fold fstep finitial fextract ffinal) =
in if rh == orig
then fex s1
else consumeRing s1 orig mba rh
Continue s1 ->
let rh = ringHead rb1
in if rh == orig
then fex s1
else consumeRing s1 orig mba rh
Done b -> return b

case state of
Expand Down Expand Up @@ -1795,6 +1900,9 @@ distributeScan getFolds = Scanl consume initial extract final
Partial fs -> do
r <- step fs a
run (Tuple' (Fold step (return r) extr fin : ys) zs) xs a
Continue fs -> do
r <- step fs a
run (Tuple' (Fold step (return r) extr fin : ys) zs) xs a
Done b -> do
run (Tuple' ys (b : zs)) xs a

Expand Down Expand Up @@ -2216,6 +2324,7 @@ unfoldEach (Unfold ustep inject) (Fold fstep initial extract final) =
fres <- fstep fs b
case fres of
Partial fs1 -> produce fs1 us1
Continue fs1 -> produce fs1 us1
-- XXX What to do with the remaining stream?
Done c -> return $ Done c
StreamD.Skip us1 -> produce fs us1
Expand Down Expand Up @@ -2321,13 +2430,16 @@ intersperseWithQuotes
case resL of
Partial sL ->
return $ Partial $ mkState sL
Continue sL ->
return $ Partial $ mkState sL
Done _ ->
errMsg "content" "succeed"

initial = do
res <- initialR
case res of
Partial sR -> initL (IntersperseQUnquoted sR)
Continue sR -> initL (IntersperseQUnquoted sR)
Done b -> return $ Done b

{-# INLINE collect #-}
Expand All @@ -2336,20 +2448,24 @@ intersperseWithQuotes
case res of
Partial s ->
initL (nextS s)
Continue s ->
initL (nextS s)
Done c -> return (Done c)

{-# INLINE process #-}
process a sL sR nextState = do
r <- stepL sL a
case r of
Partial s -> return $ Partial (nextState sR s)
Continue s -> return $ Partial (nextState sR s)
Done b -> collect nextState sR b

{-# INLINE processQuoted #-}
processQuoted a sL sR nextState = do
r <- stepL sL a
case r of
Partial s -> return $ Partial (nextState sR s)
Continue s -> return $ Partial (nextState sR s)
Done _ -> do
_ <- finalR sR
error "Collecting fold finished inside quote"
Expand Down
Loading
Loading