0 | module IO.Async.BQueue
2 | import Data.Linear.Ref1
3 | import Data.Linear.Unique
10 | record Taker a where
13 | callback : a -> IO1 ()
17 | v1 == v2 = v1.token == v2.token
19 | record Offerer a where
26 | Eq (Offerer a) where
27 | v1 == v2 = v1.token == v2.token
34 | takers : Queue (Taker a)
35 | offerers : Queue (Offerer a)
38 | takeV : Taker a -> a -> IO1 (IO1 ())
39 | takeV (T _ cb) v t = let _ # t := cb v t in unit1 # t
42 | takeO : Offerer a -> Taker a -> a -> IO1 (IO1 ())
43 | takeO (O _ _ cb) tk v t = let _ # t := cb t in takeV tk v t
46 | offer : Offerer a -> IO1 (IO1 ())
47 | offer (O _ _ cb) t = let _ # t := cb t in unit1 # t
54 | record BQueue a where
60 | bqueue : Lift1 World f => Nat -> f (BQueue a)
61 | bqueue cap = BQ <$> newref (S cap empty empty empty)
66 | bqueueOf : Lift1 World f => (0 a : Type) -> Nat -> f (BQueue a)
70 | untake : IORef (ST a) -> Taker a -> IO1 ()
71 | untake r t = casmod1 r $
{takers $= filter (/= t)}
74 | unoffer : IORef (ST a) -> Offerer a -> IO1 ()
75 | unoffer r o = casmod1 r $
{offerers $= filter (/= o)}
77 | deq : IORef (ST a) -> Taker a -> IO1 (IO1 ())
78 | deq r tk t = let act # t := casupdate1 r adj t in act t
80 | adj : ST a -> (ST a, IO1 (IO1 ()))
81 | adj (S cap q ts os) =
83 | Just (n,q2) => case dequeue os of
84 | Nothing => (S (S cap) q2 ts os, takeV tk n)
85 | Just (o,os2) => (S cap (enqueue q2 o.value) ts os2, takeO o tk n)
86 | Nothing => case dequeue os of
87 | Nothing => (S cap q (enqueue ts tk) os, pure $
untake r tk)
88 | Just (o,os2) => (S cap q ts os2, takeO o tk o.value)
90 | enq : IORef (ST a) -> Offerer a -> IO1 (IO1 ())
91 | enq r o t = let act # t := casupdate1 r adj t in act t
93 | adj : ST a -> (ST a, IO1 (IO1 ()))
94 | adj (S cap q ts os) =
96 | Just (t,ts2) => (S cap q ts2 os, takeO o t o.value)
97 | Nothing => case cap of
98 | 0 => (S 0 q ts (enqueue os o), pure $
unoffer r o)
99 | S k => (S k (enqueue q o.value) ts os, offer o)
104 | enqueue : BQueue a -> a -> Async e es ()
105 | enqueue (BQ ref) v =
106 | primAsync $
\cb,t =>
107 | let tok # t := token1 t
108 | in enq ref (O tok v $
cb (Right ())) t
113 | dequeue : BQueue a -> Async e es a
115 | primAsync $
\cb,t =>
116 | let tok # t := token1 t
117 | in deq ref (T tok $
cb . Right) t