0 | ||| General operations on `Pull`s
  1 | ||| and `Stream`s: Splits, scans, filters, and maps
  2 | |||
  3 | ||| It is suggested to import this qualified `import FS.Pull as P` or
  4 | ||| via the catch-all module `FS` and use qualified names such
  5 | ||| as `P.filter` for those functions that overlap with the ones
  6 | ||| from `FS.Chunk`.
  7 | module FS.Pull
  8 |
  9 | import Control.Monad.MCancel
 10 | import Control.Monad.Resource
 11 | import Data.Either
 12 | import Data.List
 13 | import Data.List.Quantifiers
 14 | import public FS.Core
 15 |
 16 | import Data.SnocList
 17 |
 18 | %default total
 19 | %hide Data.Linear.(.)
 20 |
 21 | --------------------------------------------------------------------------------
 22 | -- Creating Pulls
 23 | --------------------------------------------------------------------------------
 24 |
 25 | public export
 26 | data UnfoldRes : (r,s,o : Type) -> Type where
 27 |   More : (vals : o) -> (st : s) -> UnfoldRes r s o
 28 |   Done : (res : r) ->  UnfoldRes r s o
 29 |   Last : (res : r) ->  (vals : o) -> UnfoldRes r s o
 30 |
 31 | ||| Emits a list of chunks.
 32 | export %inline
 33 | emits : List o -> Stream f es o
 34 | emits []     = pure ()
 35 | emits (h::t) = cons h (emits t)
 36 |
 37 | ||| Emits a list of values as a single chunk.
 38 | export
 39 | emitList : List o -> Stream f es (List o)
 40 | emitList [] = pure ()
 41 | emitList vs = emit vs
 42 |
 43 | ||| Emits a single optional value.
 44 | export
 45 | emitMaybe : Maybe o -> Stream f es o
 46 | emitMaybe Nothing  = pure ()
 47 | emitMaybe (Just v) = emit v
 48 |
 49 | ||| Utility to emit a single list chunk from a snoc list.
 50 | export %inline
 51 | emitSnoc : SnocList o -> Maybe o -> Stream f es (List o)
 52 | emitSnoc sv m = emitList $ (sv <>> maybe [] pure m)
 53 |
 54 | ||| Emits a single chunk of output generated from an effectful
 55 | ||| computation.
 56 | export %inline
 57 | eval : f es o -> Stream f es o
 58 | eval act = exec act >>= emit
 59 |
 60 | ||| Like `unfold` but emits values in larger chunks.
 61 | |||
 62 | ||| This allows us to potentially emit a bunch of values right before
 63 | ||| we are done.
 64 | |||
 65 | ||| This overlaps with function `FS.Chunk.unfold`, so it is typically
 66 | ||| used qualified: `P.unfold`.
 67 | export
 68 | unfold : (init : s) -> (s -> UnfoldRes r s o) -> Pull f o es r
 69 | unfold init g =
 70 |   case g init of
 71 |     More vals st => cons vals (unfold st g)
 72 |     Done res      => pure res
 73 |     Last res vals => cons vals (pure res)
 74 |
 75 | ||| Like `unfold` but produces values via an effectful computation
 76 | ||| until a `Done` or `Last` is returned.
 77 | export
 78 | unfoldEval : (init : s) -> (s -> f es (UnfoldRes r s o)) -> Pull f o es r
 79 | unfoldEval cur act =
 80 |   assert_total $ exec (act cur) >>= \case
 81 |     More vals st => cons vals (unfoldEval st act)
 82 |     Done res      => pure res
 83 |     Last res vals => cons vals (pure res)
 84 |
 85 | ||| Produces values via an effectful computation
 86 | ||| until a `Nothing` is returned.
 87 | export
 88 | unfoldEvalMaybe : f es (Maybe o) -> Stream f es o
 89 | unfoldEvalMaybe act =
 90 |   assert_total $ exec act >>= \case
 91 |     Nothing => pure ()
 92 |     Just o  => cons o (unfoldEvalMaybe act)
 93 |
 94 | ||| Infinitely produces chunks of values of the given size
 95 | export
 96 | fill : o -> Pull f o es ()
 97 | fill v = cons v (fill v)
 98 |
 99 | ||| An infinite stream of values of type `o` where
100 | ||| the next value is built from the previous one by
101 | ||| applying the given function.
102 | export
103 | iterate : o -> (o -> o) -> Pull f o es ()
104 | iterate v f = unfold v (\x => More x $ f x)
105 |
106 | ||| Produces the given chunk of value `n` times.
107 | export
108 | replicate : Nat -> o -> Stream f es o
109 | replicate 0     _ = pure ()
110 | replicate (S k) v = cons v (replicate k v)
111 |
112 | ||| Infinitely cycles through the given `Pull`
113 | export
114 | repeat : Stream f es o -> Pull f o es ()
115 | repeat v = assert_total $ v >> repeat v
116 |
117 | ||| Prepends the given optional output to a pull.
118 | export %inline
119 | consMaybe : Maybe o -> Pull f o es r -> Pull f o es r
120 | consMaybe (Just v) p = cons v p
121 | consMaybe Nothing  p = p
122 |
123 | 0 cproof : NonEmpty (as ++ [a])
124 | cproof {as = []}   = %search
125 | cproof {as = _::_} = %search
126 |
127 | ||| Infinitely cycles through the values in the given
128 | ||| non-empty list.
129 | export
130 | cycle : (as : List a) -> (0 p : NonEmpty as) => Stream f es a
131 | cycle (v::vs) = assert_total $ emit v >> cycle (vs ++ [v]) @{cproof}
132 |
133 | --------------------------------------------------------------------------------
134 | -- Splitting Streams
135 | --------------------------------------------------------------------------------
136 |
137 | public export
138 | data BreakInstruction : Type where
139 |   ||| Take the first succeeding value as part of the emitted prefix
140 |   TakeHit : BreakInstruction
141 |
142 |   ||| Keep the succeeding value as part of the postfix.
143 |   PostHit : BreakInstruction
144 |
145 |   ||| Discard the succeeding value
146 |   DropHit : BreakInstruction
147 |
148 | ||| Like `uncons` but does not consume the chunk
149 | export
150 | peek : Pull f o es r -> Pull f q es (Either r (o, Pull f o es r))
151 | peek p =
152 |   uncons p <&> \case
153 |     Left v       => Left v
154 |     Right (vs,q) => Right (vs, cons vs q)
155 |
156 | ||| Like `peeks` but only checks if the pull has been exhausted or not.
157 | export %inline
158 | peekRes : Pull f o es r -> Pull f q es (Either r (Pull f o es r))
159 | peekRes = map (map snd) . peek
160 |
161 | ||| Empties a stream silently dropping all output.
162 | export
163 | drain : Pull f o es r -> Pull f q es r
164 | drain p =
165 |   assert_total $ uncons p >>= \case
166 |     Left x      => pure x
167 |     Right (_,p) => drain p
168 |
169 | ||| Emits the first `n` values of a `Pull`, returning the remainder.
170 | |||
171 | ||| In case the remaining pull is exhausted, with will wrap the
172 | ||| final result in a `Left`, otherwise, the (non-empty) remainder will be
173 | ||| wrapped in a right.
174 | export
175 | splitAt : Nat -> Pull f o es r -> Pull f o es (Pull f o es r)
176 | splitAt 0     p = pure p
177 | splitAt (S k) p =
178 |   assert_total $ uncons p >>= \case
179 |     Left v      => pure (pure v)
180 |     Right (v,q) => cons v (splitAt k q)
181 |
182 | ||| Emits only the first `n` values of a `Stream`.
183 | export %inline
184 | take : Nat -> Pull f o es r -> Stream f es o
185 | take n = ignore . newScope . splitAt n
186 |
187 | ||| Like `take` but limits the number of emitted values.
188 | |||
189 | ||| This fails with the given error in case the limit is exceeded.
190 | |||
191 | ||| Note: This tries to read past the end of a pull by invoking `peekRes`.
192 | |||       This is not suitable for limitting the input from a stream that
193 | |||       blocks once it is exhausted.
194 | export %inline
195 | limit : Has e es => Lazy e -> Nat -> Pull f o es r -> Pull f o es r
196 | limit err n p = do
197 |   q      <- splitAt n p
198 |   Left v <- peekRes q | Right _ => throw err
199 |   pure v
200 |
201 | ||| Discards the first `n` values of a `Stream`, returning the
202 | ||| remainder.
203 | export %inline
204 | drop : Nat -> Pull f o es r -> Pull f o es r
205 | drop k q = join $ drain (splitAt k q)
206 |
207 | ||| Only keeps the first element of the input.
208 | export %inline
209 | head : Pull f o es r -> Stream f es o
210 | head = take 1
211 |
212 | ||| Drops the first element of the input.
213 | export %inline
214 | tail : Pull f o es r -> Pull f o es r
215 | tail = drop 1
216 |
217 | ||| Result of splitting a value into two parts. This is used to
218 | ||| split up streams of data along logical boundaries.
219 | public export
220 | data BreakRes : (c : Type) -> Type where
221 |   ||| The value was broken and we got a (possibly empty) pre- and postfix.
222 |   Broken : (pre, post : Maybe c) -> BreakRes c
223 |   ||| The value could not be broken.
224 |   Keep   : c -> BreakRes c
225 |
226 | ||| Uses the given breaking function to break the pull into
227 | ||| a prefix of emitted chunks and a suffix that is returned as
228 | ||| the result.
229 | export
230 | breakPull :
231 |      (orElse : r -> Pull f o es r)
232 |   -> (o -> BreakRes o)
233 |   -> Pull f o es r
234 |   -> Pull f o es (Pull f o es r)
235 | breakPull orElse g p =
236 |   assert_total $ uncons p >>= \case
237 |     Left r      => pure (orElse r)
238 |     Right (v,q) => case g v of
239 |       Keep w         => cons w (breakPull orElse g q)
240 |       Broken pre pst => consMaybe pre (pure $ consMaybe pst q)
241 |
242 | ||| Breaks a pull as soon as the given predicate returns `True`.
243 | |||
244 | ||| The `BreakInstruction` dictates, if the value, for which the
245 | ||| predicate held, should be emitted as part of the prefix or the
246 | ||| suffix, or if it should be discarded.
247 | export
248 | breakFull :
249 |      (orElse : r -> Pull f o es r)
250 |   -> BreakInstruction
251 |   -> (o -> Bool)
252 |   -> Pull f o es r
253 |   -> Pull f o es (Pull f o es r)
254 | breakFull orElse bi pred =
255 |   breakPull orElse $ \v => case pred v of
256 |     False => Keep v
257 |     True  => case bi of
258 |       TakeHit => Broken (Just v) Nothing
259 |       PostHit => Broken Nothing  (Just v)
260 |       DropHit => Broken Nothing Nothing
261 |
262 | ||| Like `breakFull` but fails with the given execption if no
263 | ||| breaking point was found.
264 | export %inline
265 | forceBreakFull :
266 |      {auto has : Has e es}
267 |   -> (err : Lazy e)
268 |   -> BreakInstruction
269 |   -> (o -> Bool)
270 |   -> Pull f o es r
271 |   -> Pull f o es (Pull f o es r)
272 | forceBreakFull err = breakFull (const $ throw err)
273 |
274 | ||| Emits values until the given predicate returns `True`.
275 | |||
276 | ||| The `BreakInstruction` dictates, if the value, for which the
277 | ||| predicate held, should be emitted as part of the prefix or not.
278 | export
279 | takeUntil : BreakInstruction -> (o -> Bool) -> Pull f o es r -> Stream f es o
280 | takeUntil tf pred = ignore . newScope . breakFull pure tf pred
281 |
282 | ||| Emits values until the given predicate returns `False`,
283 | ||| returning the remainder of the `Pull`.
284 | export %inline
285 | takeWhile : (o -> Bool) -> Pull f o es r -> Stream f es o
286 | takeWhile pred = takeUntil DropHit (not . pred)
287 |
288 | ||| Like `takeWhile` but also includes the first failure.
289 | export %inline
290 | takeThrough : (o -> Bool) -> Pull f o es r -> Stream f es o
291 | takeThrough pred = takeUntil TakeHit (not . pred)
292 |
293 | ||| Emits values until the given pull emits a `Nothing`.
294 | export
295 | takeWhileJust : Pull f (Maybe o) es r -> Stream f es o
296 | takeWhileJust = newScope . go
297 |   where
298 |     go : Pull f (Maybe o) es r -> Stream f es o
299 |     go p =
300 |       assert_total $ uncons p >>= \case
301 |         Left _      => pure ()
302 |         Right (Just v,q)  => cons v (takeWhileJust q)
303 |         Right (Nothing,q) => pure ()
304 |
305 | ||| Discards values until the given predicate returns `True`.
306 | |||
307 | ||| The `BreakInstruction` dictates, if the value, for which the
308 | ||| predicate held, should be emitted as part of the remainder or not.
309 | export
310 | dropUntil : BreakInstruction -> (o -> Bool) -> Pull f o es r -> Pull f o es r
311 | dropUntil tf pred p = drain (breakFull pure tf pred p) >>= id
312 |
313 | ||| Drops values from a stream while the given predicate returns `True`,
314 | ||| returning the remainder of the stream.
315 | export %inline
316 | dropWhile : (o -> Bool) -> Pull f o es r -> Pull f o es r
317 | dropWhile pred = dropUntil PostHit (not . pred)
318 |
319 | ||| Like `dropWhile` but also drops the first value where
320 | ||| the predicate returns `False`.
321 | export %inline
322 | dropThrough : (o -> Bool) -> Pull f o es r -> Pull f o es r
323 | dropThrough pred = dropUntil DropHit (not . pred)
324 |
325 | --------------------------------------------------------------------------------
326 | -- Maps and Filters
327 | --------------------------------------------------------------------------------
328 |
329 | ||| Maps chunks of output via a partial function.
330 | export
331 | mapMaybe : (o -> Maybe p) -> Pull f o es r -> Pull f p es r
332 | mapMaybe f p =
333 |   assert_total $ uncons p >>= \case
334 |     Left x      => pure x
335 |     Right (v,q) => case f v of
336 |       Just w  => cons w (mapMaybe f q)
337 |       Nothing => mapMaybe f q
338 |
339 | ||| Emits a values wrapped in a `Just` from the initial stream.
340 | export %inline
341 | catMaybes : Pull f (Maybe o) es r -> Pull f o es r
342 | catMaybes = mapMaybe id
343 |
344 | ||| Emits the values, for which the given function returns a `Right`.
345 | export
346 | mapRight : (o -> Either e p) -> Pull f o es r -> Pull f p es r
347 | mapRight f = mapMaybe (getRight . f)
348 |
349 | ||| Emits the values wrapped `Rights`.
350 | export %inline
351 | catRights : Pull f (Either e o) es r -> Pull f o es r
352 | catRights = mapMaybe getRight
353 |
354 | ||| Emits the values, for which the given function returns a `Right`.
355 | export
356 | mapLeft : (o -> Either e p) -> Pull f o es r -> Pull f e es r
357 | mapLeft f = mapMaybe (getLeft . f)
358 |
359 | ||| Emits the values wrapped `Rights`.
360 | export %inline
361 | catLefts : Pull f (Either e o) es r -> Pull f e es r
362 | catLefts = mapMaybe getLeft
363 |
364 | ||| Chunk-wise maps the emit of a `Pull`
365 | export %inline
366 | mapOutput : (o -> p) -> Pull f o es r -> Pull f p es r
367 | mapOutput f = mapMaybe (Just . f)
368 |
369 | ||| Chunk-wise effectful mapping of the emit of a `Pull`
370 | export
371 | evalMap : (o -> f es p) -> Pull f o es r -> Pull f p es r
372 | evalMap f p =
373 |   assert_total $ uncons p >>= \case
374 |     Left x       => pure x
375 |     Right (vs,p) => do
376 |       ws <- exec (f vs)
377 |       cons ws (evalMap f p)
378 |
379 | ||| Chunk-wise effectful mapping of the emit of a `Pull`
380 | export
381 | evalMapMaybe : (o -> f es (Maybe p)) -> Pull f o es r -> Pull f p es r
382 | evalMapMaybe f p =
383 |   assert_total $ uncons p >>= \case
384 |     Left x       => pure x
385 |     Right (v,q) => do
386 |       Just w <- exec (f v) | Nothing => evalMapMaybe f q
387 |       cons w (evalMapMaybe f q)
388 |
389 | ||| Chunk-wise filters the emit of a `Pull` emitting only the
390 | ||| values that fulfill the given predicate
391 | export
392 | filter : (o -> Bool) -> Pull f o es r -> Pull f o es r
393 | filter pred p =
394 |   assert_total $ uncons p >>= \case
395 |     Left x      => pure x
396 |     Right (v,q) => case pred v of
397 |       True  => cons v (filter pred q)
398 |       False => filter pred q
399 |
400 | ||| Convenience alias for `filterC (not . pred)`.
401 | export %inline
402 | filterNot : (o -> Bool) -> Pull f o es r -> Pull f o es r
403 | filterNot pred = filter (not . pred)
404 |
405 | ||| Wraps the values emitted by this stream in a `Just` and
406 | ||| marks its end with a `Nothing`.
407 | export
408 | endWithNothing : Pull f o es r -> Pull f (Maybe o) es r
409 | endWithNothing s = mapOutput Just s >>= \res => cons Nothing (pure res)
410 |
411 | --------------------------------------------------------------------------------
412 | -- Folds
413 | --------------------------------------------------------------------------------
414 |
415 | ||| Returns the first output of this stream and pairs it with the
416 | ||| result.
417 | |||
418 | ||| The remainder of the pull is drained and run to completion.
419 | export
420 | pairLastOr : Lazy o -> Pull f o es r -> Pull f p es (o,r)
421 | pairLastOr dflt s =
422 |   assert_total $ uncons s >>= \case
423 |     Left res    => pure (dflt,res)
424 |     Right (v,q) => pairLastOr v q
425 |
426 | ||| Like `pairLastOr` but operates on a stream and therefore only returns
427 | ||| the last output.
428 | export %inline
429 | lastOr : Lazy o -> Stream f es o -> Pull f p es o
430 | lastOr dflt s = fst <$> pairLastOr dflt s
431 |
432 | ||| Like `firstOr` but fails with the given error in case no
433 | ||| value is emitted.
434 | export
435 | pairLastOrErr : Has e es => Lazy e -> Pull f o es r -> Pull f p es (o,r)
436 | pairLastOrErr err s =
437 |   uncons s >>= \case
438 |     Left res    => throw err
439 |     Right (v,q) => pairLastOr v q
440 |
441 | ||| Like `pairLastOrErr` but operates on a stream and therefore only returns
442 | ||| the last output.
443 | export %inline
444 | lastOrErr : Has e es => Lazy e -> Stream f es o -> Pull f p es o
445 | lastOrErr err s = fst <$> pairLastOrErr err s
446 |
447 | ||| Folds the emit of a pull using an initial value and binary operator.
448 | |||
449 | ||| The accumulated result is emitted as a single value.
450 | export
451 | fold : (p -> o -> p) -> p -> Pull f o es r -> Pull f p es r
452 | fold g v s =
453 |   assert_total $ uncons s >>= \case
454 |     Left res      => cons v (pure res)
455 |     Right (vs,s2) => fold g (g v vs) s2
456 |
457 | ||| Like `fold` but instead of emitting the result as a single
458 | ||| value of output, it is paired with the `Pull`s result.
459 | export
460 | foldPair : (p -> o -> p) -> p -> Pull f o es r -> Pull f q es (p,r)
461 | foldPair g v s =
462 |   assert_total $ uncons s >>= \case
463 |     Left res      => pure (v, res)
464 |     Right (vs,s2) => foldPair g (g v vs) s2
465 |
466 | ||| Like `foldPair` but with a function that can fail.
467 | export
468 | foldPairE :
469 |      {auto has : Has e es}
470 |   -> (p -> o -> Either e p)
471 |   -> p
472 |   -> Pull f o es r
473 |   -> Pull f q es (p,r)
474 | foldPairE g v s =
475 |   assert_total $ uncons s >>= \case
476 |     Left res      => pure (v, res)
477 |     Right (vs,s2) => case g v vs of
478 |       Right v2 => foldPairE g v2 s2
479 |       Left  x  => throw x
480 |
481 | ||| Convenience version of `foldPair` for working on streams.
482 | export %inline
483 | foldGet : (p -> o -> p) -> p -> Stream f es o -> Pull f q es p
484 | foldGet acc ini = map fst . foldPair acc ini
485 |
486 | ||| Like `foldC` but will not emit a value in case of an empty pull.
487 | export
488 | fold1 : (o -> o -> o) -> Pull f o es r -> Pull f o es r
489 | fold1 g s =
490 |   uncons s >>= \case
491 |     Left r      => pure r
492 |     Right (v,q) => fold g v q
493 |
494 | ||| Emits `True` if all emitted values of the given stream fulfill
495 | ||| the given predicate
496 | export
497 | all : (o -> Bool) -> Pull f o es r -> Stream f es Bool
498 | all pred p =
499 |   assert_total $ uncons p >>= \case
500 |     Left _ => emit True
501 |     Right (vs,q) => case pred vs of
502 |       True  => all pred q
503 |       False => emit False
504 |
505 | ||| Returns `True` if any of the emitted values of the given stream fulfills
506 | ||| the given predicate
507 | export
508 | any : (o -> Bool) -> Pull f o es r -> Stream f es Bool
509 | any pred p =
510 |   assert_total $ uncons p >>= \case
511 |     Left _ => emit False
512 |     Right (vs,q) => case pred vs of
513 |       False  => any pred q
514 |       True   => emit True
515 |
516 | ||| Sums up the emitted chunks.
517 | export %inline
518 | sum : Num o => Pull f o es r -> Pull f o es r
519 | sum = fold (+) 0
520 |
521 | ||| Counts the number of emitted chunks.
522 | export %inline
523 | count : Pull f o es r -> Pull f Nat es r
524 | count = fold (const . S) 0
525 |
526 | ||| Emits the largest output encountered.
527 | export %inline
528 | maximum : Ord o => Pull f o es r -> Pull f o es r
529 | maximum = fold1 max
530 |
531 | ||| Emits the smallest output encountered.
532 | export %inline
533 | minimum : Ord o => Pull f o es r -> Pull f o es r
534 | minimum = fold1 min
535 |
536 | ||| Emits the smallest output encountered.
537 | export %inline
538 | mappend : Semigroup o => Pull f o es r -> Pull f o es r
539 | mappend = fold1 (<+>)
540 |
541 | ||| Accumulates the emitted values over a monoid.
542 | |||
543 | ||| Note: This corresponds to a left fold, so it will
544 | |||       run in quadratic time for monoids that are
545 | |||       naturally accumulated from the right (such as List)
546 | export %inline
547 | foldMap : Monoid m => (o -> m) -> Pull f o es r -> Pull f m es r
548 | foldMap f = fold (\v,el => v <+> f el) neutral
549 |
550 | ||| Folds the emit of a pull using an initial value and binary operator.
551 | |||
552 | ||| The accumulated result is emitted as a single value.
553 | export
554 | evalFold : (p -> o -> f es p) -> p -> Pull f o es r -> Pull f p es r
555 | evalFold g v s =
556 |   assert_total $ uncons s >>= \case
557 |     Left res      => cons v (pure res)
558 |     Right (vs,s2) => exec (g v vs) >>= \v2 => evalFold g v2 s2
559 |
560 | ||| Like `fold` but instead of emitting the result as a single
561 | ||| value of output, it is paired with the `Pull`s result.
562 | export
563 | evalFoldPair : (p -> o -> f es p) -> p -> Pull f o es r -> Pull f q es (p,r)
564 | evalFoldPair g v s =
565 |   assert_total $ uncons s >>= \case
566 |     Left res      => pure (v, res)
567 |     Right (vs,s2) => exec (g v vs) >>= \v2 => evalFoldPair g v2 s2
568 |
569 | ||| Convenience version of `evalFoldPair` for working on streams.
570 | export %inline
571 | evalFoldGet : (p -> o -> f es p) -> p -> Stream f es o -> Pull f q es p
572 | evalFoldGet acc ini = map fst . evalFoldPair acc ini
573 |
574 | --------------------------------------------------------------------------------
575 | -- Scans
576 | --------------------------------------------------------------------------------
577 |
578 | ||| Runs a stateful computation across all chunks of data.
579 | export
580 | scan : s -> (s -> o -> (p,s)) -> Pull f o es r -> Pull f p es r
581 | scan s1 f p =
582 |   assert_total $ uncons p >>= \case
583 |     Left res    => pure res
584 |     Right (v,q) => let (w,s2) := f s1 v in cons w (scan s2 f q)
585 |
586 | ||| Emits values of the original pull, but emits consecutive
587 | ||| values for which `f` returns `True` only once.
588 | |||
589 | ||| Note: For this to work correctly, `f` must be an equivalence relation.
590 | |||       This function makes no guarantees about the order in which arguments
591 | |||       are passed to `f`, so `f` must be symmetric. It also makes
592 | |||       no guarantees about which emitted value is being kept in its
593 | |||       internal state, so `f` must be transitive.
594 | export
595 | distinctBy : (o -> o -> Bool) -> Pull f o es r -> Pull f o es r
596 | distinctBy f = catMaybes . scan Nothing next
597 |   where
598 |     next : Maybe o -> o -> (Maybe o, Maybe o)
599 |     next Nothing  v = (Just v, Just v)
600 |     next (Just x) v = if f x v then (Nothing,Just v) else (Just v, Just v)
601 |
602 | ||| Emits values of the original pull, but emits consecutive
603 | ||| identical values only once.
604 | export %inline
605 | distinct : Eq o => Pull f o es r -> Pull f o es r
606 | distinct = distinctBy (==)
607 |
608 | ||| General stateful conversion of a `Pull`s emit.
609 | |||
610 | ||| Aborts as soon as the given accumulator function returns `Nothing`
611 | export
612 | scanMaybe : s -> (s -> Maybe (o -> (p,s))) -> Pull f o es r -> Pull f p es s
613 | scanMaybe s1 f p =
614 |   assert_total $ case f s1 of
615 |     Nothing => pure s1
616 |     Just g  => uncons p >>= \case
617 |       Left _      => pure s1
618 |       Right (v,p) => let (w,s2) := g v in cons w (scanMaybe s2 f p)
619 |
620 | ||| Like `scan` but will possibly also emit the final state.
621 | export
622 | scanFull :
623 |      s
624 |   -> (s -> o -> (Maybe p,s))
625 |   -> (s -> Maybe p)
626 |   -> Pull f o es r
627 |   -> Pull f p es r
628 | scanFull s1 g last p = do
629 |   assert_total $ uncons p >>= \case
630 |     Left v      => consMaybe (last s1) (pure v)
631 |     Right (v,q) => let (w,s2) := g s1 v in consMaybe w (scanFull s2 g last q)
632 |
633 | ||| Zips the input with a running total, up to but
634 | ||| not including the current element.
635 | export
636 | zipWithScan : p -> (p -> o -> p) -> Pull f o es r -> Pull f (o,p) es r
637 | zipWithScan vp fun =
638 |   scan vp $ \vp1,vo => let vp2 := fun vp1 vo in ((vo, vp1),vp2)
639 |
640 | ||| Like `zipWithScan` but the running total is including the current element.
641 | export
642 | zipWithScan1 : p -> (p -> o -> p) -> Pull f o es r -> Pull f (o,p) es r
643 | zipWithScan1 vp fun =
644 |   scan vp $ \vp1,vo => let vp2 := fun vp1 vo in ((vo, vp2),vp2)
645 |
646 | ||| Pairs each element in the stream with its 0-based index.
647 | export %inline
648 | zipWithIndex : Pull f o es r -> Pull f (o,Nat) es r
649 | zipWithIndex = zipWithScan 0 (\n,_ => S n)
650 |
651 | ||| Pairs each element in the stream with its 1-based count.
652 | export %inline
653 | zipWithCount : Pull f o es r -> Pull f (o,Nat) es r
654 | zipWithCount = zipWithScan 1 (\n,_ => S n)
655 |
656 | ||| Accumulates the input value using the given function.
657 | |||
658 | ||| The emitted result will include the input up to but
659 | ||| not including the current value.
660 | export %inline
661 | scans : p -> (p -> o -> p) -> Pull f o es r -> Pull f p es r
662 | scans vp fun = scan vp $ \vp1,vo => let vp2 := fun vp1 vo in (vp1,vp2)
663 |
664 | ||| Like `scans` but the running total is including the current element.
665 | |||
666 | ||| Emits the initial value as its first output.
667 | export %inline
668 | scans1 : p -> (p -> o -> p) -> Pull f o es r -> Pull f p es r
669 | scans1 vp fun os =
670 |   cons vp $ scan vp (\vp1,vo => let vp2 := fun vp1 vo in (vp2,vp2)) os
671 |
672 | ||| Like `scans` but specialized to a pull of unary functions.
673 | export %inline
674 | scanFrom : o -> Pull f (o -> o) es r -> Pull f o es r
675 | scanFrom vo = scans vo (\x,f => f x)
676 |
677 | ||| Like `scans1` but specialized to a pull of unary functions.
678 | export %inline
679 | scanFrom1 : o -> Pull f (o -> o) es r -> Pull f o es r
680 | scanFrom1 vo = scans1 vo (\x,f => f x)
681 |
682 | ||| Emits the count of each element.
683 | export %inline
684 | runningCount : Pull f o es r -> Pull f Nat es r
685 | runningCount = scan 1 (\n,_ => (n, S n))
686 |
687 | ||| Like `scan` but with an effectful computation.
688 | export
689 | evalScan : s -> (s -> o -> f es (p,s)) -> Pull f o es r -> Pull f p es r
690 | evalScan s1 f p =
691 |   assert_total $ uncons p >>= \case
692 |     Left res    => pure res
693 |     Right (v,q) => do
694 |       (w,s2) <- exec $ f s1 v
695 |       cons w (evalScan s2 f q)
696 |
697 | parameters {0 es      : List Type}
698 |            {0 f       : List Type -> Type -> Type}
699 |            {auto func : Functor (f es)}
700 |   ||| Accumulates the input value using the given function.
701 |   |||
702 |   ||| The emitted result will include the input up to but
703 |   ||| not including the current value.
704 |   export %inline
705 |   evalScans : p -> (p -> o -> f es p) -> Pull f o es r -> Pull f p es r
706 |   evalScans vp fun =
707 |     evalScan vp $ \vp1,vo => (\vp2 => (vp1,vp2)) <$> fun vp1 vo
708 |
709 |   ||| Like `scans` but the running total is including the current element.
710 |   export %inline
711 |   evalScans1 : p -> (p -> o -> f es p) -> Pull f o es r -> Pull f p es r
712 |   evalScans1 vp fun =
713 |     evalScan vp $ \vp1,vo => (\vp2 => (vp2,vp2)) <$> fun vp1 vo
714 |
715 |   ||| Like `evalScans` but specialized to a pull of unary effectful functions.
716 |   export %inline
717 |   evalScanFrom : o -> Pull f (o -> f es o) es r -> Pull f o es r
718 |   evalScanFrom vo = evalScans vo (\x,f => f x)
719 |
720 |   ||| Like `evalScans1` but specialized to a pull of unary effectful functions.
721 |   export %inline
722 |   evalScanFrom1 : o -> Pull f (o -> f es o) es r -> Pull f o es r
723 |   evalScanFrom1 vo = evalScans1 vo (\x,f => f x)
724 |
725 | --------------------------------------------------------------------------------
726 | -- Observing and Draining Streams
727 | --------------------------------------------------------------------------------
728 |
729 | ||| Performs the given action on each emitted chunk of values, thus draining
730 | ||| the stream, consuming all its output.
731 | |||
732 | ||| For acting on output without actually draining the stream, see
733 | ||| `observe` and `observe1`.
734 | export
735 | foreach : (o -> f es ()) -> Pull f o es r -> Pull f q es r
736 | foreach f p =
737 |   assert_total $ uncons p >>= \case
738 |     Left x      => pure x
739 |     Right (v,p) => exec (f v) >> foreach f p
740 |
741 | ||| Utility alias for `foreach . const`.
742 | export %inline
743 | foreach' : f es () -> Pull f o es r -> Pull f q es r
744 | foreach' = foreach . const
745 |
746 | ||| Performs the given action on every chunk of values of the stream without
747 | ||| otherwise affecting the emitted values.
748 | export
749 | observe : (o -> f es ()) -> Pull f o es r -> Pull f o es r
750 | observe act p =
751 |   assert_total $ uncons p >>= \case
752 |     Left x      => pure x
753 |     Right (v,p) => exec (act v) >> cons v (observe act p)
754 |
755 | ||| Utility alias for `observe . const`.
756 | export %inline
757 | observe' : f es () -> Pull f o es r -> Pull f o es r
758 | observe' = observe . const
759 |
760 | ||| Converts every chunk of output to a new stream,
761 | ||| concatenating the resulting streams before emitting the final
762 | ||| result.
763 | export
764 | flatMap : Pull f o es r -> (o -> Stream f es p) -> Pull f p es r
765 | flatMap p g =
766 |   assert_total $ uncons p >>= \case
767 |     Left v       => pure v
768 |     Right (os,q) => g os >> flatMap q g
769 |
770 | ||| Flipped version of `flatMapC`
771 | export %inline
772 | bind : (o -> Pull f p es ()) -> Pull f o es r -> Pull f p es r
773 | bind = flip flatMap
774 |
775 | export
776 | attemptOutput : Pull f o es () -> Pull f (Result es o) fs ()
777 | attemptOutput p =
778 |   att (mapOutput Right p) >>= \case
779 |     Left x  => emit (Left x)
780 |     Right _ => pure ()
781 |
782 | --------------------------------------------------------------------------------
783 | -- Zipping Streams
784 | --------------------------------------------------------------------------------
785 |
786 | ||| This is an internal utility to run the remainder of a stream
787 | ||| in the correct scope.
788 | export
789 | stepLeg : Stream f es o -> Pull f p es (Maybe $ StepLeg f es o)
790 | stepLeg x =
791 |   uncons x >>= \case
792 |     Left _        => pure Nothing
793 |     Right (v,rem) => map (Just . SL v rem) scope
794 |
795 | ||| Runs a `StepLeg` in its scope, returning then next `StepLeg`.
796 | export
797 | step : StepLeg f es o -> Pull f p es (Maybe $ StepLeg f es o)
798 | step (SL _ p sc) = inScope sc (stepLeg p)
799 |
800 | ||| Type alias for a function that converts a stream plus
801 | ||| a prefix to a stream emitting different values.
802 | public export
803 | 0 ZipWithLeft :
804 |      (f : List Type -> Type -> Type)
805 |   -> List Type
806 |   -> (o,p : Type)
807 |   -> Type
808 | ZipWithLeft f es o p = o -> Stream f es o -> Stream f es p
809 |
810 |
811 | zipWith_ :
812 |      (this : Stream f es o1)
813 |   -> (that : Stream f es o2)
814 |   -> (k1   : ZipWithLeft f es o1 o3)
815 |   -> (k3   : Stream f es o2 -> Stream f es o3)
816 |   -> (fun  : o1 -> o2 -> o3)
817 |   -> Stream f es o3
818 | zipWith_ this that k1 k3 fun = do
819 |   Just l1 <- stepLeg this | Nothing => k3 that
820 |   Just l2 <- stepLeg that | Nothing => k1 l1.out l1.pull
821 |   go l1 l2
822 |
823 |   where
824 |     go : StepLeg f es o1 -> StepLeg f es o2 -> Stream f es o3
825 |     go l1 l2 = do
826 |       emit (fun l1.out l2.out)
827 |       Just l12 <- step l1 | Nothing => k3 l2.pull
828 |       Just l22 <- step l2 | Nothing => k1 l12.out l12.pull
829 |       assert_total $ go l12 l22
830 |
831 | ||| Determinsitically zips elements with the specified function, terminating
832 | ||| when the ends of both branches are reached naturally, padding the left
833 | ||| branch with `pad1` and padding the right branch with `pad2` as necessary.
834 | export
835 | zipAllWith :
836 |      (pad1 : o1)
837 |   -> (pad2 : o2)
838 |   -> (o1 -> o2 -> o3)
839 |   -> (this : Stream f es o1)
840 |   -> (that : Stream f es o2)
841 |   -> Stream f es o3
842 | zipAllWith pad1 pad2 fun this that =
843 |  let kL := mapOutput (`fun` pad2)
844 |      kR := mapOutput (fun pad1)
845 |      k1 := \x,y => emit (fun x pad2) >> kL y
846 |   in zipWith_ this that k1 kR fun
847 |
848 | ||| `zipAllWith` specialized to returning a pair of values
849 | export %inline
850 | zipAll :
851 |      (pad1 : o1)
852 |   -> (pad2 : o2)
853 |   -> (this : Stream f es o1)
854 |   -> (that : Stream f es o2)
855 |   -> Stream f es (o1,o2)
856 | zipAll pad1 pad2 = zipAllWith pad1 pad2 MkPair
857 |
858 | ||| Deterministically zips elements, terminating when the end
859 | ||| of either branch is reached naturally.
860 | export %inline
861 | zipWith :
862 |      (o1 -> o2 -> o3)
863 |   -> (this : Stream f es o1)
864 |   -> (that : Stream f es o2)
865 |   -> Stream f es o3
866 | zipWith fun this that =
867 |   zipWith_ this that (\_,_ => pure ()) (\_ => pure ()) fun
868 |
869 | ||| `zipAll` specialized to returning a pair of values
870 | export %inline
871 | zip : Stream f es o1 -> Stream f es o2 -> Stream f es (o1,o2)
872 | zip = zipWith MkPair
873 |
874 | export %inline
875 | zipRight : Stream f es o1 -> Stream f es o2 -> Stream f es o2
876 | zipRight = zipWith (\_ => id)
877 |
878 | export %inline
879 | zipLeft : Stream f es o1 -> Stream f es o2 -> Stream f es o1
880 | zipLeft = zipWith const
881 |
882 | ||| Zips the values in a heterogeneous list of streams.
883 | export
884 | hzip : All (Stream f es) ts -> Stream f es (HList ts)
885 | hzip []        = fill []
886 | hzip [s]       = mapOutput (::Nil) s
887 | hzip (s :: ss) = zipWith (::) s (hzip ss)
888 |
889 | public export
890 | 0 HZipFun : List Type -> Type -> Type
891 | HZipFun []        r = r
892 | HZipFun (t :: ts) r = t -> HZipFun ts r
893 |
894 | export
895 | happly : HZipFun ts r -> HList ts -> r
896 | happly fun []        = fun
897 | happly fun (v :: vs) = happly (fun v) vs
898 |
899 | ||| Zips the given streams applying the resulting heterogeneous
900 | ||| lists of values to the given function.
901 | export
902 | hzipWith : HZipFun ts r -> All (Stream f es) ts -> Stream f es r
903 | hzipWith fun = mapOutput (happly fun) . hzip
904 |
905 |