0 | module Network.HTTP.Pool.ConnectionPool
2 | import Network.HTTP.Error
3 | import Network.HTTP.Scheduler
4 | import Network.HTTP.Protocol
5 | import Network.HTTP.Message
6 | import Network.HTTP.Header
7 | import Network.HTTP.URL
8 | import Network.HTTP.Pool.Worker
9 | import Network.HTTP.Pool.Common
11 | import Network.TLS.Signature
12 | import Network.TLS.Verify
13 | import Network.Socket
19 | import System.Random
20 | import System.Future
21 | import System.Concurrency
25 | record Pool (e : Type) where
27 | workers : IORef (List (Worker e))
29 | counter : IORef Bits32
30 | scheduled : Queue (Event e)
31 | last_called : IORef Integer
34 | record PoolManager (e : Type) where
35 | constructor MkPoolManager
36 | pools : IORef (List (Hostname, Pool e))
37 | max_per_site_connections : Nat
38 | max_total_connections : Nat
39 | certificate_checker : (String -> CertificateCheck IO)
41 | record Worker (e : Type) where
42 | constructor MkWorker
51 | Show (Worker e) where
52 | show worker = "Worker: \{show worker.protocol} \{show worker.uuid}"
55 | new_pool_manager : HasIO io => Nat -> Nat -> (String -> CertificateCheck IO) -> io (PoolManager e)
56 | new_pool_manager max_per_site_connections max_total_connections cert_check = liftIO $
do
58 | pure (MkPoolManager ref max_per_site_connections max_total_connections cert_check)
60 | pool_new_worker_id : Pool e -> IO Bits64
61 | pool_new_worker_id pool = do
62 | r <- the (IO Int32) randomIO
63 | modifyIORef pool.counter (+ 1)
64 | c <- readIORef pool.counter
65 | pure $
(shiftL (cast r) 32) .|. cast c
67 | find_or_create_pool : RawHttpMessage -> PoolManager e -> IO (Either (HttpError e) (Hostname, Pool e))
68 | find_or_create_pool message manager = do
69 | pools <- readIORef manager.pools
70 | case lookup_header message.headers Host of
72 | case lookup host pools of
74 | pure (Right (host, pool))
76 | pool <- pure $
MkPool !(newIORef []) !(newIORef 0) !(mk_queue) !(time >>= newIORef)
77 | modifyIORef manager.pools ((host, pool) ::)
78 | pure (Right (host, pool))
79 | Nothing => pure (Left UnknownHost)
81 | single_pool_active_connections : Protocol -> Pool e -> IO Nat
82 | single_pool_active_connections protocol pool = length . filter (\x => protocol == x.protocol) <$> readIORef pool.workers
85 | total_active_connections : Protocol -> PoolManager e -> IO Nat
86 | total_active_connections protocol manager = do
87 | pools <- readIORef manager.pools
88 | let pools = map snd pools
89 | foldlM (\a,b => (a+) <$> single_pool_active_connections protocol b) 0 pools
91 | close_worker : Worker e -> IO ()
92 | close_worker worker = do
95 | let f = \w => w.uuid /= worker.uuid
96 | modifyIORef worker.parent.workers (filter f)
98 | close_pool : {e : _} -> Condition -> Pool e -> IO ()
99 | close_pool cond pool = do
100 | let queue = pool.scheduled
101 | remaining <- mapMaybe (\case Request x => Just x;
_ => Nothing) <$> recv_all queue
103 | traverse_ (flip channelPut (Left ConnectionClosed) . response) remaining
106 | workers <- readIORef pool.workers
107 | traverse_ close_worker workers
110 | broadcast queue (Kill $
Just cond)
112 | wait_for_worker_close : {e : _} -> Mutex -> Condition -> List (Pool e) -> IO ()
113 | wait_for_worker_close mutex cond pools = do
114 | conditionWaitTimeout cond mutex 1000000
115 | workers <- traverse (\p => readIORef p.workers) pools
117 | if null (join workers) then pure () else wait_for_worker_close mutex cond pools
119 | has_idle_worker : Pool e -> IO Bool
120 | has_idle_worker pool = readIORef pool.workers >>= loop where
121 | loop : List (Worker e) -> IO Bool
122 | loop [] = pure False
123 | loop (x :: xs) = readIORef x.idle >>= (\b => if b then pure True else loop xs)
125 | pools_last_called : PoolManager e -> IO (List (Integer, Pool e))
126 | pools_last_called manager = do
127 | pools <- readIORef manager.pools
128 | let pools = map snd pools
129 | flip traverse pools $
\pool => do
130 | t <- readIORef pool.last_called
133 | spawn_worker : {e : _} -> Queue (Event e) -> (HttpError e -> IO ()) -> (String -> CertificateCheck IO) -> Protocol -> Hostname -> Pool e -> IO ()
134 | spawn_worker fetcher throw cert_check protocol hostname pool = do
135 | worker_id <- pool_new_worker_id pool
136 | Right sock <- socket AF_INET Stream 0
137 | | Left err => throw $
SocketError "error when creating socket: \{show err}"
138 | idle_ref <- newIORef True
139 | let worker = MkWorker idle_ref worker_id protocol sock pool
140 | modifyIORef pool.workers (worker ::)
141 | let hostname_str = hostname_string hostname
142 | let port = case hostname.port of Just p => p;
Nothing => protocol_port_number protocol
143 | 0 <- connect sock (Hostname hostname.domain) (cast port)
144 | | err => throw $
SocketError "unable to connect to \{hostname_str}: \{show err}"
145 | let closer = close_worker worker
146 | worker_handle sock idle_ref closer fetcher throw cert_check protocol hostname_str
148 | min_by : (ty -> ty -> Ordering) -> List1 ty -> ty
149 | min_by compare (x ::: xs) = loop x xs where
150 | loop : ty -> List ty -> ty
153 | case compare x y of
158 | {e : _} -> Scheduler e IO (PoolManager e) where
159 | schedule_request manager protocol request = do
160 | let throw = \err => channelPut request.response (Left err)
161 | Right (host, pool) <- find_or_create_pool request.raw_http_message manager
162 | | Left err => throw err
165 | time >>= writeIORef pool.last_called
167 | signal pool.scheduled $
Request request
177 | local <- single_pool_active_connections protocol pool
178 | all <- total_active_connections protocol manager
179 | has_idle <- has_idle_worker pool
181 | let first_condition = (all >= manager.max_per_site_connections) && (local == Z)
182 | let second_condition = not first_condition && ((local < manager.max_per_site_connections) && not has_idle)
184 | when {f=IO} first_condition $
do
185 | pools_to_kill <- pools_last_called manager
186 | let Just pools_to_kill = fromList pools_to_kill
187 | | Nothing => pure ()
189 | let (_, pool_to_kill) = min_by (\a,b => compare (fst a) (fst b)) pools_to_kill
190 | signal pool_to_kill.scheduled (Kill Nothing)
191 | _ <- forkIO $
spawn_worker pool.scheduled throw manager.certificate_checker protocol host pool
194 | when {f=IO} second_condition $
do
195 | _ <- forkIO $
spawn_worker pool.scheduled throw manager.certificate_checker protocol host pool
198 | evict_all manager = do
199 | pools <- readIORef manager.pools
200 | condition <- makeCondition
202 | writeIORef manager.pools []
203 | let pools = map snd pools
205 | traverse_ (close_pool condition) pools
206 | wait_for_worker_close mutex condition pools