0 | module IO.Async.BQueue
  1 |
  2 | import Data.Linear.Ref1
  3 | import Data.Linear.Unique
  4 | import Data.Queue
  5 | import IO.Async
  6 | import Syntax.T1
  7 |
  8 | %default total
  9 |
 10 | record Taker a where
 11 |   constructor T
 12 |   token    : IOToken
 13 |   callback : a -> IO1 ()
 14 |
 15 | %inline
 16 | Eq (Taker a) where
 17 |   v1 == v2 = v1.token == v2.token
 18 |
 19 | record Offerer a where
 20 |   constructor O
 21 |   token    : IOToken
 22 |   value    : a
 23 |   callback : IO1 ()
 24 |
 25 | %inline
 26 | Eq (Offerer a) where
 27 |   v1 == v2 = v1.token == v2.token
 28 |
 29 | -- internal state of the queue
 30 | record ST a where
 31 |   constructor S
 32 |   capacity : Nat
 33 |   queue    : Queue a
 34 |   takers   : Queue (Taker a)
 35 |   offerers : Queue (Offerer a)
 36 |
 37 | %inline
 38 | takeV : Taker a -> a -> IO1 (IO1 ())
 39 | takeV (T _ cb) v t = let _ # t := cb v t in unit1 # t
 40 |
 41 | %inline
 42 | takeO : Offerer a -> Taker a -> a -> IO1 (IO1 ())
 43 | takeO (O _ _ cb) tk v t = let _ # t := cb t in takeV tk v t
 44 |
 45 | %inline
 46 | offer : Offerer a -> IO1 (IO1 ())
 47 | offer (O _ _ cb) t = let _ # t := cb t in unit1 # t
 48 |
 49 | ||| A concurrent, bounded queue holding values of type `a`.
 50 | |||
 51 | ||| This is an important primitive for implementing producer/consumer
 52 | ||| services.
 53 | export
 54 | record BQueue a where
 55 |   constructor BQ
 56 |   ref : IORef (ST a)
 57 |
 58 | ||| Creates a new bounded queue of the given capacity.
 59 | export %inline
 60 | bqueue : Lift1 World f => Nat -> f (BQueue a)
 61 | bqueue cap = BQ <$> newref (S cap empty empty empty)
 62 |
 63 | ||| Utility alias for `bqueue` taking the type of stored values
 64 | ||| as an explicit argument.
 65 | export %inline
 66 | bqueueOf : Lift1 World f => (0 a : Type) -> Nat -> f (BQueue a)
 67 | bqueueOf _ = bqueue
 68 |
 69 | %inline
 70 | untake : IORef (ST a) -> Taker a -> IO1 ()
 71 | untake r t = casmod1 r $ {takers $= filter (/= t)}
 72 |
 73 | %inline
 74 | unoffer : IORef (ST a) -> Offerer a -> IO1 ()
 75 | unoffer r o = casmod1 r $ {offerers $= filter (/= o)}
 76 |
 77 | deq : IORef (ST a) -> Taker a -> IO1 (IO1 ())
 78 | deq r tk t = let act # t := casupdate1 r adj t in act t
 79 |   where
 80 |     adj : ST a -> (ST a, IO1 (IO1 ()))
 81 |     adj (S cap q ts os) =
 82 |       case dequeue q of
 83 |         Just (n,q2) => case dequeue os of
 84 |           Nothing => (S (S cap) q2 ts os, takeV tk n)
 85 |           Just (o,os2) => (S cap (enqueue q2 o.value) ts os2, takeO o tk n)
 86 |         Nothing => case dequeue os of
 87 |           Nothing => (S cap q (enqueue ts tk) os, pure $ untake r tk)
 88 |           Just (o,os2) => (S cap q ts os2, takeO o tk o.value)
 89 |
 90 | enq : IORef (ST a) -> Offerer a -> IO1 (IO1 ())
 91 | enq r o t = let act # t := casupdate1 r adj t in act t
 92 |   where
 93 |     adj : ST a -> (ST a, IO1 (IO1 ()))
 94 |     adj (S cap q ts os) =
 95 |       case dequeue ts of
 96 |         Just (t,ts2) => (S cap q ts2 os, takeO o t o.value)
 97 |         Nothing => case cap of
 98 |           0   => (S 0 q ts (enqueue os o), pure $ unoffer r o)
 99 |           S k => (S k (enqueue q o.value) ts os, offer o)
100 |
101 | ||| Appends a value to a bounded queue potentially blocking the
102 | ||| calling fiber until there is some capacity.
103 | export
104 | enqueue : BQueue a -> a -> Async e es ()
105 | enqueue (BQ ref) v =
106 |   primAsync $ \cb,t =>
107 |    let tok # t := token1 t
108 |     in enq ref (O tok v $ cb (Right ())) t
109 |
110 | ||| Extracts the next value from a bounded queue potentially blocking
111 | ||| the calling fiber until such a value is available.
112 | export
113 | dequeue : BQueue a -> Async e es a
114 | dequeue (BQ ref) =
115 |   primAsync $ \cb,t =>
116 |    let tok # t := token1 t
117 |     in deq ref (T tok $ cb . Right) t
118 |