2 | import Control.Monad.Resource
4 | import Data.Array.Mutable
5 | import Data.Linear.Deferred
6 | import Data.Linear.Unique
9 | import IO.Async.Internal.Ref
10 | import IO.Async.Loop.Sync
11 | import IO.Async.Loop.TimerH
12 | import IO.Async.Semaphore
22 | 0 Handler : Type -> Type -> Type -> Type
23 | Handler a e x = x -> Async e [] a
28 | injectIO : Has x es => IO (Either x a) -> Async e es a
29 | injectIO = sync . map (mapFst inject)
32 | embed : (onCancel : Lazy (Async e es a)) -> Outcome es a -> Async e es a
33 | embed _ (Succeeded res) = succeed res
34 | embed _ (Error err) = fail err
35 | embed oc Canceled = oc
44 | join : Fiber es a -> Async e fs (Outcome es a)
47 | primAsync $
\cb => f.observe_ me $
cb . Right
51 | wait : Fiber es a -> Async e fs ()
52 | wait = ignore . join
59 | cancel : (target : Fiber es a) -> Async e fs ()
60 | cancel f = uncancelable $
\_ => runIO f.cancel_ >> ignore (join f)
63 | Resource (Async e) (Fiber es a) where
72 | primAsync_ : ((Result es a -> IO1 ()) -> IO1 ()) -> Async e es a
74 | primAsync $
\cb,t =>
81 | never : Async e es a
82 | never = primAsync_ $
\cb => unit1
86 | awaitOnce : Once World a -> Async e es a
87 | awaitOnce o = primAsync $
\cb => observeOnce1 o (cb . Right)
91 | await : Deferred World a -> Async e es a
94 | primAsync $
\cb => observeDeferredAs1 d me (cb . Right)
99 | -> Async e gs (Either (Outcome es a, Fiber fs b) (Fiber es a, Outcome fs b))
100 | listenPair f1 f2 = do
102 | primAsync $
\cb,t =>
103 | let c1 # t := f1.observe_ me (\o => cb (Right $
Left (o, f2))) t
104 | c2 # t := f2.observe_ me (\o => cb (Right $
Right (f1, o))) t
105 | in (\t => let _ # t := c1 t in c2 t) # t
116 | -> Async e gs (Either (Outcome es a, Fiber fs b) (Fiber es a, Outcome fs b))
118 | uncancelable $
\poll => Prelude.do
121 | poll (listenPair f1 f2)
129 | joinWith : Fiber es a -> (onCancel: Lazy (Async e es a)) -> Async e es a
130 | joinWith f c = join f >>= embed c
135 | joinWithNeutral : Monoid a => Fiber es a -> Async e es a
136 | joinWithNeutral f = joinWith f (pure neutral)
139 | cancelable : (act : Async e es a) -> (fin : Async e [] ()) -> Async e es (Maybe a)
140 | cancelable act fin =
141 | uncancelable $
\poll => do
143 | out <- onCancel (poll $
join fiber) (fin >> cancel fiber)
144 | embed (poll $
canceled $> Nothing) (map Just out)
158 | raceOutcome : Async e es a -> Async e fs b -> Async e gs (Either (Outcome es a) (Outcome fs b))
159 | raceOutcome fa fb =
160 | uncancelable $
\poll => poll (racePair fa fb) >>= \case
161 | Left (oc,f) => cancel f $> Left oc
162 | Right (f,oc) => cancel f $> Right oc
194 | race2 fa fb ac bc dflt =
195 | uncancelable $
\poll => poll (racePair fa fb) >>= \case
196 | Left (oc,f) => case oc of
197 | Succeeded res => cancel f $> ac res
198 | Error err => cancel f >> fail err
199 | Canceled => cancel f >> join f >>= \case
200 | Succeeded res => pure $
bc res
201 | Error err => fail err
202 | Canceled => pure dflt
203 | Right (f,oc) => case oc of
204 | Succeeded res => cancel f $> bc res
205 | Error err => cancel f >> fail err
206 | Canceled => cancel f >> join f >>= \case
207 | Succeeded res => pure $
ac res
208 | Error err => fail err
209 | Canceled => pure dflt
213 | hrace : All (Async e es) ts -> Async e es (Maybe $
HSum ts)
214 | hrace [] = pure Nothing
215 | hrace [x] = map (Just . Here) x
216 | hrace (x :: y) = race2 x (hrace y) (Just . Here) (map There) Nothing
222 | race : (dflt : Lazy a) -> List (Async e es a) -> Async e es a
223 | race dflt [] = pure dflt
225 | race dflt (x::xs) = race2 x (race dflt xs) id id dflt
230 | race_ : List (Async e es ()) -> Async e es ()
246 | bothOutcome : Async e es a -> Async e fs b -> Async e gs (Outcome es a, Outcome fs b)
247 | bothOutcome fa fb =
248 | uncancelable $
\poll => poll (racePair fa fb) >>= \case
249 | Left (oc, f) => (oc,) <$> onCancel (poll $
join f) (cancel f)
250 | Right (f, oc) => (,oc) <$> onCancel (poll $
join f) (cancel f)
280 | both : Async e es a -> Async e es b -> Async e es (Maybe (a,b))
282 | uncancelable $
\poll => poll (racePair fa fb) >>= \case
283 | Left (oc, f) => case oc of
284 | Succeeded x => onCancel (poll $
join f) (cancel f) >>= \case
285 | Succeeded y => pure $
Just (x,y)
286 | Error err => fail err
287 | Canceled => pure Nothing
288 | Error err => cancel f >> fail err
289 | Canceled => cancel f >> pure Nothing
290 | Right (f, oc) => case oc of
291 | Succeeded y => onCancel (poll $
join f) (cancel f) >>= \case
292 | Succeeded x => pure $
Just (x,y)
293 | Error err => fail err
294 | Canceled => pure Nothing
295 | Error err => cancel f >> fail err
296 | Canceled => cancel f >> pure Nothing
301 | par : All (Async e es) ts -> Async e es (Maybe $
HList ts)
302 | par [] = pure (Just [])
303 | par [x] = map (\v => Just [v]) x
305 | flip map (both h $
par t) $
\case
306 | Just (h2,Just t2) => Just (h2::t2)
311 | -> SnocList (Fiber es a)
312 | -> IOArray n (Outcome es a)
315 | -> {auto 0 lte : LTE k n}
316 | -> List (Async e es a)
317 | -> Async e es (List $
Fiber es a)
318 | parstart sx arr sem (S k) (x::xs) = do
319 | fib <- start $
guaranteeCase x $
\case
320 | Canceled => releaseN sem n
321 | o => runIO (setNat arr k o) >> release sem
322 | parstart (sx:<fib) arr sem k xs
323 | parstart sx arr sem _ _ = pure (sx <>> [])
327 | -> IOArray n (Outcome es a)
329 | -> {auto 0 lte : LTE k n}
331 | -> IO1 (Outcome es $
List a)
332 | collect sx arr 0 b t =
333 | if b then Succeeded (sx <>> []) # t else Canceled # t
334 | collect sx arr (S k) b t =
335 | case getNat arr k t of
336 | Succeeded v # t => collect (sx:<v) arr k b t
337 | Error x # t => Error x # t
338 | Canceled # t => collect sx arr k False t
340 | marr : Lift1 World f => (n : Nat) -> f (
k ** IOArray k (Outcome es a))
342 | arr <- marray n Canceled
351 | parseq : List (Async e es a) -> Async e es (Maybe $
List a)
353 | uncancelable $
\poll => do
354 | (
n ** arr)
<- marr (length xs)
356 | fs <- parstart [<] arr sem n xs
357 | flip guarantee (traverse_ cancel fs) $
poll $
do
359 | runIO (collect [<] arr n True) >>= \case
360 | Succeeded vs => pure (Just vs)
362 | Canceled => pure Nothing
368 | parTraverse : (a -> Async e es b) -> List a -> Async e es (Maybe $
List b)
369 | parTraverse f = parseq . map f
377 | lazy : Lazy a -> Async e es a
378 | lazy v = primAsync_ $
\cb => cb (Right v)
380 | parameters {auto tim : TimerH e}
384 | sleep : (dur : Clock Duration) -> Async e es ()
387 | primAsync $
\cb => primWait ev dur $
cb (Right ())
391 | waitTill : Clock Monotonic -> Async e es ()
393 | now <- liftIO (clockTime Monotonic)
394 | sleep (timeDifference cl now)
398 | delay : (dur : Clock Duration) -> Async e es a -> Async e es a
399 | delay dur act = sleep dur >> act
403 | (.ns) : Nat -> Clock Duration
404 | n.ns = fromNano (cast n)
408 | (.us) : Nat -> Clock Duration
409 | n.us = (n * 1_000).ns
413 | (.s) : Nat -> Clock Duration
414 | n.s = (n * 1_000_000).us
418 | (.ms) : Nat -> Clock Duration
419 | n.ms = (n * 1000).us
423 | (.min) : Nat -> Clock Duration
428 | (.h) : Nat -> Clock Duration
433 | (.d) : Nat -> Clock Duration
438 | delta : HasIO io => io () -> io (Clock Duration)
440 | c1 <- liftIO $
clockTime Monotonic
442 | c2 <- liftIO $
clockTime Monotonic
443 | pure (timeDifference c2 c1)
450 | syncApp : Async SyncST [] () -> IO ()
461 | traverseList : (a -> Async e es b) -> List a -> Async e es (List b)
462 | traverseList f = go [<]
464 | go : SnocList b -> List a -> Async e es (List b)
465 | go sx [] = pure (sx <>> [])
466 | go sx (x :: xs) = f x >>= \v => go (sx:<v) xs