0 | ||| Log-Structured Merge RRB Vector (LSMRRBVector)
  1 | module Data.LSMRRBVector
  2 |
  3 | import public Data.LSMRRBVector.Internal
  4 | import public Data.LSMRRBVector.Types
  5 | import public Data.RRBVector
  6 |
  7 | import Control.Monad.Elin
  8 | import Control.Monad.MCancel
  9 | import Control.Monad.ST
 10 | import Data.Array
 11 | import Data.Array.Core
 12 | import Data.Array.Index
 13 | import Data.Array.Indexed
 14 | import Data.Bits
 15 | import Data.Linear.Ref1
 16 | import Data.List
 17 | import Data.List1
 18 | import Data.Maybe
 19 | import Data.RRBVector
 20 | import Data.SortedMap
 21 | import Data.SnocList
 22 | import Data.Vect
 23 | import Data.Zippable
 24 | import IO.Async
 25 | import IO.Async.Core
 26 | import IO.Async.Loop.Poller
 27 | import IO.Async.Loop.Posix
 28 | import IO.Async.Posix
 29 | import IO.Async.Service
 30 | import Syntax.T1 as T1
 31 | import System.Concurrency
 32 | import System.Posix.Timer
 33 | import System.Posix.Timer.Prim
 34 |
 35 | %hide Control.Monad.Elin.Elin.(.run)
 36 | %hide Control.Monad.Elin.Elin.run
 37 | %hide Prelude.null
 38 | %hide Prelude.Ops.infixr.(<|)
 39 | %hide Prelude.Ops.infixl.(|>)
 40 |
 41 | %default total
 42 |
 43 | --------------------------------------------------------------------------------
 44 | --          Mutation Operations
 45 | --------------------------------------------------------------------------------
 46 |
 47 | ||| Appends a value onto the logical end of the vector.
 48 | |||
 49 | ||| Effect:
 50 | ||| - Adds an Append operation to the thread-local buffer.
 51 | |||
 52 | export
 53 | append :  LSMRRBVector World a
 54 |        -> RebuildService Poll
 55 |        -> ThreadId
 56 |        -> a
 57 |        -> Async Poll [Errno] ()
 58 | append lsmrrbvector svc tid x = do
 59 |   shouldtrigger <- liftIO (enqueueOperation lsmrrbvector.buffers lsmrrbvector.combinedsnapshotstate tid (Append x))
 60 |   scheduleIfNeeded lsmrrbvector svc shouldtrigger
 61 |   
 62 | ||| Prepends a value onto the logical beginning of the vector.
 63 | |||
 64 | ||| Effect:
 65 | ||| - Adds a Prepend operation to the thread-local buffer.
 66 | |||
 67 | export
 68 | prepend :  LSMRRBVector World a
 69 |         -> RebuildService Poll
 70 |         -> ThreadId
 71 |         -> a
 72 |         -> Async Poll [Errno] ()
 73 | prepend lsmrrbvector svc tid x = do
 74 |   shouldtrigger <- liftIO (enqueueOperation lsmrrbvector.buffers lsmrrbvector.combinedsnapshotstate tid (Prepend x))
 75 |   scheduleIfNeeded lsmrrbvector svc shouldtrigger
 76 |
 77 | ||| Inserts a value at a specified logical index.
 78 | |||
 79 | ||| Effect:
 80 | ||| - Adds an Insert operation to the thread-local buffer.
 81 | |||
 82 | export
 83 | insert :  LSMRRBVector World a
 84 |        -> RebuildService Poll
 85 |        -> ThreadId
 86 |        -> Nat
 87 |        -> a
 88 |        -> Async Poll [Errno] ()
 89 | insert lsmrrbvector svc tid i x = do
 90 |   shouldtrigger <- liftIO (enqueueOperation lsmrrbvector.buffers lsmrrbvector.combinedsnapshotstate tid (Insert i x))
 91 |   scheduleIfNeeded lsmrrbvector svc shouldtrigger
 92 |
 93 | ||| Removes a value at a specified logical index.
 94 | |||
 95 | ||| Effect:
 96 | ||| - Adds a Delete operation to the thread-local buffer.
 97 | |||
 98 | export
 99 | delete :  LSMRRBVector World a
100 |        -> RebuildService Poll
101 |        -> ThreadId
102 |        -> Nat
103 |        -> Async Poll [Errno] ()
104 | delete lsmrrbvector svc tid i = do
105 |   shouldtrigger <- liftIO (enqueueOperation lsmrrbvector.buffers lsmrrbvector.combinedsnapshotstate tid (Delete i))
106 |   scheduleIfNeeded lsmrrbvector svc shouldtrigger
107 |
108 | ||| Replaces a value at a specified logical index.
109 | |||
110 | ||| Effect:
111 | ||| - Adds an Update operation to the thread-local buffer.
112 | |||
113 | export
114 | update :  LSMRRBVector World a
115 |        -> RebuildService Poll
116 |        -> ThreadId
117 |        -> Nat
118 |        -> a
119 |        -> Async Poll [Errno] ()
120 | update lsmrrbvector svc tid i x = do
121 |   shouldtrigger <- liftIO (enqueueOperation lsmrrbvector.buffers lsmrrbvector.combinedsnapshotstate tid (Update i x))
122 |   scheduleIfNeeded lsmrrbvector svc shouldtrigger
123 |
124 | --------------------------------------------------------------------------------
125 | --          Read Operations
126 | --------------------------------------------------------------------------------
127 |
128 | ||| Converts the current published snapshot into a list.
129 | |||
130 | ||| Behavior:
131 | ||| - Reads the current immutable snapshot.
132 | ||| - Converts the snapshot contents into a List.
133 | |||
134 | ||| Properties:
135 | ||| - Observes a consistent snapshot.
136 | ||| - Does not block writers or rebuild activity.
137 | ||| - Reader participation is cleaned up automatically.
138 | |||
139 | ||| Notes:
140 | ||| - Concurrent writes published after acquisition are not visible.
141 | |||
142 | ||| Complexity:
143 | ||| - Snapshot acquisition: O(1)
144 | ||| - Conversion: O(n)
145 | |||
146 | export
147 | toList :  LSMRRBVector World a
148 |        -> ThreadId
149 |        -> IO (List a)
150 | toList lsmrrbvector tid =
151 |   readSnapshotWithGeneration lsmrrbvector tid (\(_, v) => Data.RRBVector.toList v)
152 |
153 | ||| Returns the number of elements in the current published snapshot.
154 | |||
155 | ||| Behavior:
156 | ||| - Reads the current immutable snapshot.
157 | ||| - Returns its logical length.
158 | |||
159 | ||| Properties:
160 | ||| - Observes a consistent snapshot.
161 | ||| - Does not block writers or rebuild activity.
162 | ||| - Reader participation is cleaned up automatically.
163 | |||
164 | ||| Notes:
165 | ||| - Concurrent writes published after acquisition are not visible.
166 | |||
167 | ||| Complexity:
168 | ||| - O(1)
169 | |||
170 | export
171 | length :  LSMRRBVector World a
172 |        -> ThreadId
173 |        -> IO Nat
174 | length lsmrrbvector tid =
175 |   readSnapshotWithGeneration lsmrrbvector tid (\(_, v) => Data.RRBVector.length v)
176 |
177 | ||| Returns the element at a given index.
178 | |||
179 | ||| Behavior:
180 | ||| - Reads the current immutable snapshot.
181 | ||| - Retrieves the element at the specified index.
182 | |||
183 | ||| Properties:
184 | ||| - Observes a consistent snapshot.
185 | ||| - Does not block writers or rebuild activity.
186 | ||| - Reader participation is cleaned up automatically.
187 | |||
188 | ||| Notes:
189 | ||| - Out-of-bounds behavior matches RRBVector.index.
190 | ||| - Concurrent writes published after acquisition are not visible.
191 | |||
192 | ||| Complexity:
193 | ||| - O(log n)
194 | |||
195 | export
196 | index :  LSMRRBVector World a
197 |       -> ThreadId
198 |       -> Nat
199 |       -> IO a
200 | index lsmrrbvector tid i =
201 |   readSnapshotWithGeneration lsmrrbvector tid (\(_, v) => Data.RRBVector.index i v)
202 |
203 | ||| Looks up an element by index.
204 | |||
205 | ||| Behavior:
206 | ||| - Reads the current immutable snapshot.
207 | ||| - Returns Nothing if the index is out of bounds.
208 | |||
209 | ||| Properties:
210 | ||| - Observes a consistent snapshot.
211 | ||| - Does not block writers or rebuild activity.
212 | ||| - Reader participation is cleaned up automatically.
213 | |||
214 | ||| Notes:
215 | ||| - Concurrent writes published after acquisition are not visible.
216 | |||
217 | ||| Complexity:
218 | ||| - O(log n)
219 | |||
220 | export
221 | lookup :  LSMRRBVector World a
222 |        -> ThreadId
223 |        -> Nat
224 |        -> IO (Maybe a)
225 | lookup lsmrrbvector tid i =
226 |   readSnapshotWithGeneration lsmrrbvector tid (\(_, v) => Data.RRBVector.lookup i v)
227 |
228 | ||| Tests whether the current published snapshot is empty.
229 | |||
230 | ||| Behavior:
231 | ||| - Reads the current immutable snapshot.
232 | ||| - Returns True when no elements exist.
233 | |||
234 | ||| Properties:
235 | ||| - Observes a consistent snapshot.
236 | ||| - Does not block writers or rebuild activity.
237 | ||| - Reader participation is cleaned up automatically.
238 | |||
239 | ||| Notes:
240 | ||| - Concurrent writes published after acquisition are not visible.
241 | |||
242 | ||| Complexity:
243 | ||| - O(1)
244 | |||
245 | export
246 | null :  LSMRRBVector World a
247 |      -> ThreadId
248 |      -> IO Bool
249 | null lsmrrbvector tid =
250 |   readSnapshotWithGeneration lsmrrbvector tid (\(_, v) => Data.RRBVector.null v)
251 |
252 | --------------------------------------------------------------------------------
253 | --          Default Config
254 | --------------------------------------------------------------------------------
255 |
256 | ||| Default log-structured merge vector configuration.
257 | |||
258 | ||| Current defaults favor balanced throughput and latency.
259 | |||
260 | export
261 | defaultconfig : LSMRRBVectorConfig
262 | defaultconfig = MkLSMRRBVectorConfig 64
263 |
264 | --------------------------------------------------------------------------------
265 | --          Creating Log-Structured Merge RRB-Vectors
266 | --------------------------------------------------------------------------------
267 |
268 | ||| Run an empty log-structured merge vector using a user-provided configuration.
269 | |||
270 | ||| Parameters:
271 | ||| - initialbatchwindow: Starting adaptive batching target.
272 | |||
273 | ||| Notes:
274 | ||| - Smaller values rebuild more aggressively.
275 | ||| - Larger values favor write throughput.
276 | |||
277 | export covering
278 | runEmptyWith :  Ord (Entry a)
279 |              => LSMRRBVectorConfig
280 |              -> List (LSMRRBVector World a -> RebuildService Poll -> RebuildServiceState -> Async Poll [Errno] ())
281 |              -> List (LSMRRBVector World a -> Async Poll [Errno] ())
282 |              -> IO ()
283 | runEmptyWith config rebuilderactions lsmrrbvectoractions = do
284 |   buffers                 <- newref Data.SortedMap.empty
285 |   combinedsnapshotstate   <- newref (MkCombinedSnapshotState (MkSnapshotState Z Empty) [] Data.SortedMap.empty 0 False config.initialbatchwindow)
286 |   rebuildscheduled        <- newref False
287 |   let lsmrrbvector        = MkLSMRRBVector buffers combinedsnapshotstate rebuildscheduled
288 |   let rebuilderservice    = rebuilderService lsmrrbvector initialRebuildServiceState rebuilderactions
289 |   let lsmrrbvectorservice = lsmrrbvectorService lsmrrbvector lsmrrbvectoractions
290 |   n                       <- asyncThreads
291 |   app n [SIGINT] posixPoller $ handle handlers (rebuilderAndLSMRRBVectorService rebuilderservice lsmrrbvectorservice)
292 |   where
293 |     handlers : All (Handler () Poll) [Errno]
294 |     handlers = [\x => stderrLn "Error: \{errorText x} (\{errorName x})"]
295 |
296 | ||| Runs an empty log-structured merge vector tuned for high sustained write throughput.
297 | |||
298 | ||| Configuration:
299 | ||| - Initial adaptive batch window: 512
300 | |||
301 | ||| Behavior:
302 | ||| - Favors larger rebuild batches.
303 | ||| - Reduces rebuild frequency under heavy write load.
304 | ||| - May increase visibility latency for newly written values.
305 | |||
306 | ||| Notes:
307 | ||| - Intended for write-heavy workloads.
308 | |||
309 | export covering
310 | runFastWritesEmpty :  Ord (Entry a)
311 |                    => List (LSMRRBVector World a -> RebuildService Poll -> RebuildServiceState -> Async Poll [Errno] ())
312 |                    -> List (LSMRRBVector World a -> Async Poll [Errno] ())
313 |                    -> IO ()
314 | runFastWritesEmpty rebuilderactions lsmrrbvectoractions =
315 |   runEmptyWith (MkLSMRRBVectorConfig 512) rebuilderactions lsmrrbvectoractions
316 |
317 | ||| Runs an empty log-structured merge vector tuned for low publication latency.
318 | |||
319 | ||| Configuration:
320 | ||| - Initial adaptive batch window: 16
321 | |||
322 | ||| Behavior:
323 | ||| - Favors frequent rebuild cycles.
324 | ||| - Reduces time between writes and publication.
325 | ||| - May increase rebuild overhead under heavy load.
326 | |||
327 | ||| Notes:
328 | ||| - Intended for latency-sensitive workloads.
329 | |||
330 | export covering
331 | runLowLatencyEmpty :  Ord (Entry a)
332 |                    => List (LSMRRBVector World a -> RebuildService Poll -> RebuildServiceState -> Async Poll [Errno] ())
333 |                    -> List (LSMRRBVector World a -> Async Poll [Errno] ())
334 |                    -> IO ()
335 | runLowLatencyEmpty rebuilderactions lsmrrbvectoractions =
336 |   runEmptyWith (MkLSMRRBVectorConfig 16) rebuilderactions lsmrrbvectoractions
337 |
338 | ||| Runs an empty log-structured merge vector.
339 | |||
340 | export covering
341 | runEmpty :  Ord (Entry a)
342 |          => List (LSMRRBVector World a -> RebuildService Poll -> RebuildServiceState -> Async Poll [Errno] ())
343 |          -> List (LSMRRBVector World a -> Async Poll [Errno] ())
344 |          -> IO ()
345 | runEmpty rebuilderactions lsmrrbvectoractions = 
346 |   runEmptyWith defaultconfig rebuilderactions lsmrrbvectoractions
347 |