0 | ||| Resource Pool
   1 | module Data.Pool
   2 |
   3 | import public Data.Pool.Internal
   4 |
   5 | import Control.Monad.Elin
   6 | import Control.Monad.MCancel
   7 | import Data.Array
   8 | import Data.Array.Mutable
   9 | import Data.Linear.Ref1
  10 | import Data.Linear.Traverse1
  11 | import Data.List
  12 | import Data.SortedSet
  13 | import System.Concurrency
  14 | import System.Info
  15 | import System.Posix.Timer
  16 | import System.Posix.Timer.Prim
  17 | import Syntax.T1
  18 |
  19 | %language ElabReflection
  20 |
  21 | %default total
  22 |
  23 | --------------------------------------------------------------------------------
  24 | --          Utilities
  25 | --------------------------------------------------------------------------------
  26 |
  27 | ||| Cancellation check.
  28 | isCancelled :  Nat
  29 |             -> Queue Nat
  30 |             -> Bool
  31 | isCancelled _ QEnd         =
  32 |   False
  33 | isCancelled x (QNode y ys) =
  34 |   case x == y of
  35 |     True  =>
  36 |       True
  37 |     False =>
  38 |       isCancelled x ys
  39 |
  40 | ||| Append a value into the `Queue a`.
  41 | appendQ :  Queue a
  42 |         -> a
  43 |         -> Queue a
  44 | appendQ QEnd         x =
  45 |   QNode x QEnd
  46 | appendQ (QNode y ys) x =
  47 |   QNode y (appendQ ys x)
  48 |
  49 | ||| Revers a `Queue a`.
  50 | reverseQ :  Queue a
  51 |          -> Queue a
  52 | reverseQ q =
  53 |   go q QEnd
  54 |   where
  55 |     go :  Queue a
  56 |        -> Queue a
  57 |        -> Queue a
  58 |     go QEnd         acc =
  59 |       acc
  60 |     go (QNode x xs) acc =
  61 |       go xs (QNode x acc)
  62 |
  63 | ||| Append two `Queue a`.
  64 | appendAll :  Queue a
  65 |           -> Queue a
  66 |           -> Queue a
  67 | appendAll QEnd         ys =
  68 |   ys
  69 | appendAll (QNode x xs) ys =
  70 |   QNode x (appendAll xs ys)
  71 |
  72 | ||| Normalize two `Queue Waiter`s.
  73 | normalize :  Queue (Waiter a)
  74 |           -> Queue (Waiter a)
  75 |           -> Queue (Waiter a)
  76 | normalize QEnd q2 =
  77 |   q2
  78 | normalize q1   q2 =
  79 |   appendAll q1 q2
  80 |
  81 | ||| Dequeue first live waiter.
  82 | dequeueLive :  Queue (Waiter a)
  83 |             -> SortedSet Nat
  84 |             -> (Maybe (Waiter a), Queue (Waiter a), SortedSet Nat)
  85 | dequeueLive QEnd                              cancelled =
  86 |   (Nothing, QEnd, cancelled)
  87 | dequeueLive (QNode w@(MkWaiter id wake) rest) cancelled =
  88 |   case contains id cancelled of
  89 |     True  =>
  90 |       -- cancelled waiter
  91 |       -- consume tombstone and continue
  92 |       dequeueLive rest (delete id cancelled)
  93 |     False =>
  94 |       -- live waiter
  95 |       (Just w, rest, cancelled)
  96 |
  97 | ||| Stripe-level dequeue.
  98 | dequeueStripe :  Stripe a
  99 |               -> (Maybe (Waiter a), Stripe a)
 100 | dequeueStripe (MkStripe available cache queue queuer nextid cancelled) =
 101 |   let fullq                  = normalize queue queuer
 102 |       (mw, rest, cancelled') = dequeueLive fullq cancelled
 103 |     in case mw of
 104 |          Nothing =>
 105 |            (Nothing, MkStripe available cache QEnd QEnd nextid cancelled')
 106 |          Just w  =>
 107 |            (Just w, MkStripe available cache rest QEnd nextid cancelled')
 108 |
 109 | ||| Check to see if entry is stale.
 110 | isStale :  Clock Duration
 111 |         -> IClock CLOCK_MONOTONIC
 112 |         -> Entry a
 113 |         -> Bool
 114 | isStale ttl now (MkEntry _ lastused) =
 115 |   timeDifference now lastused > ttl
 116 |
 117 | ||| Execute `Stripe a` effects after CAS commit.
 118 | |||
 119 | ||| This is the only place IO is performed for Stripe transitions.
 120 | |||
 121 | ||| Guarantees:
 122 | ||| - Effects are executed exactly once (only after successful CAS).
 123 | ||| - Ordering is preserved.
 124 | ||| - No effects are run on CAS retry.
 125 | |||
 126 | export
 127 | runEffects :  Stripe1 World a
 128 |            -> List (StripeEffect a)
 129 |            -> F1' World
 130 | runEffects mstripe effects t =
 131 |   traverse1_ (runEffect mstripe) effects t
 132 |   where
 133 |     grabTime : Elin World [Errno] (IClock CLOCK_MONOTONIC)
 134 |     grabTime = getTime CLOCK_MONOTONIC
 135 |     runEffect :  Stripe1 World a
 136 |               -> StripeEffect a
 137 |               -> F1' World
 138 |     runEffect _                     None                      t =
 139 |       () # t
 140 |     runEffect _                     (Wake ch val)             t =
 141 |       ioToF1 (channelPut ch val) t
 142 |     runEffect _                     (WakeMany pairs)          t =
 143 |       traverse1_ (\(ch,val) => ioToF1 (channelPut ch val))
 144 |                  pairs
 145 |                  t
 146 |     runEffect _                     (FreeMany free xs)        t =
 147 |       traverse1_ (\x => ioToF1 (free x)) xs t
 148 |     runEffect (MkStripe1 striperef) (InsertWithTimestamp val) t =
 149 |       let now # t := ioToF1 (runElinIO grabTime) t
 150 |         in case now of
 151 |              Left err   =>
 152 |                (assert_total $ idris_crash "Data.Pool.runEffects.runEffect: \{show err}") # t
 153 |              Right now' =>
 154 |                let entry := MkEntry val now'
 155 |                  in casupdate1 striperef (\(MkStripe available cache queue queuer nextid cancelled) =>
 156 |                                             (MkStripe available (entry :: cache) queue queuer nextid cancelled, ())
 157 |                                          ) t
 158 |
 159 | ||| Atomically apply a `Stripe a` transition and execute its effects.
 160 | |||
 161 | ||| This is the central concurrency primitive of the `Stripe a` model.
 162 | |||
 163 | ||| Behavior:
 164 | ||| - Applies a pure state transition (`Stripe -> StripeStep`) under CAS.
 165 | ||| - Retries automatically on contention using `casupdate1`.
 166 | ||| - Extracts effects only from the successful committed transition.
 167 | ||| - Executes effects exactly once after CAS succeeds.
 168 | |||
 169 | ||| Guarantees:
 170 | ||| - Linearizability, such that the `Stripe a` transition appears atomic.
 171 | ||| - No duplicated effects (retries do not leak effects).
 172 | ||| - No IO occurs during CAS evaluation.
 173 | ||| - Effects are executed strictly after commit.
 174 | |||
 175 | ||| Design Notes:
 176 | ||| - `stepfn` must be pure (no IO, no external mutation).
 177 | ||| - All side effects must be encoded in `StripeEffect a`.
 178 | ||| - This function is the only place where `Stripe a` transitions are committed.
 179 | |||
 180 | export
 181 | casWithEffects :  Stripe1 World a
 182 |                -> (Stripe a -> StripeStep a)
 183 |                -> F1' World
 184 | casWithEffects (MkStripe1 striperef) stepfn t =
 185 |   let effects # t := casupdate1 striperef (\stripe =>
 186 |                                             let (MkStripeStep stripe' stripeeffects) = stepfn stripe
 187 |                                               in (stripe', stripeeffects)
 188 |                                           ) t
 189 |     in runEffects (MkStripe1 striperef) effects t
 190 |
 191 | --------------------------------------------------------------------------------
 192 | --          Configuration
 193 | --------------------------------------------------------------------------------
 194 |
 195 | ||| Set the number of stripes in the `PoolConfig a`.
 196 | export
 197 | setNumStripes :  (pc : PoolConfig a)
 198 |               -> (n ** (LTE 1 n, LTE n (fst (poolmaxresources pc))))
 199 |               -> PoolConfig a
 200 | setNumStripes (MkPoolConfig create free cachettl (maxres ** prfmaxres_ pclabel) numstripes =
 201 |   MkPoolConfig create
 202 |                free
 203 |                cachettl
 204 |                (maxres ** prfmaxres)
 205 |                numstripes
 206 |                pclabel
 207 |
 208 | ||| Assign a label to the `PoolConfig a`.
 209 | export
 210 | setPoolLabel :  String
 211 |              -> PoolConfig a
 212 |              -> PoolConfig a
 213 | setPoolLabel label pc =
 214 |   { poolconfiglabel := label } pc
 215 |
 216 | --------------------------------------------------------------------------------
 217 | --          Resource Management
 218 | --------------------------------------------------------------------------------
 219 |
 220 | ||| Create a new striped resource pool.
 221 | |||
 222 | ||| Behavior:
 223 | ||| - Allocates exactly `mstripes` independent `Stripe`s.
 224 | ||| - Distributes the total capacity (`poolmaxresources`) across stripes as evenly as possible:
 225 | |||  - Each stripe receives either `base` or `base + 1` capacity.
 226 | |||  - The first `rest` stripes receive the extra unit.
 227 | ||| - Initializes each stripe with:
 228 | |||  - `available = assigned capacity`
 229 | |||  - empty cache
 230 | |||  - empty waiter queues
 231 | |||  - fresh waiter id supply
 232 | ||| - Constructs a `LocalPool1` for each stripe and stores them in a mutable array.
 233 | |||
 234 | ||| Resource Distribution:
 235 | ||| - Let:
 236 | |||  - `base = div maxres mstripes`
 237 | |||  - `rest = mod maxres mstripes`
 238 | ||| - Then:
 239 | |||  - Total capacity is preserved: sum(stripes) = maxres
 240 | |||  - Load is balanced with minimal skew (difference ≤ 1).
 241 | |||
 242 | ||| Concurrency Model:
 243 | ||| - Each stripe is independent and owns its own:
 244 | |||   - resource cache
 245 | |||   - waiter queues
 246 | |||   - capacity accounting
 247 | ||| - Threads interact with exactly one stripe at a time (via `getLocalPool`).
 248 | ||| - This minimizes contention and improves scalability.
 249 | |||
 250 | ||| Cleanup Model:
 251 | ||| - No global collector thread is created.
 252 | ||| - Resource cleanup is performed opportunistically via `cleanStripeIfNeeded`.
 253 | ||| - This ensures:
 254 | |||   - No background threads.
 255 | |||   - Cleanup proportional to usage.
 256 | |||   - Deterministic behavior (no GC reliance).
 257 | |||
 258 | ||| Guarantees:
 259 | ||| - Total capacity never exceeds `poolmaxresources`.
 260 | ||| - Each stripe starts empty but with full creation capacity.
 261 | ||| - No IO occurs during stripe initialization except allocation of refs.
 262 | ||| - Array is fully initialized before being returned.
 263 | |||
 264 | ||| Failure Conditions:
 265 | ||| - Crashes if:
 266 | |||   - An impossible index is encountered during initialization (should be unreachable).
 267 | |||   - A `Nat -> Fin` conversion fails (indicates internal inconsistency).
 268 | |||
 269 | ||| Notes:
 270 | ||| - `numstripes` is explicit, avoiding runtime dependency on capabilities.
 271 | ||| - The caller is responsible for eventual cleanup via `destroyAllResources`.
 272 | ||| - This function performs no resource creation; resources are created lazily on demand.
 273 | |||
 274 | ||| Invariants Established:
 275 | ||| - Each `LocalPool1` corresponds to exactly one stripe.
 276 | ||| - Stripe state is valid and consistent for CAS-based transitions.
 277 | ||| - Waiter queues and cancellation queues start empty.
 278 | |||
 279 | export
 280 | newPool :  (numstripes : Nat)
 281 |         -> PoolConfig a
 282 |         -> F1 World (Pool1 World numstripes a)
 283 | newPool numstripes pc@(MkPoolConfig create free cachettl (maxres ** prfmaxres_ pclabel) t =
 284 |   let striperesources     := let base = div maxres numstripes
 285 |                                  rest = mod maxres numstripes
 286 |                                in zip (range Z numstripes)
 287 |                                       (distribute base rest numstripes)
 288 |       pools           # t := unsafeMArray1 numstripes t
 289 |       ()              # t := saturateLocalPools 0 numstripes striperesources pools t
 290 |     in MkPool1 pc pools # t 
 291 |   where
 292 |     range :  Nat
 293 |           -> Nat
 294 |           -> List Nat
 295 |     range start Z     =
 296 |       []
 297 |     range start (S k) =
 298 |       start :: range (S start) k
 299 |     distribute :  Nat
 300 |                -> Nat
 301 |                -> Nat
 302 |                -> List Nat
 303 |     distribute base rest Z      =
 304 |       []
 305 |     distribute base Z     (S k) =
 306 |       base :: distribute base Z k
 307 |     distribute base (S r) (S k) =
 308 |       (S base) :: distribute base r k
 309 |     saturateLocalPools :  (o, x : Nat)
 310 |                        -> {auto v : Ix x numstripes}
 311 |                        -> {auto 0 prf : LTE o $ ixToNat v}
 312 |                        -> (resources : List (Nat, Nat))
 313 |                        -> (arr : MArray World numstripes (LocalPool1 World a))
 314 |                        -> F1' World
 315 |     saturateLocalPools o Z     _         _   t =
 316 |       () # t
 317 |     saturateLocalPools o (S j) resources arr t =
 318 |       case lookup j resources of
 319 |         Nothing       =>
 320 |           (assert_total $ idris_crash "Data.Pool.newPool: impossible index") # t
 321 |         Just resource =>
 322 |           let striperef  # t := ref1 ( MkStripe resource
 323 |                                                 []
 324 |                                                 QEnd
 325 |                                                 QEnd
 326 |                                                 0
 327 |                                                 empty
 328 |                                      ) t
 329 |               striperef1     := MkStripe1 striperef
 330 |               localpool      := MkLocalPool1 j striperef1
 331 |             in case tryNatToFin j of
 332 |                  Nothing =>
 333 |                    (assert_total $ idris_crash "Data.Pool.newPool.saturatePools: couldn't convert Nat to Fin") # t
 334 |                  Just j' =>
 335 |                    let () # t := set arr j' localpool t
 336 |                      in saturateLocalPools o j resources arr t
 337 |
 338 | ||| Select a `LocalPool1 World a` for the current thread.
 339 | |||
 340 | ||| This function deterministically maps the calling thread to one of the
 341 | ||| available stripes using a modulo-based hashing scheme.
 342 | |||
 343 | ||| Behavior:
 344 | ||| - Computes a stripe index `sid`:
 345 | |||  - If `n == 1`, always selects index `0` (fast path).
 346 | |||  - Otherwise:
 347 | |||   - Retrieves the current thread id (`getThreadId`).
 348 | |||   - Maps it into `[0, n)` via modulo arithmetic.
 349 | ||| - Converts the resulting index into a `Fin n`.
 350 | ||| - Returns the corresponding `LocalPool1` from the array.
 351 | |||
 352 | ||| Thread-to-Stripe Mapping:
 353 | ||| - Mapping is stable for a given thread id.
 354 | ||| - Different threads are distributed across stripes.
 355 | ||| - Collisions are possible but minimized under uniform thread ids.
 356 | |||
 357 | ||| Concurrency Implications:
 358 | ||| - Each thread interacts primarily with a single stripe.
 359 | ||| - Reduces contention compared to a single global pool.
 360 | ||| - Enables scalable parallel access under CAS-based updates.
 361 | |||
 362 | ||| Arithmetic Details:
 363 | ||| - Uses a custom `remInt` implementation to ensure:
 364 | |||  - Correct behavior for negative thread ids (if any).
 365 | |||  - Avoidance of undefined behavior from division/modulo edge cases.
 366 | ||| - Conversion pipeline:
 367 | |||  - Int (thread id)
 368 | |||   - modulo n
 369 | |||   - Nat
 370 | |||   - Fin n
 371 | |||
 372 | ||| Guarantees:
 373 | ||| - Always returns a valid `LocalPool1` when invariants hold.
 374 | ||| - No mutation of pool state occurs.
 375 | ||| - No blocking or waiting.
 376 | |||
 377 | ||| Failure Conditions:
 378 | ||| - Crashes if:
 379 | |||   - Conversion from `Nat` to `Fin n` fails (should be impossible if modulo is correct).
 380 | |||   - Division by zero is attempted (guarded by invariant `n >= 1`).
 381 | |||
 382 | ||| Performance:
 383 | ||| - O(1) selection.
 384 | ||| - Minimal overhead in the `n == 1` case (no IO, no modulo).
 385 | ||| - Single IO call (`getThreadId`) in the general case.
 386 | |||
 387 | ||| Design Notes:
 388 | ||| - This function is intentionally simple and deterministic.
 389 | ||| - It avoids randomness or hashing to keep behavior predictable.
 390 | ||| - Stripe selection is orthogonal to resource availability:
 391 | |||  - Load balancing is achieved probabilistically via thread distribution.
 392 | |||
 393 | ||| Invariants:
 394 | ||| - `n >= 1` (guaranteed by `PoolConfig` construction).
 395 | ||| - `pools` contains exactly `n` initialized entries.
 396 | ||| - Each index in `[0, n)` maps to a valid `LocalPool1`.
 397 | |||
 398 | ||| Relationship to the system:
 399 | ||| - This is the entry point for all pool operations:
 400 | |||  - `takeResource`
 401 | |||  - `tryTakeResource`
 402 | |||  - `putResource`
 403 | ||| - It determines which stripe's CAS state machine is used.
 404 | |||
 405 | private
 406 | getLocalPool :  {n : Nat}
 407 |              -> MArray World n (LocalPool1 World a)
 408 |              -> F1 World (LocalPool1 World a)
 409 | getLocalPool pools t =
 410 |   case n == 1 of
 411 |     True  =>
 412 |       let sid  := 0
 413 |           sid' := remInt sid (cast {to=Int} n)
 414 |         in case tryNatToFin (cast {to=Nat} sid') of
 415 |              Nothing    =>
 416 |                (assert_total $ idris_crash "Data.Pool.getLocalPool: couldn't convert Nat to Fin") # t
 417 |              Just sid'' =>
 418 |                get pools sid'' t
 419 |     False =>
 420 |       let sid # t := ioToF1 getThreadId t
 421 |           sid'    := remInt sid (cast {to=Int} n)
 422 |         in case tryNatToFin (cast {to=Nat} sid') of
 423 |              Nothing    =>
 424 |                (assert_total $ idris_crash "Data.Pool.getLocalPool: couldn't convert Nat to Fin") # t
 425 |              Just sid'' =>
 426 |                get pools sid'' t
 427 |   where
 428 |     signumInt :  Int
 429 |               -> Int
 430 |     signumInt x =
 431 |       case x > 0 of
 432 |         True  =>
 433 |           1
 434 |         False =>
 435 |           case x < 0 of
 436 |             True  =>
 437 |               -1
 438 |             False =>
 439 |               0      
 440 |     quotInt :  Int
 441 |             -> Int
 442 |             -> Int
 443 |     quotInt x y =
 444 |       let q = x `div` y
 445 |           r = x `mod` y
 446 |         in case (r /= 0) && (signumInt x /= signumInt y) of
 447 |              True  =>
 448 |                q + 1
 449 |              False =>
 450 |                q
 451 |     remInt :  Int
 452 |            -> Int
 453 |            -> Int
 454 |     remInt x y =
 455 |       case y == 0 of
 456 |         True  =>
 457 |           assert_total $ idris_crash "division by zero"
 458 |         False =>
 459 |           x - (quotInt x y) * y
 460 |
 461 | ||| Deliver a value to a `Stripe a` state.
 462 | |||
 463 | ||| This function:
 464 | ||| - Updates Stripe state
 465 | ||| - Emits wake effects
 466 | |||
 467 | ||| Invariants:
 468 | ||| - Each wake corresponds to a committed state transition.
 469 | ||| - Queue ordering is preserved.
 470 | ||| - No side effects occur during evaluation.
 471 | |||
 472 | export
 473 | signal :  Stripe a
 474 |        -> WakeResult a
 475 |        -> StripeStep a
 476 | signal stripe@(MkStripe available cache queue queuer nextid cancelled) result =
 477 |   let (mw, MkStripe available' cache' queue' queuer' nextid' cancelled') = dequeueStripe stripe
 478 |     in case mw of
 479 |          -- no waiting thread
 480 |          Nothing =>
 481 |            case result of
 482 |              Deliver val            =>
 483 |                MkStripeStep (MkStripe (S available') cache' queue' queuer' nextid' cancelled')
 484 |                             [InsertWithTimestamp val]
 485 |              Create                 =>
 486 |                MkStripeStep (MkStripe available' cache' queue' queuer' nextid' cancelled')
 487 |                             [None]
 488 |              Cancelled              =>
 489 |                MkStripeStep (MkStripe available' cache' queue' queuer' nextid' cancelled')
 490 |                             [None]
 491 |          Just (MkWaiter _ wake) =>
 492 |            MkStripeStep (MkStripe available' cache' queue' queuer' nextid' cancelled')
 493 |                         [Wake wake result]
 494 |
 495 | ||| Block until a resource is delivered to this waiter.
 496 | |||
 497 | ||| Behavior:
 498 | ||| - Waits on the provided `Channel (Maybe a)` for a wakeup signal.
 499 | ||| - Returns:
 500 | |||  - `Just a` if a resource is successfully delivered.
 501 | |||  - `Nothing` if the waiter is cancelled or destroyed.
 502 | |||
 503 | ||| Cancellation:
 504 | ||| - If the waiting thread is aborted, the `cleanup` handler is invoked.
 505 | ||| - This atomically marks the waiter as cancelled by inserting its `wid` into the Stripe's `cancelled` queue.
 506 | ||| - Cancellation is lazy, cancelled waiters are skipped during dequeue.
 507 | |||
 508 | ||| Guarantees:
 509 | ||| - No busy waiting, the thread blocks on a channel.
 510 | ||| - No lost wakeups, every successful `signal` results in exactly one `channelPut` to a live waiter.
 511 | ||| - Safe under races:
 512 | |||  - If cancellation happens before wake, waiter is skipped later.
 513 | |||  -  If wake happens before cancellation, value is delivered.
 514 | ||| - Exactly-once semantics:
 515 | |||  - Each waiter receives at most one wakeup.
 516 | |||  - Each wakeup corresponds to a committed Stripe transition.
 517 | |||
 518 | ||| Design Notes:
 519 | ||| - This function performs no direct Stripe mutation except in `cleanup`.
 520 | ||| - All coordination with producers happens via `signal` + `StripeEffect`.
 521 | ||| - The `Channel (Maybe a)` encodes both success (`Just`) and termination (`Nothing`).
 522 | |||
 523 | ||| Invariants:
 524 | ||| - `wid` must be the same identifier used when enqueuing the waiter.
 525 | ||| - The channel must be single-consumer and used exactly once.
 526 | ||| - Stripe state remains the single source of truth for cancellation.
 527 | |||
 528 | export
 529 | waitForResource :  Stripe1 World a
 530 |                 -> Nat               -- waiter id
 531 |                 -> Channel (WakeResult a) -- wake channel
 532 |                 -> F1 World (WakeResult a)
 533 | waitForResource mstripe wid wake t =
 534 |   let res # t := ioToF1 (runElinIO (waitForResource' mstripe wid wake)) t
 535 |     in case res of
 536 |          Right res' =>
 537 |            res' # t
 538 |          Left err   =>
 539 |            (assert_total $ idris_crash "Data.Pool.waitForResource: \{show err}") # t
 540 |   where
 541 |     cleanup :   Stripe1 World a
 542 |              -> Nat
 543 |              -> F1' World
 544 |     cleanup (MkStripe1 mstripe) wid t =
 545 |       casupdate1 mstripe (\(MkStripe available cache queue queuer nextid cancelled) =>
 546 |                            (MkStripe available cache queue queuer nextid (insert wid cancelled), ())
 547 |                          ) t
 548 |     waitForResource'' :  Channel (WakeResult a)
 549 |                       -> F1 World (WakeResult a)
 550 |     waitForResource'' wake t =
 551 |       ioToF1 (channelGet wake) t
 552 |     waitForResource' :  MCancel (Elin World)
 553 |                      => Stripe1 World a
 554 |                      -> Nat
 555 |                      -> Channel (WakeResult a)
 556 |                      -> Elin World [Errno] (WakeResult a)
 557 |     waitForResource' mstripe wid wake =
 558 |       onAbort (runIO (waitForResource'' wake)) (runIO (cleanup mstripe wid))
 559 |
 560 | ||| Destroy a resource instead of returning it to the `Pool1 World a`.
 561 | |||
 562 | ||| Behavior:
 563 | ||| - If a waiter exists, they are woken with `Nothing`.
 564 | ||| - Otherwise, no state change occurs (resource is discarded).
 565 | |||
 566 | ||| Guarantees:
 567 | ||| - Waiters are not left blocked indefinitely.
 568 | ||| - No resource is reinserted into the cache.
 569 | |||
 570 | export
 571 | destroyResource :  Stripe1 World a
 572 |                 -> F1' World
 573 | destroyResource (MkStripe1 striperef) t =
 574 |   let res # t := ioToF1 (runElinIO (destroy (MkStripe1 striperef))) t
 575 |     in case res of
 576 |          Right _  =>
 577 |            () # t
 578 |          Left err =>
 579 |            (assert_total $ idris_crash "Data.Pool.destroyResource: \{show err}") # t
 580 |   where
 581 |     destroy' :  Stripe1 World a
 582 |              -> F1' World
 583 |     destroy' (MkStripe1 striperef) t =
 584 |       casWithEffects (MkStripe1 striperef) (\stripe => signal stripe Create) t
 585 |     destroy :  MCancel (Elin World)
 586 |             => Stripe1 World a
 587 |             -> Elin World [Errno] ()
 588 |     destroy (MkStripe1 striperef) =
 589 |       uncancelable $ \_ =>
 590 |         runIO (destroy' (MkStripe1 striperef))
 591 |
 592 | ||| Free resource entries in the stripe that satisfy a predicate.
 593 | |||
 594 | ||| Behavior:
 595 | ||| - Removes stale entries from cache atomically.
 596 | ||| - Emits a batched free effect.
 597 | ||| - Ensures no resource is freed twice or leaked.
 598 | |||
 599 | ||| Guarantees:
 600 | ||| - Removal is atomic with respect to Stripe.
 601 | ||| - Freeing happens after commit.
 602 | ||| - Safe under contention and retries.
 603 | |||
 604 | private
 605 | cleanStripe :  (Entry a -> Bool)
 606 |             -> (a -> IO ())
 607 |             -> Stripe1 World a
 608 |             -> F1' World
 609 | cleanStripe isstale free (MkStripe1 striperef) t =
 610 |   let res # t := ioToF1 (runElinIO (cleanStripe' (MkStripe1 striperef))) t
 611 |     in case res of
 612 |          Right _  =>
 613 |            () # t
 614 |          Left err =>
 615 |            (assert_total $ idris_crash "Data.Pool.cleanStripe: \{show err}") # t
 616 |   where
 617 |     step :  Stripe a
 618 |          -> StripeStep a
 619 |     step (MkStripe available cache queue queuer nextid cancelled) =
 620 |       let (stale, fresh) = partition isstale cache
 621 |           freedvals      = map (\(MkEntry v _) => v) stale
 622 |         in MkStripeStep
 623 |              (MkStripe available fresh queue queuer nextid cancelled)
 624 |              ( case freedvals of
 625 |                  [] =>
 626 |                    [None]
 627 |                  xs =>
 628 |                    [FreeMany free xs]
 629 |              )
 630 |     cleanStripe'' :  Stripe1 World a
 631 |                   -> F1' World
 632 |     cleanStripe'' (MkStripe1 striperef) t =
 633 |       casWithEffects (MkStripe1 striperef) step t
 634 |     cleanStripe' :  MCancel (Elin World)
 635 |                  => Stripe1 World a
 636 |                  -> Elin World [Errno] ()
 637 |     cleanStripe' (MkStripe1 striperef) =
 638 |       uncancelable $ \_ =>
 639 |         runIO (cleanStripe'' (MkStripe1 striperef))
 640 |
 641 | ||| Opportunistically clean stale resources from a `Stripe1 World a`.
 642 | |||
 643 | ||| This function performs stripe-local garbage collection of cached resources
 644 | ||| based on a time-to-live (TTL) policy. It replaces the need for a global
 645 | ||| collector thread by tying cleanup to normal pool activity.
 646 | |||
 647 | ||| Behavior:
 648 | ||| - Reads the current monotonic time.
 649 | ||| - Constructs a staleness predicate using the provided TTL.
 650 | ||| - Invokes `cleanStripe` to:
 651 | |||   - Remove stale entries from the cache.
 652 | |||   - Emit `FreeMany` effects for the removed resources.
 653 | ||| - Effects are executed only after the CAS commit inside `cleanStripe`.
 654 | |||
 655 | ||| Staleness:
 656 | ||| - A resource is considered stale if:
 657 | |||  - now - lastUsed > ttl
 658 | ||| - Time is measured using `CLOCK_MONOTONIC`, ensuring:
 659 | |||  - No sensitivity to wall-clock changes.
 660 | |||  - Stable elapsed-time semantics.
 661 | |||
 662 | ||| Concurrency Model:
 663 | ||| - Cleanup is performed via `cleanStripe`, which uses CAS:
 664 | |||  - Stripe state updates are atomic.
 665 | |||  - Effects are executed exactly once after commit.
 666 | ||| - Safe under contention:
 667 | |||  - Multiple threads may attempt cleanup concurrently.
 668 | |||  - Only one successful CAS applies each transition.
 669 | |||  - No resource is freed more than once.
 670 | |||
 671 | ||| Execution Model:
 672 | ||| - This function performs IO (time retrieval) outside CAS.
 673 | ||| - The actual mutation and freeing are deferred via `StripeEffect`.
 674 | ||| - No IO occurs during CAS evaluation.
 675 | |||
 676 | ||| Usage:
 677 | ||| - Intended to be called opportunistically during:
 678 | |||  - `takeResource`
 679 | |||  - `putResource`
 680 | |||  - `tryTakeResource`
 681 | ||| - Provides amortized cleanup without background threads.
 682 | |||
 683 | ||| Guarantees:
 684 | ||| - Stale resources are eventually freed under continued usage.
 685 | ||| - No interference with active resources or waiters.
 686 | ||| - No blocking or waiting is introduced.
 687 | |||
 688 | ||| Tradeoffs:
 689 | ||| - Cleanup is activity-driven rather than time-driven.
 690 | ||| - Idle stripes may retain stale resources longer.
 691 | ||| - In exchange:
 692 | |||  - No global thread.
 693 | |||  - Lower runtime overhead.
 694 | |||  - Fully local behavior.
 695 | |||
 696 | ||| Failure Handling:
 697 | ||| - Crashes if time retrieval fails (consistent with module error policy).
 698 | |||
 699 | ||| Invariants:
 700 | ||| - Only cached resources are considered for cleanup.
 701 | ||| - Each freed resource is removed exactly once.
 702 | ||| - Stripe structure remains consistent after cleanup.
 703 | |||
 704 | private
 705 | cleanStripeIfNeeded :  (ttl : Clock Duration)
 706 |                     -> (free : a -> IO ())
 707 |                     -> Stripe1 World a
 708 |                     -> F1' World
 709 | cleanStripeIfNeeded ttl free (MkStripe1 striperef) t =
 710 |   let now # t := ioToF1 (runElinIO grabTime) t
 711 |     in case now of
 712 |          Left err   =>
 713 |            (assert_total $ idris_crash "Data.Pool.cleanStripeIfNeeded: \{show err}") # t
 714 |          Right now' =>
 715 |            cleanStripe (isStale ttl now') free (MkStripe1 striperef) t
 716 |   where    
 717 |     grabTime : Elin World [Errno] (IClock CLOCK_MONOTONIC)
 718 |     grabTime = getTime CLOCK_MONOTONIC
 719 |
 720 | ||| Return a resource to the `Pool1 World n a`.
 721 | |||
 722 | ||| Behavior:
 723 | ||| - If a waiter exists, the resource is delivered directly.
 724 | ||| - Otherwise, it is inserted into the cache with a timestamp.
 725 | |||
 726 | ||| Guarantees:
 727 | ||| - No resource is lost.
 728 | ||| - Wakeups are ordered and deterministic.
 729 | |||
 730 | export
 731 | putResource :  Pool1 World n a
 732 |             -> Stripe1 World a
 733 |             -> a
 734 |             -> F1' World
 735 | putResource (MkPool1 (MkPoolConfig _ free ttl _ _ _) _) (MkStripe1 striperef) val t =
 736 |   let () # t := cleanStripeIfNeeded ttl free (MkStripe1 striperef) t
 737 |     in casWithEffects (MkStripe1 striperef) (\stripe => signal stripe (Deliver val)) t
 738 |
 739 | ||| Destroy all resources in all stripes in the `Pool1 World n a`.
 740 | |||
 741 | ||| Behavior:
 742 | ||| - Removes all cached resources from every stripe.
 743 | ||| - Frees them via the provided `freeresource` function.
 744 | ||| - Leaves wait queues untouched.
 745 | |||
 746 | ||| Guarantees:
 747 | ||| - Each resource is freed exactly once.
 748 | ||| - No IO occurs during Stripe state transitions.
 749 | ||| - Safe under contention (uses CAS + effect model).
 750 | |||
 751 | ||| Notes:
 752 | ||| - This only affects cached (idle) resources.
 753 | ||| - Resources currently checked out are NOT affected.
 754 | |||
 755 | export
 756 | destroyAllResources :  {n : Nat}
 757 |                     -> Pool1 World n a
 758 |                     -> MArray World n (LocalPool1 World a)
 759 |                     -> F1' World
 760 | destroyAllResources (MkPool1 (MkPoolConfig _ freeresource _ _ _ _) _) localpools t =
 761 |   go 0 n localpools t
 762 |   where
 763 |     go :  (o, x : Nat)
 764 |        -> {auto v : Ix x n}
 765 |        -> {auto 0 prf : LTE o $ ixToNat v}
 766 |        -> (arr : MArray World n (LocalPool1 World a))
 767 |        -> F1' World
 768 |     go o Z     _   t =
 769 |       () # t
 770 |     go o (S j) arr t =
 771 |       let MkLocalPool1 _ stripe1 # t := getIx arr j t
 772 |           ()                     # t := cleanStripe (const True) freeresource stripe1 t
 773 |         in go o j arr t
 774 |
 775 | ||| Restore one unit of available capacity in the `Stripe a`.
 776 | |||
 777 | ||| Behavior:
 778 | ||| - Increments `available` by 1.
 779 | ||| - Does not modify cache or queue.
 780 | ||| - Emits no effects.
 781 | |||
 782 | ||| Used when resource creation fails after capacity was reserved.
 783 | |||
 784 | ||| Guarantees:
 785 | ||| - Atomic under CAS.
 786 | ||| - No IO performed.
 787 | ||| - Safe under contention.
 788 | |||
 789 | export
 790 | restoreSize :  Stripe1 World a
 791 |             -> F1' World
 792 | restoreSize (MkStripe1 striperef) t =
 793 |   casWithEffects (MkStripe1 striperef) step t
 794 |   where
 795 |     step :  Stripe a
 796 |          -> StripeStep a
 797 |     step (MkStripe available cache queue queuer nextid cancelled) =
 798 |       MkStripeStep
 799 |         (MkStripe (S available) cache queue queuer nextid cancelled)
 800 |         [None]
 801 |
 802 | ||| Acquire a resource from the `Pool1 World n a`.
 803 | |||
 804 | ||| Behavior:
 805 | ||| - Attempts to take a resource from the local stripe.
 806 | ||| - Uses a single CAS step to atomically choose between:
 807 | |||  - Reusing a cached resource.
 808 | |||  - Reserving capacity for new resource creation.
 809 | |||  - Enqueuing as a waiter when fully exhausted.
 810 | |||
 811 | ||| Fast Path (Cache Reuse):
 812 | ||| - If a cached resource exists:
 813 | |||  - Remove it from the cache.
 814 | |||  - Return it immediately.
 815 | ||| - `available` is not modified.
 816 | ||| - The resource already exists and therefore does not consume creation capacity.
 817 | |||
 818 | ||| Creation Path:
 819 | ||| - If the cache is empty but `available > 0`:
 820 | |||  - Atomically decrement `available`.
 821 | |||  - Reserve one unit of creation capacity.
 822 | |||  - Create a fresh resource outside the CAS section.
 823 | |||
 824 | ||| Wait Path:
 825 | ||| - If:
 826 | |||  - cache is empty.
 827 | |||  - and `available == 0`.
 828 | ||| - Then:
 829 | |||  - enqueue a `Waiter`.
 830 | |||  - block on `waitForResource`.
 831 | ||| - The waiter is eventually:
 832 | |||  - woken with `Just a` when a resource is returned.
 833 | |||  - or `Nothing` when capacity is restored.
 834 | |||
 835 | ||| Capacity Semantics:
 836 | ||| - `available` represents remaining creation budget.
 837 | ||| - It is decremented ONLY when creating a brand-new resource.
 838 | ||| - It is restored when:
 839 | |||  - resource creation aborts.
 840 | |||  - resources are destroyed.
 841 | |||
 842 | ||| Concurrency Guarantees:
 843 | ||| - Decision logic is atomic via CAS.
 844 | ||| - No IO occurs during CAS evaluation.
 845 | ||| - Effects execute exactly once after successful commit.
 846 | ||| - Waiters are served FIFO (excluding cancelled waiters).
 847 | ||| - No lost wakeups.
 848 | |||
 849 | ||| Invariants:
 850 | ||| - Total live resources never exceeds stripe capacity.
 851 | ||| - Cached resources are timestamped before insertion.
 852 | ||| - Waiters exist only inside Stripe state.
 853 | |||
 854 | export
 855 | takeResource :  {n : Nat}
 856 |              -> Pool1 World n a
 857 |              -> F1 World (a, LocalPool1 World a)
 858 | takeResource pool@(MkPool1 poolconfig@(MkPoolConfig _ free ttl _ _ _) localpools) t =
 859 |   let lp@(MkLocalPool1 _ stripe1@(MkStripe1 striperef)) # t := getLocalPool localpools t
 860 |       -- clean stripe if needed
 861 |       ()                                                # t := cleanStripeIfNeeded ttl free (MkStripe1 striperef) t
 862 |       -- pre-allocate channel for slow path
 863 |       wake                                              # t := ioToF1 makeChannel t
 864 |       res                                                   : (List (StripeEffect a), Either a (Either () (Nat, Channel (WakeResult a))))
 865 |       res@(effects, res')                               # t :=
 866 |         casupdate1 striperef (\(MkStripe available cache queue queuer nextid cancelled) =>
 867 |                                 case cache of
 868 |                                   -- fast path
 869 |                                   MkEntry v _ :: rest =>
 870 |                                     let none : List (StripeEffect a)
 871 |                                         none    = [None]
 872 |                                         stripe' = MkStripe available
 873 |                                                            rest
 874 |                                                            queue
 875 |                                                            queuer
 876 |                                                            nextid
 877 |                                                            cancelled
 878 |                                         result : Either a (Either () (Nat, Channel (WakeResult a)))
 879 |                                         result = Left v
 880 |                                       in ( stripe'
 881 |                                          , (none, result)
 882 |                                          )
 883 |                                   -- slow path
 884 |                                   []                  =>
 885 |                                     case available == 0 of
 886 |                                       True  =>
 887 |                                         -- enqueue waiter
 888 |                                         let none : List (StripeEffect a)
 889 |                                             none    = [None]
 890 |                                             wid     = nextid
 891 |                                             waiter : Waiter a
 892 |                                             waiter  = MkWaiter wid wake
 893 |                                             stripe' = MkStripe available
 894 |                                                                cache
 895 |                                                                queue
 896 |                                                                (appendQ queuer waiter)
 897 |                                                                (S nextid)
 898 |                                                                cancelled
 899 |                                             result : Either a (Either () (Nat, Channel (WakeResult a)))
 900 |                                             result = Right (Right (wid, wake))
 901 |                                           in ( stripe'
 902 |                                              , (none, result)
 903 |                                              )
 904 |                                       False =>
 905 |                                         -- resource creation slot
 906 |                                         let none : List (StripeEffect a)
 907 |                                             none    = [None]
 908 |                                             stripe' = MkStripe (minus available 1)
 909 |                                                                cache
 910 |                                                                queue
 911 |                                                                queuer
 912 |                                                                nextid
 913 |                                                                cancelled
 914 |                                             result : Either a (Either () (Nat, Channel (WakeResult a)))
 915 |                                             result = Right (Left ())
 916 |                                           in ( stripe'
 917 |                                              , (none, result)
 918 |                                              )
 919 |                                         
 920 |                              ) t
 921 |       -- Run effects after commit
 922 |       ()                                                # t := runEffects stripe1 effects t
 923 |     in case res' of
 924 |          -- fast path
 925 |          Left v                    =>
 926 |            (v, lp) # t
 927 |          -- create immediately
 928 |          Right (Left ())           =>
 929 |            let res # t := ioToF1 (runElinIO (createWithCleanup poolconfig stripe1)) t
 930 |              in case res of
 931 |                   Right v =>
 932 |                     (v, lp) # t
 933 |                   Left err =>
 934 |                     (assert_total $ idris_crash "Data.Pool.takeResource: \{show err}") # t           
 935 |          Right (Right (wid, wake)) =>
 936 |            let wakeresult # t := waitForResource stripe1 wid wake t
 937 |              in case wakeresult of
 938 |                   -- woken with resource
 939 |                   Deliver v =>
 940 |                     (v, lp) # t
 941 |                   -- need to create
 942 |                   Create    =>
 943 |                     let res # t := ioToF1 (runElinIO (createWithCleanup poolconfig stripe1)) t
 944 |                       in case res of
 945 |                            Right v =>
 946 |                              (v, lp) # t
 947 |                            Left err =>
 948 |                              (assert_total $ idris_crash "Data.Pool.takeResource: \{show err}") # t
 949 |                   Cancelled =>
 950 |                     (assert_total $ idris_crash "Data.Pool.takeResource: impossible") # t
 951 |   where
 952 |     createWithCleanup :  PoolConfig a
 953 |                       -> Stripe1 World a
 954 |                       -> Elin World [Errno] a
 955 |     createWithCleanup (MkPoolConfig createResource _ _ _ _ _) stripe =
 956 |       onAbort (liftIO createResource) (runIO (restoreSize stripe))
 957 |
 958 | ||| Safely acquire and use a resource from the pool.
 959 | |||
 960 | ||| This is the primary high-level interface for working with `Pool1`.
 961 | ||| It ensures that resources are correctly returned or destroyed,
 962 | ||| even in the presence of exceptions or cancellation.
 963 | |||
 964 | ||| Behavior:
 965 | ||| - Acquires a resource using `takeResource`.
 966 | ||| - Executes the user action `f` with that resource.
 967 | ||| - On normal completion:
 968 | |||  - The resource is returned to the pool via `putResource`.
 969 | ||| - On exception or cancellation:
 970 | |||  - The resource is destroyed via `destroyResource`.
 971 | |||
 972 | ||| Concurrency & Masking:
 973 | ||| - The entire operation runs inside `uncancelable`, ensuring:
 974 | |||  - Resource acquisition and release cannot be interrupted.
 975 | ||| - The user action `f` is executed via `poll`, meaning:
 976 | |||  - It *can* be interrupted or cancelled.
 977 | ||| - If cancellation occurs during `f`, the cleanup handler runs.
 978 | |||
 979 | ||| Cleanup Guarantees:
 980 | ||| - Exactly one of the following happens:
 981 | |||  - `putResource` (success path)
 982 | |||  - `destroyResource` (failure or cancellation path)
 983 | ||| - No resource is leaked or returned twice.
 984 | ||| - Waiters are properly woken via Stripe effects.
 985 | |||
 986 | ||| Failure Handling:
 987 | ||| - Exceptions from `f` are propagated.
 988 | ||| - Exceptions during acquisition or cleanup cause a crash (consistent with the rest of the module’s error handling).
 989 | |||
 990 | ||| Returns:
 991 | ||| - The result of applying `f` to the acquired resource.
 992 | |||
 993 | ||| Invariants:
 994 | ||| - Resources are never duplicated or lost.
 995 | ||| - Pool state remains consistent under concurrency.
 996 | ||| - All Stripe effects are executed after CAS commit.
 997 | |||
 998 | export
 999 | withResource :  {n : Nat}
1000 |              -> Pool1 World n a
1001 |              -> (a -> IO r)
1002 |              -> F1 World r
1003 | withResource pool f t =
1004 |   let res # t := ioToF1 (runElinIO (withResource' pool f)) t
1005 |     in case res of
1006 |          Right res' =>
1007 |            res' # t
1008 |          Left err   =>
1009 |            (assert_total $ idris_crash "Data.Pool.withResource: \{show err}") # t
1010 |   where
1011 |     withResource' :  {n : Nat}
1012 |                   -> MCancel (Elin World)
1013 |                   => Pool1 World n a
1014 |                   -> (a -> IO r)
1015 |                   -> Elin World [Errno] r
1016 |     withResource' pool f =
1017 |       uncancelable $ \poll => do
1018 |         (res, MkLocalPool1 _ (MkStripe1 striperef)) <- runIO (takeResource pool)
1019 |         res' <- onAbort (poll $ liftIO $ f res) (runIO (destroyResource (MkStripe1 striperef)))
1020 |         runIO (putResource pool (MkStripe1 striperef) res)
1021 |         pure res'
1022 |
1023 | ||| Attempt to take a resource without blocking.
1024 | |||
1025 | ||| Behavior:
1026 | ||| - Reads the local stripe and checks availability.
1027 | ||| - If no resources are available:
1028 | |||   - Returns `Nothing` immediately.
1029 | |||   - Does NOT enqueue a waiter.
1030 | |||   - Does NOT create a resource.
1031 | |||
1032 | ||| - If a resource is available:
1033 | |||   - Removes it atomically via CAS.
1034 | |||   - Returns `Just (resource, LocalPool1)`.
1035 | |||
1036 | ||| Guarantees:
1037 | ||| - Non-blocking: never waits on a channel.
1038 | ||| - No side effects inside CAS.
1039 | ||| - No waiter allocation.
1040 | ||| - Safe under contention via CAS retry.
1041 | |||
1042 | export
1043 | tryTakeResource :  {n : Nat}
1044 |                 -> Pool1 World n a
1045 |                 -> F1 World (Maybe (a, LocalPool1 World a))
1046 | tryTakeResource pool@(MkPool1 _ localpools) t =
1047 |   let res # t := ioToF1 (runElinIO (tryTakeResource' pool)) t
1048 |     in case res of
1049 |          Right res'  =>
1050 |            res' # t
1051 |          Left err    =>
1052 |            (assert_total $ idris_crash "Data.Pool.tryTakeResource: \{show err}") # t
1053 |   where
1054 |     tryTakeResource'' :  {n : Nat}
1055 |                       -> MCancel (Elin World)
1056 |                       => Pool1 World n a
1057 |                       -> F1 World (Maybe (a, LocalPool1 World a))
1058 |     tryTakeResource'' pool@(MkPool1 (MkPoolConfig _ free ttl _ _ _) _) t =
1059 |       let lp@(MkLocalPool1 _ stripe1@(MkStripe1 striperef)) # t := getLocalPool localpools t
1060 |           -- clean stripe if needed
1061 |           ()                                                # t := cleanStripeIfNeeded ttl free (MkStripe1 striperef) t
1062 |           -- attempt fast-path only
1063 |           res                                               # t :=
1064 |             casupdate1 striperef (\(MkStripe available cache queue queuer nextid cancelled) =>
1065 |                                     case (available == 0, cache) of
1066 |                                       -- no capacity, do nothing
1067 |                                       (True, _)                    =>
1068 |                                         ( MkStripe available cache queue queuer nextid cancelled
1069 |                                         , Nothing
1070 |                                         )
1071 |                                       -- cache hit, consume
1072 |                                       (False, MkEntry v _ :: rest) =>
1073 |                                         ( MkStripe available
1074 |                                                    rest
1075 |                                                    queue
1076 |                                                    queuer
1077 |                                                    nextid
1078 |                                                    cancelled
1079 |                                         , Just v
1080 |                                         )
1081 |                                       -- available > 0, but cache empty
1082 |                                       (False, [])                  =>
1083 |                                         ( MkStripe available cache queue queuer nextid cancelled
1084 |                                         , Nothing
1085 |                                         )
1086 |                                  ) t
1087 |         in case res of
1088 |              Nothing =>
1089 |                Nothing # t
1090 |              Just v  =>
1091 |                Just (v, lp) # t
1092 |     tryTakeResource' :  {n : Nat}
1093 |                      -> MCancel (Elin World)
1094 |                      => Pool1 World n a
1095 |                      -> Elin World [Errno] (Maybe (a, LocalPool1 World a))
1096 |     tryTakeResource' pool =
1097 |       uncancelable $ \_ =>
1098 |         runIO (tryTakeResource'' pool)
1099 |
1100 | ||| Attempt to acquire and use a resource from the pool without blocking.
1101 | |||
1102 | ||| Behavior:
1103 | ||| - Tries to take a resource immediately using `tryTakeResource`.
1104 | ||| - If no resource is available:
1105 | |||  - Returns `Nothing` without blocking or creating a resource.
1106 | ||| - If a resource is available:
1107 | |||  - Executes the provided function `f` with the resource.
1108 | |||  - Returns `Just result` on success.
1109 | |||
1110 | ||| Resource Handling:
1111 | ||| - The acquired resource is always returned to the pool via `putResource` after successful execution of `f`.
1112 | ||| - If an exception or cancellation occurs during `f`:
1113 | |||  - The resource is destroyed using `destroyResource` instead of being returned.
1114 | |||
1115 | ||| Cancellation Semantics:
1116 | ||| - The outer operation is `uncancelable`, ensuring:
1117 | |||  - No resource is leaked between acquisition and release.
1118 | ||| - The user function `f` is executed under `poll`, meaning:
1119 | |||  - It remains cancelable.
1120 | ||| - If cancellation occurs during `f`:
1121 | |||  - The resource is safely discarded.
1122 | |||  - The pool remains in a consistent state.
1123 | |||
1124 | ||| Concurrency Guarantees:
1125 | ||| - Does not block waiting for a resource.
1126 | ||| - Does not enqueue a waiter.
1127 | ||| - All Stripe transitions (`putResource`, `destroyResource`) are performed via `casWithEffects`, ensuring:
1128 | |||  - Atomic state updates.
1129 | |||  - No duplicated side effects.
1130 | |||  - Deterministic wake behavior.
1131 | |||
1132 | ||| Failure Handling:
1133 | ||| - Any exception from `f` is propagated.
1134 | ||| - Internal pool errors result in a crash with diagnostic information.
1135 | |||
1136 | ||| Returns:
1137 | ||| - `Nothing` if no resource was immediately available.
1138 | ||| - `Just r` if a resource was acquired and `f` completed successfully.
1139 | |||
1140 | ||| Notes:
1141 | ||| - This function is the non-blocking counterpart to `withResource`.
1142 | ||| - It is useful when callers prefer to fallback rather than wait.
1143 | |||
1144 | export
1145 | tryWithResource :  {n : Nat}
1146 |                 -> Pool1 World n a
1147 |                 -> (a -> IO r)
1148 |                 -> F1 World (Maybe r)
1149 | tryWithResource pool f t =
1150 |   let res # t := ioToF1 (runElinIO (tryWithResource' pool f)) t
1151 |     in case res of
1152 |          Right res' =>
1153 |            res' # t
1154 |          Left err   =>
1155 |            (assert_total $ idris_crash "Data.Pool.tryWithResource: \{show err}") # t
1156 |   where
1157 |     tryWithResource' :  {n : Nat}
1158 |                      -> MCancel (Elin World)
1159 |                      => Pool1 World n a
1160 |                      -> (a -> IO r)
1161 |                      -> Elin World [Errno] (Maybe r)
1162 |     tryWithResource' pool f =
1163 |       uncancelable $ \poll => do
1164 |         res <- runIO (tryTakeResource pool)
1165 |         case res of
1166 |           Nothing                                           =>
1167 |             pure Nothing
1168 |           Just (res', MkLocalPool1 _ (MkStripe1 striperef)) => do
1169 |             res'' <- onAbort (poll $ liftIO $ f res') (runIO (destroyResource (MkStripe1 striperef)))
1170 |             runIO (putResource pool (MkStripe1 striperef) res')
1171 |             pure $ Just res''
1172 |