0 | module Network.HTTP.Scheduler
 1 |
 2 | import Network.HTTP.Message
 3 | import Network.HTTP.Error
 4 | import Network.HTTP.Protocol
 5 | import Network.HTTP.Header
 6 | import Utils.Streaming
 7 | import System.Concurrency
 8 | import Data.List1
 9 | import Data.Compress.Interface
10 | import Data.Compress.GZip
11 | import Data.Compress.ZLib
12 |
13 | public export
14 | record ScheduleResponse (e : Type) (m : Type -> Type) where
15 |   constructor MkScheduleResponse
16 |   raw_http_response : HttpResponse
17 |   content : Channel (Either (HttpError e) (Maybe (List Bits8)))
18 |
19 | public export
20 | record ScheduleRequest (e : Type) (m : Type -> Type) where
21 |   constructor MkScheduleRequest
22 |   raw_http_message : RawHttpMessage
23 |   content : Stream (Of Bits8) m (Either e ())
24 |   response : Channel (Either (HttpError e) (ScheduleResponse e m))
25 |
26 | public export
27 | interface Scheduler (e : Type) (m : Type -> Type) (0 a : Type) where
28 |   ||| Schedule a HTTP request
29 |   schedule_request : a -> Protocol -> ScheduleRequest e m -> m ()
30 |   ||| Evict all HTTP connections, returning scheduler to a clean state (and closing all resources)
31 |   evict_all : a -> m ()
32 |
33 | channel_to_stream : (HasIO m, Decompressor c) => c -> Channel (Either (HttpError e) (Maybe (List Bits8))) ->
34 |                     Stream (Of Bits8) m (Either (HttpError e) ())
35 | channel_to_stream decomp channel = do
36 |   Right (Just content) <- liftIO $ channelGet channel
37 |   | Right Nothing =>
38 |       case done decomp of
39 |         Right [] => pure (Right ())
40 |         Right xs => pure (Left $ DecompressionError "\{show $ length xs} leftover bytes in decompression buffer")
41 |         Left err => pure (Left $ DecompressionError err)
42 |   | Left err => pure (Left err)
43 |   let Right (content, decomp) = feed decomp content
44 |   | Left err => pure (Left $ DecompressionError err)
45 |   fromList_ content *> channel_to_stream decomp channel
46 |
47 | decompressor : List ContentEncodingScheme -> DPair Type Decompressor
48 | decompressor [ GZip ] = MkDPair GZipState %search
49 | decompressor [ Deflate ] = MkDPair ZLibState %search
50 | decompressor _ = MkDPair IdentityState %search
51 |
52 | to_list : Maybe (List1 a) -> List a
53 | to_list Nothing = []
54 | to_list (Just (x ::: xs)) = x :: xs
55 |
56 | public export
57 | start_request : {m, e : _} -> (HasIO m, Scheduler e m scheduler) =>
58 |           scheduler ->
59 |           Protocol ->
60 |           RawHttpMessage ->
61 |           Stream (Of Bits8) m (Either e ()) ->
62 |           m (Either (HttpError e) (HttpResponse, Stream (Of Bits8) m (Either (HttpError e) ())))
63 | start_request scheduler protocol msg content = do
64 |   mvar <- makeChannel
65 |   schedule_request scheduler protocol $ MkScheduleRequest msg content mvar
66 |   Right response <- channelGet mvar
67 |   | Left err => pure $ Left err
68 |   let (encoding ** wit= decompressor $ to_list $ lookup_header response.raw_http_response.headers ContentEncoding
69 |   pure $ Right (response.raw_http_response, channel_to_stream (init @{wit}) response.content)
70 |