0 | ||| This module provides combinators for running streams concurrently,
  1 | ||| merging the output they produce nondeterministically, or interrupting
  2 | ||| a stream after a timeout.
  3 | |||
  4 | ||| Unlike the combinators in `FS.Stream`, we need a concurrent runtime
  5 | ||| to use the combinators provided here, which means that they run in
  6 | ||| the `Async` monad.
  7 | module FS.Concurrent
  8 |
  9 | import Data.Linear.Deferred
 10 | import Data.Linear.Ref1
 11 | import Data.List
 12 | import Data.Maybe
 13 | import Data.Nat
 14 |
 15 | import FS.Concurrent.Signal
 16 | import FS.Concurrent.Util
 17 | import FS.Pull
 18 | import FS.Resource
 19 | import FS.Scope
 20 |
 21 | import IO.Async.BQueue
 22 | import IO.Async.Channel
 23 | import IO.Async.Logging
 24 | import IO.Async.Loop.TimerH
 25 | import IO.Async.Semaphore
 26 |
 27 | import public IO.Async
 28 |
 29 | %default total
 30 |
 31 | ||| Convenience alias for `Pull . Async`
 32 | public export
 33 | 0 AsyncPull : Type -> Type -> List Type -> Type -> Type
 34 | AsyncPull e = Pull (Async e)
 35 |
 36 | ||| Convenience alias for `Stream . Async`
 37 | public export
 38 | 0 AsyncStream : Type -> List Type -> Type -> Type
 39 | AsyncStream e = Stream (Async e)
 40 |
 41 | ||| An empty stream that terminates after the given delay.
 42 | export %inline
 43 | sleep : TimerH e => Clock Duration -> AsyncStream e es o
 44 | sleep = exec . sleep
 45 |
 46 | ||| An empty stream that terminates at the given clock time.
 47 | export %inline
 48 | waitTill : TimerH e => Clock Monotonic -> AsyncStream e es o
 49 | waitTill = exec . waitTill
 50 |
 51 | ||| Emits the given value after a delay of the given duration.
 52 | export %inline
 53 | delayed : TimerH e => Clock Duration -> o -> AsyncStream e es o
 54 | delayed dur v = sleep dur >> emit v
 55 |
 56 | ||| Emits the given value after at the given clock time
 57 | export %inline
 58 | atClock : TimerH e => Clock Monotonic -> o -> AsyncStream e es o
 59 | atClock dur v = waitTill dur >> emit v
 60 |
 61 | ||| Infinitely emits the given value at regular intervals.
 62 | export %inline
 63 | timed : TimerH e => Clock Duration -> o -> AsyncStream e es o
 64 | timed dur v = do
 65 |   now <- liftIO (clockTime Monotonic)
 66 |   go (addDuration now dur)
 67 |   where
 68 |     go : Clock Monotonic -> AsyncStream e es o
 69 |     go cl = assert_total $ atClock cl v >> go (addDuration cl dur)
 70 |
 71 | ||| Emits the values from the given stream, each with a delay of the
 72 | ||| given duration.
 73 | |||
 74 | ||| The first value will be emitted `after` the given delay.
 75 | |||
 76 | ||| Note: If the given stream emits some values more slowly than specified
 77 | |||       by the delays, irregular emission of several values at a time
 78 | |||       might be observed.
 79 | export
 80 | every : TimerH e => Clock Duration -> AsyncStream e es a -> AsyncStream e es a
 81 | every x = zipRight (timed x ())
 82 |
 83 | ||| Like `every` but the first value will be emitted as soon as the
 84 | ||| original stream emits it.
 85 | export
 86 | every0 : TimerH e => Clock Duration -> AsyncStream e es a -> AsyncStream e es a
 87 | every0 x = zipRight (cons () $ timed x ())
 88 |
 89 | --------------------------------------------------------------------------------
 90 | -- Streams from Concurrency Primitives
 91 | --------------------------------------------------------------------------------
 92 |
 93 | ||| Converts a bounded queue of values into an infinite stream
 94 | ||| of values.
 95 | export %inline
 96 | dequeue : BQueue o -> AsyncStream e es o
 97 | dequeue = repeat . eval . dequeue
 98 |
 99 | export %inline
100 | Discrete BQueue where
101 |   discrete = dequeue
102 |
103 | ||| Converts a channel of chunks into an infinite stream of values.
104 | export %inline
105 | receive : Channel o -> AsyncStream e es o
106 | receive = unfoldEvalMaybe . receive
107 |
108 | export %inline
109 | Discrete Channel where
110 |   discrete = receive
111 |
112 | --------------------------------------------------------------------------------
113 | -- Interrupting Streams
114 | --------------------------------------------------------------------------------
115 |
116 | ||| Runs the given stream until the given duration expires.
117 | export
118 | timeout :
119 |      {auto th : TimerH e}
120 |   -> Clock Duration
121 |   -> AsyncStream e es o
122 |   -> AsyncStream e es o
123 | timeout dur str = do
124 |   def <- deferredOf ()
125 |   _   <- acquire (start {es = []} $ sleep dur >> putDeferred def ()) cancel
126 |   interruptOnAny def str
127 |
128 | --------------------------------------------------------------------------------
129 | -- Merging Streams
130 | --------------------------------------------------------------------------------
131 |
132 | parameters (chnl : Channel o)
133 |            (done : Deferred World ())
134 |            (res  : Deferred World (Result es ()))
135 |            (sema : IORef Nat)
136 |
137 |   -- Handles the outcome of running one of the input streams.
138 |   -- in case the stream terminated with an error, `res` is immediately
139 |   -- set, which causes the output stream to be interrupted and refire
140 |   -- the error. Otherwise, the counter in `sema` is atomically reduced
141 |   -- bye one and the channel closed if the counter arrives at 0.
142 |   out : Outcome es () -> Async e [] ()
143 |   out (Error err) = putDeferred res (Left err)
144 |   out _           = do
145 |     0 <- update sema (\x => let y := pred x in (y,y)) | _ => pure ()
146 |     close chnl
147 |
148 |   -- Starts running one of the input streams `s` in the background, returning
149 |   -- the corresponding fiber. Running `s` is interrupted if
150 |   -- the output stream is exhausted and `done` is completed.
151 |   -- The running input stream writes all chunks of output to the channel.
152 |   child : AsyncStream e es o -> Async e es (Fiber [] ())
153 |   child s = foreach (ignore . send chnl) s |> parrunCase done out
154 |
155 |   -- Starts running all input streams in parallel, and reads chunks of
156 |   -- output from the bounded queue `que`.
157 |   merged : List (AsyncStream e es o) -> AsyncStream e es o
158 |   merged ss =
159 |     bracket
160 |       (traverse child ss)
161 |       (\fs => putDeferred done () >> traverse_ wait fs)
162 |       (\_  => interruptOn res (receive chnl))
163 |
164 | ||| Runs the given streams in parallel and nondeterministically
165 | ||| (but chunkc-wise) interleaves their output.
166 | |||
167 | ||| The resulting stream will emit chunks as long as one of the input
168 | ||| streams is still alive, or until one of the input streams terminates
169 | ||| with an error, in which case the output stream will terminate with
170 | ||| the same error.
171 | export
172 | merge : List (AsyncStream e es o) -> AsyncStream e es o
173 | merge []  = neutral
174 | merge [s] = s
175 | merge ss  = Prelude.do
176 |   -- A bounded queue where the running streams will write their output
177 |   -- to. There will be no buffering: evaluating the streams will block
178 |   -- until then next chunk of ouptut has been requested by the consumer.
179 |   chnl <- channelOf o 0
180 |
181 |   -- Signals the exhaustion of the output stream, which will cause all
182 |   -- input streams to be interrupted.
183 |   done <- deferredOf {s = World} ()
184 |
185 |   -- Signals the termination of the input streams. This will be set as
186 |   -- soon as one input stream throws an error, or after all input
187 |   -- streams terminated successfully.
188 |   res  <- deferredOf {s = World} (Result es ())
189 |
190 |   -- Semaphore-like counter keeping track of the number of input streams
191 |   -- that are still running.
192 |   sema <- newref (length ss)
193 |
194 |   merged chnl done res sema ss
195 |
196 | ||| Runs the given streams in parallel and nondeterministically interleaves
197 | ||| their output.
198 | |||
199 | ||| This will terminate as soon as the first string is exhausted.
200 | export
201 | mergeHaltL : (s1,s2 : AsyncStream e es o) -> AsyncStream e es o
202 | mergeHaltL s1 s2 =
203 |   takeWhileJust $ merge [endWithNothing s1, mapOutput Just s2]
204 |
205 | ||| Runs the given streams in parallel and nondeterministically interleaves
206 | ||| their output.
207 | |||
208 | ||| This will terminate as soon as either stream is exhausted.
209 | export
210 | mergeHaltBoth : (s1,s2 : AsyncStream  e es o) -> AsyncStream e es o
211 | mergeHaltBoth s1 s2 =
212 |   takeWhileJust $ merge [endWithNothing s1, endWithNothing s2]
213 |
214 | ||| Runs the second stream until the first emits a value
215 | export
216 | haltOn : AsyncStream  e es o -> AsyncStream e es p -> AsyncStream e es p
217 | haltOn s1 s2 =
218 |   takeWhileJust $ merge [mapOutput (const Nothing) s1, mapOutput Just s2]
219 |
220 | --------------------------------------------------------------------------------
221 | -- Parallel Joining of Streams
222 | --------------------------------------------------------------------------------
223 |
224 | upd : Maybe (Async e [] ()) -> (Maybe (Async e [] ()), Bool)
225 | upd Nothing  = (Just $ pure (), True)
226 | upd (Just x) = (Just x, False)
227 |
228 | -- `parJoin` implementation
229 | parameters (done      : Deferred World (Result es ()))
230 |            (available : Semaphore)
231 |            (running   : SignalRef Nat)
232 |            (output    : Channel o)
233 |            (leaseref  : Ref World (Maybe $ Async e [] ()))
234 |
235 |
236 |   doLease : Scope (Async e) -> Async e es ()
237 |   doLease sc = do
238 |     True <- update leaseref upd | False => pure ()
239 |     cl   <- weakenErrors (lease sc)
240 |     writeref leaseref (Just cl)
241 |
242 |   doUnlease : Async e [] ()
243 |   doUnlease = readref leaseref >>= sequence_
244 |
245 |   -- Every time an inner or the outer stream terminates, the number
246 |   -- of running fibers is reduced by one. If this reaches zero, no
247 |   -- more streams (including the outer stream!) is running, so the
248 |   -- channel can be closed. The result stream will terminated as soon
249 |   -- as the closed channel is empty.
250 |   %inline
251 |   decRunning : Async e [] ()
252 |   decRunning =
253 |     updateAndGet running pred >>= \case
254 |       0 => close output
255 |       _ => pure ()
256 |
257 |   -- Runs an inner stream on its own fiber until it terminates gracefully
258 |   -- or fails with an error. In case of an error, the `done` flag is set
259 |   -- immediately to hold the error and stop all other running streams.
260 |   covering
261 |   inner : AsyncStream e es o -> Async e es ()
262 |   inner s =
263 |     uncancelable $ \poll => do
264 |       poll (acquire available) -- wait for a fiber to become available
265 |       modify running S         -- increase the number of running fibers
266 |       poll $ ignore $ parrunCase
267 |         done
268 |         (\o => putErr done o >> decRunning >> release available)
269 |         (foreach (ignore . send output) s)
270 |
271 |   -- Runs the outer stream on its own fiber until it terminates gracefully
272 |   -- or fails with an error.
273 |   -- The stream is interrupted as soon as the `done` flag is set.
274 |   outer : AsyncStream e es (AsyncStream e es o) -> Async e es (Fiber [] ())
275 |   outer ss =
276 |     parrunCase
277 |       done
278 |       (\o => putErr done o >> decRunning >> until running (== 0) >> doUnlease) $
279 |         flatMap ss $ \v => do
280 |           sc <- scope
281 |           exec (doLease sc >> inner v)
282 |
283 | ||| Nondeterministically merges a stream of streams (`outer`) in to a single stream,
284 | ||| opening at most `maxOpen` streams at any point in time.
285 | |||
286 | ||| The outer stream is evaluated and each resulting inner stream is run concurrently,
287 | ||| up to `maxOpen` stream. Once this limit is reached, evaluation of the outer stream
288 | ||| is paused until one of the inner streams finishes evaluating.
289 | |||
290 | ||| When the outer stream stops gracefully, all inner streams continue to run,
291 | ||| resulting in a stream that will stop when all inner streams finish
292 | ||| their evaluation.
293 | |||
294 | ||| Finalizers on each inner stream are run at the end of the inner stream,
295 | ||| concurrently with other stream computations.
296 | |||
297 | ||| Finalizers on the outer stream are run after all inner streams have been pulled
298 | ||| from the outer stream but not before all inner streams terminate
299 | ||| -- hence finalizers on the outer stream will run
300 | ||| AFTER the LAST finalizer on the very last inner stream.
301 | |||
302 | ||| Finalizers on the returned stream are run after the outer stream has finished
303 | ||| and all open inner streams have finished.
304 | export
305 | parJoin :
306 |      (maxOpen    : Nat)
307 |   -> {auto 0 prf : IsSucc maxOpen}
308 |   -> (outer      : AsyncStream e es (AsyncStream e es o))
309 |   -> AsyncStream e es o
310 | parJoin 1       out = flatMap out id
311 | parJoin maxOpen out = do
312 |   leaseref  <- newref {a = Maybe (Async e [] ())} Nothing
313 |
314 |   -- Signals exhaustion of the output stream (for instance, due
315 |   -- to a `take n`). It will interrupt evaluation of the
316 |   -- input stream and all child streams.
317 |   done      <- deferredOf (Result es ())
318 |
319 |   -- Concurrent slots available. Child streams will wait on this
320 |   -- before being started.
321 |   available <- semaphore maxOpen
322 |
323 |   running   <- signal 1
324 |
325 |   -- The input channel used for the result stream. It will be
326 |   -- closed when the last child was exhausted.
327 |   output    <- channelOf o 0
328 |
329 |   fbr       <- exec $ outer done available running output leaseref out
330 |
331 |   -- The resulting stream should cleanup resources when it is done.
332 |   -- It should also finalize `done`.
333 |   interruptOn done $
334 |     finally (putDeferred done (Right ()) >> wait fbr) (receive output)
335 |
336 | ||| Convenience alias for `P.mapOutput inner outer |> parJoin maxOpen`
337 | export %inline
338 | parBind :
339 |      (maxOpen    : Nat)
340 |   -> {auto 0 prf : IsSucc maxOpen}
341 |   -> (inner      : o -> AsyncStream e es p)
342 |   -> (outer      : AsyncStream e es o)
343 |   -> AsyncStream e es p
344 | parBind mo i o = mapOutput i o |> parJoin mo
345 |
346 | export
347 | broadcast :
348 |      AsyncStream e es o
349 |   -> (pipes : List (o -> AsyncStream e es p))
350 |   -> {auto 0 prf : NonEmpty pipes}
351 |   -> AsyncStream e es p
352 | broadcast s ps@(_::qs) =
353 |   parJoin (S $ length ps) (flatMap s $ \v => emits $ map (v) ps)
354 |
355 | ||| Uses `parJoin` to map the given function over each emitted output
356 | ||| in parallel.
357 | export
358 | parMapE :
359 |      (maxOpen    : Nat)
360 |   -> {auto 0 prf : IsSucc maxOpen}
361 |   -> (fun        : o -> Result es p)
362 |   -> (outer      : AsyncStream e es o)
363 |   -> AsyncStream e es p
364 | parMapE maxOpen fun = parJoin maxOpen . Pull.mapOutput run
365 |   where
366 |     run : o -> AsyncStream e es p
367 |     run o = pure o >>= either fail emit . fun
368 |
369 | ||| Like `parMapE`, but injects the error first.
370 | export
371 | parMapI :
372 |      {auto has   : Has x es}
373 |   -> (maxOpen    : Nat)
374 |   -> {auto 0 prf : IsSucc maxOpen}
375 |   -> (fun        : o -> Either x p)
376 |   -> (outer      : AsyncStream e es o)
377 |   -> AsyncStream e es p
378 | parMapI maxOpen fun = parMapE maxOpen (mapFst inject . fun)
379 |
380 | ||| Like `parMapE`, but for a function that cannot fail.
381 | export
382 | parMap :
383 |      (maxOpen    : Nat)
384 |   -> {auto 0 prf : IsSucc maxOpen}
385 |   -> (fun        : o -> p)
386 |   -> (outer      : AsyncStream e es o)
387 |   -> AsyncStream e es p
388 | parMap maxOpen fun = parMapE maxOpen (Right . fun)
389 |
390 | export
391 | foreachPar :
392 |      (maxOpen    : Nat)
393 |   -> {auto 0 prf : IsSucc maxOpen}
394 |   -> (sink       : o -> Async e [] ())
395 |   -> (outer      : AsyncPull e o es r)
396 |   -> AsyncPull e q es r
397 | foreachPar maxOpen sink outer = do
398 |   -- Concurrent slots available. Child streams will wait on this
399 |   -- before being started.
400 |   available <- semaphore maxOpen
401 |
402 |   finally
403 |     (acquireN available maxOpen)
404 |     (foreach (run available) outer)
405 |
406 |   where
407 |     run : Semaphore -> o -> Async e es ()
408 |     run available v = do
409 |       acquire available
410 |       ignore $ start (guarantee (sink v) (release available))
411 |
412 | --------------------------------------------------------------------------------
413 | -- Switch Map
414 | --------------------------------------------------------------------------------
415 |
416 | parameters (ps    : AsyncStream e es p)
417 |            (guard : Semaphore)
418 |
419 |     switchInner : Deferred World () -> AsyncStream e es p
420 |     switchInner halt =
421 |       interruptOnAny halt $
422 |         bracket
423 |           (acquire guard)
424 |           (const $ release guard) $ \_ => ps
425 |
426 |     switchHalted : IORef (Maybe $ Deferred World ()) -> Async e es (AsyncStream e es p)
427 |     switchHalted ref = do
428 |       halt <- deferredOf ()
429 |       prev <- update ref (Just halt,)
430 |       for_ prev $ \p => putDeferred p ()
431 |       pure $ switchInner halt
432 |
433 | ||| Like `flatMap` but interrupts the inner stream when
434 | ||| new elements arrive in the outer stream.
435 | |||
436 | ||| Finializers of each inner stream are guaranteed to run
437 | ||| before the next inner stream starts.
438 | ||| When the outer stream stops gracefully, the currently running
439 | ||| inner stream will continue to run.
440 | |||
441 | ||| When an inner stream terminates/interrupts, nothing
442 | ||| happens until the next element arrives
443 | ||| in the outer stream.
444 | |||
445 | ||| When either the inner or outer stream fails, the entire
446 | ||| stream fails and the finalizer of the
447 | ||| inner stream runs before the outer one.
448 | |||
449 | export
450 | switchMap :
451 |      (o -> AsyncStream e es p)
452 |   -> AsyncStream e es o
453 |   -> AsyncStream e es p
454 | switchMap f os = do
455 |   guard <- semaphore 1
456 |   ref   <- newref Nothing
457 |   parJoin 2 (evalMap (\v => switchHalted (f v) guard ref) os)
458 |
459 | --------------------------------------------------------------------------------
460 | -- Hold
461 | --------------------------------------------------------------------------------
462 |
463 | public export
464 | record Hold e es o where
465 |   constructor H
466 |   release : Async e [] ()
467 |   stream  : AsyncStream e es o
468 |
469 | export %inline
470 | Resource (Async e) (Hold e es o) where
471 |   cleanup = release
472 |
473 | ||| Converts a discrete stream of values into a continuous one that will
474 | ||| emit the last value emitted by the original stream on every pull starting
475 | ||| with the given initial value.
476 | |||
477 | ||| The original stream is immediately started and
478 | ||| processed in the background.
479 | |||
480 | ||| This should be used in combination with a call to `bracket` or
481 | ||| `resource`, so that the stream running in the background is
482 | ||| properly terminated and its resources released
483 | ||| once the resulting stream is exhausted.
484 | |||
485 | ||| ```idris example
486 | ||| signalOn :
487 | |||     o
488 | |||  -> AsyncStream e es ()
489 | |||  -> AsyncStream e es o
490 | |||  -> AsyncStream e es o
491 | ||| signalOn ini tick sig =
492 | |||   resource (hold ini sig) (zipRight tick . stream)
493 | ||| ```
494 | export
495 | hold :
496 |      (ini : o)
497 |   -> AsyncStream e es o
498 |   -> Async e fs (Hold e es o)
499 | hold ini os = do
500 |   -- Signals the exhaustion of the output stream, which will cause the
501 |   -- input streams to be interrupted.
502 |   done <- deferredOf {s = World} ()
503 |
504 |   -- Signals the termination of the input streams. This will be set as
505 |   -- soon as the input stream throws an error or after the input
506 |   -- stream terminated successfully.
507 |   res  <- deferredOf {s = World} (Result es ())
508 |
509 |   ref <- newref ini
510 |
511 |   fbr <- foreach (writeref ref) os |> parrunCase done (putErr res)
512 |   pure $
513 |     H
514 |       (putDeferred done () >> wait fbr)
515 |       (interruptOn res (repeat (eval $ readref ref)))
516 |
517 | ||| Like `hold` but the resulting stream will not emit a value
518 | ||| until after the original stream first emitted a value.
519 | export
520 | hold1 : AsyncStream e es o -> Async e fs (Hold e es o)
521 | hold1 = map {stream $= catMaybes} . hold Nothing . mapOutput Just
522 |
523 | ||| Runs the second stream in the background, emitting its latest
524 | ||| output whenever the first stream emits.
525 | export
526 | signalOn : o -> AsyncStream e es () -> AsyncStream e es o -> AsyncStream e es o
527 | signalOn ini tick sig = resource (hold ini sig) (zipRight tick . stream)
528 |
529 | ||| Like `signalOn` but only starts emitting values *after* the
530 | ||| second stream emitted its first value.
531 | export
532 | signalOn1 : AsyncStream e es () -> AsyncStream e es o -> AsyncStream e es o
533 | signalOn1 tick sig = resource (hold1 sig) (zipRight tick . stream)
534 |
535 | --------------------------------------------------------------------------------
536 | -- Logging Utils
537 | --------------------------------------------------------------------------------
538 |
539 | parameters {0 es     : List Type}
540 |            {auto lgs : All (Loggable e) es}
541 |
542 |   export
543 |   logExec : (dflt : t) -> Async e es t -> Pull (Async e) o [] t
544 |   logExec dflt = exec . unerr dflt
545 |
546 |   export %inline
547 |   tryExec : Async e es t -> Pull (Async e) o [] (Maybe t)
548 |   tryExec = logExec Nothing . map Just
549 |
550 |   export
551 |   tryPull : (dflt : r) -> Pull (Async e) o es r -> Pull (Async e) o [] r
552 |   tryPull dflt = handle (mapProperty (\_,v => exec (logLoggable v $> dflt)) lgs)
553 |
554 |   export %inline
555 |   tryStream : Stream (Async e) es o -> Stream (Async e) [] o
556 |   tryStream = tryPull ()
557 |
558 | --------------------------------------------------------------------------------
559 | -- Read-only Signals
560 | --------------------------------------------------------------------------------
561 |
562 | ||| An observe-only wrapper around a `SignalRef`.
563 | |||
564 | ||| Use this if you still want to observe a mutable value by means of
565 | ||| `discrete` or `continuous` but you want to prevent it to be further used
566 | ||| as a data sink.
567 | export
568 | record Signal a where
569 |   constructor MkSignal
570 |   signow  : IO1 a
571 |   sigdisc : {0 e : Type} -> {0 es : List Type} -> Stream (Async e) es a
572 |
573 | export
574 | sig : SignalRef a -> Signal a
575 | sig s = MkSignal (current1 s) (discrete s)
576 |
577 | export %hint %inline
578 | ref2sig : SignalRef a => Signal a
579 | ref2sig = sig %search
580 |
581 | export %inline
582 | Reference Signal where
583 |   current1 = signow
584 |
585 | export %inline
586 | Discrete Signal where
587 |   discrete s = s.sigdisc
588 |
589 | export
590 | Functor Signal where
591 |   map f (MkSignal n d) =
592 |     MkSignal
593 |       (\t => let v # t := n t in f v # t)
594 |       (mapOutput f d)
595 |
596 | export
597 | Applicative Signal where
598 |   pure v = MkSignal (v #) (emit v)
599 |   MkSignal nf df <*> MkSignal nv dv =
600 |     MkSignal
601 |       (\t => let f # t := nf tv # t := nv tin f v # t) $
602 |       merge
603 |         [ evalMap (\f => f <$> lift1 nv) df
604 |         , evalMap (\v => (v) <$> lift1 nf) dv
605 |         ]
606 |
607 | export
608 | Zippable Signal where
609 |   unzipWith f s = (map (fst . f) s, map (snd . f) s)
610 |   zipWith f x y = f <$> x <*> y
611 |
612 | ||| Generalization of `observeSig`: Acts on the output of a pull by combining
613 | ||| with the values from a heterogeneous list of signals.
614 | export
615 | hobserveSig :
616 |      {0 f      : List Type -> Type -> Type}
617 |   -> {0 es,ts  : List Type}
618 |   -> {auto lio : LIO (f es)}
619 |   -> All Signal ts
620 |   -> HZipFun (o::ts) (f es ())
621 |   -> Pull f o es r
622 |   -> Pull f o es r
623 | hobserveSig sigs fun =
624 |   observe $ \vo => Prelude.do
625 |     vs <- hsequence $ mapProperty current sigs
626 |     happly fun (vo::vs)
627 |
628 | ||| Like `hobserveSig` but drains the stream in the process.
629 | export
630 | hforeachSig :
631 |      {0 f      : List Type -> Type -> Type}
632 |   -> {0 es,ts  : List Type}
633 |   -> {auto lio : LIO (f es)}
634 |   -> All Signal ts
635 |   -> HZipFun (o::ts) (f es ())
636 |   -> Pull f o es r
637 |   -> Pull f q es r
638 | hforeachSig sigs fun =
639 |   foreach $ \vo => Prelude.do
640 |     vs <- hsequence $ mapProperty current sigs
641 |     happly fun (vo::vs)
642 |