0 | ||| Log-Structured Merge RRB Vector Internals
   1 | module Data.LSMRRBVector.Internal
   2 |
   3 | import public Data.LSMRRBVector.Types
   4 | import Data.RRBVector
   5 |
   6 | import Control.Monad.Elin
   7 | import Control.Monad.MCancel
   8 | import Control.Monad.ST
   9 | import Data.Array
  10 | import Data.Array.Core
  11 | import Data.Array.Index
  12 | import Data.Array.Indexed
  13 | import Data.Bits
  14 | import Data.Linear.Ref1
  15 | import Data.List
  16 | import Data.List1
  17 | import Data.Maybe
  18 | import Data.RRBVector
  19 | import Data.SortedMap
  20 | import Data.SnocList
  21 | import Data.Vect
  22 | import Data.Zippable
  23 | import IO.Async
  24 | import IO.Async.Core
  25 | import IO.Async.Loop.Poller
  26 | import IO.Async.Loop.Posix
  27 | import IO.Async.Posix
  28 | import IO.Async.Service
  29 | import IO.Async.Util
  30 | import Syntax.T1 as T1
  31 | import System.Concurrency
  32 | import System.Posix.Timer
  33 | import System.Posix.Timer.Prim
  34 |
  35 | %hide Control.Monad.Elin.Elin.(.run)
  36 | %hide Control.Monad.Elin.Elin.run
  37 | %hide Prelude.null
  38 | %hide Prelude.Ops.infixr.(<|)
  39 | %hide Prelude.Ops.infixl.(|>)
  40 |
  41 | %default total
  42 |
  43 | --------------------------------------------------------------------------------
  44 | --          Buffer Utilities
  45 | --------------------------------------------------------------------------------
  46 |
  47 | ||| Empty mutation buffer.
  48 | |||
  49 | export
  50 | emptyBuffer : Buffer a
  51 | emptyBuffer =
  52 |   MkBuffer [<] 0
  53 |
  54 | --------------------------------------------------------------------------------
  55 | --          Write Buffer Utilities
  56 | --------------------------------------------------------------------------------
  57 |
  58 | ||| Empty buffer state.
  59 | |||
  60 | export
  61 | emptyWriteBuffers : WriteBuffers a
  62 | emptyWriteBuffers =
  63 |   MkWriteBuffers emptyBuffer
  64 |
  65 | --------------------------------------------------------------------------------
  66 | --          Thread Context Utilities
  67 | --------------------------------------------------------------------------------
  68 |
  69 | ||| Determines whether a buffer contains pending entries.
  70 | |||
  71 | ||| Returns:
  72 | ||| - True when no buffered operations exist.
  73 | ||| - False otherwise.
  74 | |||
  75 | ||| Properties:
  76 | ||| - O(1).
  77 | |||
  78 | export
  79 | bufferEmpty :  Buffer a
  80 |             -> Bool
  81 | bufferEmpty b =
  82 |   b.length == 0
  83 |
  84 | ||| Determines whether a thread context contains buffered work.
  85 | |||
  86 | ||| Returns:
  87 | ||| - True when all thread-local mutation state is empty.
  88 | ||| - False otherwise.
  89 | |||
  90 | ||| Properties:
  91 | ||| - O(1).
  92 | |||
  93 | ||| Notes:
  94 | ||| - Sequence numbers are ignored.
  95 | ||| - Historical sequence advancement does not imply pending work.
  96 | |||
  97 | export
  98 | threadContextEmpty :  ThreadContext a
  99 |                    -> Bool
 100 | threadContextEmpty ctx =
 101 |   bufferEmpty ctx.buffers.active
 102 |
 103 | --------------------------------------------------------------------------------
 104 | --          Metrics Utilities
 105 | --------------------------------------------------------------------------------
 106 |
 107 | ||| Empty rebuild metrics.
 108 | |||
 109 | ||| Properties:
 110 | ||| - No rebuilds observed.
 111 | ||| - Average batch size is effectively zero.
 112 | |||
 113 | export
 114 | initialRebuildMetrics : RebuildMetrics
 115 | initialRebuildMetrics =
 116 |   MkRebuildMetrics
 117 |     0
 118 |     0
 119 |     0
 120 |
 121 | --------------------------------------------------------------------------------
 122 | --          Rebuild Service Utilities
 123 | --------------------------------------------------------------------------------
 124 |
 125 | ||| Initial rebuild service state.
 126 | |||
 127 | ||| Properties:
 128 | ||| - Service begins idle.
 129 | ||| - No rebuild failures recorded.
 130 | |||
 131 | export
 132 | initialRebuildServiceState : RebuildServiceState
 133 | initialRebuildServiceState =
 134 |   MkRebuildServiceState
 135 |     Sleeping
 136 |     initialRebuildMetrics
 137 |
 138 | --------------------------------------------------------------------------------
 139 | --          Metrics
 140 | --------------------------------------------------------------------------------
 141 |
 142 | ||| Updates rebuild metrics after a successful rebuild cycle.
 143 | |||
 144 | ||| Parameters:
 145 | ||| - batchsize: Number of entries processed in this cycle.
 146 | |||
 147 | ||| Properties:
 148 | ||| - O(1).
 149 | ||| - Pure deterministic update.
 150 | |||
 151 | export
 152 | updateMetrics :  RebuildMetrics
 153 |               -> Nat
 154 |               -> RebuildMetrics
 155 | updateMetrics m batchsize =
 156 |   { lastbatchsize  := batchsize
 157 |   , totalbatchsize $= (`plus` batchsize)
 158 |   , rebuildcount   $= S
 159 |   } m
 160 |
 161 | ||| Computes average rebuild batch size.
 162 | |||
 163 | ||| Returns:
 164 | ||| - 0 when no rebuilds have occurred.
 165 | |||
 166 | export
 167 | averageBatchSize :  RebuildMetrics
 168 |                  -> Nat
 169 | averageBatchSize m =
 170 |   case m.rebuildcount of
 171 |     Z =>
 172 |       0
 173 |     S _ =>
 174 |       m.totalbatchsize `div` m.rebuildcount
 175 |
 176 | --------------------------------------------------------------------------------
 177 | --          Registering Threads
 178 | --------------------------------------------------------------------------------
 179 |
 180 | ||| Registers a thread if necessary and returns its thread context.
 181 | |||
 182 | ||| Behavior:
 183 | ||| - Existing registrations are reused.
 184 | ||| - Missing registrations allocate fresh thread state.
 185 | |||
 186 | ||| Properties:
 187 | ||| - One ThreadContext per ThreadId.
 188 | ||| - Preserves existing mutation state.
 189 | |||
 190 | export
 191 | registerThread :  Ref World (SortedMap ThreadId (ThreadContext a))
 192 |                -> ThreadId
 193 |                -> IO (ThreadContext a)
 194 | registerThread regref tid =
 195 |   update regref (\m =>
 196 |                   case lookup tid m of
 197 |                     Just ctx =>
 198 |                       (m, ctx)
 199 |                     Nothing  =>
 200 |                       let ctx = MkThreadContext
 201 |                                   tid
 202 |                                   0
 203 |                                   emptyWriteBuffers
 204 |                         in (insert tid ctx m, ctx)
 205 |                 )
 206 |
 207 | --------------------------------------------------------------------------------
 208 | --          Generation Utilities
 209 | --------------------------------------------------------------------------------
 210 |
 211 | ||| Announces that a thread has entered a snapshot read section.
 212 | |||
 213 | ||| Behavior:
 214 | ||| - Registers the generation currently being read.
 215 | ||| - Replaces any previous generation announcement.
 216 | |||
 217 | ||| Properties:
 218 | ||| - O(log n).
 219 | ||| - One active generation per thread.
 220 | ||| - Used by reclamation safety checks.
 221 | |||
 222 | export
 223 | enterGeneration :  Ref World (CombinedSnapshotState a)
 224 |                 -> ThreadId
 225 |                 -> IO (SnapshotState a)
 226 | enterGeneration combinedsnapshotstate tid =
 227 |   update combinedsnapshotstate (\s =>
 228 |                                  ( { readerstate $= insert tid (MkReaderState s.currentsnapshot.generation)
 229 |                                    } s
 230 |                                  , s.currentsnapshot
 231 |                                  )
 232 |                                )
 233 |
 234 | ||| Announces that a thread has completed a snapshot read section.
 235 | |||
 236 | ||| Behavior:
 237 | ||| - Removes thread participation state.
 238 | ||| - Indicates thread no longer references a snapshot.
 239 | |||
 240 | ||| Properties:
 241 | ||| - O(log n).
 242 | ||| - Enables reclamation progress.
 243 | |||
 244 | export
 245 | leaveGeneration :  Ref World (CombinedSnapshotState a)
 246 |                 -> ThreadId
 247 |                 -> IO ()
 248 | leaveGeneration combinedsnapshotstate tid =
 249 |   mod combinedsnapshotstate (\s =>
 250 |                               { readerstate $= delete tid
 251 |                               } s
 252 |                             )
 253 |
 254 | ||| Finds oldest active generation.
 255 | |||
 256 | ||| Returns:
 257 | ||| - Nothing when no readers exist.
 258 | ||| - Just generation otherwise.
 259 | |||
 260 | ||| Properties:
 261 | ||| - O(n).
 262 | |||
 263 | export
 264 | minimumGeneration :  SortedMap ThreadId ReaderState
 265 |                   -> Maybe Generation
 266 | minimumGeneration rs =
 267 |     case map generation (values rs) of
 268 |       []    =>
 269 |         Nothing
 270 |       x::xs =>
 271 |         Just (foldl min x xs)
 272 |
 273 | --------------------------------------------------------------------------------
 274 | --          Mutation Utilities
 275 | --------------------------------------------------------------------------------
 276 |
 277 | ||| Appends an Entry onto the end of a mutation buffer.
 278 | |||
 279 | ||| Properties:
 280 | ||| - O(1).
 281 | ||| - Preserves insertion ordering.
 282 | |||
 283 | export
 284 | appendEntry :  Buffer a
 285 |             -> Entry a
 286 |             -> Buffer a
 287 | appendEntry (MkBuffer es n) e =
 288 |   MkBuffer (es :< e) (S n)
 289 |
 290 | ||| Appends a deferred mutation into the active write buffer.
 291 | |||
 292 | ||| Properties:
 293 | ||| - Frozen buffer remains unchanged.
 294 | ||| - O(1) amortized.
 295 | |||
 296 | export
 297 | writeOperation :  WriteBuffers a
 298 |                -> Entry a
 299 |                -> WriteBuffers a
 300 | writeOperation (MkWriteBuffers active) e =
 301 |   MkWriteBuffers
 302 |     (appendEntry active e)
 303 |
 304 | ||| Converts an operation into an Entry and appends it into the owning thread's active mutation buffer.
 305 | |||
 306 | ||| Steps:
 307 | ||| - Acquire current timestamp.
 308 | ||| - Register thread if necessary.
 309 | ||| - Allocate Entry.
 310 | ||| - Increment sequence counter.
 311 | ||| - Append into active write buffer.
 312 | ||| - Increment global write pressure.
 313 | ||| - Mark rebuild work as pending.
 314 | |||
 315 | ||| Properties:
 316 | ||| - O(log n) registry update.
 317 | ||| - O(1) buffer append.
 318 | ||| - Deterministic replay ordering.
 319 | ||| - Adaptive batching pressure is globally visible atomically.
 320 | |||
 321 | export
 322 | enqueueOperation :  Ref World (SortedMap ThreadId (ThreadContext a))
 323 |                  -> Ref World (CombinedSnapshotState a)
 324 |                  -> ThreadId
 325 |                  -> Operation a
 326 |                  -> IO Bool
 327 | enqueueOperation regref snapshotref tid op = do
 328 |   now <- runElinIO grabTime
 329 |   case now of
 330 |     Left err   =>
 331 |       assert_total $ idris_crash "Data.LSMRRBVector.enqueueOperation: \{show err}"
 332 |     Right now' => do
 333 |       ctx <- registerThread regref tid
 334 |       mod regref (\m =>
 335 |                    let entry = MkEntry
 336 |                                  op
 337 |                                  now'
 338 |                                  tid
 339 |                                  ctx.sequence
 340 |                        ctx'  = { sequence := S ctx.sequence
 341 |                                , buffers  := writeOperation ctx.buffers entry
 342 |                                } ctx
 343 |                      in insert tid ctx' m
 344 |                  )
 345 |       update snapshotref (\s =>
 346 |                            let pressure' = S s.writepressure
 347 |                                pending'  = pressure' >= s.batchwindow
 348 |                                s'        = { writepressure := pressure'
 349 |                                            , rebuildpending := pending'
 350 |                                            } s
 351 |                              in (s', pending')
 352 |                          )
 353 |   where
 354 |     grabTime : Elin World [Errno] (IClock CLOCK_REALTIME)
 355 |     grabTime = getTime CLOCK_REALTIME
 356 |
 357 | --------------------------------------------------------------------------------
 358 | --          Rebuild Trigger
 359 | --------------------------------------------------------------------------------
 360 |
 361 | ||| Sends a rebuild notification to the background rebuilder.
 362 | |||
 363 | ||| Behavior:
 364 | ||| - Requests background progression toward snapshot publication.
 365 | ||| - Multiple requests may be coalesced.
 366 | ||| - Triggering occurs only after adaptive batching thresholds indicate sufficient accumulated write pressure.
 367 | |||
 368 | ||| Notes:
 369 | ||| - Does not guarantee immediate rebuild execution.
 370 | ||| - Does not guarantee publication.
 371 | |||
 372 | export
 373 | triggerRebuild :  LSMRRBVector World a
 374 |                -> RebuildService Poll
 375 |                -> Async Poll [Errno] ()
 376 | triggerRebuild lsmrrbvector svc = do
 377 |   shouldsend <- update lsmrrbvector.rebuildscheduled (\scheduled =>
 378 |                                                        case scheduled of
 379 |                                                          True  =>
 380 |                                                            (True, False)
 381 |                                                          False =>
 382 |                                                            (True, True)
 383 |                                                      )
 384 |   case shouldsend of
 385 |     True  => do
 386 |       run svc Trigger
 387 |     False =>
 388 |       pure ()
 389 |
 390 | --------------------------------------------------------------------------------
 391 | --          Scheduling Helper
 392 | --------------------------------------------------------------------------------
 393 |
 394 | ||| Schedules rebuild work if adaptive batching has reached its target.
 395 | |||
 396 | ||| Behavior:
 397 | ||| - Checks whether the current write accumulation has crossed the adaptive batch threshold.
 398 | ||| - Coalesces multiple concurrent scheduling attempts.
 399 | |||
 400 | ||| Properties:
 401 | ||| - O(1).
 402 | ||| - Avoids duplicate rebuild requests.
 403 | |||
 404 | export
 405 | scheduleIfNeeded :  LSMRRBVector World a
 406 |                  -> RebuildService Poll
 407 |                  -> Bool
 408 |                  -> Async Poll [Errno] ()
 409 | scheduleIfNeeded lsmrrbvector svc shouldtrigger =
 410 |   case shouldtrigger of
 411 |     True =>
 412 |       triggerRebuild lsmrrbvector svc
 413 |     False =>
 414 |       pure ()
 415 |
 416 | --------------------------------------------------------------------------------
 417 | --          Buffer Rotation
 418 | --------------------------------------------------------------------------------
 419 |
 420 | ||| Extract active buffer ownership for rebuilding.
 421 | |||
 422 | ||| Returns:
 423 | ||| - Updated thread context with empty active buffer
 424 | ||| - Extracted buffer now owned by rebuilder
 425 | |||
 426 | export
 427 | rotateBuffers :  ThreadContext a
 428 |               -> (ThreadContext a, Buffer a)
 429 | rotateBuffers ctx =
 430 |   let active = ctx.buffers.active
 431 |       ctx'   = { buffers :=
 432 |                    MkWriteBuffers
 433 |                      emptyBuffer
 434 |                } ctx
 435 |     in (ctx', active)
 436 |
 437 | --------------------------------------------------------------------------------
 438 | --          Registry Rotation
 439 | --------------------------------------------------------------------------------
 440 |
 441 | ||| Atomically extracts active buffers from all registered threads.
 442 | |||
 443 | ||| Behavior:
 444 | ||| - Replaces active buffers with empty buffers.
 445 | ||| - Transfers ownership of previous active buffers.
 446 | ||| - Removes thread registrations whose post-rotation state contains no pending work.
 447 | |||
 448 | ||| Lifecycle:
 449 | ||| - Rotated thread contexts whose active buffer becomes empty are removed.
 450 | ||| - Because only active buffers contribute to emptiness, rebuild currently removes all rotated thread registrations.
 451 | ||| - Explicit thread unregistration is unnecessary.
 452 | |||
 453 | ||| Properties:
 454 | ||| - Extracted entries appear exactly once.
 455 | ||| - Prevents unbounded registry growth.
 456 | ||| - O(number of registered threads).
 457 | |||
 458 | export
 459 | rotateAllBuffers :  Ref World (SortedMap ThreadId (ThreadContext a))
 460 |                  -> IO (List (Buffer a))
 461 | rotateAllBuffers regref = do
 462 |   update regref (\m =>
 463 |                   let rotated : SortedMap ThreadId (ThreadContext a, Buffer a)
 464 |                       rotated = map rotateBuffers m
 465 |                       extracted : List (Buffer a)
 466 |                       extracted = map snd (values rotated)
 467 |                       survivors : SortedMap ThreadId (ThreadContext a)
 468 |                       survivors = foldl (\acc, (tid, (ctx, _)) =>
 469 |                                           case threadContextEmpty ctx of
 470 |                                             True =>
 471 |                                               acc
 472 |                                             False =>
 473 |                                               insert tid ctx acc
 474 |                                         )
 475 |                                         Data.SortedMap.empty
 476 |                                         (Data.SortedMap.toList rotated)
 477 |                     in (survivors, extracted)
 478 |                 )
 479 |
 480 | --------------------------------------------------------------------------------
 481 | --          Entry Collection
 482 | --------------------------------------------------------------------------------
 483 |
 484 | ||| Converts a buffer into a list of contained entries.
 485 | |||
 486 | ||| Behavior:
 487 | ||| - Preserves insertion order.
 488 | ||| - Extracts buffered mutation events for rebuild processing.
 489 | |||
 490 | ||| Properties:
 491 | ||| - O(n).
 492 | ||| - Does not modify buffer ownership.
 493 | ||| - Pure projection operation.
 494 | |||
 495 | ||| Notes:
 496 | ||| - Intended for rebuild entry collection.
 497 | |||
 498 | export
 499 | bufferEntries :  Buffer a
 500 |               -> List (Entry a)
 501 | bufferEntries (MkBuffer es _) =
 502 |   cast es
 503 |
 504 | ||| Collects entries from multiple extracted buffers.
 505 | |||
 506 | ||| Behavior:
 507 | ||| - Traverses all buffers.
 508 | ||| - Concatenates their entries into a single list.
 509 | |||
 510 | ||| Properties:
 511 | ||| - O(total entries).
 512 | ||| - Preserves per-buffer ordering.
 513 | ||| - Does not perform global ordering.
 514 | |||
 515 | ||| Notes:
 516 | ||| - Intended as a preprocessing step before sorting.
 517 | |||
 518 | export
 519 | collectEntries :  List (Buffer a)
 520 |                -> List (Entry a)
 521 | collectEntries =
 522 |   concatMap bufferEntries
 523 |
 524 | ||| Produces a deterministic global ordering of buffered entries.
 525 | |||
 526 | ||| Ordering:
 527 | ||| - timestamp
 528 | ||| - thread id
 529 | ||| - sequence number
 530 | |||
 531 | ||| Properties:
 532 | ||| - O(n log n).
 533 | ||| - Deterministic across rebuild cycles.
 534 | |||
 535 | ||| Notes:
 536 | ||| - Required before replay to ensure stable behavior under concurrent writes.
 537 | |||
 538 | export
 539 | sortEntries :  Ord (Entry a)
 540 |             => List (Entry a)
 541 |             -> List (Entry a)
 542 | sortEntries =
 543 |   sort
 544 |
 545 | --------------------------------------------------------------------------------
 546 | --          Replay
 547 | --------------------------------------------------------------------------------
 548 |
 549 | ||| Applies a single deferred mutation to an RRBVector snapshot.
 550 | |||
 551 | ||| Behavior:
 552 | ||| - Executes the logical operation represented by Operation.
 553 | ||| - Produces a new immutable vector.
 554 | ||| - Does not mutate the supplied vector.
 555 | |||
 556 | ||| Variants:
 557 | ||| - Append  -> append value to end
 558 | ||| - Prepend -> prepend value to beginning
 559 | ||| - Insert  -> insert value at index
 560 | ||| - Delete  -> remove value at index
 561 | ||| - Update  -> replace value at index
 562 | |||
 563 | ||| Properties:
 564 | ||| - Pure.
 565 | ||| - Deterministic.
 566 | ||| - Preserves RRBVector immutability.
 567 | |||
 568 | ||| Notes:
 569 | ||| - Bounds behavior is inherited from the underlying RRBVector operation.
 570 | |||
 571 | export
 572 | applyOperation :  Operation a
 573 |                -> RRBVector a
 574 |                -> RRBVector a
 575 | applyOperation (Append x)   v =
 576 |   v |> x
 577 | applyOperation (Prepend x)  v =
 578 |   x <| v
 579 | applyOperation (Insert i x) v =
 580 |   insertAt i x v
 581 | applyOperation (Delete i)   v =
 582 |   deleteAt i v
 583 | applyOperation (Update i x) v =
 584 |   update i x v
 585 |
 586 | ||| Replays a sequence of buffered mutations onto an immutable snapshot.
 587 | |||
 588 | ||| Behavior:
 589 | ||| - Traverses entries in order.
 590 | ||| - Applies each contained operation to the accumulating vector.
 591 | ||| - Produces a rebuilt snapshot reflecting all replayed mutations.
 592 | |||
 593 | ||| Ordering:
 594 | ||| - Replay order is exactly the order of the supplied entry list.
 595 | ||| - Deterministic replay therefore depends on prior sorting.
 596 | |||
 597 | ||| Properties:
 598 | ||| - Pure.
 599 | ||| - O(number of entries × operation cost).
 600 | ||| - Preserves snapshot immutability.
 601 | |||
 602 | ||| Notes:
 603 | ||| - Typically executed after collectEntries and sortEntries.
 604 | ||| - Does not validate entry ordering.
 605 | |||
 606 | export
 607 | replayEntries :  List (Entry a)
 608 |               -> RRBVector a
 609 |               -> RRBVector a
 610 | replayEntries es v =
 611 |   foldl (\acc, e => applyOperation e.operation acc) v es
 612 |
 613 | --------------------------------------------------------------------------------
 614 | --          Reading
 615 | --------------------------------------------------------------------------------
 616 |
 617 | ||| Reads the current immutable snapshot together with its generation.
 618 | |||
 619 | ||| Behavior:
 620 | ||| - Atomically captures the current SnapshotState.
 621 | ||| - Registers the thread as an active reader of that generation.
 622 | ||| - Passes (generation, snapshot tree) to the user function.
 623 | ||| - Ensures reader registration is cleaned up after evaluation.
 624 | |||
 625 | ||| Key property:
 626 | ||| - The generation and tree are consistent and taken from the same CAS snapshot.
 627 | |||
 628 | ||| This enables:
 629 | ||| - Precise visibility reasoning.
 630 | ||| - Safe interaction with reclamation.
 631 | ||| - Deterministic debugging of snapshot lag.
 632 | |||
 633 | ||| Complexity:
 634 | ||| - O(log n) for reader registration/removal.
 635 | ||| - O(1) snapshot access.
 636 | |||
 637 | export
 638 | readSnapshotWithGeneration :  LSMRRBVector World a
 639 |                            -> ThreadId
 640 |                            -> ((Generation, RRBVector a) -> b)
 641 |                            -> IO b
 642 | readSnapshotWithGeneration rrbvector tid f = do
 643 |   res <- runElinIO readSnapshotWithGeneration'
 644 |   case res of
 645 |     Right res' =>
 646 |       pure res'
 647 |     Left err   =>
 648 |       assert_total $ idris_crash "Data.LSMRRBVector.readSnapshotWithGeneration: \{show err}"
 649 |   where
 650 |     acquire : F1 World (SnapshotState a)
 651 |     acquire = ioToF1 (enterGeneration rrbvector.combinedsnapshotstate tid)
 652 |     use :  SnapshotState a
 653 |         -> F1 World b
 654 |     use snapshot t =
 655 |       let gen := snapshot.generation
 656 |         in f (gen, snapshot.tree) # t
 657 |     release :  SnapshotState a
 658 |             -> F1' World
 659 |     release _ = ioToF1 (leaveGeneration rrbvector.combinedsnapshotstate tid)
 660 |     readSnapshotWithGeneration' : Elin World [Errno] b
 661 |     readSnapshotWithGeneration' =
 662 |       bracket (runIO acquire)
 663 |               (\snapshot => runIO (use snapshot))
 664 |               (\snapshot => runIO (release snapshot))
 665 |
 666 | --------------------------------------------------------------------------------
 667 | --          Metrics Queries
 668 | --------------------------------------------------------------------------------
 669 |
 670 | ||| Returns current rebuild metrics.
 671 | |||
 672 | ||| Properties:
 673 | ||| - O(1)
 674 | ||| - Snapshot of current service state
 675 | |||
 676 | export
 677 | rebuildMetrics :  RebuildServiceState
 678 |                -> RebuildMetrics
 679 | rebuildMetrics st =
 680 |   st.rebuildmetrics
 681 |
 682 | ||| Returns average rebuild batch size.
 683 | |||
 684 | ||| Properties:
 685 | ||| - O(1)
 686 | |||
 687 | export
 688 | averageRebuildBatchSize :  RebuildServiceState
 689 |                         -> Nat
 690 | averageRebuildBatchSize st =
 691 |   averageBatchSize st.rebuildmetrics
 692 |
 693 | --------------------------------------------------------------------------------
 694 | --          Adaptive Batching
 695 | --------------------------------------------------------------------------------
 696 |
 697 | ||| Adjusts adaptive batching window according to observed write pressure.
 698 | |||
 699 | ||| Rules:
 700 | ||| - Pressure above the current window expands the window.
 701 | ||| - Pressure below half the current window shrinks the window.
 702 | ||| - Window never shrinks below 1.
 703 | |||
 704 | ||| Properties:
 705 | ||| - Pure deterministic policy function.
 706 | ||| - Does not affect correctness.
 707 | ||| - Only influences rebuild batching behavior.
 708 | |||
 709 | export
 710 | adjustBatchWindow :  Nat
 711 |                   -> Nat
 712 |                   -> Nat
 713 | adjustBatchWindow pressure window =
 714 |   case pressure > window of
 715 |     True  =>
 716 |       window * 2
 717 |     False =>
 718 |       case pressure < (window `div` 2) of
 719 |         True  =>
 720 |           max 1 (window `div` 2)
 721 |         False =>
 722 |           window
 723 |
 724 | --------------------------------------------------------------------------------
 725 | --          Publication
 726 | --------------------------------------------------------------------------------
 727 |
 728 | ||| Atomically publishes a rebuilt snapshot and advances adaptive batching state.
 729 | |||
 730 | ||| Steps:
 731 | ||| - Publish rebuilt immutable tree.
 732 | ||| - Increment snapshot generation.
 733 | ||| - Retire previous snapshot.
 734 | ||| - Reset accumulated write pressure.
 735 | ||| - Clear rebuild pending state.
 736 | ||| - Adaptively adjust batching window.
 737 | |||
 738 | ||| Properties:
 739 | ||| - Snapshot publication is atomic.
 740 | ||| - Readers always observe a consistent snapshot/generation pair.
 741 | ||| - Adaptive batching state transitions are globally visible atomically.
 742 | ||| - Previous snapshots become eligible for reclamation.
 743 | |||
 744 | ||| Notes:
 745 | ||| - Adaptive batching decisions are based on write pressure observed since the previous successful publication.
 746 | |||
 747 | export
 748 | publishSnapshot :  Ref World (CombinedSnapshotState a)
 749 |                 -> List (Entry a)
 750 |                 -> IO Generation
 751 | publishSnapshot combinedsnapshotstateref entries =
 752 |   update combinedsnapshotstateref (\(MkCombinedSnapshotState snapshot retired readers writepressure rebuildpending batchwindow) =>
 753 |                                     let rebuilt    = replayEntries entries snapshot.tree
 754 |                                         newgen     = S snapshot.generation
 755 |                                         snapshot'  = MkSnapshotState newgen rebuilt
 756 |                                         retired'   = MkRetiredSnapshot snapshot.generation snapshot.tree :: retired
 757 |                                         nextwindow = adjustBatchWindow writepressure batchwindow
 758 |                                       in ( MkCombinedSnapshotState snapshot' retired' readers 0 False nextwindow
 759 |                                          , newgen
 760 |                                          )
 761 |                                   )
 762 |
 763 | --------------------------------------------------------------------------------
 764 | --          Reclamation Utilities
 765 | --------------------------------------------------------------------------------
 766 |
 767 | ||| Computes the newest retired generation that may safely be reclaimed.
 768 | |||
 769 | ||| Behavior:
 770 | ||| - Determines the oldest snapshot generation currently referenced by active readers.
 771 | ||| - Finds the highest retired generation strictly older than that reader boundary.
 772 | ||| - Returns Nothing when no reclaimable generation exists.
 773 | |||
 774 | ||| Returns:
 775 | ||| - Nothing: No readers exist, or no retired snapshots can be reclaimed.
 776 | ||| - Just g: Every retired snapshot with generation <= g may safely be reclaimed.
 777 | |||
 778 | ||| Safety rules:
 779 | ||| - Readers observing generation G may still require snapshot G.
 780 | ||| - Readers may also require all newer generations.
 781 | ||| - Only snapshots strictly older than the oldest active reader generation are reclaimable.
 782 | |||
 783 | ||| Example:
 784 | |||
 785 | ||| Retired              <-> [1,2,3,4,5]
 786 | ||| Active readers       <-> [4,7]
 787 | ||| Oldest active reader <-> 4
 788 | ||| Safe reclamation     <-> [1,2,3]
 789 | ||| Result               <-> Just 3
 790 | |||
 791 | ||| Properties:
 792 | ||| - O(number of retired snapshots + number of readers).
 793 | ||| - Never reclaims a snapshot visible to any active reader.
 794 | ||| - Computes a maximal safe reclamation boundary.
 795 | |||
 796 | export
 797 | reclamationCutoff :
 798 |      List (RetiredSnapshot a)
 799 |   -> SortedMap ThreadId ReaderState
 800 |   -> Maybe Generation
 801 | reclamationCutoff retired readers =
 802 |   case minimumGeneration readers of
 803 |     Nothing =>
 804 |       Nothing
 805 |     Just oldest =>
 806 |       case map generation ( filter
 807 |                              (\snap =>
 808 |                                snap.generation < oldest)
 809 |                             retired
 810 |                           ) of
 811 |
 812 |         []      =>
 813 |           Nothing
 814 |         x :: xs =>
 815 |           Just (foldl max x xs)
 816 |
 817 | --------------------------------------------------------------------------------
 818 | --          Reclamation
 819 | --------------------------------------------------------------------------------
 820 |
 821 | ||| Reclaims retired snapshots no longer visible to active readers.
 822 | |||
 823 | ||| Rules:
 824 | ||| - No readers: Reclaim everything.
 825 | ||| - Readers exist: retain snapshots at or newer than the oldest active reader boundary.
 826 | |||
 827 | ||| Properties:
 828 | ||| - Safe generation-based reclamation.
 829 | ||| - Keeps only snapshots potentially observable by readers.
 830 | ||| - O(number of retired snapshots + number of readers).
 831 | |||
 832 | export
 833 | reclaimSnapshots :  Ref World (CombinedSnapshotState a)
 834 |                  -> IO ()
 835 | reclaimSnapshots combinedsnapshotstate =
 836 |   mod combinedsnapshotstate (\(MkCombinedSnapshotState snapshot retired readers writepressure rebuildpending batchwindow) =>
 837 |                               let survivors = case reclamationCutoff retired readers of
 838 |                                                 Nothing     =>
 839 |                                                   case minimumGeneration readers of
 840 |                                                     Nothing =>
 841 |                                                       []
 842 |                                                     Just _  =>
 843 |                                                       retired
 844 |                                                 Just cutoff =>
 845 |                                                   filter (\snap =>
 846 |                                                            snap.generation > cutoff
 847 |                                                          ) retired
 848 |                                 in MkCombinedSnapshotState snapshot survivors readers writepressure rebuildpending batchwindow
 849 |                             )
 850 |
 851 | --------------------------------------------------------------------------------
 852 | --          Single Rebuild Cycle
 853 | --------------------------------------------------------------------------------
 854 |
 855 | ||| Executes one rebuild pass.
 856 | |||
 857 | ||| Steps:
 858 | ||| - Rotate all active buffers.
 859 | ||| - Collect extracted entries.
 860 | ||| - Sort entries deterministically.
 861 | ||| - Publish rebuilt snapshot.
 862 | ||| - Reclaim retired snapshots.
 863 | |||
 864 | ||| Returns:
 865 | ||| - Newly published generation when entries were processed.
 866 | ||| - 0 only when no entries were available and publication did not occur.
 867 | ||| - Whether any entries were processed.
 868 | |||
 869 | ||| Notes:
 870 | ||| - Performs exactly one ownership-transfer cycle.
 871 | ||| - Does not guarantee complete draining.
 872 | |||
 873 | export covering
 874 | rebuildOnce :  Ord (Entry a)
 875 |             => Ref World (SortedMap Int (ThreadContext a))
 876 |             -> Ref World (CombinedSnapshotState a)
 877 |             -> RebuildServiceState
 878 |             -> Async Poll [Errno] (RebuildServiceState, Generation, Bool)
 879 | rebuildOnce buffers combinedsnapshotstate st = do
 880 |   -- RotatingBuffers
 881 |   let st1        : RebuildServiceState
 882 |       st1        = { rebuildphase := RotatingBuffers } st
 883 |   extracted      <- liftIO (rotateAllBuffers buffers)
 884 |   -- CollectingEntries
 885 |   let st2        : RebuildServiceState
 886 |       st2        = { rebuildphase := CollectingEntries } st1
 887 |       entries    = collectEntries extracted
 888 |       batchsize  = length entries
 889 |   case isNil entries of
 890 |     True  => do
 891 |       let st' = { rebuildphase := Sleeping } st2
 892 |       pure (st', 0, False)
 893 |     False => do
 894 |       -- SortingEntries
 895 |       let st3 : RebuildServiceState
 896 |           st3 = { rebuildphase := SortingEntries
 897 |                 } st2
 898 |           sorted = sortEntries entries
 899 |       -- PublishingSnapshot
 900 |       let st4    : RebuildServiceState
 901 |           st4    = { rebuildphase := PublishingSnapshot } st3
 902 |       generation <- liftIO (publishSnapshot combinedsnapshotstate sorted)
 903 |       liftIO (reclaimSnapshots combinedsnapshotstate)
 904 |       let st5    : RebuildServiceState
 905 |           st5    = { rebuildphase := Sleeping
 906 |                    , rebuildmetrics := updateMetrics st4.rebuildmetrics batchsize
 907 |                    } st4
 908 |       pure (st5, generation, True)
 909 |
 910 | --------------------------------------------------------------------------------
 911 | --          Flush Until Empty
 912 | --------------------------------------------------------------------------------
 913 |
 914 | ||| Repeatedly performs rebuild cycles until a rotation extracts no work.
 915 | |||
 916 | ||| Behavior:
 917 | ||| - Executes rebuild cycles in sequence.
 918 | ||| - Each cycle atomically rotates ownership of active buffers.
 919 | ||| - Extracted entries are rebuilt into a published snapshot.
 920 | ||| - Terminates once a rotation produces no extracted entries.
 921 | |||
 922 | ||| Visibility guarantees:
 923 | ||| - Flush establishes a quiescent visibility boundary at buffer rotation.
 924 | ||| - All writes already present in rotated buffers are incorporated before completion.
 925 | ||| - Writes arriving concurrently may appear either before or after completion depending on timing.
 926 | ||| - Flush does not stop writers or establish a global synchronization barrier.
 927 | |||
 928 | ||| Concurrency properties:
 929 | ||| - Writers continue appending during rebuild execution.
 930 | ||| - Multiple rebuild cycles may be required if writes continue arriving.
 931 | ||| - Progress remains lock-free for writers.
 932 | |||
 933 | ||| Returns:
 934 | ||| - Final rebuild state.
 935 | ||| - Most recently published generation.
 936 | |||
 937 | ||| Notes:
 938 | ||| - Completion means no buffered work was visible during the final rotation.
 939 | ||| - This is weaker than "all writes before return".
 940 | ||| - Stronger linearizable flush semantics would require an explicit epoch or barrier mechanism.
 941 | ||| - Currently unused by rebuild request processing.
 942 | ||| - May serve as the implementation basis for future flush semantics.
 943 | |||
 944 | export covering
 945 | flushUntilEmpty :  Ord (Entry a)
 946 |                 => LSMRRBVector World a
 947 |                 -> RebuildServiceState
 948 |                 -> Async Poll [Errno] (RebuildServiceState, Generation)
 949 | flushUntilEmpty lsmrrbvector st =
 950 |   let loop :  RebuildServiceState
 951 |            -> Generation
 952 |            -> Async Poll [Errno] (RebuildServiceState, Generation)
 953 |       loop st lastgen = do
 954 |         (st', gen, hadentries) <- rebuildOnce lsmrrbvector.buffers lsmrrbvector.combinedsnapshotstate st
 955 |         case hadentries of
 956 |           True  =>
 957 |             loop st' gen
 958 |           False =>
 959 |             pure (st', gen)
 960 |     in loop st 0
 961 |
 962 | --------------------------------------------------------------------------------
 963 | --          Rebuilder Service
 964 | --------------------------------------------------------------------------------
 965 |
 966 | ||| Processes a rebuild request issued by the LSM write system.
 967 | |||
 968 | ||| This service is responsible for advancing the immutable snapshot from the accumulated thread-local mutation buffers.
 969 | |||
 970 | ||| Two modes of operation exist:
 971 | |||
 972 | ||| Trigger:
 973 | ||| - Performs exactly one rebuildOnce invocation.
 974 | ||| - May publish at most one snapshot.
 975 | ||| - Additional buffered work remains for future rebuild requests.
 976 | |||
 977 | ||| Flush:
 978 | ||| - Repeatedly performs rebuild cycles until all buffered writes observed at invocation time are incorporated.
 979 | ||| - Guarantees that all writes visible in rotated buffers during draining are reflected in the returned generation.
 980 | ||| - Concurrent writes may be incorporated either before or after completion.
 981 | ||| - May perform multiple rotations and publications internally.
 982 | |||
 983 | ||| Concurrency guarantees:
 984 | ||| - Writers may continue appending during rebuild.
 985 | ||| - Flush only guarantees completeness relative to a quiescent cut of buffer rotation visibility.
 986 | |||
 987 | ||| Return values:
 988 | ||| - Trigger returns unit acknowledgement.
 989 | ||| - Flush returns the final snapshot generation after draining.
 990 | |||
 991 | ||| State transitions:
 992 | ||| Sleeping → RotatingBuffers → CollectingEntries → SortingEntries → PublishingSnapshot → Sleeping
 993 | |||
 994 | ||| Notes:
 995 | ||| - Empty rebuild cycles do not advance generation.
 996 | ||| - Flush drains until a cycle produces no entries.
 997 | ||| - Trigger is a bounded operation, Flush is unbounded (but finite under quiescent assumptions).
 998 | |||
 999 | export covering
1000 | handleRebuildRequest :  Ord (Entry a)
1001 |                      => LSMRRBVector World a
1002 |                      -> RebuildServiceState
1003 |                      -> (req : RebuildRequest)
1004 |                      -> Async Poll [Errno] ()
1005 | handleRebuildRequest lsmrrbvector st Trigger = do
1006 |   (_, _, _) <- rebuildOnce lsmrrbvector.buffers lsmrrbvector.combinedsnapshotstate st
1007 |   liftIO (mod lsmrrbvector.rebuildscheduled (const False))
1008 |   let st' : RebuildServiceState
1009 |       st' = { rebuildphase := Sleeping
1010 |             } st
1011 |   pure ()
1012 |
1013 | --------------------------------------------------------------------------------
1014 | --          Rebuild Service
1015 | --------------------------------------------------------------------------------
1016 |
1017 | ||| Constructs and launches user-supplied rebuild (writer) actions concurrently.
1018 | |||
1019 | ||| Behavior:
1020 | ||| - Creates a RebuildService endpoint.
1021 | ||| - Routes rebuild requests into handleRebuildRequest.
1022 | ||| - Starts all supplied rebuild-service actions concurrently.
1023 | |||
1024 | ||| Concurrency:
1025 | ||| - Actions execute in parallel.
1026 | ||| - All actions share the same LSMRRBVector instance.
1027 | ||| - All actions share the same rebuild request endpoint.
1028 | |||
1029 | ||| Properties:
1030 | ||| - Service composition utility.
1031 | ||| - Does not itself perform rebuild work.
1032 | ||| - Rebuild execution occurs only when actions issue requests.
1033 | |||
1034 | ||| Notes:
1035 | ||| - Used to launch rebuild workers supporting background maintenance tasks.
1036 | |||
1037 | export covering
1038 | rebuilderService :  Ord (Entry a)
1039 |                  => LSMRRBVector World a
1040 |                  -> RebuildServiceState
1041 |                  -> List (LSMRRBVector World a -> RebuildService Poll -> RebuildServiceState -> Async Poll [Errno] ())
1042 |                  -> Async Poll [Errno] ()
1043 | rebuilderService lsmrrbvector st actions = do
1044 |   rebuilderservice <-
1045 |     rebuilder
1046 |       (\req => handleRebuildRequest lsmrrbvector st req)
1047 |   ignore $
1048 |     parTraverse (\action => action lsmrrbvector rebuilderservice st) actions
1049 |
1050 | --------------------------------------------------------------------------------
1051 | --          LSMRRBVector Service
1052 | --------------------------------------------------------------------------------
1053 |
1054 | ||| Constructs and launches user-supplied reader actions concurrently.
1055 | |||
1056 | ||| Behavior:
1057 | ||| - Executes all supplied actions in parallel.
1058 | ||| - Provides each action access to the shared LSMRRBVector instance.
1059 | |||
1060 | ||| Concurrency:
1061 | ||| - Actions may perform reads.
1062 | ||| - Execution order is not specified.
1063 | ||| - Actions run independently.
1064 | |||
1065 | ||| Properties:
1066 | ||| - Service composition utility.
1067 | ||| - Does not perform vector operations itself.
1068 | |||
1069 | ||| Notes:
1070 | ||| - Used to launch application readers.
1071 | |||
1072 | export covering
1073 | lsmrrbvectorService :  LSMRRBVector World a
1074 |                     -> List (LSMRRBVector World a -> Async Poll [Errno] ())
1075 |                     -> Async Poll [Errno] ()
1076 | lsmrrbvectorService lsmrrbvector actions =
1077 |   ignore $
1078 |     parTraverse (\action => action lsmrrbvector) actions
1079 |
1080 | --------------------------------------------------------------------------------
1081 | --          Rebuild And LSMRRBVector Service
1082 | --------------------------------------------------------------------------------
1083 |
1084 | ||| Runs rebuild infrastructure and reader workloads together.
1085 | |||
1086 | ||| Behavior:
1087 | ||| - Executes the rebuilderService (writers) and lsmrrbvectorService (readers) concurrently.
1088 | ||| - Allows writers and readers to operate while rebuild work proceeds in the background.
1089 | |||
1090 | ||| Concurrency:
1091 | ||| - Neither service blocks the other.
1092 | ||| - Rebuild publication may occur concurrently with reads and writes.
1093 | |||
1094 | ||| Properties:
1095 | ||| - Top-level composition utility.
1096 | ||| - Establishes the complete LSMRRBVector runtime.
1097 | |||
1098 | ||| Notes:
1099 | ||| - Intended as the final composition point used by runEmptyWith.
1100 | |||
1101 | export covering
1102 | rebuilderAndLSMRRBVectorService :  Async Poll [Errno] ()
1103 |                                 -> Async Poll [Errno] ()
1104 |                                 -> Async Poll [Errno] ()
1105 | rebuilderAndLSMRRBVectorService rebuilderservice lsmrrbvectorservice =
1106 |   ignore $
1107 |     par [ -- writers
1108 |          rebuilderservice
1109 |         , -- readers
1110 |           lsmrrbvectorservice
1111 |         ]
1112 |