2 | import Data.Seq.Unsized
5 | import System.Concurrency
7 | data QueueEvent a = Msg a | Chan (Channel a)
10 | data Queue a = Q Mutex (IORef (Seq (QueueEvent a)))
13 | mk_queue : HasIO io => io (Queue a)
14 | mk_queue = liftIO $
pure $
Q !makeMutex !(newIORef empty)
18 | recv : HasIO io => Queue a -> io a
19 | recv (Q mutex ref) = liftIO $
do
21 | queue <- readIORef ref
23 | Just (Msg message, rest) => do
29 | writeIORef ref (snoc queue (Chan chan))
35 | recv' : HasIO io => Queue a -> io (Maybe a)
36 | recv' (Q mutex ref) = liftIO $
do
38 | queue <- readIORef ref
40 | Just (Msg message, rest) => do
50 | recv_all : HasIO io => Queue a -> io (List a)
51 | recv_all (Q mutex ref) = liftIO $
do
53 | queue <- readIORef ref
54 | let (msgs, others) = spanBy (\case Msg msg => Just msg;
_ => Nothing) (toList queue)
55 | writeIORef ref (fromList others)
61 | signal : HasIO io => Queue a -> a -> io ()
62 | signal (Q mutex ref) msg = liftIO $
do
64 | queue <- readIORef ref
66 | Just (Chan chan, rest) => do
71 | writeIORef ref (snoc queue (Msg msg))
76 | broadcast : HasIO io => Queue a -> a -> io ()
77 | broadcast (Q mutex ref) msg = liftIO $
do
79 | queue <- readIORef ref
80 | writeIORef ref empty
82 | let channels = mapMaybe (\case Chan chan => Just chan;
_ => Nothing) (toList queue)
83 | traverse_ (flip channelPut msg) channels