0 | module Control.Monad.Coop
2 | import public System.Time
6 | import Data.SortedMap
7 | import Data.SortedSet
8 | import public Data.Zippable
10 | import public Control.Applicative.Concurrent
12 | import Control.Monad.Coop.Sync
13 | import public Control.Monad.Spawn
14 | import Control.Monad.State
15 | import Control.Monad.State.Tuple
16 | import public Control.Monad.Trans
17 | import public Control.MonadRec
25 | data SyncKind = Join | Race
28 | data Coop : (m : Type -> Type) -> (a : Type) -> Type where
29 | Point : m a -> Coop m a
30 | Sequential : Coop m a -> (a -> Coop m b) -> Coop m b
31 | Interleaved : Coop m a -> Coop m b -> Coop m (a, b)
32 | Racing : Coop m a -> Coop m a -> Coop m a
33 | RaceFence : (prevRaceSync : Maybe $
Sync Race) -> Coop m Unit
34 | DelayedTill : Time -> Coop m Unit
35 | Spawn : Coop m Unit -> Coop m Unit
43 | Timed m => Timed (Coop m) where
44 | currentTime = Point currentTime
47 | Applicative m => Functor (Coop m) where
48 | map f (Point a) = Point (map f a)
49 | map f (Sequential a b) = Sequential a $
\ar => map f $
b ar
50 | map f x@(Interleaved _ _) = Sequential x $
Point . pure . f
51 | map f x@(Racing _ _) = Sequential x $
Point . pure . f
52 | map f x@(RaceFence _) = Sequential x $
Point . pure . f
53 | map f x@(DelayedTill _) = Sequential x $
Point . pure . f
54 | map f x@(Spawn _) = Sequential x $
Point . pure . f
58 | Applicative m => Applicative (Coop m) where
60 | l <*> r = Sequential l (<$> r)
67 | race : Applicative m => Coop m a -> Coop m b -> Coop m $
Either a b
68 | race l r = Racing (l <&> Left) (r <&> Right)
71 | Applicative m => Alternative (Coop m) where
75 | l <|> r = l `Racing` r
78 | Applicative m => Monad (Coop m) where
85 | Applicative m => MonadRec (Coop m) where
86 | tailRecM x (Access acc) st next =
87 | next x st `Sequential` \case
88 | Cont seed2 prf vst => tailRecM seed2 (acc seed2 prf) vst next
89 | Done vres => Point $
pure vres
92 | Applicative m => Zippable (Coop m) where
94 | zipWith f = map (uncurry f) .: Interleaved
96 | zip3 a b c = a `Interleaved` (b `Interleaved` c)
97 | zipWith3 f a b c = zip3 a b c <&> \(x, y, z) => f x y z
99 | unzipWith f ab = (fst . f <$> ab, snd . f <$> ab)
100 | unzipWith3 f abc = (fst . f <$> abc, fst . snd . f <$> abc, snd . snd . f <$> abc)
102 | [Conc] Applicative m => Applicative (Coop m) where
103 | pure = Point . pure
104 | (<*>) = zipWith apply
107 | Applicative m => ConcurrentApplicative (Coop m) where
111 | Timed m => Applicative m => CanSleep (Coop m) where
112 | sleepTill = DelayedTill
115 | Applicative m => CanSpawn (Coop m) where
135 | HasIO (Coop IO) where
139 | MonadTrans Coop where
143 | forever : Monad m => m a -> m b
144 | forever x = ignore x >> forever x
152 | data LeftOrRight = Left | Right
154 | record Event (m : Type -> Type) where
157 | coop : Coop m actionRetTy
161 | joinSync : Maybe (Sync Join, LeftOrRight)
162 | raceSync : Maybe $
Sync Race
166 | 0 Events : (Type -> Type) -> Type
167 | Events = SortedMap Time . Queue1 . Event
169 | insertTimed : Event m -> Events m -> Events m
170 | insertTimed ev evs = insert ev.time (maybe (singleton ev) (add ev) (lookup ev.time evs)) evs
173 | singleEvent : Event m -> Events m
174 | singleEvent ev = singleton ev.time $
singleton ev
176 | addEvents : MonadState (Events m) n => Event m -> List (Event m -> Event m) -> n Unit
177 | addEvents ev = modify . foldl (\acc, modF => acc . insertTimed (modF ev)) id
180 | addEvent : MonadState (Events m) n => Event m -> (Event m -> Event m) -> n Unit
181 | addEvent ev modF = modify $
insertTimed $
modF ev
184 | addEvent2 : MonadState (Events m) n => Event m -> (Event m -> Event m) -> (Event m -> Event m) -> n Unit
185 | addEvent2 ev modF1 modF2 = modify $
insertTimed (modF1 ev) . insertTimed (modF2 ev)
187 | earliestEvent : Events m -> Maybe (Event m, Lazy (Events m))
188 | earliestEvent evs = leftMost evs <&> \(t, tEvs) =>
189 | let (currEv, restTEvs) = remove tEvs in
190 | (currEv,) $
maybe (delete t evs) (\r => insert t r evs) restTEvs
192 | filterEvents : (Event m -> Bool) -> Events m -> Events m
193 | filterEvents f = fromList . mapMaybe (\(t, evs) => (t,) <$> filter f evs) . kvList
197 | record Postponed (m : Type -> Type) where
198 | constructor Postpone
199 | postCoop : (contLTy, contRTy) -> Coop m contRetTy
200 | postJoinSync : Maybe (Sync Join, LeftOrRight)
203 | completedHalf : Maybe completedHalfTy
205 | 0 JoinSyncs : (Type -> Type) -> Type
206 | JoinSyncs = SortedMap (Sync Join) . Postponed
212 | RaceSyncs = SortedMap (Sync Race) $
List $
Sync Race
214 | transitiveLookup : Foldable f => Ord a => SortedMap a (f a) -> a -> SortedSet a
215 | transitiveLookup mp x = let x1 = singleton x in go x1 x1 where
216 | go : (curr : SortedSet a) -> (new : SortedSet a) -> SortedSet a
217 | go curr new = if null new then curr else do
218 | let allNexts = fromList $
Prelude.toList new >>= maybe [] toList . lookup' mp
219 | let nextNew = allNexts `difference` curr
220 | assert_total $
go (curr `union` nextNew) nextNew
225 | runEvent : Monad m => MonadTrans t => Monad (t m) =>
226 | MonadState (Events m) (t m) =>
227 | MonadState (JoinSyncs m) (t m) =>
228 | MonadState RaceSyncs (t m) =>
229 | Event m -> t m Unit
230 | runEvent ev = case ev.coop of
231 | Point x => lift x >>= awakePostponed
232 | Sequential lhs f => case lhs of
233 | Point x => lift x >>= \r => addEvent ev {coop := f r}
234 | Sequential x g => addEvent ev {coop := Sequential x $
g >=> f}
235 | DelayedTill d => addEvent ev {time := d, coop := f ()}
236 | Spawn s => addEvent2 ev {coop := s, joinSync := Nothing} {coop := f ()}
237 | Interleaved l r => do uniqueSync <- newUniqueSync <$> get
238 | modify $
insert uniqueSync $
Postpone f ev.joinSync $
Nothing {ty=Unit}
240 | {coop := l, joinSync := Just (uniqueSync, Left )}
241 | {coop := r, joinSync := Just (uniqueSync, Right)}
242 | RaceFence prevS => finishRaces *> addEvent ev {coop := f (), raceSync := prevS}
243 | Racing Empty r => addEvent ev {coop := r >>= f}
244 | Racing l Empty => addEvent ev {coop := l >>= f}
245 | Racing l r => do uniqueSync <- newUniqueSync <$> get
246 | modify $
insert uniqueSync []
247 | whenJust ev.raceSync $
\parent => modify $
merge $
singleton parent [uniqueSync]
249 | {coop := l >>= (RaceFence ev.raceSync *>) . f, raceSync := Just uniqueSync}
250 | {coop := r >>= (RaceFence ev.raceSync *>) . f, raceSync := Just uniqueSync}
252 | nonSeqNonPoint => addEvent ev {coop := nonSeqNonPoint >>= pure}
256 | awakePostponed : forall a. a -> t m Unit
257 | awakePostponed myHalf =
258 | whenJust ev.joinSync $
\(sy, iAmLOrR) => do
260 | whenJust (SortedMap.lookup sy syncs) $
\pp =>
261 | case pp.completedHalf of
262 | Just theirHalf => do
263 | let awakenCoop = pp.postCoop $
case iAmLOrR of
264 | Left => believe_me (myHalf, theirHalf)
265 | Right => believe_me (theirHalf, myHalf)
266 | addEvent ev {coop := awakenCoop, joinSync := pp.postJoinSync}
267 | put $
delete sy syncs
269 | put $
insert sy ({completedHalf := Just myHalf} pp) syncs
271 | finishRaces : t m Unit
272 | finishRaces = whenJust ev.raceSync $
\currRaceSync => do
274 | let syncsToRemove = transitiveLookup raceSyncs currRaceSync
275 | modify $
filterEvents $
maybe True (not . contains' syncsToRemove) . raceSync
276 | put $
foldl SortedMap.delete' raceSyncs syncsToRemove
279 | runCoop : MonadRec m => CanSleep m => Coop m Unit -> m Unit
281 | let initEvents = singleEvent $
Ev !currentTime co Nothing Nothing
282 | initJoinSyncs : JoinSyncs m = empty
283 | initRaceSyncs : RaceSyncs = empty
284 | evalStateT (initEvents, initJoinSyncs, initRaceSyncs) runLeftEvents where
286 | covering WellFounded () Equal where wellFounded = wellFounded
288 | runLeftEvents : MonadTrans t => MonadRec (t m) =>
289 | MonadState (Events m) (t m) =>
290 | MonadState (JoinSyncs m) (t m) =>
291 | MonadState RaceSyncs (t m) =>
293 | runLeftEvents = trWellFounded () () $
\(), () => do
294 | case earliestEvent !get of
295 | Nothing => pure $
Done ()
296 | Just (currEv, restEvs) => do
297 | currTime <- lift currentTime
298 | if currTime >= currEv.time
299 | then put restEvs *> runEvent ({time := currTime} currEv)
300 | else lift $
sleepTill currEv.time
301 | pure $
Cont () Refl ()