0 | module System.UV.Stream
  1 |
  2 | import Control.Monad.Trans
  3 | import Data.Buffer.Indexed
  4 | import Data.ByteString
  5 |
  6 | import IO.Async.Event
  7 |
  8 | import System.UV.Loop
  9 | import System.UV.Pointer
 10 | import System.UV.Raw.Handle
 11 | import System.UV.Raw.Stream
 12 |
 13 | %default total
 14 |
 15 | export %inline
 16 | Resource AllocCB where
 17 |   release = freeAllocCB
 18 |
 19 | public export
 20 | data ReadRes : (a : Type) -> Type where
 21 |   Done : ReadRes a
 22 |   Data : (val : a) -> ReadRes a
 23 |   Err  : UVError -> ReadRes a
 24 |
 25 | export
 26 | Functor ReadRes where
 27 |   map _ Done       = Done
 28 |   map f (Data val) = Data (f val)
 29 |   map _ (Err err)  = Err err
 30 |
 31 | export
 32 | Applicative ReadRes where
 33 |   pure = Data
 34 |
 35 |   Data f  <*> Data a  = pure (f a)
 36 |   Done    <*> _       = Done
 37 |   Err err <*> _       = Err err
 38 |   _       <*> Done    = Done
 39 |   _       <*> Err err = Err err
 40 |
 41 | export
 42 | Monad ReadRes where
 43 |   Done    >>= _ = Done
 44 |   Data a  >>= f = f a
 45 |   Err err >>= _ = Err err
 46 |
 47 | public export
 48 | data ReadResT : (m : Type -> Type) -> (a : Type) -> Type where
 49 |   MkReadResT : m (ReadRes a) -> ReadResT m a
 50 |
 51 | export %inline
 52 | runReadResT : ReadResT m a -> m (ReadRes a)
 53 | runReadResT (MkReadResT x) = x
 54 |
 55 | export
 56 | Functor m => Functor (ReadResT m) where
 57 |   map f (MkReadResT x) = MkReadResT $ map f <$> x
 58 |
 59 | export
 60 | Applicative m => Applicative (ReadResT m) where
 61 |   pure = MkReadResT . pure . pure
 62 |   MkReadResT f <*> MkReadResT x = MkReadResT [| f <*> x |]
 63 |
 64 | export
 65 | Monad m => Monad (ReadResT m) where
 66 |   MkReadResT x >>= f = MkReadResT $ do
 67 |     Data x' <- x | Err err => pure (Err err)
 68 |                  | Done => pure Done
 69 |     runReadResT $ f x'
 70 |
 71 | export
 72 | MonadTrans ReadResT where
 73 |   lift = MkReadResT . map pure
 74 |
 75 | export
 76 | HasIO m => HasIO (ReadResT m) where
 77 |   liftIO act = MkReadResT $ liftIO act >>= pure . pure
 78 |
 79 | toMsg : Int32 -> Ptr Buf -> IO (ReadRes ByteString)
 80 | toMsg n buf =
 81 |   case uvRes {es = [UVError]} n $> n of
 82 |     Left (Here EOF) => pure Done
 83 |     Left (Here err) => pure (Err err)
 84 |     Right n         => Data <$> bufToByteString buf (cast n)
 85 |
 86 | export
 87 | (cc : CloseCB) => Resource (Ptr Stream) where
 88 |   release h = uv_close h cc
 89 |
 90 | export
 91 | shutdownStream : UVLoop => (0 pc : PCast t Stream) => Ptr t -> Async [] ()
 92 | shutdownStream x =
 93 |   let s := castPtr @{pc} x
 94 |    in uv_read_stop s >> ignore (uv_shutdown s $ \_,_ => release s)
 95 |
 96 | parameters {auto l : UVLoop}
 97 |            {auto has : Has UVError es}
 98 |
 99 |   export
100 |   read :
101 |        AllocCB
102 |     -> Ptr t
103 |     -> {auto 0 cstt : PCast t Stream}
104 |     -> (Buffer (ReadRes ByteString) -> Async es a)
105 |     -> Async es a
106 |   read {a} ac h run = finally act (uv_read_stop h)
107 |     where
108 |       act : Async es a
109 |       act = do
110 |         st <- newEvent
111 |         uv $ uv_read_start h ac (\_,n,buf => toMsg n buf >>= buffer st)
112 |         run st
113 |
114 |   export
115 |   write : Ptr t -> (0 _ : PCast t Stream) => ByteString -> Async es ()
116 |   write str b =
117 |     use1 (fromByteString b) $ \cs => uvAsync $ \cb =>
118 |       uv_write str cs (cast b.size) (\_,_ => cb $ Succeeded ())
119 |
120 |   export
121 |   listen :
122 |        Ptr t
123 |     -> {auto 0 cst : PCast t Stream}
124 |     -> (Buffer (Either UVError $ Ptr Stream) -> Async es a)
125 |     -> Async es a
126 |   listen {a} {cst} server run = do
127 |     q <- newEvent
128 |     uv $ uv_listen server 128 $ \p,res =>
129 |       buffer q $ if res < 0 then Left $ fromCode res else Right p
130 |     run q
131 |