9 | import Data.Linear.Deferred
10 | import Data.Linear.Ref1
15 | import FS.Concurrent.Signal
16 | import FS.Concurrent.Util
21 | import IO.Async.BQueue
22 | import IO.Async.Channel
23 | import IO.Async.Logging
24 | import IO.Async.Loop.TimerH
25 | import IO.Async.Semaphore
27 | import public IO.Async
33 | 0 AsyncPull : Type -> Type -> List Type -> Type -> Type
34 | AsyncPull e = Pull (Async e)
38 | 0 AsyncStream : Type -> List Type -> Type -> Type
39 | AsyncStream e = Stream (Async e)
43 | sleep : TimerH e => Clock Duration -> AsyncStream e es o
44 | sleep = exec . sleep
48 | waitTill : TimerH e => Clock Monotonic -> AsyncStream e es o
49 | waitTill = exec . waitTill
53 | delayed : TimerH e => Clock Duration -> o -> AsyncStream e es o
54 | delayed dur v = sleep dur >> emit v
58 | atClock : TimerH e => Clock Monotonic -> o -> AsyncStream e es o
59 | atClock dur v = waitTill dur >> emit v
63 | timed : TimerH e => Clock Duration -> o -> AsyncStream e es o
65 | now <- liftIO (clockTime Monotonic)
66 | go (addDuration now dur)
68 | go : Clock Monotonic -> AsyncStream e es o
69 | go cl = assert_total $
atClock cl v >> go (addDuration cl dur)
80 | every : TimerH e => Clock Duration -> AsyncStream e es a -> AsyncStream e es a
81 | every x = zipRight (timed x ())
86 | every0 : TimerH e => Clock Duration -> AsyncStream e es a -> AsyncStream e es a
87 | every0 x = zipRight (cons () $
timed x ())
96 | dequeue : BQueue o -> AsyncStream e es o
97 | dequeue = repeat . eval . dequeue
100 | Discrete BQueue where
105 | receive : Channel o -> AsyncStream e es o
106 | receive = unfoldEvalMaybe . receive
109 | Discrete Channel where
119 | {auto th : TimerH e}
121 | -> AsyncStream e es o
122 | -> AsyncStream e es o
123 | timeout dur str = do
124 | def <- deferredOf ()
125 | _ <- acquire (start {es = []} $
sleep dur >> putDeferred def ()) cancel
126 | interruptOnAny def str
132 | parameters (chnl : Channel o)
133 | (done : Deferred World ())
134 | (res : Deferred World (Result es ()))
142 | out : Outcome es () -> Async e [] ()
143 | out (Error err) = putDeferred res (Left err)
145 | 0 <- update sema (\x => let y := pred x in (y,y)) | _ => pure ()
152 | child : AsyncStream e es o -> Async e es (Fiber [] ())
153 | child s = foreach (ignore . send chnl) s |> parrunCase done out
157 | merged : List (AsyncStream e es o) -> AsyncStream e es o
160 | (traverse child ss)
161 | (\fs => putDeferred done () >> traverse_ wait fs)
162 | (\_ => interruptOn res (receive chnl))
172 | merge : List (AsyncStream e es o) -> AsyncStream e es o
175 | merge ss = Prelude.do
179 | chnl <- channelOf o 0
183 | done <- deferredOf {s = World} ()
188 | res <- deferredOf {s = World} (Result es ())
192 | sema <- newref (length ss)
194 | merged chnl done res sema ss
201 | mergeHaltL : (s1,s2 : AsyncStream e es o) -> AsyncStream e es o
203 | takeWhileJust $
merge [endWithNothing s1, mapOutput Just s2]
210 | mergeHaltBoth : (s1,s2 : AsyncStream e es o) -> AsyncStream e es o
211 | mergeHaltBoth s1 s2 =
212 | takeWhileJust $
merge [endWithNothing s1, endWithNothing s2]
216 | haltOn : AsyncStream e es o -> AsyncStream e es p -> AsyncStream e es p
218 | takeWhileJust $
merge [mapOutput (const Nothing) s1, mapOutput Just s2]
224 | upd : Maybe (Async e [] ()) -> (Maybe (Async e [] ()), Bool)
225 | upd Nothing = (Just $
pure (), True)
226 | upd (Just x) = (Just x, False)
229 | parameters (done : Deferred World (Result es ()))
230 | (available : Semaphore)
231 | (running : SignalRef Nat)
232 | (output : Channel o)
233 | (leaseref : Ref World (Maybe $
Async e [] ()))
236 | doLease : Scope (Async e) -> Async e es ()
238 | True <- update leaseref upd | False => pure ()
239 | cl <- weakenErrors (lease sc)
240 | writeref leaseref (Just cl)
242 | doUnlease : Async e [] ()
243 | doUnlease = readref leaseref >>= sequence_
251 | decRunning : Async e [] ()
253 | updateAndGet running pred >>= \case
261 | inner : AsyncStream e es o -> Async e es ()
263 | uncancelable $
\poll => do
264 | poll (acquire available)
266 | poll $
ignore $
parrunCase
268 | (\o => putErr done o >> decRunning >> release available)
269 | (foreach (ignore . send output) s)
274 | outer : AsyncStream e es (AsyncStream e es o) -> Async e es (Fiber [] ())
278 | (\o => putErr done o >> decRunning >> until running (== 0) >> doUnlease) $
279 | flatMap ss $
\v => do
281 | exec (doLease sc >> inner v)
307 | -> {auto 0 prf : IsSucc maxOpen}
308 | -> (outer : AsyncStream e es (AsyncStream e es o))
309 | -> AsyncStream e es o
310 | parJoin 1 out = flatMap out id
311 | parJoin maxOpen out = do
312 | leaseref <- newref {a = Maybe (Async e [] ())} Nothing
317 | done <- deferredOf (Result es ())
321 | available <- semaphore maxOpen
323 | running <- signal 1
327 | output <- channelOf o 0
329 | fbr <- exec $
outer done available running output leaseref out
334 | finally (putDeferred done (Right ()) >> wait fbr) (receive output)
340 | -> {auto 0 prf : IsSucc maxOpen}
341 | -> (inner : o -> AsyncStream e es p)
342 | -> (outer : AsyncStream e es o)
343 | -> AsyncStream e es p
344 | parBind mo i o = mapOutput i o |> parJoin mo
349 | -> (pipes : List (o -> AsyncStream e es p))
350 | -> {auto 0 prf : NonEmpty pipes}
351 | -> AsyncStream e es p
352 | broadcast s ps@(_::qs) =
353 | parJoin (S $
length ps) (flatMap s $
\v => emits $
map ($
v) ps)
360 | -> {auto 0 prf : IsSucc maxOpen}
361 | -> (fun : o -> Result es p)
362 | -> (outer : AsyncStream e es o)
363 | -> AsyncStream e es p
364 | parMapE maxOpen fun = parJoin maxOpen . Pull.mapOutput run
366 | run : o -> AsyncStream e es p
367 | run o = pure o >>= either fail emit . fun
372 | {auto has : Has x es}
374 | -> {auto 0 prf : IsSucc maxOpen}
375 | -> (fun : o -> Either x p)
376 | -> (outer : AsyncStream e es o)
377 | -> AsyncStream e es p
378 | parMapI maxOpen fun = parMapE maxOpen (mapFst inject . fun)
384 | -> {auto 0 prf : IsSucc maxOpen}
386 | -> (outer : AsyncStream e es o)
387 | -> AsyncStream e es p
388 | parMap maxOpen fun = parMapE maxOpen (Right . fun)
393 | -> {auto 0 prf : IsSucc maxOpen}
394 | -> (sink : o -> Async e [] ())
395 | -> (outer : AsyncPull e o es r)
396 | -> AsyncPull e q es r
397 | foreachPar maxOpen sink outer = do
400 | available <- semaphore maxOpen
403 | (acquireN available maxOpen)
404 | (foreach (run available) outer)
407 | run : Semaphore -> o -> Async e es ()
408 | run available v = do
410 | ignore $
start (guarantee (sink v) (release available))
416 | parameters (ps : AsyncStream e es p)
417 | (guard : Semaphore)
419 | switchInner : Deferred World () -> AsyncStream e es p
421 | interruptOnAny halt $
424 | (const $
release guard) $
\_ => ps
426 | switchHalted : IORef (Maybe $
Deferred World ()) -> Async e es (AsyncStream e es p)
427 | switchHalted ref = do
428 | halt <- deferredOf ()
429 | prev <- update ref (Just halt,)
430 | for_ prev $
\p => putDeferred p ()
431 | pure $
switchInner halt
451 | (o -> AsyncStream e es p)
452 | -> AsyncStream e es o
453 | -> AsyncStream e es p
454 | switchMap f os = do
455 | guard <- semaphore 1
456 | ref <- newref Nothing
457 | parJoin 2 (evalMap (\v => switchHalted (f v) guard ref) os)
464 | record Hold e es o where
466 | release : Async e [] ()
467 | stream : AsyncStream e es o
470 | Resource (Async e) (Hold e es o) where
497 | -> AsyncStream e es o
498 | -> Async e fs (Hold e es o)
502 | done <- deferredOf {s = World} ()
507 | res <- deferredOf {s = World} (Result es ())
511 | fbr <- foreach (writeref ref) os |> parrunCase done (putErr res)
514 | (putDeferred done () >> wait fbr)
515 | (interruptOn res (repeat (eval $
readref ref)))
520 | hold1 : AsyncStream e es o -> Async e fs (Hold e es o)
521 | hold1 = map {stream $= catMaybes} . hold Nothing . mapOutput Just
526 | signalOn : o -> AsyncStream e es () -> AsyncStream e es o -> AsyncStream e es o
527 | signalOn ini tick sig = resource (hold ini sig) (zipRight tick . stream)
532 | signalOn1 : AsyncStream e es () -> AsyncStream e es o -> AsyncStream e es o
533 | signalOn1 tick sig = resource (hold1 sig) (zipRight tick . stream)
539 | parameters {0 es : List Type}
540 | {auto lgs : All (Loggable e) es}
543 | logExec : (dflt : t) -> Async e es t -> Pull (Async e) o [] t
544 | logExec dflt = exec . unerr dflt
547 | tryExec : Async e es t -> Pull (Async e) o [] (Maybe t)
548 | tryExec = logExec Nothing . map Just
551 | tryPull : (dflt : r) -> Pull (Async e) o es r -> Pull (Async e) o [] r
552 | tryPull dflt = handle (mapProperty (\_,v => exec (logLoggable v $> dflt)) lgs)
555 | tryStream : Stream (Async e) es o -> Stream (Async e) [] o
556 | tryStream = tryPull ()
568 | record Signal a where
569 | constructor MkSignal
571 | sigdisc : {0 e : Type} -> {0 es : List Type} -> Stream (Async e) es a
574 | sig : SignalRef a -> Signal a
575 | sig s = MkSignal (current1 s) (discrete s)
577 | export %hint %inline
578 | ref2sig : SignalRef a => Signal a
579 | ref2sig = sig %search
582 | Reference Signal where
586 | Discrete Signal where
587 | discrete s = s.sigdisc
590 | Functor Signal where
591 | map f (MkSignal n d) =
593 | (\t => let v # t := n t in f v # t)
597 | Applicative Signal where
598 | pure v = MkSignal (v #) (emit v)
599 | MkSignal nf df <*> MkSignal nv dv =
601 | (\t => let f # t := nf t;
v # t := nv t;
in f v # t) $
603 | [ evalMap (\f => f <$> lift1 nv) df
604 | , evalMap (\v => ($
v) <$> lift1 nf) dv
608 | Zippable Signal where
609 | unzipWith f s = (map (fst . f) s, map (snd . f) s)
610 | zipWith f x y = f <$> x <*> y
616 | {0 f : List Type -> Type -> Type}
617 | -> {0 es,ts : List Type}
618 | -> {auto lio : LIO (f es)}
620 | -> HZipFun (o::ts) (f es ())
623 | hobserveSig sigs fun =
624 | observe $
\vo => Prelude.do
625 | vs <- hsequence $
mapProperty current sigs
626 | happly fun (vo::vs)
631 | {0 f : List Type -> Type -> Type}
632 | -> {0 es,ts : List Type}
633 | -> {auto lio : LIO (f es)}
635 | -> HZipFun (o::ts) (f es ())
638 | hforeachSig sigs fun =
639 | foreach $
\vo => Prelude.do
640 | vs <- hsequence $
mapProperty current sigs
641 | happly fun (vo::vs)