0 | module IO.Async.Fiber
  1 |
  2 | import Data.IORef
  3 | import Data.SortedMap
  4 | import System.Concurrency
  5 | import IO.Async.Outcome
  6 | import IO.Async.Token
  7 |
  8 | %default total
  9 |
 10 | --------------------------------------------------------------------------------
 11 | -- Execution Context
 12 | --------------------------------------------------------------------------------
 13 |
 14 | ||| A context for submitting and running commands asynchronously.
 15 | public export
 16 | record ExecutionContext where
 17 |   [noHints]
 18 |   constructor EC
 19 |   tokenGen : TokenGen
 20 |   submit   : IO () -> IO ()
 21 |   limit    : Nat
 22 |
 23 | export %inline %hint
 24 | ecToTokenGen : ExecutionContext => TokenGen
 25 | ecToTokenGen @{ec} = ec.tokenGen
 26 |
 27 | --------------------------------------------------------------------------------
 28 | -- Fiber
 29 | --------------------------------------------------------------------------------
 30 |
 31 | ||| A fiber is a synchronous (sequential) computation producing
 32 | ||| an outcome of type `Outcome es a` eventually.
 33 | |||
 34 | ||| We can register a callback at a fiber to get informed about its
 35 | ||| termination, and we can externally interrupt a running fiber
 36 | ||| by canceling it.
 37 | public export
 38 | record Fiber (es : List Type) (a : Type) where
 39 |   constructor MkFiber
 40 |   token         : Token
 41 |   observe       : Token -> (Outcome es a -> IO ()) -> IO ()
 42 |   stopObserving : Token -> IO ()
 43 |   cancel        : IO ()
 44 |
 45 | --------------------------------------------------------------------------------
 46 | -- Cancelability
 47 | --------------------------------------------------------------------------------
 48 |
 49 | -- Marks a section of an asynchronous computation to be
 50 | -- cancelable (`C`), uncancelable (`U`), or taking its cancelability
 51 | -- from the parent scope `P` (which is typically the default)
 52 | data Cancelability = U | P | C
 53 |
 54 | %inline
 55 | Semigroup Cancelability where
 56 |   p <+> P = p
 57 |   _ <+> p = p
 58 |
 59 | %inline
 60 | Monoid Cancelability where
 61 |   neutral = P
 62 |
 63 | --------------------------------------------------------------------------------
 64 | -- Async
 65 | --------------------------------------------------------------------------------
 66 |
 67 | ||| `Async es a` is a monad for describing asynchronous computations
 68 | ||| producing a result of type `Outcome es a` eventually.
 69 | export
 70 | data Async : (es : List Type) -> Type -> Type where
 71 |   Term   : Outcome es a -> Async es a
 72 |
 73 |   Sync   : Cancelability -> IO (Result es a) -> Async es a
 74 |
 75 |   Start  : Cancelability -> Async es a -> Async fs (Fiber es a)
 76 |
 77 |   Shift  : ExecutionContext -> Async es ()
 78 |
 79 |   Self   : Async es Token
 80 |
 81 |   Cancel : Async es ()
 82 |
 83 |   GetEC  : Async es ExecutionContext
 84 |
 85 |   Asnc  :
 86 |        Cancelability
 87 |     -> ((Outcome es a -> IO ()) -> IO (Maybe $ Async [] ()))
 88 |     -> Async es a
 89 |
 90 |   Bind  :
 91 |        Cancelability
 92 |     -> Async es a
 93 |     -> (Outcome es a -> Async fs b)
 94 |     -> Async fs b
 95 |
 96 | --------------------------------------------------------------------------------
 97 | -- Interface Implementations
 98 | --------------------------------------------------------------------------------
 99 |
100 | export %inline
101 | succeed : a -> Async es a
102 | succeed = Term . Succeeded
103 |
104 | export %inline
105 | sync : IO (Result es a) -> Async es a
106 | sync = Sync P
107 |
108 | bind : Async es a -> (a -> Async es b) -> Async es b
109 | bind x f =
110 |   Bind P x $ \case
111 |     Succeeded v => f v
112 |     Error x     => Term (Error x)
113 |     Canceled    => Term Canceled
114 |
115 | export
116 | Functor (Async es) where
117 |   map f aa = bind aa (succeed . f)
118 |
119 | export %inline
120 | Applicative (Async es) where
121 |   pure      = succeed
122 |   af <*> aa = bind af (<$> aa)
123 |
124 | export %inline
125 | Monad (Async es) where
126 |   (>>=) = bind
127 |
128 | export
129 | HasIO (Async es) where
130 |   liftIO = sync . map Right
131 |
132 | --------------------------------------------------------------------------------
133 | -- Cancelation
134 | --------------------------------------------------------------------------------
135 |
136 | export
137 | uncancelable : Async fs a -> Async fs a
138 | uncancelable (Sync x y)   = Sync U y
139 | uncancelable (Asnc x y)   = Asnc U y
140 | uncancelable (Start x y)  = Start U y
141 | uncancelable (Bind x y f) = Bind U y f
142 | uncancelable v            = v
143 |
144 | export
145 | cancelable : Async fs a -> Async fs a
146 | cancelable (Sync P y)   = Sync C y
147 | cancelable (Bind P y f) = Bind C y f
148 | cancelable (Asnc P y)   = Asnc C y
149 | cancelable (Start P y)  = Start C y
150 | cancelable v            = v
151 |
152 | export
153 | strictCancelable : Async fs a -> Async fs a
154 | strictCancelable (Sync _ y)   = Sync C y
155 | strictCancelable (Asnc _ y)   = Asnc C y
156 | strictCancelable (Start _ y)  = Start C y
157 | strictCancelable (Bind _ y f) = Bind C y f
158 | strictCancelable v            = v
159 |
160 | export
161 | canceled : Async es ()
162 | canceled = Cancel
163 |
164 | --------------------------------------------------------------------------------
165 | -- Asynchronicity
166 | --------------------------------------------------------------------------------
167 |
168 | ||| Returns the unique token of the currently running fiber.
169 | export %inline
170 | self : Async es Token
171 | self = Self
172 |
173 | export %inline
174 | cancelableAsync : ((Outcome es a -> IO ()) -> IO (Async [] ())) -> Async es a
175 | cancelableAsync f = Asnc P (map Just . f)
176 |
177 | export %inline
178 | async : ((Outcome es a -> IO ()) -> IO ()) -> Async es a
179 | async f = Asnc P (\o => f o $> Nothing)
180 |
181 | export
182 | join : Fiber es a -> Async fs (Outcome es a)
183 | join f = do
184 |   t <- self
185 |   cancelableAsync $ \cb =>
186 |     f.observe t (cb . Succeeded) $> liftIO (f.stopObserving t)
187 |
188 | export
189 | joinResult : Fiber es a -> Async es a
190 | joinResult f = join f >>= Term
191 |
192 | export
193 | cancel : Fiber es a -> Async fs ()
194 | cancel f =
195 |   uncancelable $ do
196 |     liftIO $ f.cancel
197 |     cancelable (ignore (join f))
198 |
199 | ||| Runs an asynchronous computation in the background on a new fiber.
200 | |||
201 | ||| The resulting fiber can be canceled from the current fiber, and
202 | ||| we can semantically block the current fiber to wait for the background
203 | ||| computation to complete.
204 | |||
205 | ||| See also `cancel` and `join`.
206 | export %inline
207 | start : Async es a -> Async fs (Fiber es a)
208 | start as = Start P as
209 |
210 | ||| Asynchronously runs a computation on a new fiber.
211 | |||
212 | ||| While we can no longer observe the computation's result, it will still
213 | ||| be canceled if the current fiber terminates.
214 | export %inline
215 | background : Async es a -> Async fs ()
216 | background = ignore . start
217 |
218 | --------------------------------------------------------------------------------
219 | -- MonadError
220 | --------------------------------------------------------------------------------
221 |
222 | export %inline
223 | fail : HSum es -> Async es a
224 | fail = Term . Error
225 |
226 | export %inline
227 | throw : Has e es => e -> Async es a
228 | throw = fail . inject
229 |
230 | ||| Inject an `Either e a` computation into an `Async` monad dealing
231 | ||| with several possible errors.
232 | export
233 | injectEither : Has e es => Either e a -> Async es a
234 | injectEither (Left v)  = throw v
235 | injectEither (Right v) = pure v
236 |
237 | ||| Inject an `IO (Either e a)` computation into an `Async` monad dealing
238 | ||| with several possible errors.
239 | export
240 | injectIO : Has e es => IO (Either e a) -> Async es a
241 | injectIO = sync . map (mapFst inject)
242 |
243 | export
244 | handleErrors : (HSum es -> Async fs a) -> Async es a -> Async fs a
245 | handleErrors f x =
246 |   Bind U x $ \case
247 |     Succeeded x => Term $ Succeeded x
248 |     Error x     => f x
249 |     Canceled    => Term Canceled
250 |
251 | export %inline
252 | mapErrors : (HSum es -> HSum fs) -> Async es a -> Async fs a
253 | mapErrors f = handleErrors (fail . f)
254 |
255 | export %inline
256 | weakenErrors : Async [] a -> Async fs a
257 | weakenErrors = mapErrors $ \case x impossible
258 |
259 | export %inline
260 | dropErrs : Async es () -> Async [] ()
261 | dropErrs = handleErrors (const $ pure ())
262 |
263 | public export
264 | 0 Handler : Type -> Type -> Type
265 | Handler a x = x -> Async [] a
266 |
267 | export %inline
268 | handle : All (Handler a) es -> Async es a -> Async [] a
269 | handle hs = handleErrors (collapse' . hzipWith id hs)
270 |
271 | export %inline
272 | liftErrors : Async es a -> Async fs (Result es a)
273 | liftErrors = handleErrors (pure . Left) . map Right
274 |
275 | export %inline
276 | liftError : Async [e] a -> Async fs (Either e a)
277 | liftError = handleErrors (pure . Left . project1) . map Right
278 |
279 | export
280 | guaranteeCase : Async es a -> (Outcome es a -> Async [] ()) -> Async es a
281 | guaranteeCase as f =
282 |   Bind U as $ \o => Bind U (uncancelable $ f o) (\_ => Term o)
283 |
284 | export %inline
285 | onCancel : Async es a -> Async [] () -> Async es a
286 | onCancel as x = guaranteeCase as $ \case Canceled => x_ => pure ()
287 |
288 | ||| Guarantees to run the given cleanup hook in case a fiber
289 | ||| has been canceled or failed with an error.
290 | |||
291 | ||| See `guarantee` for additional information.
292 | export
293 | onAbort : Async es a -> (cleanup : Async [] ()) -> Async es a
294 | onAbort as h =
295 |   guaranteeCase as $ \case
296 |     Canceled => h
297 |     Error _  => h
298 |     _        => pure ()
299 |
300 | ||| Guarantees to run the given cleanup hook in case
301 | ||| the given computation finishes with an outcome.
302 | |||
303 | ||| See `guarantee` for additional information.
304 | export %inline
305 | finally : Async es a -> (cleanup : Async [] ()) -> Async es a
306 | finally aa v = guaranteeCase aa (\_ => v)
307 |
308 | export %inline
309 | forget : Async es a -> Async [] ()
310 | forget as = Bind P as (\_ => pure ())
311 |
312 | export
313 | consume : Async es a -> (Outcome es a -> IO ()) -> Async [] ()
314 | consume as cb = forget $ guaranteeCase as (liftIO . cb)
315 |
316 | export
317 | bracketCase :
318 |      Async es a
319 |   -> (a -> Async es b)
320 |   -> ((a,Outcome es b) -> Async [] ())
321 |   -> Async es b
322 | bracketCase acquire use release =
323 |   uncancelable $ do
324 |     res <- acquire
325 |     guaranteeCase (use res) (\o => release (res,o))
326 |
327 | export %inline
328 | bracket : Async es a -> (a -> Async es b) -> (a -> Async [] ()) -> Async es b
329 | bracket acquire use release =
330 |   bracketCase acquire use (release . fst)
331 |
332 | --------------------------------------------------------------------------------
333 | -- Concurrency
334 | --------------------------------------------------------------------------------
335 |
336 | ||| Semantically blocks the current fiber until one
337 | ||| of the given fibers has produced an outcome, in which
338 | ||| the others are canceled immediately.
339 | |||
340 | ||| This is useful if you for instance have several abort conditions
341 | ||| such as a timer and a signal from the operating system, and want
342 | ||| to stop your process as soon as the first of the conditions
343 | ||| occurs.
344 | export
345 | raceF : List (Async es (Fiber es a)) -> Async es a
346 | raceF fs = do
347 |   t    <- self
348 |   fibs <- sequence fs
349 |   cancelableAsync (\cb => for_ fibs (\f => f.observe t cb) $> stop t fibs)
350 |
351 |   where
352 |     stop : Token -> List (Fiber es a) -> Async [] ()
353 |     stop t fibers = liftIO $ for_ fibers $ \f => f.stopObserving t
354 |
355 | ||| Alias for `raceF . traverse start`.
356 | export %inline
357 | race : (xs : List $ Async es a) -> Async es a
358 | race = raceF . map start
359 |
360 | injections : All f ts -> All (\t => (v : t) -> HSum ts) ts
361 | injections []        = []
362 | injections (x :: xs) = Here :: mapProperty (There .) (injections xs)
363 |
364 | ||| Runs a heterogeneous list of asynchronous computations in parallel,
365 | ||| keeping only the one that finishes first.
366 | export %inline
367 | raceAny : All (Async es) ts -> Async es (HSum ts)
368 | raceAny xs = race . forget $ hzipWith map (injections xs) xs
369 |
370 | collectOutcomes : All (Outcome es) ts -> Outcome es (HList ts)
371 | collectOutcomes []                 = Succeeded []
372 | collectOutcomes (Succeeded r :: t) = (r::) <$> collectOutcomes t
373 | collectOutcomes (Error x     :: t) = Error x
374 | collectOutcomes (Canceled    :: t) =
375 |   case collectOutcomes t of
376 |     Error x => Error x
377 |     _       => Canceled
378 |
379 | ||| Accumulates the results of the given heterogeneous list of
380 | ||| fibers in a heterogeneous list.
381 | export
382 | parF : All (Async es . Fiber es) ts -> Async es (HList ts)
383 | parF fs = do
384 |   fibers <- hsequence fs
385 |   hsequence $ mapProperty joinResult fibers
386 |
387 | ||| Runs the given computations in parallel and collects the outcomes
388 | ||| in a heterogeneous list.
389 | export %inline
390 | par : All (Async es) ts -> Async es (HList ts)
391 | par = parF . mapProperty start
392 |
393 | export covering
394 | runAsyncWith : ExecutionContext => Async es a -> (Outcome es a -> IO ()) -> IO ()
395 |
396 | export covering %inline
397 | runAsync : ExecutionContext => Async es a -> IO ()
398 | runAsync as = runAsyncWith as (\_ => pure ())
399 |
400 | --------------------------------------------------------------------------------
401 | -- Implementation (Here be Dragons)
402 | --------------------------------------------------------------------------------
403 |
404 | -- Properly typed stack of nested `Bind`s plus their cancelability
405 | data Stack : (es,fs : List Type) -> (a,b : Type) -> Type where
406 |   Nil  : Stack es es a a
407 |   (::) :
408 |        (Cancelability, Outcome es a -> Async fs b)
409 |     -> Stack fs gs b c
410 |     -> Stack es gs a c
411 |
412 | -- Current stat of a fiber
413 | data FiberState : List Type -> Type -> Type where
414 |   -- The fiber has just been initialized with the asynchronous
415 |   -- computation it is about to run.
416 |   Init        : Async es a -> FiberState es a
417 |
418 |   -- The fiber is currently being run on its execution context
419 |   Running     : FiberState es a
420 |
421 |   -- The fiber is currently being run on its execution context,
422 |   -- and it has been informed that the result from an asynchronous
423 |   -- function call is ready
424 |   ResultReady : FiberState es a
425 |
426 |   -- The fiber produced an outcome and ist now finished.
427 |   Done        : Outcome es a -> FiberState es a
428 |
429 |   -- The fiber is awaiting the result from an asynchronous
430 |   -- computation, and is currently not being run.
431 |   Suspended   :
432 |        IORef (Maybe $ Outcome es a)
433 |     -> (onCancel : Maybe $ Async [] ())
434 |     -> Stack es fs a b
435 |     -> FiberState fs b
436 |
437 | -- An existential (non-parameterized) wrapper around a `FiberImpl es a`
438 | data AnyFiber : Type
439 |
440 | record FiberImpl (es : List Type) (a : Type) where
441 |   constructor FI
442 |   ec        : IORef ExecutionContext
443 |   mutex     : Mutex
444 |   parent    : Maybe AnyFiber
445 |   token     : Token
446 |   callbacks : IORef (SortedMap Token (Outcome es a -> IO ()))
447 |   children  : IORef (SortedMap Token AnyFiber)
448 |   canceled  : IORef Bool
449 |   state     : IORef (FiberState es a)
450 |
451 | data AnyFiber : Type where
452 |   AF : FiberImpl es a -> AnyFiber
453 |
454 | withLock : FiberImpl es a -> IO b -> IO b
455 | withLock fbr f = do
456 |   mutexAcquire fbr.mutex
457 |   res <- f
458 |   mutexRelease fbr.mutex
459 |   pure res
460 |
461 | addChild : Maybe AnyFiber -> FiberImpl fs b -> IO ()
462 | addChild Nothing       _ = pure ()
463 | addChild (Just $ AF q) y =
464 |   withLock q $
465 |     readIORef q.canceled >>= \case
466 |       True  => writeIORef y.canceled True
467 |       False => modifyIORef q.children (insert y.token (AF y))
468 |
469 | removeChild : FiberImpl es a -> Token -> IO ()
470 | removeChild fbr tk = withLock fbr (modifyIORef fbr.children $ delete tk)
471 |
472 | newFiber :
473 |      ExecutionContext
474 |   -> (parent : Maybe AnyFiber)
475 |   -> (as     : Async es a)
476 |   -> IO (FiberImpl es a)
477 | newFiber ec p as = do
478 |   fbr <- [| FI
479 |               (newIORef ec)
480 |               makeMutex
481 |               (pure p)
482 |               token
483 |               (newIORef empty)
484 |               (newIORef empty)
485 |               (newIORef False)
486 |               (newIORef $ Init as)
487 |          |]
488 |   addChild p fbr
489 |   pure fbr
490 |
491 | stopObservingImpl : FiberImpl es a -> Token -> IO ()
492 | stopObservingImpl fbr tk = withLock fbr $ modifyIORef fbr.callbacks (delete tk)
493 |
494 | observeImpl :
495 |      FiberImpl es a
496 |   -> Token
497 |   -> (Outcome es a -> IO ())
498 |   -> IO ()
499 | observeImpl fbr tk cb = do
500 |   run <- withLock fbr $
501 |     readIORef fbr.state >>= \case
502 |       Done o => pure (cb o)
503 |       _      => modifyIORef fbr.callbacks (insert tk cb) $> pure ()
504 |   run
505 |
506 | covering run :
507 |      {auto ec : ExecutionContext}
508 |   -> Nat
509 |   -> FiberImpl fs b
510 |   -> Async es a
511 |   -> Stack es fs a b
512 |   -> IO ()
513 |
514 | -- This function is invoked if
515 | --   a) The fiber was canceled
516 | --   b) The result of a callback is ready
517 | covering resume : FiberImpl es a -> IO ()
518 | resume fbr = do
519 |   -- This might be invoked from several threads, so we
520 |   -- adjust the state and assemble the action to run under
521 |   -- a lock. The action is run after the mutex was released.
522 |   run <- withLock fbr $ do
523 |     readIORef fbr.state >>= \case
524 |       Suspended ref cncl s => do
525 |         -- take over control and make sure no one else does
526 |         writeIORef fbr.state Running
527 |         ec <- readIORef fbr.ec
528 |         readIORef fbr.canceled >>= \case
529 |
530 |           -- we are still up and running, so the result in the
531 |           -- mutable reference should be ready
532 |           False => readIORef ref >>= \case
533 |             -- all is well. let's continue
534 |             Just o  => pure (run @{ec} ec.limit fbr (Term o) s)
535 |             -- WTF?? This should not happen, so should we crash?
536 |             Nothing => writeIORef fbr.state (Suspended ref cncl s) $> pure ()
537 |
538 |           -- we were canceled so run the cancel hook (if any)
539 |           -- otherwise, just continue and finish the uncancelable parts
540 |           -- IMPORTANT: We will no longer wait for the callback to finish!
541 |           True  => case cncl of
542 |             Just c  =>
543 |               let f := Bind U c $ \_ => Term Canceled
544 |                in pure (run @{ec} ec.limit fbr f s)
545 |             Nothing => pure (run @{ec} ec.limit fbr (Term Canceled) s)
546 |
547 |       Init as => do
548 |         writeIORef fbr.state Running
549 |         ec <- readIORef fbr.ec
550 |         pure (run @{ec} ec.limit fbr as [])
551 |
552 |       -- we are already running or done, so don't interfere
553 |       _  => pure (pure ())
554 |   run -- actually run the action we got
555 |
556 | covering suspend :
557 |      FiberImpl fs b
558 |   -> IORef (Maybe $ Outcome es a)
559 |   -> Maybe (Async [] ())
560 |   -> Stack es fs a b
561 |   -> IO ()
562 | suspend fbr ref cncl stck = do
563 |   run <- withLock fbr $ do
564 |     readIORef fbr.state >>= \case
565 |       ResultReady => writeIORef fbr.state (Suspended ref cncl stck) $> resume fbr
566 |       Running     => writeIORef fbr.state (Suspended ref cncl stck) $> pure ()
567 |       _           => pure (pure ())
568 |   run
569 |
570 | covering cancelImpl : FiberImpl es a -> IO ()
571 | cancelImpl fbr = do
572 |   run <- withLock fbr $ do -- make sure no one else adjusts the state
573 |     readIORef fbr.canceled >>= \case
574 |       True  => pure (pure ()) -- we have already been canceled, so that's being take care of
575 |       False => writeIORef fbr.canceled True $> resume fbr
576 |   run
577 |
578 | -- We have a result and the fiber can be finalized.
579 | -- This can only be called from a running fiber, so we don't have
580 | -- to check the state here.
581 | covering finalize : FiberImpl es a -> Outcome es a -> IO ()
582 | finalize fbr o = do
583 |   run <- withLock fbr $ do -- make sure no one else adjusts the state
584 |     -- We won the race, so we set the state to "Done" before anybody else does.
585 |     writeIORef fbr.state (Done o)
586 |
587 |     -- Read and empty the callbacks...
588 |     cbs <- readIORef fbr.callbacks
589 |     writeIORef fbr.callbacks empty
590 |
591 |     -- Read and empty the children...
592 |     chl <- readIORef fbr.children
593 |     writeIORef fbr.children empty
594 |
595 |     -- ...and invoke all callbacks and cancel all children
596 |     pure $ do
597 |       for_ fbr.parent (\(AF x) => removeChild x fbr.token)
598 |       for_ cbs (\cb => cb o)
599 |       for_ chl (\(AF x) => cancelImpl x)
600 |
601 |   run -- actually run the action we got
602 |
603 | set : Cancelability -> Async es a -> Async es a
604 | set x (Asnc y z)   = Asnc (x <+> y) z
605 | set x (Sync y z)   = Sync (x <+> y) z
606 | set x (Bind y z f) = Bind (x <+> y) z f
607 | set x (Start y z)  = Start (x <+> y) z
608 | set _ y            = y
609 |
610 | observeCancelation : Cancelability -> FiberImpl es a -> IO Bool
611 | observeCancelation U _ = pure False
612 | observeCancelation _ f = withLock f (readIORef f.canceled)
613 |
614 | covering (.fiber) : FiberImpl es a -> Fiber es a
615 | f.fiber = MkFiber f.token (observeImpl f) (stopObservingImpl f) (cancelImpl f)
616 |
617 | run 0 fbr act stck = ec.submit (run ec.limit fbr act stck)
618 |
619 | run (S k) fbr (Bind c x f) stck = run k fbr (set c x) ((c,f)::stck)
620 |
621 | run (S k) fbr (Term o) [] = finalize fbr o
622 |
623 | run (S k) fbr (Term o) ((c,f)::t) = do
624 |   False <- observeCancelation c fbr | True => run k fbr (Term Canceled) t
625 |   run k fbr (set c $ f o) t
626 |
627 | run (S k) fbr (Sync c io) stck = do
628 |   False <- observeCancelation c fbr | True => run k fbr (Term Canceled) stck
629 |   r     <- io
630 |   run k fbr (Term $ toOutcome r) stck
631 |
632 | run (S k) fbr (Shift ec2) stck = do
633 |   writeIORef fbr.ec ec2 >>
634 |   ec2.submit (run @{ec2} k fbr (pure ()) stck)
635 |
636 | run (S k) fbr Cancel stck = do
637 |   withLock fbr (writeIORef fbr.canceled True) >>
638 |   run k fbr (Term Canceled) stck
639 |
640 | run (S k) fbr GetEC stck = run k fbr (pure ec) stck
641 |
642 | run (S k) fbr Self stck = run k fbr (pure fbr.token) stck
643 |
644 | run (S k) fbr (Start c as) stck = do
645 |   False <- observeCancelation c fbr | True => run k fbr (Term Canceled) stck
646 |   child <- newFiber ec (Just $ AF fbr) as
647 |   ec.submit (resume child)
648 |   run k fbr (Term $ Succeeded child.fiber) stck
649 |
650 | run (S k) fbr (Asnc c reg) stck = do
651 |   False <- observeCancelation c fbr | True => run k fbr (Term Canceled) stck
652 |   ref <- newIORef Nothing
653 |   cnl <- reg $ \o => do
654 |     run <- withLock fbr $ do
655 |       -- test if we won the race and the value is yet unset
656 |       Nothing <- readIORef ref | _ => pure (pure ())
657 |       -- write the value and continue
658 |       writeIORef ref (Just o)
659 |
660 |       -- Check if the fiber has been canceled. If that's the case,
661 |       -- we are going to be left behind anyway, and we must abort.
662 |       -- (Because the fiber has been canceled, it's current state might
663 |       -- be "Running", and we must not mistake that for us winning the
664 |       -- concurrent race)
665 |       readIORef fbr.canceled >>= \case
666 |         True  => pure (pure ())
667 |         False =>
668 |           -- Check the current fiber state: If it is still at `Running`, we were
669 |           -- so quick (or synchronous) that the fiber had no time to get
670 |           -- suspended. In that case, the fiber will be suspended in a moment
671 |           -- and we inform it that the result is already here.
672 |           readIORef fbr.state >>= \case
673 |             -- We were quick and the fiber can continue immediately.
674 |             Running => writeIORef fbr.state ResultReady $> pure()
675 |             -- The fiber has already been suspended, so it can resume now.
676 |             _       => pure (resume fbr)
677 |     run
678 |   suspend fbr ref cnl stck
679 |
680 | runAsyncWith @{ec} as cb = do
681 |   fib <- newFiber ec Nothing as
682 |   tk  <- token
683 |   observeImpl fib tk cb
684 |   ec.submit (resume fib)
685 |