0 | module IO.Async.Posix
  1 |
  2 | import public IO.Async
  3 | import public IO.Async.Loop.PollH
  4 | import public IO.Async.Loop.TimerH
  5 | import public System.Posix.File
  6 | import public System.Posix.Poll.Types
  7 | import public System.Posix.Time
  8 |
  9 | import Data.C.Ptr
 10 | import System.Posix.Dir
 11 |
 12 | %default total
 13 |
 14 | parameters {auto has : Has Errno es}
 15 |            {auto ep  : PollH e}
 16 |            {auto fd  : FileDesc f}
 17 |            (fd       : f)
 18 |
 19 |   ||| Polls the file descriptor for the given events without blocking
 20 |   ||| an operating system thread.
 21 |   |||
 22 |   ||| If the file descriptor does not support polling, for instance,
 23 |   ||| because it is a regular file, this will immediately return
 24 |   ||| `ev`.
 25 |   export
 26 |   poll : (ev : PollEvent) -> Async e es PollEvent
 27 |   poll ev = do
 28 |     st <- env
 29 |     primAsync $ \cb => primPoll st (cast fd) ev False $ \case
 30 |       Right x => cb (Right x)
 31 |       Left  x => cb (Left $ inject x)
 32 |
 33 |   ||| Runs a computation after polling a file descriptor.
 34 |   |||
 35 |   ||| This allows us to read from or write to a file descriptor
 36 |   ||| without blocking an operating system thread.
 37 |   export
 38 |   onEvent : PollEvent -> Async e es a -> Async e es a
 39 |   onEvent ev act = do
 40 |     evt <- poll ev
 41 |     case hasEvent evt ev of
 42 |       True  => act
 43 |       False => throw EINVAL
 44 |
 45 |   ||| Reads from a file descriptor without blocking.
 46 |   |||
 47 |   ||| If the descriptor corresponds to a regular file, this will just
 48 |   ||| read up to the given amount of bytes from the file. If the descriptor
 49 |   ||| corresponds to a socket or FIFO (pipe), the `O_NONBLOCK` flag of
 50 |   ||| the descriptor *must* have been set (via `addFlags` for instance).
 51 |   |||
 52 |   ||| This will then first try to read from the descriptor without
 53 |   ||| polling, and if this fails with `EAGAIN`, proper file polling is used.
 54 |   export
 55 |   readnb : (0 r : Type) -> FromBuf r => Bits32 -> Async e es r
 56 |   readnb r n =
 57 |     attempt (read {es = [Errno]} fd r n) >>= \case
 58 |       Left (Here x) =>
 59 |         if x == EAGAIN || x == EWOULDBLOCK
 60 |            then onEvent POLLIN (read fd r n)
 61 |            else throw x
 62 |       Right res => pure res
 63 |
 64 |   ||| Like `readnb` but reads data into a pre-allocated buffer.
 65 |   |||
 66 |   ||| If the descriptor corresponds to a regular file, this will just
 67 |   ||| read up to the given amount of bytes from the file. If the descriptor
 68 |   ||| corresponds to a socket or FIFO (pipe), the `O_NONBLOCK` flag of
 69 |   ||| the descriptor *must* have been set (via `addFlags` for instance).
 70 |   export
 71 |   readRawNb : Buf -> Async e es EMBuffer
 72 |   readRawNb buf =
 73 |     attempt (readRaw {es = [Errno]} fd buf) >>= \case
 74 |       Left (Here x) =>
 75 |         if x == EAGAIN || x == EWOULDBLOCK
 76 |            then onEvent POLLIN (readRaw fd buf)
 77 |            else throw x
 78 |       Right res => pure res
 79 |
 80 |   ||| Like `readnb` but reads data into a pre-allocated C-pointer and
 81 |   ||| converts it from there.
 82 |   |||
 83 |   ||| This is useful for re-using a (reasonably large) buffer when
 84 |   ||| streaming lots of comparably small chunks of data. Instead of
 85 |   ||| allocating a new - potentially much too large - buffer with every
 86 |   ||| read (as is the case with `readnb`), we allocate a buffer once
 87 |   ||| and copy only the bytes we actually read into an immutable
 88 |   ||| data type such as a string or byte vector.
 89 |   export
 90 |   readPtrNB : (0 r : Type) -> FromPtr r => CPtr -> Async e es r
 91 |   readPtrNB r p =
 92 |     attempt (readPtr {es = [Errno]} fd r p) >>= \case
 93 |       Left (Here x) =>
 94 |         if x == EAGAIN || x == EWOULDBLOCK
 95 |            then onEvent POLLIN (readPtr fd r p)
 96 |            else throw x
 97 |       Right res => pure res
 98 |
 99 |
100 |   ||| Writes to a file descriptor without blocking.
101 |   |||
102 |   ||| If the descriptor corresponds to a regular file, this will just
103 |   ||| write up to the given amount of bytes to the file. If the descriptor
104 |   ||| corresponds to a socket or FIFO (pipe), the `O_NONBLOCK` flag of
105 |   ||| the descriptor *must* have been set (via `addFlags` for instance).
106 |   |||
107 |   ||| This will then first try to write to the descriptor without
108 |   ||| polling, and if this fails with `EAGAIN`, proper file polling is used.
109 |   export
110 |   writenb : ToBuf r => r -> Async e es Bits32
111 |   writenb v =
112 |     attempt (write {es = [Errno]} fd v) >>= \case
113 |       Left (Here x) =>
114 |         if x == EAGAIN || x == EWOULDBLOCK
115 |            then onEvent POLLOUT (write fd v)
116 |            else throw x
117 |       Right res => pure res
118 |
119 |
120 |   ||| Iteratively writes a value to a file descriptor making sure
121 |   ||| that the whole value is written. Use this, if a single call to
122 |   ||| `write` might not write the complete data (for instance, when
123 |   ||| writing to a pipe or socket).
124 |   |||
125 |   |||
126 |   export
127 |   fwritenb : ToBuf r => r -> Async e es ()
128 |   fwritenb v =
129 |     case (unsafeToBuf v) of
130 |       Left  (CP sz p) => goPtr p sz
131 |       Right bs        => go bs
132 |
133 |     where
134 |       goPtr : AnyPtr -> Bits32 -> Async e es ()
135 |       goPtr p 0  = pure ()
136 |       goPtr p sz = do
137 |         m <- writenb (CP sz p)
138 |         goPtr (prim__inc_ptr p m 1) (assert_smaller sz $ sz - m)
139 |
140 |       go : ByteString -> Async e es ()
141 |       go (BS 0 _) = pure ()
142 |       go bs       = do
143 |         m <- writenb bs
144 |         go (assert_smaller bs $ drop (cast m) bs)
145 |
146 |   ||| Continously reads and transforms data from a file
147 |   ||| descriptor without blocking.
148 |   export covering
149 |   stream :
150 |        (0 r : Type)
151 |     -> {auto frp : FromBuf r}
152 |     -> Bits32
153 |     -> (r -> Async e es ())
154 |     -> Async e es ()
155 |   stream r buf act =
156 |     onEvent POLLIN (read fd Buf buf) >>= \case
157 |       B 0 _ => pure ()
158 |       b     => do
159 |         v <- runIO (fromBuf b)
160 |         act v
161 |         stream r buf act
162 |
163 |   ||| Continously reads and transforms data from a file
164 |   ||| descriptor without blocking by loading data into a
165 |   ||| preallocated pointer.
166 |   |||
167 |   ||| For very large files, this can be faster than `stream` if
168 |   ||| the data in question can be transformed in place without allocating
169 |   ||| additional memory. This also allows us to use a very large buffer
170 |   ||| even in case we often only read small amounts of data.
171 |   export covering
172 |   streamp :
173 |        (0 r : Type)
174 |     -> {auto frp : FromPtr r}
175 |     -> CPtr
176 |     -> (r -> Async e es ())
177 |     -> Async e es ()
178 |   streamp r cp act =
179 |     onEvent POLLIN (readPtr fd CPtr cp) >>= \case
180 |       CP 0 _ => pure ()
181 |       cp2    => do
182 |         v <- runIO (fromPtr cp2)
183 |         act v
184 |         streamp r cp act
185 |
186 | ||| Wait until the given UTC time, which is computed from the
187 | ||| current time.
188 | export
189 | sleepTill : TimerH e => (Tm -> Clock UTC) -> Async e es ()
190 | sleepTill fun = do
191 |   now <- liftIO (clockTime UTC)
192 |   sleep (timeDifference (fun $ fromUTC now) now)
193 |
194 | ||| Computes local time of the next full minute.
195 | export
196 | nextMinute : Tm -> Clock UTC
197 | nextMinute x = addDuration (toUTC ({sec := 0} x)) 60.s
198 |
199 | ||| Computes local time of the next full hour.
200 | export
201 | nextHour : Tm -> Clock UTC
202 | nextHour tm = addDuration (toUTC ({sec := 0, min := 0} tm)) 1.h
203 |
204 | ||| Computes local time of 00:00:00 of the next day.
205 | export
206 | nextDay : Tm -> Clock UTC
207 | nextDay tm = addDuration (toUTC ({sec := 0, min := 0, hour := 0} tm)) 1.d
208 |
209 | ||| Computes local time of 00:00:00 on January 1 of the next day.
210 | export
211 | nextYear : Tm -> Clock UTC
212 | nextYear tm = toUTC $ {year := tm.year + 1} blank
213 |
214 | ||| Computes local time of 00:00:00 of the first day of the next month.
215 | export
216 | nextMonth : Tm -> Clock UTC
217 | nextMonth tm =
218 |   if tm.mon == 11
219 |      then nextYear tm
220 |      else toUTC $ {mon := tm.mon + 1, year := tm.year} blank
221 |
222 | ||| Computes local time of 00:00:00 of the next Sunday
223 | export
224 | nextWeek : Tm -> Clock UTC
225 | nextWeek tm =
226 |   addDuration (toUTC ({sec := 0, min := 0, hour := 0} tm)) (cast $ 7 - tm.wday).d
227 |