0 | ||| A pool of a fixed number of worker threads, each operating on its own
  1 | ||| work queue.
  2 | |||
  3 | ||| Idle threads will try and take work packages from other threads
  4 | ||| (work-stealing).
  5 | |||
  6 | ||| Each thread is responsible for scheduling and running an arbitrary number
  7 | ||| of `Fiber`s, as well as taking care of registered timers, signal handlers,
  8 | ||| and file polling.
  9 | module IO.Async.Loop.Posix
 10 |
 11 | import public Data.Nat
 12 | import public IO.Async
 13 | import public IO.Async.Loop
 14 | import public IO.Async.Loop.PollH
 15 | import public IO.Async.Loop.SignalH
 16 | import public IO.Async.Loop.TimerH
 17 |
 18 | import Control.Monad.Elin
 19 |
 20 | import Data.Array.Core as AC
 21 | import Data.Array.Mutable
 22 | import Data.Linear.Traverse1
 23 | import Data.List
 24 | import Data.Vect
 25 |
 26 | import IO.Async.Internal.Ref
 27 | import IO.Async.Loop.Poller
 28 | import IO.Async.Loop.Queue
 29 | import IO.Async.Loop.SignalST
 30 | import IO.Async.Loop.TimerST
 31 | import IO.Async.Signal
 32 |
 33 | import System
 34 | import System.Concurrency
 35 | import System.Posix.File.Prim
 36 | import System.Posix.Poll.Prim
 37 | import System.Posix.Limits
 38 |
 39 | %default total
 40 |
 41 | --------------------------------------------------------------------------------
 42 | -- Poll
 43 | --------------------------------------------------------------------------------
 44 |
 45 | public export
 46 | 0 Task : Type
 47 |
 48 | POLL_ITER : Nat
 49 | POLL_ITER = 16
 50 |
 51 | -- State of a physical worker thread in a thread-pool.
 52 | export
 53 | record Poll where
 54 |   constructor W
 55 |   ||| Number of worker threads in the pool
 56 |   size     : Nat
 57 |
 58 |   ||| Index of the worker thread corresponding to this state
 59 |   me       : Fin size
 60 |
 61 |   ||| Reference indicating whether the pool is still alive
 62 |   alive    : IORef Bool
 63 |
 64 |   ||| Work queue of this worker
 65 |   queue    : IORef (Queue Task)
 66 |
 67 |   ||| Work queues of all worker threads
 68 |   workers  : IArray size Poll
 69 |
 70 |   ||| The state used for polling file descriptors
 71 |   poller   : Poller
 72 |
 73 |   ||| Remaining number of stealers. To reduce contention,
 74 |   ||| not all idle workers will be allowed to steal work
 75 |   ||| at the same time
 76 |   stealers : IORef Nat
 77 |
 78 |   ||| State for schedule actions
 79 |   timer : Timer
 80 |
 81 |   ||| State for the signal handler
 82 |   signals : Sighandler
 83 |
 84 |   ||| Mutex used for sleeping
 85 |   lock    : Mutex
 86 |
 87 |   ||| Condition used for sleeping
 88 |   cond    : Condition
 89 |
 90 | Task = FbrState Poll
 91 |
 92 | -- initialize the state of a worker thread.
 93 | workST :
 94 |      {n : Nat}
 95 |   -> Fin n
 96 |   -> (poll     : Poller)
 97 |   -> (workers  : IArray n Poll)
 98 |   -> (stealers : IORef Nat)
 99 |   -> IO Poll
100 | workST me poll workers stealers =
101 |   runIO $ \t =>
102 |     let alive # t := ref1 True t
103 |         tim   # t := TimerST.timer t
104 |         sigh  # t := sighandler t
105 |         que   # t := ref1 (queueOf Task) t
106 |         lock  # t := ioToF1 makeMutex t
107 |         cond  # t := ioToF1 makeCondition t
108 |      in W n me alive que workers poll stealers tim sigh lock cond # t
109 |
110 | release : Poll -> IO1 ()
111 | release p t = () # t
112 |   -- let _ # t := ffi (destroyCond p.cond) t
113 |   --  in ffi (destroyMutex p.lock) t
114 |
115 | debug : Lazy String -> IO1 ()
116 | debug s t = () # t
117 | -- debug s = ioToF1 (putStrLn s)
118 |
119 | submit : Poll -> Task -> IO1 ()
120 | submit st p t =
121 |   let _    # t := debug "submitting a fiber to \{show st.me}" t
122 |       True # t := enq st.queue p t | False # t => () # t
123 |       _    # t := debug "waking up \{show st.me}" t
124 |       _    # t := ioToF1 (mutexAcquire st.lock) t
125 |       _    # t := ioToF1 (conditionSignal st.cond) t
126 |    in ioToF1 (mutexRelease st.lock) t
127 |
128 | export %inline covering
129 | EventLoop Poll where
130 |   spawn = submit
131 |   limit = 1024
132 |
133 | --------------------------------------------------------------------------------
134 | -- Work Loop
135 | --------------------------------------------------------------------------------
136 |
137 | nextFin : {n : _} -> Fin n -> Fin n
138 | nextFin FZ     = last
139 | nextFin (FS x) = weaken x
140 |
141 | sleepDuration : Integer -> Int -- Clock Duration
142 | sleepDuration 0 = 2_000
143 | sleepDuration n = (min (cast $ n `div` 1000) 2_000) -- at most two milli seconds
144 |
145 | parameters (s : Poll)
146 |
147 |   -- tries to steal a task from another worker
148 |   stealTasks : Fin s.size -> Nat -> IO1 (Maybe Task)
149 |   stealTasks x 0     t =
150 |    let _ # t := debug "\{show s.me} could not steal anything" t
151 |     in Nothing # t
152 |   stealTasks x (S k) t =
153 |     case steal (queue $ at s.workers x) t of
154 |       Nothing # t => stealTasks (nextFin x) k t
155 |       Just h  # t =>
156 |        let _ # t := debug "\{show s.me} stole a fiber from \{show x} " t
157 |         in Just h # t
158 |
159 |   -- Looks for the next task to run. If possible, this will be the
160 |   -- last ceded task of this work loop, unless our queue is non-empty,
161 |   -- in which case the ceded task has to be appended to the queue.
162 |   --
163 |   -- If there is no task, we go stealing from other work loops unless
164 |   -- too many stealers are already active.
165 |   %inline
166 |   next : IO1 (Maybe Task)
167 |   next t =
168 |     case deq s.queue t of
169 |       Nothing # t => case dec s.stealers t of
170 |         False # t =>
171 |          let _ # t := debug "\{show s.me} enough stealers are active" t
172 |           in Nothing # t
173 |         True  # t =>
174 |          let _ # t := debug "\{show s.me} start stealing" t 
175 |              tsk # t := stealTasks (nextFin s.me) (pred s.size) t
176 |              _   # t := Queue.inc s.stealers t
177 |           in tsk # t
178 |       tsk # t =>
179 |        let _   # t := debug "\{show s.me} still got work to do" t
180 |         in tsk # t
181 |
182 |   runTask : FbrState Poll -> IO1 ()
183 |   runTask fst t =
184 |     case runFbr s fst t of
185 |       Cont fst2 # t => let _ # t := enq s.queue fst2 t in () # t
186 |       Done      # t => () # t
187 |
188 |   -- Main worker loop. If `cpoll` is at zero, this indicates that we should
189 |   -- poll at this iteration. Otherwise we look for the next task to run.
190 |   -- If there is none, we go to sleep (that is, we `poll` with a timeout
191 |   -- of 1 ms).
192 |   covering
193 |   loop : Nat -> IO1 ()
194 |   loop cpoll t =
195 |     -- first we check if the system is still alive and running
196 |     case read1 s.alive t of
197 |       -- Evaluation has ended. Time to shut down.
198 |       False # t => () # t
199 |
200 |       -- Still alive, so let's go.
201 |       True  # t => case cpoll of
202 |
203 |         -- If it's time for polling, we check our signal handlers
204 |         -- and continue.
205 |         0   =>
206 |          let _ # t := checkSignals s.signals t
207 |           in loop POLL_ITER t
208 |
209 |         -- No time for polling. Check timers and get the next task to run -
210 |         -- either by taking the head of our own queue or by stealing from
211 |         -- another queue.
212 |         S k =>
213 |          let r # t := runDueTimers s.timer t
214 |           in case next t of
215 |                Just tsk # t => let _ # t := runTask tsk t in loop k t
216 |                Nothing  # t =>
217 |                 let _ # t := checkSignals s.signals t
218 |                     _ # t := ioToF1 (mutexAcquire s.lock) t
219 |                  in case deqAndSleep s.queue t of
220 |                       Just tsk # t =>
221 |                         let _ # t := ioToF1 (mutexRelease s.lock) t
222 |                             _ # t := runTask tsk t
223 |                          in loop POLL_ITER t 
224 |                       Nothing  # t =>
225 |                        let d     := sleepDuration r
226 |                            _ # t := debug "\{show s.me} sleeping for \{show d} us" t
227 |                            _ # t := ioToF1 (conditionWaitTimeout s.cond s.lock  d) t
228 |                            _ # t := ioToF1 (mutexRelease s.lock) t
229 |                         in loop POLL_ITER t
230 |
231 | --------------------------------------------------------------------------------
232 | -- Interfaces
233 | --------------------------------------------------------------------------------
234 |
235 | export %inline
236 | PollH Poll where
237 |   threadId s = cast s.me
238 |   primPoll s = s.poller.pollFile
239 |
240 | export %inline
241 | TimerH Poll where
242 |   primWait s dur f = schedule s.timer dur f
243 |
244 | export %inline
245 | SignalH Poll where
246 |   primOnSignals s sigs f = await s.signals sigs (f . Right)
247 |
248 | --------------------------------------------------------------------------------
249 | -- ThreadPool
250 | --------------------------------------------------------------------------------
251 |
252 | ||| A fixed-size pool of `n` physical worker threads.
253 | |||
254 | ||| Tasks are submited to the worker threads in round-robin
255 | ||| fashion: A new task is submitted to the next worker in line,
256 | ||| restarting at the beginning when reaching the last worker.
257 | export
258 | record ThreadPool where
259 |   constructor TP
260 |   size    : Nat
261 |   ids     : Vect size ThreadID
262 |   pollid  : ThreadID
263 |   workers : Vect (S size) Poll
264 |
265 | stop : ThreadPool -> IO ()
266 | stop tp = runIO $ traverse1_ (\w => write1 w.alive False) tp.workers
267 |
268 | workSTs :
269 |      {n : _}
270 |   -> (poll : Poller)
271 |   -> IOArray n Poll 
272 |   -> IArray n Poll 
273 |   -> (stealers : IORef Nat)
274 |   -> (k : Nat)
275 |   -> {auto 0 lte : LTE k n}
276 |   -> IO (Vect n Poll)
277 | workSTs poll mps ips stealers 0     = pure (toVect ips)
278 | workSTs poll mps ips stealers (S k) = do
279 |   w  <- workST (natToFinLT k) poll ips stealers
280 |   runIO $ setNat mps k w
281 |   workSTs poll mps ips stealers k
282 |
283 | covering
284 | pollLoop : (alive : Ref World Bool) -> Poller -> IO1 ()
285 | pollLoop ref p t =
286 |   let True # t := read1 ref t | _ # t => p.release t
287 |       _    # t := p.pollWait 10.ms t
288 |    in pollLoop ref p t
289 |
290 | ||| Create a new thread pool of `n` worker threads and additional
291 | covering
292 | mkThreadPool :
293 |      (n : Subset Nat IsSucc)
294 |   -> (mkPoll : IO1 Poller)
295 |   -> IO ThreadPool
296 | mkThreadPool (Element (S k) _) mkPoll = do
297 |   ps <- unsafeMArray {a = Poll} (S k)
298 |   is <- runIO (unsafeFreeze ps)
299 |   ss <- newref (S Z)
300 |   pl <- runIO mkPoll
301 |   ws <- workSTs pl ps is ss (S k)
302 |   ts <- traverse (\x => fork (runIO $ loop x POLL_ITER)) (tail ws)
303 |   pi <- fork (runIO $ pollLoop (head ws).alive pl)
304 |   pure $ TP k ts pi ws
305 |
306 | toIO : Elin World [Errno] () -> IO ()
307 | toIO = ignore . runElinIO
308 |
309 | ||| Starts an epoll-based event loop and runs the given async
310 | ||| program to completion.
311 | |||
312 | ||| `n`    : Number of threads to use
313 | ||| `sigs` : The signals to block while running the program.
314 | |||          These are typically the ones dealt with as part of `prog`
315 | ||| `prog` : The program to run
316 | export covering
317 | app :
318 |      (n      : Subset Nat IsSucc)
319 |   -> (sigs   : List Signal)
320 |   -> (mkPoll : IO1 Poller)
321 |   -> (prog   : Async Poll [] ())
322 |   -> IO ()
323 | app n sigs mkPoll prog = do
324 |   toIO $ sigprocmask SIG_BLOCK sigs
325 |   runIO (dieOnErr $ addFlags Stdin O_NONBLOCK)
326 |   tp <- mkThreadPool n mkPoll
327 |   runAsyncWith (head tp.workers) prog (\_ => stop tp)
328 |   runIO (loop (head tp.workers) POLL_ITER)
329 |   traverse_ (\x => threadWait x) tp.ids
330 |   traverse_ (\w => runIO (release w)) tp.workers
331 |   threadWait tp.pollid
332 |   usleep 100
333 |
334 | ||| Reads environment variable `IDRIS2_ASYNC_THREADS` and returns
335 | ||| the number of threads to use. Default: 2.
336 | export
337 | asyncThreads : IO (Subset Nat IsSucc)
338 | asyncThreads = do
339 |   s <- getEnv "IDRIS2_ASYNC_THREADS"
340 |   pure $ case cast {to = Nat} <$> s of
341 |     Just (S k) => Element (S k) %search
342 |     _          => Element 2 %search
343 |
344 | ||| Simplified version of `app`.
345 | |||
346 | ||| This uses the posix-compatible `poll` call for polling files. For
347 | ||| a faster poller - especially when polling hundreds or thousands of
348 | ||| file descriptors at a time - consider using `IO.Async.Loop.Epoll.epollApp`.
349 | |||
350 | ||| We use environment variable `IDRIS2_ASYNC_THREADS` to determine the
351 | ||| number of threads to use (default: 2) and cancel the running program
352 | ||| on receiving `SIGINT`.
353 | |||
354 | ||| By default, only `SIGINT` is masked, to handle other signals
355 | ||| within your program, give `{sigs = [...]}` as the first
356 | ||| argument. One of the signals must be SIGINT, which is enforced by
357 | ||| the `Has SIGINT sigs` constraint.
358 | export covering
359 | simpleApp
360 |   : {default [SIGINT] sigs : List Signal}
361 |   -> Has SIGINT sigs
362 |   => Async Poll [] ()
363 |   -> IO ()
364 | simpleApp {sigs} prog = do
365 |   n <- asyncThreads
366 |   app n sigs posixPoller cprog
367 |
368 |   where
369 |     cprog : Async Poll [] ()
370 |     cprog =
371 |       race_
372 |         [ prog
373 |         , dropErrs {es = [Errno]} $ onSignal SIGINT (pure ())
374 |         ]
375 |