9 | module IO.Async.Loop.Posix
11 | import public Data.Nat
12 | import public IO.Async
13 | import public IO.Async.Loop
14 | import public IO.Async.Loop.PollH
15 | import public IO.Async.Loop.SignalH
16 | import public IO.Async.Loop.TimerH
18 | import Control.Monad.Elin
20 | import Data.Array.Core as AC
21 | import Data.Array.Mutable
22 | import Data.Linear.Traverse1
26 | import IO.Async.Internal.Ref
27 | import IO.Async.Loop.Poller
28 | import IO.Async.Loop.Queue
29 | import IO.Async.Loop.SignalST
30 | import IO.Async.Loop.TimerST
31 | import IO.Async.Signal
34 | import System.Concurrency
35 | import System.Posix.File.Prim
36 | import System.Posix.Poll.Prim
37 | import System.Posix.Limits
65 | queue : IORef (Queue Task)
68 | workers : IArray size Poll
76 | stealers : IORef Nat
82 | signals : Sighandler
90 | Task = FbrState Poll
97 | -> (workers : IArray n Poll)
98 | -> (stealers : IORef Nat)
100 | workST me poll workers stealers =
102 | let alive # t := ref1 True t
103 | tim # t := TimerST.timer t
104 | sigh # t := sighandler t
105 | que # t := ref1 (queueOf Task) t
106 | lock # t := ioToF1 makeMutex t
107 | cond # t := ioToF1 makeCondition t
108 | in W n me alive que workers poll stealers tim sigh lock cond # t
110 | release : Poll -> IO1 ()
111 | release p t = () # t
115 | debug : Lazy String -> IO1 ()
119 | submit : Poll -> Task -> IO1 ()
121 | let _ # t := debug "submitting a fiber to \{show st.me}" t
122 | True # t := enq st.queue p t | False # t => () # t
123 | _ # t := debug "waking up \{show st.me}" t
124 | _ # t := ioToF1 (mutexAcquire st.lock) t
125 | _ # t := ioToF1 (conditionSignal st.cond) t
126 | in ioToF1 (mutexRelease st.lock) t
128 | export %inline covering
129 | EventLoop Poll where
137 | nextFin : {n : _} -> Fin n -> Fin n
139 | nextFin (FS x) = weaken x
141 | sleepDuration : Integer -> Int
142 | sleepDuration 0 = 2_000
143 | sleepDuration n = (min (cast $
n `div` 1000) 2_000)
145 | parameters (s : Poll)
148 | stealTasks : Fin s.size -> Nat -> IO1 (Maybe Task)
150 | let _ # t := debug "\{show s.me} could not steal anything" t
152 | stealTasks x (S k) t =
153 | case steal (queue $
at s.workers x) t of
154 | Nothing # t => stealTasks (nextFin x) k t
156 | let _ # t := debug "\{show s.me} stole a fiber from \{show x} " t
166 | next : IO1 (Maybe Task)
168 | case deq s.queue t of
169 | Nothing # t => case dec s.stealers t of
171 | let _ # t := debug "\{show s.me} enough stealers are active" t
174 | let _ # t := debug "\{show s.me} start stealing" t
175 | tsk # t := stealTasks (nextFin s.me) (pred s.size) t
176 | _ # t := Queue.inc s.stealers t
179 | let _ # t := debug "\{show s.me} still got work to do" t
182 | runTask : FbrState Poll -> IO1 ()
184 | case runFbr s fst t of
185 | Cont fst2 # t => let _ # t := enq s.queue fst2 t in () # t
193 | loop : Nat -> IO1 ()
196 | case read1 s.alive t of
198 | False # t => () # t
201 | True # t => case cpoll of
206 | let _ # t := checkSignals s.signals t
207 | in loop POLL_ITER t
213 | let r # t := runDueTimers s.timer t
215 | Just tsk # t => let _ # t := runTask tsk t in loop k t
217 | let _ # t := checkSignals s.signals t
218 | _ # t := ioToF1 (mutexAcquire s.lock) t
219 | in case deqAndSleep s.queue t of
221 | let _ # t := ioToF1 (mutexRelease s.lock) t
222 | _ # t := runTask tsk t
223 | in loop POLL_ITER t
225 | let d := sleepDuration r
226 | _ # t := debug "\{show s.me} sleeping for \{show d} us" t
227 | _ # t := ioToF1 (conditionWaitTimeout s.cond s.lock d) t
228 | _ # t := ioToF1 (mutexRelease s.lock) t
229 | in loop POLL_ITER t
237 | threadId s = cast s.me
238 | primPoll s = s.poller.pollFile
242 | primWait s dur f = schedule s.timer dur f
246 | primOnSignals s sigs f = await s.signals sigs (f . Right)
258 | record ThreadPool where
261 | ids : Vect size ThreadID
263 | workers : Vect (S size) Poll
265 | stop : ThreadPool -> IO ()
266 | stop tp = runIO $
traverse1_ (\w => write1 w.alive False) tp.workers
273 | -> (stealers : IORef Nat)
275 | -> {auto 0 lte : LTE k n}
276 | -> IO (Vect n Poll)
277 | workSTs poll mps ips stealers 0 = pure (toVect ips)
278 | workSTs poll mps ips stealers (S k) = do
279 | w <- workST (natToFinLT k) poll ips stealers
280 | runIO $
setNat mps k w
281 | workSTs poll mps ips stealers k
284 | pollLoop : (alive : Ref World Bool) -> Poller -> IO1 ()
286 | let True # t := read1 ref t | _ # t => p.release t
287 | _ # t := p.pollWait 10.ms t
288 | in pollLoop ref p t
293 | (n : Subset Nat IsSucc)
294 | -> (mkPoll : IO1 Poller)
296 | mkThreadPool (Element (S k) _) mkPoll = do
297 | ps <- unsafeMArray {a = Poll} (S k)
298 | is <- runIO (unsafeFreeze ps)
301 | ws <- workSTs pl ps is ss (S k)
302 | ts <- traverse (\x => fork (runIO $
loop x POLL_ITER)) (tail ws)
303 | pi <- fork (runIO $
pollLoop (head ws).alive pl)
304 | pure $
TP k ts pi ws
306 | toIO : Elin World [Errno] () -> IO ()
307 | toIO = ignore . runElinIO
318 | (n : Subset Nat IsSucc)
319 | -> (sigs : List Signal)
320 | -> (mkPoll : IO1 Poller)
321 | -> (prog : Async Poll [] ())
323 | app n sigs mkPoll prog = do
324 | toIO $
sigprocmask SIG_BLOCK sigs
325 | runIO (dieOnErr $
addFlags Stdin O_NONBLOCK)
326 | tp <- mkThreadPool n mkPoll
327 | runAsyncWith (head tp.workers) prog (\_ => stop tp)
328 | runIO (loop (head tp.workers) POLL_ITER)
329 | traverse_ (\x => threadWait x) tp.ids
330 | traverse_ (\w => runIO (release w)) tp.workers
331 | threadWait tp.pollid
337 | asyncThreads : IO (Subset Nat IsSucc)
339 | s <- getEnv "IDRIS2_ASYNC_THREADS"
340 | pure $
case cast {to = Nat} <$> s of
341 | Just (S k) => Element (S k) %search
342 | _ => Element 2 %search
360 | : {default [SIGINT] sigs : List Signal}
362 | => Async Poll [] ()
364 | simpleApp {sigs} prog = do
366 | app n sigs posixPoller cprog
369 | cprog : Async Poll [] ()
373 | , dropErrs {es = [Errno]} $
onSignal SIGINT (pure ())