0 | ||| Resource Pool Internals
  1 | module Data.Pool.Internal
  2 |
  3 | import Data.Array.Core
  4 | import Data.Linear.Ref1
  5 | import Data.Nat
  6 | import Data.So
  7 | import Data.SortedSet
  8 | import System.Concurrency
  9 | import System.Posix.Timer
 10 | import System.Posix.Timer.Prim
 11 |
 12 | %language ElabReflection
 13 |
 14 | %default total
 15 |
 16 | ||| Configuration of a Pool.
 17 | |||
 18 | ||| Constraints:
 19 | ||| - poolmaxresources -> The smallest acceptable value is 1.
 20 | ||| - poolnumstripes -> The smallest acceptable value is 1, poolnumstripes must not be larger than poolmaxresources.
 21 | |||
 22 | public export
 23 | record PoolConfig a where
 24 |   constructor MkPoolConfig
 25 |   createresource   : IO a
 26 |   freeresource     : a -> IO ()
 27 |   poolcachettl     : Clock Duration
 28 |   poolmaxresources : (maxres ** LTE 1 maxres)
 29 |   poolnumstripes   : (n ** (LTE 1 n, LTE n (fst poolmaxresources)))
 30 |   poolconfiglabel  : String
 31 |
 32 | ||| A simple (persistent) FIFO queue.
 33 | |||
 34 | ||| This is used to maintain an ordered collection of waiting threads.
 35 | ||| Elements are appended at the tail and removed from the head.
 36 | |||
 37 | ||| Notes:
 38 | ||| - This representation has O(n) append.
 39 | ||| - Under contention (with CAS updates), appends may be retried, so this structure favors simplicity over performance.
 40 | ||| - It is typically used together with a secondary "reversed" queue to amortize costs (two-list queue pattern).
 41 | |||
 42 | public export
 43 | data Queue a
 44 |   = QNode a (Queue a)
 45 |   | QEnd
 46 |
 47 | ||| Result of waking a waiting thread.
 48 | |||
 49 | ||| This represents the outcome delivered to a blocked waiter through its wake channel.
 50 | |||
 51 | ||| Variants:
 52 | ||| - `Deliver a`
 53 | |||  - A resource was directly handed off to the waiter.
 54 | |||
 55 | ||| - `Create`
 56 | |||  - No reusable resource was available, but the waiter should proceed by creating a fresh resource using an already-reserved capacity slot.
 57 | |||
 58 | ||| - `Cancelled`
 59 | |||  - The waiter was cancelled before receiving a resource.
 60 | |||  - This is used to distinguish cancellation from normal wakeup semantics.
 61 | |||
 62 | ||| Design Notes:
 63 | ||| - This replaces the older `Maybe a` wake protocol, which overloaded `Nothing` to represent multiple meanings.
 64 | ||| - Explicit wake states improve clarity and correctness of the Stripe state machine.
 65 | |||
 66 | ||| Guarantees:
 67 | ||| - Each waiter receives at most one `WakeResult`.
 68 | ||| - Wake results correspond only to committed Stripe transitions.
 69 | ||| - No wake result is delivered more than once.
 70 | |||
 71 | ||| Invariants:
 72 | ||| - `Deliver a` carries ownership transfer of exactly one resource.
 73 | ||| - `Create` implies capacity has already been reserved.
 74 | ||| - `Cancelled` does not transfer ownership of a resource.
 75 | |||
 76 | public export
 77 | data WakeResult a
 78 |   = Deliver a
 79 |   | Create
 80 |   | Cancelled
 81 |
 82 | ||| A pure waiting token representing a blocked thread.
 83 | |||
 84 | ||| This contains no mutable state. All lifecycle tracking is handled
 85 | ||| by the Stripe during dequeue / cancellation.
 86 | |||
 87 | ||| Fields:
 88 | ||| - `id`   : unique identifier for cancellation tracking
 89 | ||| - `wake` : channel used to unblock the thread
 90 | |||
 91 | ||| Invariants:
 92 | ||| - Waiter is immutable
 93 | ||| - Wake is single-use
 94 | ||| - Cancellation is handled by Stripe (not locally)
 95 | |||
 96 | public export
 97 | data Waiter : (a : Type) -> Type where
 98 |   MkWaiter :  (id   : Nat)
 99 |            -> (wake : Channel (WakeResult a))
100 |            -> Waiter a
101 |
102 | ||| An existing resource currently sitting in a pool.
103 | |||
104 | public export
105 | data Entry : (a : Type) -> Type where
106 |   MkEntry :  (entry : a)
107 |           -> (lastused : IClock CLOCK_MONOTONIC)
108 |           -> Entry a
109 |
110 | ||| Stripe is the only concurrent state machine in the system.
111 | |||
112 | ||| It owns:
113 | ||| - Resource availability.
114 | ||| - Cached resources.
115 | ||| - All waiting threads.
116 | ||| - Cancellation tracking.
117 | |||
118 | ||| All mutations occur via CAS on an enclosing Ref, `Stripe1 s a`.
119 | |||
120 | ||| Fields:
121 | ||| - `available` : number of available resources
122 | ||| - `cache`     : reusable resources
123 | ||| - `queue`     : primary FIFO of waiters
124 | ||| - `queuer`    : secondary FIFO (amortized append)
125 | ||| - `nextId`    : fresh waiter id supply
126 | ||| - `cancelled` : sorted set of cancelled waiter ids
127 | |||
128 | ||| Invariants:
129 | ||| - Stripe is immutable between CAS updates.
130 | ||| - Queue ordering is authoritative.
131 | ||| - Cancelled waiters are lazily skipped.
132 | |||
133 | public export
134 | data Stripe : (a : Type) -> Type where
135 |   MkStripe :  (available : Nat)
136 |            -> (cache : List (Entry a))
137 |            -> (queue : Queue (Waiter a))
138 |            -> (queuer : Queue (Waiter a))
139 |            -> (nextid : Nat)
140 |            -> (cancelled : SortedSet Nat)
141 |            -> Stripe a
142 |
143 | ||| A linear mutable stripe.
144 | |||
145 | public export
146 | data Stripe1 : (s : Type) -> (a : Type) -> Type where
147 |   MkStripe1 :  Ref s (Stripe a)
148 |             -> Stripe1 s a
149 |
150 | ||| Effects emitted by a Stripe transition.
151 | |||
152 | ||| These are collected during CAS computation and executed
153 | ||| only after a successful CAS commit.
154 | |||
155 | ||| Guarantees:
156 | ||| - No IO inside CAS.
157 | ||| - No duplicated effects on retry.
158 | ||| - Deterministic state transitions.
159 | |||
160 | public export
161 | data StripeEffect a
162 |   = Wake (Channel (WakeResult a)) (WakeResult a)
163 |   | WakeMany (List (Channel (WakeResult a), WakeResult a))
164 |   | InsertWithTimestamp a
165 |   | FreeMany (a -> IO ()) (List a)
166 |   | None
167 |
168 | ||| Result of a Stripe state transition.
169 | |||
170 | ||| Represents:
171 | ||| - The new Stripe state (to be CAS'd).
172 | ||| - Effects to run AFTER successful commit.
173 | |||
174 | ||| Invariants:
175 | ||| - This must be treated as a one-shot value.
176 | ||| - If CAS fails, this MUST be discarded.
177 | ||| - Effects must NEVER be run unless CAS succeeds.
178 | |||
179 | public export
180 | record StripeStep a where
181 |   constructor MkStripeStep
182 |   stripe  : Stripe a
183 |   effects : List (StripeEffect a)
184 |
185 | ||| A single, local pool based on a linear mutable stripe.
186 | |||
187 | public export
188 | data LocalPool1 : (s : Type) -> (a : Type) -> Type where
189 |   MkLocalPool1 :  (stripeid : Nat)
190 |                -> (stripevar : Stripe1 s a)
191 |                -> LocalPool1 s a
192 |
193 | ||| Striped resource pool based on linear mutable references.
194 | |||
195 | public export
196 | data Pool1 : (s : Type) -> (n : Nat) -> (a : Type) -> Type where
197 |   MkPool1 :  (poolconfig : PoolConfig a)
198 |           -> (localpools : (MArray s n (LocalPool1 s a)))
199 |           -> Pool1 s n a
200 |