5 | module Utils.Streaming
7 | import Control.Monad.Trans
14 | data Of : (a : Type) -> (b : Type) -> Type where
15 | (:>) : a -> Lazy b -> Of a b
19 | mapFst f (a :> b) = f a :> b
20 | mapSnd f (a :> b) = a :> f b
21 | bimap f g (a :> b) = f a :> g b
24 | Functor (Of a) where
29 | data Stream : (f : Type -> Type) -> (m : Type -> Type) -> (r : Type) -> Type where
30 | Step : Inf (f (Stream f m r)) -> Stream f m r
31 | Effect : Inf (m (Stream f m r)) -> Stream f m r
32 | Return : r -> Stream f m r
33 | Build : (forall b. (r -> b) -> (m b -> b) -> (f b -> b) -> b) -> Stream f m r
37 | wrap : f (Stream f m r) -> Stream f m r
42 | effect : m (Stream f m r) -> Stream f m r
46 | build : (forall b. (r -> b) -> (m b -> b) -> (f b -> b) -> b) -> Stream f m r
47 | build = \phi => phi Return (\x => Effect x) (\x => Step x)
50 | fold : (Functor f, Monad m) => (r -> b) -> (m b -> b) -> (f b -> b) -> Stream f m r -> b
51 | fold return effect step (Return x) = return x
52 | fold return effect step (Effect x) = effect (fold return effect step <$> x)
53 | fold return effect step (Step x) = step (fold return effect step <$> x)
54 | fold return effect step (Build g) = g return effect step
57 | destroy : (Functor f, Monad m) => (f a -> a) -> (m a -> a) -> (r -> a) -> Stream f m r -> a
58 | destroy step effect return = fold return effect step
62 | unfold : (Functor f, Monad m) => (a -> m (Either r (f a))) -> a -> Stream f m r
63 | unfold f a = Effect $
do
65 | | Left r => pure (Return r)
66 | pure (Step (unfold f <$> a'))
69 | inspect : (Functor f, Monad m) => Stream f m r -> m (Either r (f (Stream f m r)))
70 | inspect = destroy (pure . (Right . map (effect {f} {m} . map (either Return wrap)))) join (pure . Left)
73 | hoist : (Functor f, Monad m) => (forall a. m a -> n a) -> Stream f m r -> Stream f n r
74 | hoist f = fold Return (\x => Effect $
f x) (\x => Step x)
77 | (Functor f, Monad m) => Functor (Stream f m) where
78 | map f x = Build (\return, effect, step => fold (return . f) effect step x)
83 | (Functor f, Monad m) => Applicative (Stream f m) where
92 | (Functor f, Monad m) => Monad (Stream f m) where
93 | x >>= k = assert_total Build
94 | (\return, effect, step =>
95 | fold (fold return effect step . k) effect step x)
98 | MonadTrans (Stream f) where
99 | lift x = Effect (map Return x)
102 | (HasIO m, Monad (Stream f m)) => HasIO (Stream f m) where
103 | liftIO x = lift (liftIO x)
106 | yield : Monad m => a -> Stream (Of a) m ()
107 | yield x = Step (x :> Return ())
110 | run : Monad m => Stream m m r -> m r
111 | run (Return x) = pure x
112 | run (Effect x) = x >>= run
113 | run (Step x) = x >>= run
114 | run (Build g) = run (build g)
118 | toList : Monad m => Stream (Of a) m r -> m (List a, r)
119 | toList = destroy (\(a :> b) => map (mapFst (a ::)) b) join (\x => pure (Nil, x))
123 | toList_ : Monad m => Stream (Of a) m r -> m (List a)
124 | toList_ = destroy (\(a :> b) => map (a ::) b) join (const (pure Nil))
128 | fromList : r -> List a -> Stream (Of a) m r
129 | fromList r Nil = Return r
130 | fromList r (a :: as) = Step (a :> fromList r as)
134 | fromList_ : List a -> Stream (Of a) m ()
135 | fromList_ = fromList ()
139 | cons : a -> Stream (Of a) m r -> Stream (Of a) m r
140 | cons x stream = Step (x :> stream)
143 | next : Monad m => Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
144 | next stream = inspect stream >>= \case
145 | Left r => pure (Left r)
146 | Right (x :> xs) => pure (Right (x, xs))
149 | mapf : (Functor f, Monad m) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r
150 | mapf f s = Build (\return, effect, step => fold return effect (step . f) s)
153 | mapfM : (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r
154 | mapfM f stream = Build (\return, effect, step => fold return effect (effect . map step . f) stream)
157 | maps : Monad m => (a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
158 | maps f s = mapf (mapFst f) s
161 | mapsM : Monad m => (a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
162 | mapsM f s = mapfM (\(c :> g) => (:> g) <$> f c) s
165 | fors : Monad m => (a -> m x) -> Stream (Of a) m r -> m r
166 | fors f = fold pure join (\(x :> act) => ignore (f x) >> act)
169 | takeStream : Monad m => Nat -> Stream (Of a) m r -> m (List a, Either r (Stream (Of a) m r))
170 | takeStream n stream = loop [] n stream where
171 | loop : List a -> Nat -> Stream (Of a) m r -> m (List a, Either r (Stream (Of a) m r))
172 | loop acc Z stream = pure (acc, Right stream)
173 | loop acc (S n) stream = do
174 | Right (elem, rest) <- next stream
175 | | Left r => pure (acc, Left r)
176 | loop (snoc acc elem) n rest
180 | chunksOf : Monad m => (n : Nat) -> {auto 0 ok : NonZero n} -> Stream (Of a) m r -> Stream (Of (List a)) m r
181 | chunksOf n stream = do
182 | (chunk, Right rest) <- lift $
takeStream n stream
183 | | (chunk, Left r) => yield chunk >> pure r
184 | yield chunk *> chunksOf n rest
188 | consume : Monad m => Stream (Of a) m r -> m ()
189 | consume = fold (const $
pure ()) join (\(a :> b) => b)