0 | module IO.Async.Channel
  1 |
  2 | import Data.Linear.Ref1
  3 | import Data.Linear.Unique
  4 | import Data.Queue
  5 | import Derive.Prelude
  6 | import IO.Async
  7 | import Syntax.T1
  8 |
  9 | %default total
 10 | %language ElabReflection
 11 |
 12 | public export
 13 | data SendRes = Sent | SentAndClosed | Closed
 14 |
 15 | %runElab derive "SendRes" [Show,Eq,Ord]
 16 |
 17 | sendRes : (opn : Bool) -> SendRes
 18 | sendRes True  = Sent
 19 | sendRes False = SentAndClosed
 20 |
 21 | record Offerer a where
 22 |   constructor O
 23 |   token    : IOToken
 24 |   value    : a
 25 |   callback : SendRes -> IO1 ()
 26 |
 27 | %inline
 28 | Eq (Offerer a) where
 29 |   v1 == v2 = v1.token == v2.token
 30 |
 31 | 0 Taker : Type -> Type
 32 | Taker a = Maybe a -> IO1 ()
 33 |
 34 | -- internal state of the channel
 35 | record ST a where
 36 |   constructor S
 37 |   capacity : Nat
 38 |   queue    : Queue a
 39 |   taker    : Maybe (Taker a)
 40 |   offerers : Queue (Offerer a)
 41 |   open_    : Bool
 42 |
 43 | %inline
 44 | takeV : Taker a -> Maybe a -> IO1 (IO1 ())
 45 | takeV cb v t = let _ # t := cb v t in unit1 # t
 46 |
 47 | %inline
 48 | takeO : Bool -> Offerer a -> Taker a -> Maybe a -> IO1 (IO1 ())
 49 | takeO b (O _ _ cb) tk v t = let _ # t := cb (sendRes b) t in takeV tk v t
 50 |
 51 | %inline
 52 | offer : SendRes -> Offerer a -> IO1 (IO1 ())
 53 | offer sr (O _ _ cb) t = let _ # t := cb sr t in unit1 # t
 54 |
 55 | %inline
 56 | unrec : IORef (ST a) -> IO1 ()
 57 | unrec r = casmod1 r $ {taker := Nothing}
 58 |
 59 | %inline
 60 | unoffer : IORef (ST a) -> Offerer a -> IO1 ()
 61 | unoffer r o = casmod1 r $ {offerers $= filter (/= o)}
 62 |
 63 | rec : IORef (ST a) -> (Maybe a -> IO1 ()) -> IO1 (IO1 ())
 64 | rec r cb t = let act # t := casupdate1 r adj t in act t
 65 |   where
 66 |     adj : ST a -> (ST a, IO1 (IO1 ()))
 67 |     adj st@(S cap q ts os opn) =
 68 |       case dequeue q of
 69 |         Just (n,q2) => case dequeue os of
 70 |           Nothing => (S (S cap) q2 ts os opn, takeV cb $ Just n)
 71 |           Just (o, os2) =>
 72 |             (S cap (enqueue q2 o.value) ts os2 opn, takeO opn o cb $ Just n)
 73 |         Nothing => case dequeue os of
 74 |           Nothing => case opn of
 75 |             True  => (S cap q (Just cb) os opn, pure $ unrec r)
 76 |             False => (st, takeV cb Nothing)
 77 |           Just (o, os2) =>
 78 |             (S cap q ts os2 opn, takeO opn o cb $ Just o.value)
 79 |
 80 | snd : IORef (ST a) -> Offerer a -> IO1 (IO1 ())
 81 | snd r o t = let act # t := casupdate1 r adj t in act t
 82 |   where
 83 |     adj : ST a -> (ST a, IO1 (IO1 ()))
 84 |     adj st@(S cap q ts os opn) =
 85 |      case opn of
 86 |        False => (st, offer Closed o)
 87 |        True  => case ts of
 88 |          Just cb => (S cap q Nothing os opn, takeO opn o cb $ Just o.value)
 89 |          Nothing => case cap of
 90 |            0 => (S 0 q ts (enqueue os o) opn, pure $ unoffer r o)
 91 |            S k => (S k (enqueue q o.value) ts os opn, offer Sent o)
 92 |
 93 | cls : IORef (ST a) -> IO1 ()
 94 | cls r t = let act # t := casupdate1 r adj t in act t
 95 |   where
 96 |     adj : ST a -> (ST a, IO1 ())
 97 |     adj st@(S cap q ts os opn) =
 98 |       case opn of
 99 |         False => (st, unit1)
100 |         True  => case ts of
101 |           Just cb => (S 0 empty Nothing empty False, cb Nothing)
102 |           Nothing => (S cap q ts os False, unit1)
103 |
104 | ||| A concurrent, bounded channel holding values of type `a`.
105 | |||
106 | ||| This is an important primitive for implementing single
107 | ||| consumer, multiple producer services.
108 | |||
109 | ||| Note: Unlike with `IO.Async.BQueue`, which can have multiple
110 | |||       consumers, this will only accpet a single consumer,
111 | |||       silently overwriting an old consumer in case a new one
112 | |||       calls.
113 | export
114 | record Channel a where
115 |   constructor C
116 |   ref : IORef (ST a)
117 |
118 | ||| Creates a new bounded queue of the given capacity.
119 | export %inline
120 | channel : Lift1 World f => Nat -> f (Channel a)
121 | channel cap = C <$> newref (S cap empty empty empty True)
122 |
123 | ||| Utility alias for `channel` taking the type of stored values
124 | ||| as an explicit argument.
125 | export %inline
126 | channelOf : Lift1 World f => (0 a : Type) -> Nat -> f (Channel a)
127 | channelOf _ = channel
128 |
129 | ||| Sends a value through a channel potentially blocking the
130 | ||| calling fiber until there is some capacity.
131 | |||
132 | ||| This returns
133 | |||   * `Sent` if the data was received and the channel is still open after sending
134 | |||   * `SentAndClosed` if the data was received and the channel is now closed
135 | |||   * `Closed` if the data could not be sent since the channel is closed.
136 | export
137 | send : Channel a -> a -> Async e es SendRes
138 | send (C ref) v =
139 |   primAsync $ \cb,t =>
140 |    let tok # t := token1 t
141 |     in Channel.snd ref (O tok v $ cb . Right) t
142 |
143 | ||| Extracts the next value from a channel potentially blocking
144 | ||| the calling fiber until such a value is available.
145 | |||
146 | ||| This returns `Nothing` if the channel has been closed and
147 | ||| no pending values are left.
148 | export
149 | receive : Channel a -> Async e es (Maybe a)
150 | receive (C ref) = primAsync $ \cb => Channel.rec ref (cb . Right)
151 |
152 | ||| Gracefully closes the channel: No more data can be sent
153 | ||| (`send` will return immedately with `Closed` from now on),
154 | ||| put pending data can still be received.
155 | export
156 | close : Channel a -> Async e es ()
157 | close (C ref) = lift1 $ cls ref
158 |