0 | module IO.Async.Channel
2 | import Data.Linear.Ref1
3 | import Data.Linear.Unique
5 | import Derive.Prelude
10 | %language ElabReflection
13 | data SendRes = Sent | SentAndClosed | Closed
15 | %runElab derive "SendRes" [Show,Eq,Ord]
17 | sendRes : (opn : Bool) -> SendRes
19 | sendRes False = SentAndClosed
21 | record Offerer a where
25 | callback : SendRes -> IO1 ()
28 | Eq (Offerer a) where
29 | v1 == v2 = v1.token == v2.token
31 | 0 Taker : Type -> Type
32 | Taker a = Maybe a -> IO1 ()
39 | taker : Maybe (Taker a)
40 | offerers : Queue (Offerer a)
44 | takeV : Taker a -> Maybe a -> IO1 (IO1 ())
45 | takeV cb v t = let _ # t := cb v t in unit1 # t
48 | takeO : Bool -> Offerer a -> Taker a -> Maybe a -> IO1 (IO1 ())
49 | takeO b (O _ _ cb) tk v t = let _ # t := cb (sendRes b) t in takeV tk v t
52 | offer : SendRes -> Offerer a -> IO1 (IO1 ())
53 | offer sr (O _ _ cb) t = let _ # t := cb sr t in unit1 # t
56 | unrec : IORef (ST a) -> IO1 ()
57 | unrec r = casmod1 r $
{taker := Nothing}
60 | unoffer : IORef (ST a) -> Offerer a -> IO1 ()
61 | unoffer r o = casmod1 r $
{offerers $= filter (/= o)}
63 | rec : IORef (ST a) -> (Maybe a -> IO1 ()) -> IO1 (IO1 ())
64 | rec r cb t = let act # t := casupdate1 r adj t in act t
66 | adj : ST a -> (ST a, IO1 (IO1 ()))
67 | adj st@(S cap q ts os opn) =
69 | Just (n,q2) => case dequeue os of
70 | Nothing => (S (S cap) q2 ts os opn, takeV cb $
Just n)
72 | (S cap (enqueue q2 o.value) ts os2 opn, takeO opn o cb $
Just n)
73 | Nothing => case dequeue os of
74 | Nothing => case opn of
75 | True => (S cap q (Just cb) os opn, pure $
unrec r)
76 | False => (st, takeV cb Nothing)
78 | (S cap q ts os2 opn, takeO opn o cb $
Just o.value)
80 | snd : IORef (ST a) -> Offerer a -> IO1 (IO1 ())
81 | snd r o t = let act # t := casupdate1 r adj t in act t
83 | adj : ST a -> (ST a, IO1 (IO1 ()))
84 | adj st@(S cap q ts os opn) =
86 | False => (st, offer Closed o)
88 | Just cb => (S cap q Nothing os opn, takeO opn o cb $
Just o.value)
89 | Nothing => case cap of
90 | 0 => (S 0 q ts (enqueue os o) opn, pure $
unoffer r o)
91 | S k => (S k (enqueue q o.value) ts os opn, offer Sent o)
93 | cls : IORef (ST a) -> IO1 ()
94 | cls r t = let act # t := casupdate1 r adj t in act t
96 | adj : ST a -> (ST a, IO1 ())
97 | adj st@(S cap q ts os opn) =
99 | False => (st, unit1)
101 | Just cb => (S 0 empty Nothing empty False, cb Nothing)
102 | Nothing => (S cap q ts os False, unit1)
114 | record Channel a where
120 | channel : Lift1 World f => Nat -> f (Channel a)
121 | channel cap = C <$> newref (S cap empty empty empty True)
126 | channelOf : Lift1 World f => (0 a : Type) -> Nat -> f (Channel a)
127 | channelOf _ = channel
137 | send : Channel a -> a -> Async e es SendRes
139 | primAsync $
\cb,t =>
140 | let tok # t := token1 t
141 | in Channel.snd ref (O tok v $
cb . Right) t
149 | receive : Channel a -> Async e es (Maybe a)
150 | receive (C ref) = primAsync $
\cb => Channel.rec ref (cb . Right)
156 | close : Channel a -> Async e es ()
157 | close (C ref) = lift1 $
cls ref