0 | module IO.Async.Util
  1 |
  2 | import Control.Monad.Resource
  3 | import Data.Array
  4 | import Data.Array.Mutable
  5 | import Data.Linear.Deferred
  6 | import Data.Linear.Unique
  7 | import Data.Maybe
  8 | import IO.Async.Core
  9 | import IO.Async.Internal.Ref
 10 | import IO.Async.Loop.Sync
 11 | import IO.Async.Loop.TimerH
 12 | import IO.Async.Semaphore
 13 | import System.Clock
 14 |
 15 | %default total
 16 |
 17 | --------------------------------------------------------------------------------
 18 | -- Error handling
 19 | --------------------------------------------------------------------------------
 20 |
 21 | public export
 22 | 0 Handler : Type -> Type -> Type -> Type
 23 | Handler a e x = x -> Async e [] a
 24 |
 25 | ||| Inject an `IO (Either e a)` computation into an `Async` monad dealing
 26 | ||| with several possible errors.
 27 | export
 28 | injectIO : Has x es => IO (Either x a) -> Async e es a
 29 | injectIO = sync . map (mapFst inject)
 30 |
 31 | export
 32 | embed : (onCancel : Lazy (Async e es a)) ->  Outcome es a -> Async e es a
 33 | embed _  (Succeeded res) = succeed res
 34 | embed _  (Error err)     = fail err
 35 | embed oc Canceled        = oc
 36 |
 37 | --------------------------------------------------------------------------------
 38 | -- Handling fibers
 39 | --------------------------------------------------------------------------------
 40 |
 41 | ||| Semantically blocks the current fiber until the given fiber
 42 | ||| produces and outcome, and returns the outcome produced.
 43 | export %inline
 44 | join : Fiber es a -> Async e fs (Outcome es a)
 45 | join f = do
 46 |   me <- self
 47 |   primAsync $ \cb => f.observe_ me $ cb . Right
 48 |
 49 | ||| Awaits the termination of a fiber ignoring its outcome.
 50 | export %inline
 51 | wait : Fiber es a -> Async e fs ()
 52 | wait = ignore . join
 53 |
 54 | ||| Cancels the given fiber.
 55 | |||
 56 | ||| This will semantically block the current fiber, until the target has
 57 | ||| completed.
 58 | export
 59 | cancel : (target : Fiber es a) -> Async e fs ()
 60 | cancel f = uncancelable $ \_ => runIO f.cancel_ >> ignore (join f)
 61 |
 62 | export %inline
 63 | Resource (Async e) (Fiber es a) where
 64 |   cleanup = wait
 65 |
 66 | --------------------------------------------------------------------------------
 67 | -- Spawning Fibers
 68 | --------------------------------------------------------------------------------
 69 |
 70 | ||| Like `primAsync` but does not provide a hook for canceling.
 71 | export
 72 | primAsync_ : ((Result es a -> IO1 ()) -> IO1 ()) -> Async e es a
 73 | primAsync_ f =
 74 |   primAsync $ \cb,t =>
 75 |     let _ # t := f cb t
 76 |      in unit1 # t
 77 |
 78 | ||| A (cancelable) asynchronous computation that will never produce a
 79 | ||| result
 80 | export
 81 | never : Async e es a
 82 | never = primAsync_ $ \cb => unit1
 83 |
 84 | ||| Awaits the completion of a `Once a`.
 85 | export %inline
 86 | awaitOnce : Once World a -> Async e es a
 87 | awaitOnce o = primAsync $ \cb => observeOnce1 o (cb . Right)
 88 |
 89 | ||| Awaits the completion of a `Deferred a`.
 90 | export %inline
 91 | await : Deferred World a -> Async e es a
 92 | await d = do
 93 |   me <- self
 94 |   primAsync $ \cb => observeDeferredAs1 d me (cb . Right)
 95 |
 96 | listenPair :
 97 |      Fiber es a
 98 |   -> Fiber fs b
 99 |   -> Async e gs (Either (Outcome es a, Fiber fs b) (Fiber es a, Outcome fs b))
100 | listenPair f1 f2 = do
101 |   me <- self
102 |   primAsync $ \cb,t =>
103 |     let c1 # t := f1.observe_ me (\o => cb (Right $ Left  (o, f2))) t
104 |         c2 # t := f2.observe_ me (\o => cb (Right $ Right (f1, o))) t
105 |      in (\t => let _ # t := c1 t in c2 t) # t
106 |
107 | ||| A low-level primitive for racing the evaluation of two fibers that returns the [[Outcome]]
108 | ||| of the winner and the [[Fiber]] of the loser. The winner of the race is considered to be
109 | ||| the first fiber that completes with an outcome.
110 | ||| 
111 | ||| `racePair` is a cancelation-unsafe function; it is recommended to use the safer variants.
112 | export
113 | racePair :
114 |      Async e es a
115 |   -> Async e fs b
116 |   -> Async e gs (Either (Outcome es a, Fiber fs b) (Fiber es a, Outcome fs b))
117 | racePair x y =
118 |   uncancelable $ \poll => Prelude.do
119 |     f1 <- start x
120 |     f2 <- start y
121 |     poll (listenPair f1 f2)
122 |
123 | ||| Awaits the completion of the bound fiber and returns its result once it completes.
124 | ||| 
125 | ||| If the fiber completes with [[Outcome.Succeeded]], the successful value is returned. If the
126 | ||| fiber completes with [[Outcome.Errored]], the error is raised. If the fiber completes with
127 | ||| [[Outcome.Canceled]], `onCancel` is run.
128 | export
129 | joinWith : Fiber es a -> (onCancel: Lazy (Async e es a)) ->  Async e es a
130 | joinWith f c = join f >>= embed c
131 |
132 | ||| Like `joinWith`, returning the `neutral` value of the `Monoid` in case of
133 | ||| cancelation.
134 | export
135 | joinWithNeutral : Monoid a => Fiber es a -> Async e es a
136 | joinWithNeutral f = joinWith f (pure neutral)
137 |
138 | export
139 | cancelable : (act : Async e es a) -> (fin : Async e [] ()) -> Async e es (Maybe a)
140 | cancelable act fin =
141 |   uncancelable $ \poll => do
142 |     fiber <- start act
143 |     out   <- onCancel (poll $ join fiber) (fin >> cancel fiber)
144 |     embed (poll $ canceled $> Nothing) (map Just out)
145 |
146 | ||| Races the evaluation of two fibers that returns the [[Outcome]] of the winner. The winner
147 | ||| of the race is considered to be the first fiber that completes with an outcome. The loser
148 | ||| of the race is canceled before returning.
149 | ||| 
150 | ||| @param fa
151 | |||   the effect for the first racing fiber
152 | ||| @param fb
153 | |||   the effect for the second racing fiber
154 | ||| 
155 | ||| @see
156 | |||   [[race]] for a simpler variant that returns the successful outcome.
157 | export
158 | raceOutcome : Async e es a -> Async e fs b -> Async e gs (Either (Outcome es a) (Outcome fs b))
159 | raceOutcome fa fb =
160 |   uncancelable $ \poll => poll (racePair fa fb) >>= \case
161 |     Left  (oc,f) => cancel f $> Left oc
162 |     Right (f,oc) => cancel f $> Right oc
163 |
164 | ||| Races the evaluation of several fibers, returning the result
165 | ||| of the winnner. The other fibers are canceled as soon as one of the
166 | ||| fibers produced an outcome.
167 | ||| case of cancelation.
168 | ||| 
169 | ||| The semantics of [[race]] are described by the following rules:
170 | ||| 
171 | |||   1. If the winner completes with [[Outcome.Succeeded]], the race returns the successful
172 | |||      value. The loser is canceled before returning.
173 | |||   2. If the winner completes with [[Outcome.Errored]], the race raises the error.
174 | |||      The loser is canceled before returning.
175 | |||   3. If the winner completes with [[Outcome.Canceled]], the race cancels
176 | |||      the loser and returns its result, fires an error, or returns `Nothing`
177 | |||      its outcome is `Canceled`.
178 | ||| 
179 | ||| @param fa
180 | |||   the effect for the first racing fiber
181 | ||| @param fb
182 | |||   the effect for the second racing fiber
183 | ||| 
184 | ||| @see
185 | |||   [[raceOutcome]] for a variant that returns the outcome of the winner.
186 | export
187 | race2 :
188 |      Async e es a
189 |   -> Async e es b
190 |   -> (a -> c)
191 |   -> (b -> c)
192 |   -> Lazy c
193 |   -> Async e es c
194 | race2 fa fb ac bc dflt =
195 |   uncancelable $ \poll => poll (racePair fa fb) >>= \case
196 |     Left  (oc,f) => case oc of
197 |       Succeeded res => cancel f $> ac res
198 |       Error err     => cancel f >> fail err
199 |       Canceled      => cancel f >> join f >>= \case
200 |         Succeeded res => pure $ bc res
201 |         Error err     => fail err
202 |         Canceled      => pure dflt
203 |     Right (f,oc) => case oc of
204 |       Succeeded res => cancel f $> bc res
205 |       Error err     => cancel f >> fail err
206 |       Canceled      => cancel f >> join f >>= \case
207 |         Succeeded res => pure $ ac res
208 |         Error err     => fail err
209 |         Canceled      => pure dflt
210 |
211 | ||| This generalizes `race2` to an arbitrary heterogeneous list.
212 | export
213 | hrace : All (Async e es) ts -> Async e es (Maybe $ HSum ts)
214 | hrace []       = pure Nothing
215 | hrace [x]      = map (Just . Here) x
216 | hrace (x :: y) = race2 x (hrace y) (Just . Here) (map There) Nothing
217 |
218 | ||| A more efficient, monomorphic version of `hrace` with slightly
219 | ||| different semantics: The winner decides the outcome of the are
220 | ||| even if it has been cancele.
221 | export
222 | race : (dflt : Lazy a) -> List (Async e es a) -> Async e es a
223 | race dflt []      = pure dflt
224 | race dflt [x]     = x
225 | race dflt (x::xs) = race2 x (race dflt xs) id id dflt
226 |
227 | ||| Runs several non-productive fibers in parallel, terminating
228 | ||| as soon as the first one completes.
229 | export %inline
230 | race_ : List (Async e es ()) -> Async e es ()
231 | race_ = race ()
232 |
233 | ||| Races the evaluation of two fibers and returns the [[Outcome]] of both. If the race is
234 | ||| canceled before one or both participants complete, then then whichever ones are incomplete
235 | ||| are canceled.
236 | ||| 
237 | ||| @param fa
238 | |||   the effect for the first racing fiber
239 | ||| @param fb
240 | |||   the effect for the second racing fiber
241 | ||| 
242 | ||| @see
243 | |||   [[both]] for a simpler variant that returns the results of both fibers.
244 | ||| 
245 | export
246 | bothOutcome : Async e es a -> Async e fs b -> Async e gs (Outcome es a, Outcome fs b)
247 | bothOutcome fa fb =
248 |   uncancelable $ \poll => poll (racePair fa fb) >>= \case
249 |     Left  (oc, f) => (oc,) <$> onCancel (poll $ join f) (cancel f)
250 |     Right (f, oc) => (,oc) <$> onCancel (poll $ join f) (cancel f)
251 |
252 | ||| Races the evaluation of two fibers and returns the result of both.
253 | ||| 
254 | ||| The following rules describe the semantics of [[both]]:
255 | ||| 
256 | |||   1. If the winner completes with [[Outcome.Succeeded]], the race waits for the loser to
257 | |||      complete.
258 | |||   2. If the winner completes with [[Outcome.Errored]], the race raises the
259 | |||      error. The loser is canceled.
260 | |||   3. If the winner completes with [[Outcome.Canceled]],
261 | |||      the loser and the race are canceled as well.
262 | |||   4. If the loser completes with
263 | |||      [[Outcome.Succeeded]], the race returns the successful value of both fibers.
264 | |||   5. If the
265 | |||      loser completes with [[Outcome.Errored]], the race returns the error.
266 | |||   6. If the loser
267 | |||      completes with [[Outcome.Canceled]], the race is canceled.
268 | |||   7. If the race is canceled
269 | |||      before one or both participants complete, then whichever ones are incomplete are
270 | |||      canceled.
271 | ||| 
272 | ||| @param fa
273 | |||   the effect for the first racing fiber
274 | ||| @param fb
275 | |||   the effect for the second racing fiber
276 | ||| 
277 | ||| @see
278 | |||   [[bothOutcome]] for a variant that returns the [[Outcome]] of both fibers.
279 | export
280 | both : Async e es a -> Async e es b -> Async e es (Maybe (a,b))
281 | both fa fb =
282 |   uncancelable $ \poll => poll (racePair fa fb) >>= \case
283 |     Left  (oc, f) => case oc of
284 |       Succeeded x => onCancel (poll $ join f) (cancel f) >>= \case
285 |         Succeeded y => pure $ Just (x,y)
286 |         Error err   => fail err
287 |         Canceled    => pure Nothing
288 |       Error err     => cancel f >> fail err
289 |       Canceled      => cancel f >> pure Nothing
290 |     Right (f, oc) => case oc of
291 |       Succeeded y => onCancel (poll $ join f) (cancel f) >>= \case
292 |         Succeeded x => pure $ Just (x,y)
293 |         Error err   => fail err
294 |         Canceled    => pure Nothing
295 |       Error err     => cancel f >> fail err
296 |       Canceled      => cancel f >> pure Nothing
297 |
298 | ||| Runs the given heterogeneous list of asynchronous computations
299 | ||| in parallel, collecting the results again in a heterogeneous list.
300 | export
301 | par : All (Async e es) ts -> Async e es (Maybe $ HList ts)
302 | par []     = pure (Just [])
303 | par [x]    = map (\v => Just [v]) x
304 | par (h::t) =
305 |   flip map (both h $ par t) $ \case
306 |     Just (h2,Just t2) => Just (h2::t2)
307 |     _                 => Nothing
308 |
309 | parstart :
310 |      {n : _}
311 |   -> SnocList (Fiber es a)
312 |   -> IOArray n (Outcome es a) 
313 |   -> Semaphore
314 |   -> (k : Nat)
315 |   -> {auto 0 lte : LTE k n}
316 |   -> List (Async e es a)
317 |   -> Async e es (List $ Fiber es a)
318 | parstart sx arr sem (S k) (x::xs) = do
319 |   fib <- start $ guaranteeCase x $ \case
320 |     Canceled => releaseN sem n
321 |     o        => runIO (setNat arr k o) >> release sem 
322 |   parstart (sx:<fib) arr sem k xs
323 | parstart sx arr sem _ _ = pure (sx <>> [])
324 |
325 | collect :
326 |      SnocList a
327 |   -> IOArray n (Outcome es a)
328 |   -> (k : Nat)
329 |   -> {auto 0 lte : LTE k n}
330 |   -> Bool
331 |   -> IO1 (Outcome es $ List a)
332 | collect sx arr 0     b t =
333 |   if b then Succeeded (sx <>> []) # t else Canceled # t
334 | collect sx arr (S k) b t =
335 |   case getNat arr k t of
336 |     Succeeded v # t => collect (sx:<v) arr k b t
337 |     Error x # t     => Error x # t
338 |     Canceled # t    => collect sx arr k False t
339 |
340 | marr : Lift1 World f => (n : Nat) -> f (k ** IOArray k (Outcome es a))
341 | marr n = do
342 |   arr <- marray n Canceled
343 |   pure (n ** arr)
344 |
345 | ||| Runs the given list of computations in parallel.
346 | |||
347 | ||| This fails with an error, as soon as the first computation
348 | ||| fails, and it returns `Nothing` as soon as the first computation
349 | ||| is canceled.
350 | export
351 | parseq : List (Async e es a) -> Async e es (Maybe $ List a)
352 | parseq xs =
353 |   uncancelable $ \poll => do
354 |     (n ** arr<- marr (length xs)
355 |     sem        <- semaphore 0
356 |     fs         <- parstart [<] arr sem n xs
357 |     flip guarantee (traverse_ cancel fs) $ poll $ do
358 |       acquireN sem n
359 |       runIO (collect [<] arr n True) >>= \case
360 |         Succeeded vs => pure (Just vs)
361 |         Error  x     => fail x
362 |         Canceled     => pure Nothing
363 |
364 | ||| Traverses a list of values effectfully in parallel.
365 | |||
366 | ||| This returns `Nothing` if one of the fibers was canceled.
367 | export %inline
368 | parTraverse : (a -> Async e es b) -> List a -> Async e es (Maybe $ List b)
369 | parTraverse f = parseq . map f
370 |
371 | --------------------------------------------------------------------------------
372 | -- Sleeping and Timed Execution
373 | --------------------------------------------------------------------------------
374 |
375 | ||| Wraps a lazy value in an `Async`.
376 | export
377 | lazy : Lazy a -> Async e es a
378 | lazy v = primAsync_ $ \cb => cb (Right v)
379 |
380 | parameters {auto tim : TimerH e}
381 |
382 |   ||| Delay a computation by the given number of nanoseconds.
383 |   export
384 |   sleep : (dur : Clock Duration) -> Async e es ()
385 |   sleep dur = do
386 |     ev <- env
387 |     primAsync $ \cb => primWait ev dur $ cb (Right ())
388 |
389 |   ||| Delay a computation by the given number of nanoseconds.
390 |   export
391 |   waitTill : Clock Monotonic -> Async e es ()
392 |   waitTill cl = do
393 |     now <- liftIO (clockTime Monotonic)
394 |     sleep (timeDifference cl now)
395 |
396 |   ||| Delay a computation by the given number of nanoseconds.
397 |   export
398 |   delay : (dur : Clock Duration) -> Async e es a -> Async e es a
399 |   delay dur act = sleep dur >> act
400 |
401 | ||| Converts a number to nanoseconds
402 | export %inline
403 | (.ns) : Nat -> Clock Duration
404 | n.ns = fromNano (cast n)
405 |
406 | ||| Converts a number of microseconds to nanoseconds
407 | export %inline
408 | (.us) : Nat -> Clock Duration
409 | n.us = (n * 1_000).ns
410 |
411 | ||| Converts a number of seconds to nanoseconds
412 | export %inline
413 | (.s) : Nat -> Clock Duration
414 | n.s = (n * 1_000_000).us
415 |
416 | ||| Converts a number of milliseconds to nanoseconds
417 | export %inline
418 | (.ms) : Nat -> Clock Duration
419 | n.ms = (n * 1000).us
420 |
421 | ||| Converts a number of minutes to nanoseconds
422 | export %inline
423 | (.min) : Nat -> Clock Duration
424 | n.min = (n * 60).s
425 |
426 | ||| Converts a number of hours to nanoseconds
427 | export %inline
428 | (.h) : Nat -> Clock Duration
429 | n.h = (n * 60).min
430 |
431 | ||| Converts a number of days to nanoseconds
432 | export %inline
433 | (.d) : Nat -> Clock Duration
434 | n.d = (n * 24).h
435 |
436 | ||| Runs an IO action, returning the time delta it took to run.
437 | export %inline
438 | delta : HasIO io => io () -> io (Clock Duration)
439 | delta act = do
440 |   c1 <- liftIO $ clockTime Monotonic
441 |   act
442 |   c2 <- liftIO $ clockTime Monotonic
443 |   pure (timeDifference c2 c1)
444 |
445 | --------------------------------------------------------------------------------
446 | -- Running `Async`
447 | --------------------------------------------------------------------------------
448 |
449 | export covering
450 | syncApp : Async SyncST [] () -> IO ()
451 | syncApp as = do
452 |   el  <- sync
453 |   runAsync el as
454 |
455 | --------------------------------------------------------------------------------
456 | -- Stack-safety
457 | --------------------------------------------------------------------------------
458 |
459 | ||| A stack-safe alternative to `traverse`, specialized for `List`.
460 | export
461 | traverseList : (a -> Async e es b) -> List a -> Async e es (List b)
462 | traverseList f = go [<]
463 |   where
464 |     go : SnocList b -> List a -> Async e es (List b)
465 |     go sx []        = pure (sx <>> [])
466 |     go sx (x :: xs) = f x >>= \v => go (sx:<v) xs
467 |
468 |