0 | module Network.HTTP.Pool.ConnectionPool
  1 |
  2 | import Network.HTTP.Error
  3 | import Network.HTTP.Scheduler
  4 | import Network.HTTP.Protocol
  5 | import Network.HTTP.Message
  6 | import Network.HTTP.Header
  7 | import Network.HTTP.URL
  8 | import Network.HTTP.Pool.Worker
  9 | import Network.HTTP.Pool.Common
 10 | import Network.TLS
 11 | import Network.TLS.Signature
 12 | import Network.TLS.Verify
 13 | import Network.Socket
 14 | import Data.IORef
 15 | import Data.Bits
 16 | import Data.List
 17 | import Data.List1
 18 | import System
 19 | import System.Random
 20 | import System.Future
 21 | import System.Concurrency
 22 | import Utils.Queue
 23 |
 24 | mutual
 25 |   record Pool (e : Type) where
 26 |     constructor MkPool
 27 |     workers : IORef (List (Worker e))
 28 |     ||| Increase everytime there is a new worker
 29 |     counter : IORef Bits32
 30 |     scheduled : Queue (Event e)
 31 |     last_called : IORef Integer
 32 |
 33 |   export
 34 |   record PoolManager (e : Type) where
 35 |     constructor MkPoolManager
 36 |     pools : IORef (List (Hostname, Pool e))
 37 |     max_per_site_connections : Nat
 38 |     max_total_connections : Nat
 39 |     certificate_checker : (String -> CertificateCheck IO)
 40 |
 41 |   record Worker (e : Type) where
 42 |     constructor MkWorker
 43 |     idle : IORef Bool
 44 |     ||| Unique id of the pool for identification
 45 |     uuid : Bits64
 46 |     protocol : Protocol
 47 |     ||| Kept in case we need to close externally
 48 |     socket : Socket
 49 |     parent : Pool e
 50 |
 51 | Show (Worker e) where
 52 |   show worker = "Worker: \{show worker.protocol} \{show worker.uuid}"
 53 |
 54 | export
 55 | new_pool_manager : HasIO io => Nat -> Nat -> (String -> CertificateCheck IO) -> io (PoolManager e)
 56 | new_pool_manager max_per_site_connections max_total_connections cert_check = liftIO $ do
 57 |   ref <- newIORef []
 58 |   pure (MkPoolManager ref max_per_site_connections max_total_connections cert_check)
 59 |
 60 | pool_new_worker_id : Pool e -> IO Bits64
 61 | pool_new_worker_id pool = do
 62 |   r <- the (IO Int32) randomIO
 63 |   modifyIORef pool.counter (+ 1)
 64 |   c <- readIORef pool.counter
 65 |   pure $ (shiftL (cast r) 32) .|. cast c
 66 |
 67 | find_or_create_pool : RawHttpMessage -> PoolManager e -> IO (Either (HttpError e) (Hostname, Pool e))
 68 | find_or_create_pool message manager = do
 69 |   pools <- readIORef manager.pools
 70 |   case lookup_header message.headers Host of
 71 |     Just host =>
 72 |       case lookup host pools of
 73 |         Just pool =>
 74 |           pure (Right (host, pool))
 75 |         Nothing => do
 76 |           pool <- pure $ MkPool !(newIORef []) !(newIORef 0) !(mk_queue) !(time >>= newIORef)
 77 |           modifyIORef manager.pools ((host, pool) ::)
 78 |           pure (Right (host, pool))
 79 |     Nothing => pure (Left UnknownHost)
 80 |
 81 | single_pool_active_connections : Protocol -> Pool e -> IO Nat
 82 | single_pool_active_connections protocol pool = length . filter (\x => protocol == x.protocol) <$> readIORef pool.workers
 83 |
 84 | export
 85 | total_active_connections : Protocol -> PoolManager e -> IO Nat
 86 | total_active_connections protocol manager = do
 87 |   pools <- readIORef manager.pools
 88 |   let pools = map snd pools
 89 |   foldlM (\a,b => (a+) <$> single_pool_active_connections protocol b) 0 pools
 90 |
 91 | close_worker : Worker e -> IO ()
 92 | close_worker worker = do
 93 |   -- putStrLn "closing \{show worker}"
 94 |   close worker.socket
 95 |   let f = \w => w.uuid /= worker.uuid
 96 |   modifyIORef worker.parent.workers (filter f)
 97 |
 98 | close_pool : {e : _} -> Condition -> Pool e -> IO ()
 99 | close_pool cond pool = do
100 |   let queue = pool.scheduled
101 |   remaining <- mapMaybe (\case Request x => Just x_ => Nothing) <$> recv_all queue
102 |   -- feed all awaiting requests with errors
103 |   traverse_ (flip channelPut (Left ConnectionClosed) . response) remaining
104 |
105 |   -- close all sockets
106 |   workers <- readIORef pool.workers
107 |   traverse_ close_worker workers
108 |
109 |   -- broadcast kill event
110 |   broadcast queue (Kill $ Just cond)
111 |
112 | wait_for_worker_close : {e : _} -> Mutex -> Condition -> List (Pool e) -> IO ()
113 | wait_for_worker_close mutex cond pools = do
114 |   conditionWaitTimeout cond mutex 1000000
115 |   workers <- traverse (\p => readIORef p.workers) pools
116 |   -- putStrLn "waiting for pool close: \{show workers}"
117 |   if null (join workers) then pure () else wait_for_worker_close mutex cond pools
118 |
119 | has_idle_worker : Pool e -> IO Bool
120 | has_idle_worker pool = readIORef pool.workers >>= loop where
121 |   loop : List (Worker e) -> IO Bool
122 |   loop [] = pure False
123 |   loop (x :: xs) = readIORef x.idle >>= (\b => if b then pure True else loop xs)
124 |
125 | pools_last_called : PoolManager e -> IO (List (Integer, Pool e))
126 | pools_last_called manager = do
127 |   pools <- readIORef manager.pools
128 |   let pools = map snd pools
129 |   flip traverse pools $ \pool => do
130 |     t <- readIORef pool.last_called
131 |     pure (t, pool)
132 |
133 | spawn_worker : {e : _} -> Queue (Event e) -> (HttpError e -> IO ()) -> (String -> CertificateCheck IO) -> Protocol -> Hostname -> Pool e -> IO ()
134 | spawn_worker fetcher throw cert_check protocol hostname pool = do
135 |   worker_id <- pool_new_worker_id pool
136 |   Right sock <- socket AF_INET Stream 0
137 |   | Left err => throw $ SocketError "error when creating socket: \{show err}"
138 |   idle_ref <- newIORef True
139 |   let worker = MkWorker idle_ref worker_id protocol sock pool
140 |   modifyIORef pool.workers (worker ::)
141 |   let hostname_str = hostname_string hostname
142 |   let port = case hostname.port of Just p => pNothing => protocol_port_number protocol
143 |   0 <- connect sock (Hostname hostname.domain) (cast port)
144 |   | err => throw $ SocketError "unable to connect to \{hostname_str}: \{show err}"
145 |   let closer = close_worker worker
146 |   worker_handle sock idle_ref closer fetcher throw cert_check protocol hostname_str
147 |
148 | min_by : (ty -> ty -> Ordering) -> List1 ty -> ty
149 | min_by compare (x ::: xs) = loop x xs where
150 |   loop : ty -> List ty -> ty
151 |   loop x [] = x
152 |   loop x (y :: ys) =
153 |     case compare x y of
154 |       LT => loop x ys
155 |       _  => loop y ys
156 |
157 | export
158 | {e : _} -> Scheduler e IO (PoolManager e) where
159 |   schedule_request manager protocol request = do
160 |     let throw = \err => channelPut request.response (Left err)
161 |     Right (host, pool) <- find_or_create_pool request.raw_http_message manager
162 |     | Left err => throw err
163 |
164 |     -- update last called
165 |     time >>= writeIORef pool.last_called
166 |
167 |     signal pool.scheduled $ Request request
168 |
169 |     {-
170 |     1. if total connection is maxed and no local connection
171 |       send kill to the last called pool
172 |       spawn new thread
173 |     2. else if local connection is not maxed and no idle
174 |       spawn new thread
175 |     -}
176 |
177 |     local <- single_pool_active_connections protocol pool
178 |     all <- total_active_connections protocol manager
179 |     has_idle <- has_idle_worker pool
180 |
181 |     let first_condition = (all >= manager.max_per_site_connections) && (local == Z)
182 |     let second_condition = not first_condition && ((local < manager.max_per_site_connections) && not has_idle)
183 |
184 |     when {f=IO} first_condition $ do
185 |       pools_to_kill <- pools_last_called manager
186 |       let Just pools_to_kill = fromList pools_to_kill
187 |       | Nothing => pure () -- why is pool empty
188 |
189 |       let (_, pool_to_kill) = min_by (\a,b => compare (fst a) (fst b)) pools_to_kill
190 |       signal pool_to_kill.scheduled (Kill Nothing)
191 |       _ <- forkIO $ spawn_worker pool.scheduled throw manager.certificate_checker protocol host pool
192 |       pure ()
193 |
194 |     when {f=IO} second_condition $ do
195 |       _ <- forkIO $ spawn_worker pool.scheduled throw manager.certificate_checker protocol host pool
196 |       pure ()
197 |
198 |   evict_all manager = do
199 |     pools <- readIORef manager.pools
200 |     condition <- makeCondition
201 |     mutex <- makeMutex
202 |     writeIORef manager.pools []
203 |     let pools = map snd pools
204 |     mutexAcquire mutex
205 |     traverse_ (close_pool condition) pools
206 |     wait_for_worker_close mutex condition pools
207 |     mutexRelease mutex
208 |