0 | module Control.Monad.Coop
  1 |
  2 | import public System.Time
  3 |
  4 | import Data.List
  5 | import Data.Queue1
  6 | import Data.SortedMap
  7 | import Data.SortedSet
  8 | import public Data.Zippable
  9 |
 10 | import public Control.Applicative.Concurrent
 11 |
 12 | import Control.Monad.Coop.Sync
 13 | import public Control.Monad.Spawn
 14 | import Control.Monad.State
 15 | import Control.Monad.State.Tuple
 16 | import public Control.Monad.Trans
 17 | import public Control.MonadRec
 18 |
 19 | %default total
 20 |
 21 | ------------
 22 | --- Data ---
 23 | ------------
 24 |
 25 | data SyncKind = Join | Race
 26 |
 27 | export
 28 | data Coop : (m : Type -> Type) -> (a : Type) -> Type where
 29 |   Point       : m a -> Coop m a
 30 |   Sequential  : Coop m a -> (a -> Coop m b) -> Coop m b
 31 |   Interleaved : Coop m a -> Coop m b -> Coop m (a, b)
 32 |   Racing      : Coop m a -> Coop m a -> Coop m a
 33 |   RaceFence   : (prevRaceSync : Maybe $ Sync Race) -> Coop m Unit
 34 |   DelayedTill : Time -> Coop m Unit
 35 |   Spawn       : Coop m Unit -> Coop m Unit
 36 |   Empty       : Coop m a
 37 |
 38 | -----------------------
 39 | --- Implementations ---
 40 | -----------------------
 41 |
 42 | export
 43 | Timed m => Timed (Coop m) where
 44 |   currentTime = Point currentTime
 45 |
 46 | export
 47 | Applicative m => Functor (Coop m) where
 48 |   map f (Point a)           = Point (map f a)
 49 |   map f (Sequential a b)    = Sequential a $ \ar => map f $ b ar
 50 |   map f x@(Interleaved _ _) = Sequential x $ Point . pure . f
 51 |   map f x@(Racing _ _)      = Sequential x $ Point . pure . f
 52 |   map f x@(RaceFence _)     = Sequential x $ Point . pure . f
 53 |   map f x@(DelayedTill _)   = Sequential x $ Point . pure . f
 54 |   map f x@(Spawn _)         = Sequential x $ Point . pure . f
 55 |   map _ Empty               = Empty
 56 |
 57 | export
 58 | Applicative m => Applicative (Coop m) where
 59 |   pure    = Point . pure
 60 |   l <*> r = Sequential l (<$> r)
 61 |   -- This could be `(<*>) = Interleaved <&> uncurry apply`, but it must be consistent with `(>>=)` definition.
 62 |   -- Consider code `doSmth *> sleepFor 100 *> doMore` comparing to `(doSmth `zip` sleepFor 100) *> doMore`.
 63 |   -- Having parallel semantics for the `Applicative`'s `<*>`, those two examples above will mean the same, which seems to be unexpected.
 64 |   -- We have a special name instance `Concurrent` below for that case.
 65 |
 66 | export
 67 | race : Applicative m => Coop m a -> Coop m b -> Coop m $ Either a b
 68 | race l r = Racing (l <&> Left) (r <&> Right)
 69 |
 70 | export
 71 | Applicative m => Alternative (Coop m) where
 72 |   -- `empty` computation is like an infinite computation (i.e. no computation goes *after* it and its result cannot be analysed),
 73 |   -- but in contrast, if it is the only what is left during the execution, computation simply finishes.
 74 |   empty = Empty
 75 |   l <|> r = l `Racing` r
 76 |
 77 | export
 78 | Applicative m => Monad (Coop m) where
 79 |   (>>=) = Sequential
 80 |
 81 | -- This implementation is like a `NonTailRec` from `MonadRec`,
 82 | -- but this is actually safe, since `>>=` returns immediately,
 83 | -- because the whole `Coop` data structure is lazy on binding.
 84 | export
 85 | Applicative m => MonadRec (Coop m) where
 86 |   tailRecM x (Access acc) st next =
 87 |     next x st `Sequential` \case
 88 |       Cont seed2 prf vst => tailRecM seed2 (acc seed2 prf) vst next
 89 |       Done vres          => Point $ pure vres
 90 |
 91 | export
 92 | Applicative m => Zippable (Coop m) where
 93 |   zip = Interleaved
 94 |   zipWith f = map (uncurry f) .: Interleaved
 95 |
 96 |   zip3 a b c = a `Interleaved` (b `Interleaved` c)
 97 |   zipWith3 f a b c = zip3 a b c <&> \(x, y, z) => f x y z
 98 |
 99 |   unzipWith f ab = (fst . f <$> ab, snd . f <$> ab)
100 |   unzipWith3 f abc = (fst . f <$> abc, fst . snd . f <$> abc, snd . snd . f <$> abc)
101 |
102 | [Conc] Applicative m => Applicative (Coop m) where
103 |   pure  = Point . pure
104 |   (<*>) = zipWith apply
105 |
106 | export
107 | Applicative m => ConcurrentApplicative (Coop m) where
108 |   Concurrent = Conc
109 |
110 | export
111 | Timed m => Applicative m => CanSleep (Coop m) where
112 |   sleepTill = DelayedTill
113 |
114 | export
115 | Applicative m => CanSpawn (Coop m) where
116 |   -- Runs the given computation in parallel with the monadic continuation.
117 |   -- In contrast with `zip`, the continuations executes immediately, without waiting to the end of spawned computation.
118 |   -- Spawned computation will continue to work (if it needs) even if continuation has ended.
119 |   -- For example, running the following code
120 |   --
121 |   -- ```idris
122 |   -- x : HasIO m => Coop m Nat
123 |   -- x = do
124 |   --   spawn $ do
125 |   --     sleepFor 4.seconds
126 |   --     putStrLn "spawned"
127 |   --   putStrLn "main"
128 |   --   pure 1
129 |   -- ```
130 |   --
131 |   -- will result in returning `1` as the computation result **and** printing "spawned" in four seconds after funning the whole computation `x`.
132 |   spawn = Spawn
133 |
134 | export
135 | HasIO (Coop IO) where
136 |   liftIO = Point
137 |
138 | export
139 | MonadTrans Coop where
140 |   lift = Point
141 |
142 | export covering
143 | forever : Monad m => m a -> m b
144 | forever x = ignore x >> forever x
145 |
146 | -------------------
147 | --- Interpreter ---
148 | -------------------
149 |
150 | --- Data types describing discrete events ---
151 |
152 | data LeftOrRight = Left | Right
153 |
154 | record Event (m : Type -> Type) where
155 |   constructor Ev
156 |   time : Time
157 |   coop : Coop m actionRetTy
158 |   -- Two present postponed events with the same sync are meant to be blocking each other.
159 |   -- Postponed event needs to be sheduled only when all events with its sync are over.
160 |   -- `Sync` type is a comparable type and is a workaround of uncomparability of `Coop`.
161 |   joinSync : Maybe (Sync Join, LeftOrRight)
162 |   raceSync : Maybe $ Sync Race
163 |
164 | --- List of events ---
165 |
166 | 0 Events : (Type -> Type) -> Type
167 | Events = SortedMap Time . Queue1 . Event
168 |
169 | insertTimed : Event m -> Events m -> Events m
170 | insertTimed ev evs = insert ev.time (maybe (singleton ev) (add ev) (lookup ev.time evs)) evs
171 |
172 | -- Must be equivalent to `insertTimed ev empty`
173 | singleEvent : Event m -> Events m
174 | singleEvent ev = singleton ev.time $ singleton ev
175 |
176 | addEvents : MonadState (Events m) n => Event m -> List (Event m -> Event m) -> n Unit
177 | addEvents ev = modify . foldl (\acc, modF => acc . insertTimed (modF ev)) id
178 |
179 | -- Psrticular case for `addEvents ev [modF]`
180 | addEvent : MonadState (Events m) n => Event m -> (Event m -> Event m) -> n Unit
181 | addEvent ev modF = modify $ insertTimed $ modF ev
182 |
183 | -- Psrticular case for `addEvents ev [modF1, modF2]`
184 | addEvent2 : MonadState (Events m) n => Event m -> (Event m -> Event m) -> (Event m -> Event m) -> n Unit
185 | addEvent2 ev modF1 modF2 = modify $ insertTimed (modF1 ev) . insertTimed (modF2 ev)
186 |
187 | earliestEvent : Events m -> Maybe (Event m, Lazy (Events m))
188 | earliestEvent evs = leftMost evs <&> \(t, tEvs) =>
189 |   let (currEv, restTEvs) = remove tEvs in
190 |   (currEv,) $ maybe (delete t evs) (\r => insert t r evs) restTEvs
191 |
192 | filterEvents : (Event m -> Bool) -> Events m -> Events m
193 | filterEvents f = fromList . mapMaybe (\(t, evs) => (t,) <$> filter f evs) . kvList
194 |
195 | --- Join synchronisation stuff ---
196 |
197 | record Postponed (m : Type -> Type) where
198 |   constructor Postpone
199 |   postCoop : (contLTy, contRTy) -> Coop m contRetTy
200 |   postJoinSync : Maybe (Sync Join, LeftOrRight)
201 |   -- This postponed continuation is waiting for two executions.
202 |   -- When one of them is completed, the result should be present in this field.
203 |   completedHalf : Maybe completedHalfTy
204 |
205 | 0 JoinSyncs : (Type -> Type) -> Type
206 | JoinSyncs = SortedMap (Sync Join) . Postponed
207 |
208 | --- Race synchronisation stuff ---
209 |
210 | -- Map from one race sync to all child syncs (i.e. those which are cancelled when a series with the parent sync finished)
211 | 0 RaceSyncs : Type
212 | RaceSyncs = SortedMap (Sync Race) $ List $ Sync Race
213 |
214 | transitiveLookup : Foldable f => Ord a => SortedMap a (f a) -> a -> SortedSet a
215 | transitiveLookup mp x = let x1 = singleton x in go x1 x1 where
216 |   go : (curr : SortedSet a) -> (new : SortedSet a) -> SortedSet a
217 |   go curr new = if null new then curr else do
218 |     let allNexts = fromList $ Prelude.toList new >>= maybe [] toList . lookup' mp
219 |     let nextNew = allNexts `difference` curr
220 |     assert_total $ go (curr `union` nextNew) nextNew -- first argument is growing and has maximum bound (all `a` in the `mp`)
221 |
222 | --- The run loop ---
223 |
224 | %inline
225 | runEvent : Monad m => MonadTrans t => Monad (t m) =>
226 |            MonadState (Events m) (t m) =>
227 |            MonadState (JoinSyncs m) (t m) =>
228 |            MonadState RaceSyncs (t m) =>
229 |            Event m -> t m Unit
230 | runEvent ev = case ev.coop of
231 |   Point x          => lift x >>= awakePostponed
232 |   Sequential lhs f => case lhs of
233 |     Point x         => lift x >>= \r => addEvent ev {coop := f r}
234 |     Sequential x g  => addEvent ev {coop := Sequential x $ g >=> f}
235 |     DelayedTill d   => addEvent ev {time := d, coop := f ()}
236 |     Spawn s         => addEvent2 ev {coop := s, joinSync := Nothing} {coop := f ()}
237 |     Interleaved l r => do uniqueSync <- newUniqueSync <$> get
238 |                           modify $ insert uniqueSync $ Postpone f ev.joinSync $ Nothing {ty=Unit}
239 |                           addEvent2 ev
240 |                             {coop := l, joinSync := Just (uniqueSync, Left )}
241 |                             {coop := r, joinSync := Just (uniqueSync, Right)}
242 |     RaceFence prevS => finishRaces *> addEvent ev {coop := f (), raceSync := prevS}
243 |     Racing Empty r  => addEvent ev {coop := r >>= f}
244 |     Racing l Empty  => addEvent ev {coop := l >>= f}
245 |     Racing l r      => do uniqueSync <- newUniqueSync <$> get
246 |                           modify $ insert uniqueSync [] -- to prevent generation of the same sync
247 |                           whenJust ev.raceSync $ \parent => modify $ merge $ singleton parent [uniqueSync]
248 |                           addEvent2 ev
249 |                             {coop := l >>= (RaceFence ev.raceSync *>) . f, raceSync := Just uniqueSync}
250 |                             {coop := r >>= (RaceFence ev.raceSync *>) . f, raceSync := Just uniqueSync}
251 |     Empty           => pure ()
252 |   nonSeqNonPoint   => addEvent ev {coop := nonSeqNonPoint >>= pure}       -- manage as `Sequential _ Point`
253 |
254 |   where
255 |
256 |     awakePostponed : forall a. a -> t m Unit
257 |     awakePostponed myHalf =
258 |       whenJust ev.joinSync $ \(sy, iAmLOrR) => do
259 |         syncs <- get
260 |         whenJust (SortedMap.lookup sy syncs) $ \pp =>
261 |           case pp.completedHalf of
262 |             Just theirHalf => do
263 |               let awakenCoop = pp.postCoop $ case iAmLOrR of
264 |                     Left  => believe_me (myHalf, theirHalf)
265 |                     Right => believe_me (theirHalf, myHalf)
266 |               addEvent ev {coop := awakenCoop, joinSync := pp.postJoinSync}
267 |               put $ delete sy syncs
268 |             Nothing =>
269 |               put $ insert sy ({completedHalf := Just myHalf} pp) syncs
270 |
271 |     finishRaces : t m Unit
272 |     finishRaces = whenJust ev.raceSync $ \currRaceSync => do
273 |       raceSyncs <- get
274 |       let syncsToRemove = transitiveLookup raceSyncs currRaceSync
275 |       modify $ filterEvents $ maybe True (not . contains' syncsToRemove) . raceSync
276 |       put $ foldl SortedMap.delete' raceSyncs syncsToRemove
277 |
278 | export covering
279 | runCoop : MonadRec m => CanSleep m => Coop m Unit -> m Unit
280 | runCoop co = do
281 |   let initEvents = singleEvent $ Ev !currentTime co Nothing Nothing
282 |       initJoinSyncs : JoinSyncs m = empty
283 |       initRaceSyncs : RaceSyncs = empty
284 |   evalStateT (initEvents, initJoinSyncs, initRaceSyncs) runLeftEvents where
285 |
286 |   covering WellFounded () Equal where wellFounded = wellFounded
287 |
288 |   runLeftEvents : MonadTrans t => MonadRec (t m) =>
289 |                   MonadState (Events m) (t m) =>
290 |                   MonadState (JoinSyncs m) (t m) =>
291 |                   MonadState RaceSyncs (t m) =>
292 |                   t m Unit
293 |   runLeftEvents = trWellFounded () () $ \(), () => do
294 |     case earliestEvent !get of
295 |       Nothing => pure $ Done ()
296 |       Just (currEv, restEvs) => do
297 |         currTime <- lift currentTime
298 |         if currTime >= currEv.time
299 |           then put restEvs *> runEvent ({time := currTime} currEv)
300 |           else lift $ sleepTill currEv.time -- TODO to support and perform permanent tasks
301 |         pure $ Cont () Refl ()
302 |