0 | module IO.Async.Fiber
3 | import Data.SortedMap
4 | import System.Concurrency
5 | import IO.Async.Outcome
6 | import IO.Async.Token
16 | record ExecutionContext where
20 | submit : IO () -> IO ()
23 | export %inline %hint
24 | ecToTokenGen : ExecutionContext => TokenGen
25 | ecToTokenGen @{ec} = ec.tokenGen
38 | record Fiber (es : List Type) (a : Type) where
41 | observe : Token -> (Outcome es a -> IO ()) -> IO ()
42 | stopObserving : Token -> IO ()
52 | data Cancelability = U | P | C
55 | Semigroup Cancelability where
60 | Monoid Cancelability where
70 | data Async : (es : List Type) -> Type -> Type where
71 | Term : Outcome es a -> Async es a
73 | Sync : Cancelability -> IO (Result es a) -> Async es a
75 | Start : Cancelability -> Async es a -> Async fs (Fiber es a)
77 | Shift : ExecutionContext -> Async es ()
79 | Self : Async es Token
81 | Cancel : Async es ()
83 | GetEC : Async es ExecutionContext
87 | -> ((Outcome es a -> IO ()) -> IO (Maybe $
Async [] ()))
93 | -> (Outcome es a -> Async fs b)
101 | succeed : a -> Async es a
102 | succeed = Term . Succeeded
105 | sync : IO (Result es a) -> Async es a
108 | bind : Async es a -> (a -> Async es b) -> Async es b
112 | Error x => Term (Error x)
113 | Canceled => Term Canceled
116 | Functor (Async es) where
117 | map f aa = bind aa (succeed . f)
120 | Applicative (Async es) where
122 | af <*> aa = bind af (<$> aa)
125 | Monad (Async es) where
129 | HasIO (Async es) where
130 | liftIO = sync . map Right
137 | uncancelable : Async fs a -> Async fs a
138 | uncancelable (Sync x y) = Sync U y
139 | uncancelable (Asnc x y) = Asnc U y
140 | uncancelable (Start x y) = Start U y
141 | uncancelable (Bind x y f) = Bind U y f
145 | cancelable : Async fs a -> Async fs a
146 | cancelable (Sync P y) = Sync C y
147 | cancelable (Bind P y f) = Bind C y f
148 | cancelable (Asnc P y) = Asnc C y
149 | cancelable (Start P y) = Start C y
153 | strictCancelable : Async fs a -> Async fs a
154 | strictCancelable (Sync _ y) = Sync C y
155 | strictCancelable (Asnc _ y) = Asnc C y
156 | strictCancelable (Start _ y) = Start C y
157 | strictCancelable (Bind _ y f) = Bind C y f
158 | strictCancelable v = v
161 | canceled : Async es ()
170 | self : Async es Token
174 | cancelableAsync : ((Outcome es a -> IO ()) -> IO (Async [] ())) -> Async es a
175 | cancelableAsync f = Asnc P (map Just . f)
178 | async : ((Outcome es a -> IO ()) -> IO ()) -> Async es a
179 | async f = Asnc P (\o => f o $> Nothing)
182 | join : Fiber es a -> Async fs (Outcome es a)
185 | cancelableAsync $
\cb =>
186 | f.observe t (cb . Succeeded) $> liftIO (f.stopObserving t)
189 | joinResult : Fiber es a -> Async es a
190 | joinResult f = join f >>= Term
193 | cancel : Fiber es a -> Async fs ()
197 | cancelable (ignore (join f))
207 | start : Async es a -> Async fs (Fiber es a)
208 | start as = Start P as
215 | background : Async es a -> Async fs ()
216 | background = ignore . start
223 | fail : HSum es -> Async es a
224 | fail = Term . Error
227 | throw : Has e es => e -> Async es a
228 | throw = fail . inject
233 | injectEither : Has e es => Either e a -> Async es a
234 | injectEither (Left v) = throw v
235 | injectEither (Right v) = pure v
240 | injectIO : Has e es => IO (Either e a) -> Async es a
241 | injectIO = sync . map (mapFst inject)
244 | handleErrors : (HSum es -> Async fs a) -> Async es a -> Async fs a
247 | Succeeded x => Term $
Succeeded x
249 | Canceled => Term Canceled
252 | mapErrors : (HSum es -> HSum fs) -> Async es a -> Async fs a
253 | mapErrors f = handleErrors (fail . f)
256 | weakenErrors : Async [] a -> Async fs a
257 | weakenErrors = mapErrors $
\case x impossible
260 | dropErrs : Async es () -> Async [] ()
261 | dropErrs = handleErrors (const $
pure ())
264 | 0 Handler : Type -> Type -> Type
265 | Handler a x = x -> Async [] a
268 | handle : All (Handler a) es -> Async es a -> Async [] a
269 | handle hs = handleErrors (collapse' . hzipWith id hs)
272 | liftErrors : Async es a -> Async fs (Result es a)
273 | liftErrors = handleErrors (pure . Left) . map Right
276 | liftError : Async [e] a -> Async fs (Either e a)
277 | liftError = handleErrors (pure . Left . project1) . map Right
280 | guaranteeCase : Async es a -> (Outcome es a -> Async [] ()) -> Async es a
281 | guaranteeCase as f =
282 | Bind U as $
\o => Bind U (uncancelable $
f o) (\_ => Term o)
285 | onCancel : Async es a -> Async [] () -> Async es a
286 | onCancel as x = guaranteeCase as $
\case Canceled => x;
_ => pure ()
293 | onAbort : Async es a -> (cleanup : Async [] ()) -> Async es a
295 | guaranteeCase as $
\case
305 | finally : Async es a -> (cleanup : Async [] ()) -> Async es a
306 | finally aa v = guaranteeCase aa (\_ => v)
309 | forget : Async es a -> Async [] ()
310 | forget as = Bind P as (\_ => pure ())
313 | consume : Async es a -> (Outcome es a -> IO ()) -> Async [] ()
314 | consume as cb = forget $
guaranteeCase as (liftIO . cb)
319 | -> (a -> Async es b)
320 | -> ((a,Outcome es b) -> Async [] ())
322 | bracketCase acquire use release =
325 | guaranteeCase (use res) (\o => release (res,o))
328 | bracket : Async es a -> (a -> Async es b) -> (a -> Async [] ()) -> Async es b
329 | bracket acquire use release =
330 | bracketCase acquire use (release . fst)
345 | raceF : List (Async es (Fiber es a)) -> Async es a
348 | fibs <- sequence fs
349 | cancelableAsync (\cb => for_ fibs (\f => f.observe t cb) $> stop t fibs)
352 | stop : Token -> List (Fiber es a) -> Async [] ()
353 | stop t fibers = liftIO $
for_ fibers $
\f => f.stopObserving t
357 | race : (xs : List $
Async es a) -> Async es a
358 | race = raceF . map start
360 | injections : All f ts -> All (\t => (v : t) -> HSum ts) ts
362 | injections (x :: xs) = Here :: mapProperty (There .) (injections xs)
367 | raceAny : All (Async es) ts -> Async es (HSum ts)
368 | raceAny xs = race . forget $
hzipWith map (injections xs) xs
370 | collectOutcomes : All (Outcome es) ts -> Outcome es (HList ts)
371 | collectOutcomes [] = Succeeded []
372 | collectOutcomes (Succeeded r :: t) = (r::) <$> collectOutcomes t
373 | collectOutcomes (Error x :: t) = Error x
374 | collectOutcomes (Canceled :: t) =
375 | case collectOutcomes t of
382 | parF : All (Async es . Fiber es) ts -> Async es (HList ts)
384 | fibers <- hsequence fs
385 | hsequence $
mapProperty joinResult fibers
390 | par : All (Async es) ts -> Async es (HList ts)
391 | par = parF . mapProperty start
394 | runAsyncWith : ExecutionContext => Async es a -> (Outcome es a -> IO ()) -> IO ()
396 | export covering %inline
397 | runAsync : ExecutionContext => Async es a -> IO ()
398 | runAsync as = runAsyncWith as (\_ => pure ())
405 | data Stack : (es,fs : List Type) -> (a,b : Type) -> Type where
406 | Nil : Stack es es a a
408 | (Cancelability, Outcome es a -> Async fs b)
413 | data FiberState : List Type -> Type -> Type where
416 | Init : Async es a -> FiberState es a
419 | Running : FiberState es a
424 | ResultReady : FiberState es a
427 | Done : Outcome es a -> FiberState es a
432 | IORef (Maybe $
Outcome es a)
433 | -> (onCancel : Maybe $
Async [] ())
438 | data AnyFiber : Type
440 | record FiberImpl (es : List Type) (a : Type) where
442 | ec : IORef ExecutionContext
444 | parent : Maybe AnyFiber
446 | callbacks : IORef (SortedMap Token (Outcome es a -> IO ()))
447 | children : IORef (SortedMap Token AnyFiber)
448 | canceled : IORef Bool
449 | state : IORef (FiberState es a)
451 | data AnyFiber : Type where
452 | AF : FiberImpl es a -> AnyFiber
454 | withLock : FiberImpl es a -> IO b -> IO b
455 | withLock fbr f = do
456 | mutexAcquire fbr.mutex
458 | mutexRelease fbr.mutex
461 | addChild : Maybe AnyFiber -> FiberImpl fs b -> IO ()
462 | addChild Nothing _ = pure ()
463 | addChild (Just $
AF q) y =
465 | readIORef q.canceled >>= \case
466 | True => writeIORef y.canceled True
467 | False => modifyIORef q.children (insert y.token (AF y))
469 | removeChild : FiberImpl es a -> Token -> IO ()
470 | removeChild fbr tk = withLock fbr (modifyIORef fbr.children $
delete tk)
474 | -> (parent : Maybe AnyFiber)
475 | -> (as : Async es a)
476 | -> IO (FiberImpl es a)
477 | newFiber ec p as = do
486 | (newIORef $
Init as)
491 | stopObservingImpl : FiberImpl es a -> Token -> IO ()
492 | stopObservingImpl fbr tk = withLock fbr $
modifyIORef fbr.callbacks (delete tk)
497 | -> (Outcome es a -> IO ())
499 | observeImpl fbr tk cb = do
500 | run <- withLock fbr $
501 | readIORef fbr.state >>= \case
502 | Done o => pure (cb o)
503 | _ => modifyIORef fbr.callbacks (insert tk cb) $> pure ()
507 | {auto ec : ExecutionContext}
517 | covering resume : FiberImpl es a -> IO ()
522 | run <- withLock fbr $
do
523 | readIORef fbr.state >>= \case
524 | Suspended ref cncl s => do
526 | writeIORef fbr.state Running
527 | ec <- readIORef fbr.ec
528 | readIORef fbr.canceled >>= \case
532 | False => readIORef ref >>= \case
534 | Just o => pure (run @{ec} ec.limit fbr (Term o) s)
536 | Nothing => writeIORef fbr.state (Suspended ref cncl s) $> pure ()
541 | True => case cncl of
543 | let f := Bind U c $
\_ => Term Canceled
544 | in pure (run @{ec} ec.limit fbr f s)
545 | Nothing => pure (run @{ec} ec.limit fbr (Term Canceled) s)
548 | writeIORef fbr.state Running
549 | ec <- readIORef fbr.ec
550 | pure (run @{ec} ec.limit fbr as [])
553 | _ => pure (pure ())
558 | -> IORef (Maybe $
Outcome es a)
559 | -> Maybe (Async [] ())
562 | suspend fbr ref cncl stck = do
563 | run <- withLock fbr $
do
564 | readIORef fbr.state >>= \case
565 | ResultReady => writeIORef fbr.state (Suspended ref cncl stck) $> resume fbr
566 | Running => writeIORef fbr.state (Suspended ref cncl stck) $> pure ()
567 | _ => pure (pure ())
570 | covering cancelImpl : FiberImpl es a -> IO ()
571 | cancelImpl fbr = do
572 | run <- withLock fbr $
do
573 | readIORef fbr.canceled >>= \case
574 | True => pure (pure ())
575 | False => writeIORef fbr.canceled True $> resume fbr
581 | covering finalize : FiberImpl es a -> Outcome es a -> IO ()
582 | finalize fbr o = do
583 | run <- withLock fbr $
do
585 | writeIORef fbr.state (Done o)
588 | cbs <- readIORef fbr.callbacks
589 | writeIORef fbr.callbacks empty
592 | chl <- readIORef fbr.children
593 | writeIORef fbr.children empty
597 | for_ fbr.parent (\(AF x) => removeChild x fbr.token)
598 | for_ cbs (\cb => cb o)
599 | for_ chl (\(AF x) => cancelImpl x)
603 | set : Cancelability -> Async es a -> Async es a
604 | set x (Asnc y z) = Asnc (x <+> y) z
605 | set x (Sync y z) = Sync (x <+> y) z
606 | set x (Bind y z f) = Bind (x <+> y) z f
607 | set x (Start y z) = Start (x <+> y) z
610 | observeCancelation : Cancelability -> FiberImpl es a -> IO Bool
611 | observeCancelation U _ = pure False
612 | observeCancelation _ f = withLock f (readIORef f.canceled)
614 | covering (.fiber) : FiberImpl es a -> Fiber es a
615 | f.fiber = MkFiber f.token (observeImpl f) (stopObservingImpl f) (cancelImpl f)
617 | run 0 fbr act stck = ec.submit (run ec.limit fbr act stck)
619 | run (S k) fbr (Bind c x f) stck = run k fbr (set c x) ((c,f)::stck)
621 | run (S k) fbr (Term o) [] = finalize fbr o
623 | run (S k) fbr (Term o) ((c,f)::t) = do
624 | False <- observeCancelation c fbr | True => run k fbr (Term Canceled) t
625 | run k fbr (set c $
f o) t
627 | run (S k) fbr (Sync c io) stck = do
628 | False <- observeCancelation c fbr | True => run k fbr (Term Canceled) stck
630 | run k fbr (Term $
toOutcome r) stck
632 | run (S k) fbr (Shift ec2) stck = do
633 | writeIORef fbr.ec ec2 >>
634 | ec2.submit (run @{ec2} k fbr (pure ()) stck)
636 | run (S k) fbr Cancel stck = do
637 | withLock fbr (writeIORef fbr.canceled True) >>
638 | run k fbr (Term Canceled) stck
640 | run (S k) fbr GetEC stck = run k fbr (pure ec) stck
642 | run (S k) fbr Self stck = run k fbr (pure fbr.token) stck
644 | run (S k) fbr (Start c as) stck = do
645 | False <- observeCancelation c fbr | True => run k fbr (Term Canceled) stck
646 | child <- newFiber ec (Just $
AF fbr) as
647 | ec.submit (resume child)
648 | run k fbr (Term $
Succeeded child.fiber) stck
650 | run (S k) fbr (Asnc c reg) stck = do
651 | False <- observeCancelation c fbr | True => run k fbr (Term Canceled) stck
652 | ref <- newIORef Nothing
653 | cnl <- reg $
\o => do
654 | run <- withLock fbr $
do
656 | Nothing <- readIORef ref | _ => pure (pure ())
658 | writeIORef ref (Just o)
665 | readIORef fbr.canceled >>= \case
666 | True => pure (pure ())
672 | readIORef fbr.state >>= \case
674 | Running => writeIORef fbr.state ResultReady $> pure()
676 | _ => pure (resume fbr)
678 | suspend fbr ref cnl stck
680 | runAsyncWith @{ec} as cb = do
681 | fib <- newFiber ec Nothing as
683 | observeImpl fib tk cb
684 | ec.submit (resume fib)