0 | module IO.Async.Core
  1 |
  2 | import Data.Linear.Unique
  3 | import public Control.Monad.MCancel
  4 | import public Data.Linear.ELift1
  5 |
  6 | %default total
  7 |
  8 | public export
  9 | 0 IOToken : Type
 10 | IOToken = Token World
 11 |
 12 | public export
 13 | 0 Callback : List Type -> Type -> Type
 14 | Callback es a = Outcome es a -> IO1 ()
 15 |
 16 | ||| A fiber is a series of computations that will be run
 17 | ||| in strict sequence and will eventually terminate
 18 | ||| with a result of type `Outcome es a`: It will either
 19 | ||| produce a result of type `a`, an error of type `HSum es`,
 20 | ||| or - in case it was canceled early - end with `Canceled`.
 21 | |||
 22 | ||| From the outside, a fiber can be observed by installing 
 23 | ||| a callback, which will be invoked as soon as the fiber has
 24 | ||| terminated with a result.
 25 | |||
 26 | ||| In addition, a running fiber can be canceled, so that it
 27 | ||| aborts all computations at soon as possible. Canceling a fiber
 28 | ||| that has already completed is a no-op.
 29 | public export
 30 | record Fiber (es : List Type) (a : Type) where
 31 |   constructor MkFiber
 32 |   cancel_  : IO1 ()
 33 |   observe_ : IOToken -> Callback es a -> IO1 (IO1 ())
 34 |
 35 | ||| Sum-type describing the computation running on a fiber.
 36 | |||
 37 | ||| `Async` can be thought of a powerful alternative to `IO`
 38 | ||| that comes with error handling, capabilities for
 39 | ||| spawning additional `Async` computations (each running
 40 | ||| on its of `Fiber`), plus the ability for being canceled
 41 | ||| (from the inside or from other fibers).
 42 | |||
 43 | ||| In order to *run* an `Async` computation, we an
 44 | ||| `IO.Async.Loop.EventLoop`, which takes care of running
 45 | ||| many fibers (each of which might spawn additional fibers)
 46 | ||| concurrently. Some event loops will even offer true parallelism,
 47 | ||| distributing the fibers to be run across several operating
 48 | ||| system threads.
 49 | public export
 50 | data Async : (e : Type) -> (es : List Type) -> Type -> Type where
 51 |   ||| Implements bind (`>>=`)
 52 |   Bind   : Async e es a -> (a -> Async e es b) -> Async e es b
 53 |
 54 |   ||| A pure result
 55 |   Val    : a -> Async e es a
 56 |
 57 |   ||| An error
 58 |   Err    : HSum es -> Async e es a
 59 |
 60 |   ||| A wrapped synchronous/blocking IO action
 61 |   Sync   : IO (Result es a) -> Async e es a
 62 |
 63 |   ||| Cancels the curret fiber
 64 |   Cancel : Async e es ()
 65 |
 66 |   ||| Run the given cancel hook when cancelation is observed for `act`
 67 |   OnCncl : (act : Async e es a) -> (hook : Async e [] ()) -> Async e es a
 68 |
 69 |   ||| Masks a fiber as uncanceble
 70 |   ||| This takes a function argument which will get the running fiber's
 71 |   ||| identifier token plus cancelation id in order to unmask certain
 72 |   ||| inner regions, where cancellation can again be observed.
 73 |   ||| See also `Poll`.
 74 |   UC     : (IOToken -> Nat -> Async e es a) -> Async e es a
 75 |
 76 |   ||| Error handling: In case an error occured, it is wrapped in
 77 |   ||| a `Left`, while a successful result is wrapped in a `Right`.
 78 |   ||| Note, that we do not handle the `Canceled` case: Cancellation
 79 |   ||| cannot be undone. In can be temporarily masked using `UC`, but
 80 |   ||| after that, it will be observed as soon as possible.
 81 |   Attempt : Async e es a -> Async e fs (Result es a)
 82 |
 83 |   ||| Returns the context currently handling this fiber, giving us access
 84 |   ||| to functionality specific to the running event loop.
 85 |   Env  : Async e es e
 86 |
 87 |   ||| Returns the current fiber's unique identifier
 88 |   Self : Async e es IOToken
 89 |
 90 |   ||| Cedes control to the execution context
 91 |   ||| Fibers are auto-ceded after a predefined number of evaluation steps
 92 |   ||| to make sure other fibers get a chance to run even when the event loop
 93 |   ||| is single-threaded. In addition, a fiber can make room for other
 94 |   ||| fibers by invoking `cede` at strategic points.
 95 |   Cede : Async e es ()
 96 |
 97 |   ||| Runs the given computation concurrently to this one returning a
 98 |   ||| `Fiber` representing the runnign computation.
 99 |   Start : Async e es a -> Async e fs (Fiber es a)
100 |
101 |   ||| The asynchronous primitive. This allows us to register callbacks
102 |   ||| and await their invocation, thus blocking the current fiber until
103 |   ||| a result is ready. Being able to treat this as a regular
104 |   ||| IO-like computation is one of the main reasons why `Async` is such
105 |   ||| a powerful abstraction.
106 |   ||| 
107 |   ||| The `IO1 ()` returned after installing a callback will be treated
108 |   ||| as a cancellation hook: It will be invoked if the current
109 |   ||| computation is canceled and cancellation can currently be observed.
110 |   ||| 
111 |   ||| We use this data constructor whenever we'd like to wait for a
112 |   ||| result to be ready at a later time such as a timer, input from
113 |   ||| a pipe or socket, or data available from standard input.
114 |   Asnc   : ((Result es a -> IO1 ()) -> IO1 (IO1 ())) -> Async e es a
115 |
116 |   ||| Temporarily undo a layer of uncancelability. See also `UC`.
117 |   APoll  : IOToken -> Nat -> Async e es a -> Async e es a
118 |
119 | --------------------------------------------------------------------------------
120 | -- Primitives
121 | --------------------------------------------------------------------------------
122 |
123 | ||| Lifts a pure `Result` into `Async`.
124 | export %inline
125 | terminal : Result es a -> Async e es a
126 | terminal = either Err Val
127 |
128 | ||| Lifts an effectful `Result` into `Async`.
129 | export %inline
130 | sync : IO (Result es a) -> Async e es a
131 | sync = Sync
132 |
133 | ||| Asynchronous FFI: Wraps a callback handler into `Async`.
134 | |||
135 | ||| The `IO1 ()` action returned after registering the callback will
136 | ||| be used for cancelation and cleanup.
137 | export %inline
138 | primAsync : ((Result es a -> IO1 ()) -> IO1 (IO1 ())) -> Async e es a
139 | primAsync = Asnc
140 |
141 | ||| Starts a new fiber, running it concurrently to the current one
142 | export %inline
143 | start : Async e es a -> Async e fs (Fiber es a)
144 | start = Start
145 |
146 | ||| Cedes control back to the execution context.
147 | export %inline
148 | cede : Async e es ()
149 | cede = Cede
150 |
151 | ||| Returns the environment provided by the event loop this
152 | ||| fiber is currently running in.
153 | export %inline
154 | env : Async e es e
155 | env = Env
156 |
157 | ||| Returns this fiber's unique identifier.
158 | export %inline
159 | self : Async e es IOToken
160 | self = Self
161 |
162 | --------------------------------------------------------------------------------
163 | -- Interfaces
164 | --------------------------------------------------------------------------------
165 |
166 | export %inline
167 | MErr (Async e) where
168 |   fail          = Err
169 |   attempt       = Attempt
170 |   bind          = Bind
171 |   succeed       = Val
172 |   mapImpl f v   = Bind v (Val . f)
173 |   appImpl ff fv = Bind ff (`mapImpl` fv)
174 |
175 | export %inline
176 | HasIO (Async e es) where
177 |   liftIO = sync . map Right
178 |
179 | export %inline
180 | MCancel (Async e) where
181 |   onCancel = OnCncl
182 |   canceled = Cancel
183 |   uncancelable f = UC $ \t,n => f (APoll t n)
184 |
185 | export %inline
186 | ELift1 World (Async e) where
187 |   elift1 act = sync $ runIO $ \t => toResult (act t)
188 |