0 | module Utils.Queue
 1 |
 2 | import Data.Seq.Unsized
 3 | import Data.IORef
 4 | import Data.List
 5 | import System.Concurrency
 6 |
 7 | data QueueEvent a = Msg a | Chan (Channel a)
 8 |
 9 | export
10 | data Queue a = Q Mutex (IORef (Seq (QueueEvent a)))
11 |
12 | export
13 | mk_queue : HasIO io => io (Queue a)
14 | mk_queue = liftIO $ pure $ Q !makeMutex !(newIORef empty)
15 |
16 | ||| receive a message, if empty, block until there is one
17 | export
18 | recv : HasIO io => Queue a -> io a
19 | recv (Q mutex ref) = liftIO $ do
20 |   mutexAcquire mutex
21 |   queue <- readIORef ref
22 |   case viewl queue of
23 |     Just (Msg message, rest) => do
24 |       writeIORef ref rest
25 |       mutexRelease mutex
26 |       pure message
27 |     _ => do
28 |       chan <- makeChannel
29 |       writeIORef ref (snoc queue (Chan chan))
30 |       mutexRelease mutex
31 |       channelGet chan
32 |
33 | ||| receive a message, if empty, returns nothing
34 | export
35 | recv' : HasIO io => Queue a -> io (Maybe a)
36 | recv' (Q mutex ref) = liftIO $ do
37 |   mutexAcquire mutex
38 |   queue <- readIORef ref
39 |   case viewl queue of
40 |     Just (Msg message, rest) => do
41 |       writeIORef ref rest
42 |       mutexRelease mutex
43 |       pure $ Just message
44 |     _ => do
45 |       mutexRelease mutex
46 |       pure Nothing
47 |
48 | ||| receive all the messages waiting to be processed
49 | export
50 | recv_all : HasIO io => Queue a -> io (List a)
51 | recv_all (Q mutex ref) = liftIO $ do
52 |   mutexAcquire mutex
53 |   queue <- readIORef ref
54 |   let (msgs, others) = spanBy (\case Msg msg => Just msg_ => Nothing) (toList queue)
55 |   writeIORef ref (fromList others)
56 |   mutexRelease mutex
57 |   pure msgs
58 |
59 | ||| send a message to one of the receiver
60 | export
61 | signal : HasIO io => Queue a -> a -> io ()
62 | signal (Q mutex ref) msg = liftIO $ do
63 |   mutexAcquire mutex
64 |   queue <- readIORef ref
65 |   case viewl queue of
66 |     Just (Chan chan, rest) => do
67 |       writeIORef ref rest
68 |       mutexRelease mutex
69 |       channelPut chan msg
70 |     _ => do
71 |       writeIORef ref (snoc queue (Msg msg))
72 |       mutexRelease mutex
73 |
74 | ||| send a message to all the receivers
75 | export
76 | broadcast : HasIO io => Queue a -> a -> io ()
77 | broadcast (Q mutex ref) msg = liftIO $ do
78 |   mutexAcquire mutex
79 |   queue <- readIORef ref
80 |   writeIORef ref empty
81 |   mutexRelease mutex
82 |   let channels = mapMaybe (\case Chan chan => Just chan_ => Nothing) (toList queue)
83 |   traverse_ (flip channelPut msg) channels
84 |