@@ -70,10 +70,14 @@ module Streamly.Internal.Data.Array
7070 , compactEndByByte_
7171 , compactEndByLn_
7272
73- , foldBreakChunks
73+ -- * Parsing Stream of Arrays
74+ , foldBreakChunks -- Uses Stream, bad perf on break
7475 , foldChunks
75- , foldBreakChunksK
76- , parseBreakChunksK
76+ , foldBreakChunksK -- XXX rename to foldBreak
77+ , parseBreakChunksK -- XXX uses Parser, should use parseBreak instead
78+ , parseBreak
79+ , parse
80+ , adapt
7781
7882 -- * Serialization
7983 , encodeAs
@@ -124,8 +128,10 @@ import Streamly.Internal.Data.MutByteArray.Type (PinnedState(..), MutByteArray)
124128import Streamly.Internal.Data.Serialize.Type (Serialize )
125129import Streamly.Internal.Data.Fold.Type (Fold (.. ))
126130import Streamly.Internal.Data.Parser (Parser (.. ), Initial (.. ), ParseError (.. ))
131+ import Streamly.Internal.Data.ParserK.Type
132+ (ParserK , ParseResult (.. ), Input (.. ), Step (.. ))
127133import Streamly.Internal.Data.Stream (Stream (.. ))
128- import Streamly.Internal.Data.StreamK (StreamK )
134+ import Streamly.Internal.Data.StreamK.Type (StreamK )
129135import Streamly.Internal.Data.SVar.Type (adaptState , defState )
130136import Streamly.Internal.Data.Tuple.Strict (Tuple' (.. ))
131137import Streamly.Internal.Data.Unfold.Type (Unfold (.. ))
@@ -137,10 +143,11 @@ import qualified Streamly.Internal.Data.MutByteArray.Type as MBA
137143import qualified Streamly.Internal.Data.MutArray as MA
138144import qualified Streamly.Internal.Data.RingArray as RB
139145import qualified Streamly.Internal.Data.Parser as Parser
140- -- import qualified Streamly.Internal.Data.ParserK as ParserK
146+ import qualified Streamly.Internal.Data.Parser.Type as ParserD
147+ import qualified Streamly.Internal.Data.ParserK.Type as ParserK
141148import qualified Streamly.Internal.Data.Stream as D
142149import qualified Streamly.Internal.Data.Stream as Stream
143- import qualified Streamly.Internal.Data.StreamK as StreamK
150+ import qualified Streamly.Internal.Data.StreamK.Type as StreamK
144151import qualified Streamly.Internal.Data.Unfold as Unfold
145152import qualified Prelude
146153
@@ -991,3 +998,269 @@ parseBreakChunksK (Parser pstep initial extract) stream = do
991998 let n = Prelude. length backBuf
992999 arr0 = fromListN n (Prelude. reverse backBuf)
9931000 return (Left (ParseError err), StreamK. fromPure arr0)
1001+
1002+ -- The backracking buffer consists of arrays in the most-recent-first order. We
1003+ -- want to take a total of n array elements from this buffer. Note: when we
1004+ -- have to take an array partially, we must take the last part of the array.
1005+ {-# INLINE backTrack #-}
1006+ backTrack :: forall m a . Unbox a =>
1007+ Int
1008+ -> [Array a ]
1009+ -> StreamK m (Array a )
1010+ -> (StreamK m (Array a ), [Array a ])
1011+ backTrack = go
1012+
1013+ where
1014+
1015+ go _ [] stream = (stream, [] )
1016+ go n xs stream | n <= 0 = (stream, xs)
1017+ go n (x: xs) stream =
1018+ let len = length x
1019+ in if n > len
1020+ then go (n - len) xs (StreamK. cons x stream)
1021+ else if n == len
1022+ then (StreamK. cons x stream, xs)
1023+ else let ! (Array contents start end) = x
1024+ ! start1 = end - (n * SIZE_OF (a))
1025+ arr1 = Array contents start1 end
1026+ arr2 = Array contents start start1
1027+ in (StreamK. cons arr1 stream, arr2: xs)
1028+
1029+ -- | Run a 'ParserK' over a 'StreamK' of Arrays and return the parse result and
1030+ -- the remaining Stream.
1031+ {-# INLINE_NORMAL parseBreak #-}
1032+ parseBreak
1033+ :: (Monad m , Unbox a )
1034+ => ParserK (Array a ) m b
1035+ -> StreamK m (Array a )
1036+ -> m (Either ParseError b , StreamK m (Array a ))
1037+ parseBreak parser input = do
1038+ let parserk = ParserK. runParser parser ParserK. parserDone 0 0
1039+ in go [] parserk input
1040+
1041+ where
1042+
1043+ {-# INLINE goStop #-}
1044+ goStop backBuf parserk = do
1045+ pRes <- parserk ParserK. None
1046+ case pRes of
1047+ -- If we stop in an alternative, it will try calling the next
1048+ -- parser, the next parser may call initial returning Partial and
1049+ -- then immediately we have to call extract on it.
1050+ ParserK. Partial 0 cont1 ->
1051+ go [] cont1 StreamK. nil
1052+ ParserK. Partial n cont1 -> do
1053+ let n1 = negate n
1054+ assertM(n1 >= 0 && n1 <= sum (Prelude. map length backBuf))
1055+ let (s1, backBuf1) = backTrack n1 backBuf StreamK. nil
1056+ in go backBuf1 cont1 s1
1057+ ParserK. Continue 0 cont1 ->
1058+ go backBuf cont1 StreamK. nil
1059+ ParserK. Continue n cont1 -> do
1060+ let n1 = negate n
1061+ assertM(n1 >= 0 && n1 <= sum (Prelude. map length backBuf))
1062+ let (s1, backBuf1) = backTrack n1 backBuf StreamK. nil
1063+ in go backBuf1 cont1 s1
1064+ ParserK. Done 0 b ->
1065+ return (Right b, StreamK. nil)
1066+ ParserK. Done n b -> do
1067+ let n1 = negate n
1068+ assertM(n1 >= 0 && n1 <= sum (Prelude. map length backBuf))
1069+ let (s1, _) = backTrack n1 backBuf StreamK. nil
1070+ in return (Right b, s1)
1071+ ParserK. Error _ err -> do
1072+ let (s1, _) = backTrack maxBound backBuf StreamK. nil
1073+ return (Left (ParseError err), s1)
1074+
1075+ seekErr n len =
1076+ error $ " parseBreak: Partial: forward seek not implemented n = "
1077+ ++ show n ++ " len = " ++ show len
1078+
1079+ yieldk backBuf parserk arr stream = do
1080+ pRes <- parserk (ParserK. Chunk arr)
1081+ let len = length arr
1082+ case pRes of
1083+ ParserK. Partial n cont1 ->
1084+ case compare n len of
1085+ EQ -> go [] cont1 stream
1086+ LT -> do
1087+ if n >= 0
1088+ then yieldk [] cont1 arr stream
1089+ else do
1090+ let n1 = negate n
1091+ bufLen = sum (Prelude. map length backBuf)
1092+ s = StreamK. cons arr stream
1093+ assertM(n1 >= 0 && n1 <= bufLen)
1094+ let (s1, _) = backTrack n1 backBuf s
1095+ go [] cont1 s1
1096+ GT -> seekErr n len
1097+ ParserK. Continue n cont1 ->
1098+ case compare n len of
1099+ EQ -> go (arr: backBuf) cont1 stream
1100+ LT -> do
1101+ if n >= 0
1102+ then yieldk backBuf cont1 arr stream
1103+ else do
1104+ let n1 = negate n
1105+ bufLen = sum (Prelude. map length backBuf)
1106+ s = StreamK. cons arr stream
1107+ assertM(n1 >= 0 && n1 <= bufLen)
1108+ let (s1, backBuf1) = backTrack n1 backBuf s
1109+ go backBuf1 cont1 s1
1110+ GT -> seekErr n len
1111+ ParserK. Done n b -> do
1112+ let n1 = len - n
1113+ assertM(n1 <= sum (Prelude. map length (arr: backBuf)))
1114+ let (s1, _) = backTrack n1 (arr: backBuf) stream
1115+ in return (Right b, s1)
1116+ ParserK. Error _ err -> do
1117+ let (s1, _) = backTrack maxBound (arr: backBuf) stream
1118+ return (Left (ParseError err), s1)
1119+
1120+ go backBuf parserk stream = do
1121+ let stop = goStop backBuf parserk
1122+ single a = yieldk backBuf parserk a StreamK. nil
1123+ in StreamK. foldStream
1124+ defState (yieldk backBuf parserk) single stop stream
1125+
1126+ {-# INLINE parse #-}
1127+ parse :: (Monad m , Unbox a ) =>
1128+ ParserK (Array a ) m b -> StreamK m (Array a ) -> m (Either ParseError b )
1129+ parse f = fmap fst . parseBreak f
1130+
1131+ -------------------------------------------------------------------------------
1132+ -- Convert ParserD to ParserK
1133+ -------------------------------------------------------------------------------
1134+
1135+ {-# INLINE adaptCWith #-}
1136+ adaptCWith
1137+ :: forall m a s b r . (Monad m , Unbox a )
1138+ => (s -> a -> m (ParserD. Step s b ))
1139+ -> m (ParserD. Initial s b )
1140+ -> (s -> m (ParserD. Step s b ))
1141+ -> (ParseResult b -> Int -> Input (Array a ) -> m (Step (Array a ) m r ))
1142+ -> Int
1143+ -> Int
1144+ -> Input (Array a )
1145+ -> m (Step (Array a ) m r )
1146+ adaptCWith pstep initial extract cont ! offset0 ! usedCount ! input = do
1147+ res <- initial
1148+ case res of
1149+ ParserD. IPartial pst -> do
1150+ case input of
1151+ Chunk arr -> parseContChunk usedCount offset0 pst arr
1152+ None -> parseContNothing usedCount pst
1153+ ParserD. IDone b -> cont (Success offset0 b) usedCount input
1154+ ParserD. IError err -> cont (Failure offset0 err) usedCount input
1155+
1156+ where
1157+
1158+ -- XXX We can maintain an absolute position instead of relative that will
1159+ -- help in reporting of error location in the stream.
1160+ {-# NOINLINE parseContChunk #-}
1161+ parseContChunk ! count ! offset ! state arr@ (Array contents start end) = do
1162+ if offset >= 0
1163+ then go SPEC (start + offset * SIZE_OF (a)) state
1164+ else return $ Continue offset (parseCont count state)
1165+
1166+ where
1167+
1168+ {-# INLINE onDone #-}
1169+ onDone n b =
1170+ assert (n <= length arr)
1171+ (cont (Success n b) (count + n - offset) (Chunk arr))
1172+
1173+ {-# INLINE callParseCont #-}
1174+ callParseCont constr n pst1 =
1175+ assert (n < 0 || n >= length arr)
1176+ (return $ constr n (parseCont (count + n - offset) pst1))
1177+
1178+ {-# INLINE onPartial #-}
1179+ onPartial = callParseCont Partial
1180+
1181+ {-# INLINE onContinue #-}
1182+ onContinue = callParseCont Continue
1183+
1184+ {-# INLINE onError #-}
1185+ onError n err =
1186+ cont (Failure n err) (count + n - offset) (Chunk arr)
1187+
1188+ {-# INLINE onBack #-}
1189+ onBack offset1 elemSize constr pst = do
1190+ let pos = offset1 - start
1191+ in if pos >= 0
1192+ then go SPEC offset1 pst
1193+ else constr (pos `div` elemSize) pst
1194+
1195+ -- Note: div may be expensive but the alternative is to maintain an element
1196+ -- offset in addition to a byte offset or just the element offset and use
1197+ -- multiplication to get the byte offset every time, both these options
1198+ -- turned out to be more expensive than using div.
1199+ go ! _ ! cur ! pst | cur >= end =
1200+ onContinue ((end - start) `div` SIZE_OF (a)) pst
1201+ go ! _ ! cur ! pst = do
1202+ let ! x = unsafeInlineIO $ peekAt cur contents
1203+ pRes <- pstep pst x
1204+ let elemSize = SIZE_OF (a)
1205+ next = INDEX_NEXT (cur,a)
1206+ back n = next - n * elemSize
1207+ curOff = (cur - start) `div` elemSize
1208+ nextOff = (next - start) `div` elemSize
1209+ -- The "n" here is stream position index wrt the array start, and
1210+ -- not the backtrack count as returned by byte stream parsers.
1211+ case pRes of
1212+ ParserD. Done 0 b ->
1213+ onDone nextOff b
1214+ ParserD. Done 1 b ->
1215+ onDone curOff b
1216+ ParserD. Done n b ->
1217+ onDone ((back n - start) `div` elemSize) b
1218+ ParserD. Partial 0 pst1 ->
1219+ go SPEC next pst1
1220+ ParserD. Partial 1 pst1 ->
1221+ go SPEC cur pst1
1222+ ParserD. Partial n pst1 ->
1223+ onBack (back n) elemSize onPartial pst1
1224+ ParserD. Continue 0 pst1 ->
1225+ go SPEC next pst1
1226+ ParserD. Continue 1 pst1 ->
1227+ go SPEC cur pst1
1228+ ParserD. Continue n pst1 ->
1229+ onBack (back n) elemSize onContinue pst1
1230+ ParserD. Error err ->
1231+ onError curOff err
1232+
1233+ {-# NOINLINE parseContNothing #-}
1234+ parseContNothing ! count ! pst = do
1235+ r <- extract pst
1236+ case r of
1237+ -- IMPORTANT: the n here is from the byte stream parser, that means
1238+ -- it is the backtrack element count and not the stream position
1239+ -- index into the current input array.
1240+ ParserD. Done n b ->
1241+ assert (n >= 0 )
1242+ (cont (Success (- n) b) (count - n) None )
1243+ ParserD. Continue n pst1 ->
1244+ assert (n >= 0 )
1245+ (return $ Continue (- n) (parseCont (count - n) pst1))
1246+ ParserD. Error err ->
1247+ -- XXX It is called only when there is no input arr. So using 0
1248+ -- as the position is correct?
1249+ cont (Failure 0 err) count None
1250+ ParserD. Partial _ _ -> error " Bug: adaptCWith Partial unreachable"
1251+
1252+ -- XXX Maybe we can use two separate continuations instead of using
1253+ -- Just/Nothing cases here. That may help in avoiding the parseContJust
1254+ -- function call.
1255+ {-# INLINE parseCont #-}
1256+ parseCont ! cnt ! pst (Chunk arr) = parseContChunk cnt 0 pst arr
1257+ parseCont ! cnt ! pst None = parseContNothing cnt pst
1258+
1259+ -- | Convert a 'Parser' to 'ParserK' working on an Array stream.
1260+ --
1261+ -- /Pre-release/
1262+ --
1263+ {-# INLINE_LATE adapt #-}
1264+ adapt :: (Monad m , Unbox a ) => ParserD. Parser a m b -> ParserK (Array a ) m b
1265+ adapt (ParserD. Parser step initial extract) =
1266+ ParserK. MkParser $ adaptCWith step initial extract
0 commit comments