0 | module Postgres.Notification
  1 |
  2 | import System.FFI
  3 | import Postgres.FFI.Utility
  4 | import Postgres.Data.Conn
  5 | import Postgres.Data.ResultStatus
  6 | import Postgres.DB.Core
  7 | import Postgres.DB.Wait
  8 | import Postgres.Result
  9 | import Postgres.Exec
 10 |
 11 | ||| Internal type representative of the libpq struct of the same
 12 | ||| name.
 13 | PGnotify : Type
 14 | PGnotify = Struct "PGnotify" [("relname", (Ptr String)), ("be_pid", Int), ("extra", (Ptr String))]
 15 |
 16 | public export
 17 | record Notification where
 18 |   constructor MkNotification
 19 |   channel : String
 20 |   payload : String
 21 |
 22 | notificationChannel : PGnotify -> String
 23 | notificationChannel n = prim__string_value $ getField n "relname"
 24 |
 25 | notificationPayload : PGnotify -> String
 26 | notificationPayload n = prim__string_value $ getField n "extra"
 27 |
 28 | notification : PGnotify -> Notification
 29 | notification n = MkNotification (notificationChannel n) (notificationPayload n)
 30 |
 31 | %foreign libpq "PQnotifies"
 32 | prim__dbGetNextNotification : Ptr PGconn -> PrimIO (Ptr PGnotify)
 33 |
 34 | ||| Takes a PGnotify struct pointer to a PGnotify struct.
 35 | ||| IMPORTANT: Be sure you have checked that the struct pointer
 36 | |||    is non-null before calling this.
 37 | |||
 38 | ||| NOTE: Unless you need to hold onto the PGnotify struct for longer,
 39 | |||    call the notificationStruct function instead which will take care
 40 | |||    of freeing memory for you.
 41 | %foreign cHelper "notify_struct"
 42 | prim__dbNotifyStruct : Ptr PGnotify -> PGnotify
 43 |
 44 | %foreign libpq "PQfreemem"
 45 | prim__dbFreeNotifyStruct : Ptr PGnotify -> PrimIO ()
 46 |
 47 | ||| Takes a PGnotify struct pointer to a PGnotify struct.
 48 | ||| IMPORTANT: Be sure you have checked that the struct pointer
 49 | |||    is non-null before calling this.
 50 | notificationStruct : HasIO io => Ptr PGnotify -> io PGnotify
 51 | notificationStruct ptr = let res = prim__dbNotifyStruct ptr in
 52 |                              do primIO $ prim__dbFreeNotifyStruct ptr
 53 |                                 pure res
 54 |
 55 | %foreign cHelper "is_null"
 56 | prim__isNullNotifyStruct : Ptr PGnotify -> Int
 57 |
 58 | isNullNotification : Ptr PGnotify -> Bool
 59 | isNullNotification ptr = intToBool $ prim__isNullNotifyStruct ptr 
 60 |
 61 | --
 62 | -- Listen
 63 | --
 64 |
 65 | ||| Start listening for notifications on the given channel.
 66 | export
 67 | pgListen : (channel: String) -> Conn -> IO ResultStatus
 68 | pgListen channel conn = withExecResult conn ("LISTEN " ++ channel) (\r => pure $ pgResultStatus r)
 69 |
 70 | --
 71 | -- Retrieve
 72 | --
 73 |
 74 | ||| Gets the next notification _of those sitting around locally_.
 75 | ||| Returns `Nothing` if there are no notifications.
 76 | |||
 77 | ||| See `libpq` documentation on `PQnotifies` for details on the
 78 | ||| distinction between retrieving notifications from the server and
 79 | ||| getting the next notification that has already been retrieved.
 80 | |||
 81 | ||| NOTE: This function _does_ consume input to make sure no notification
 82 | |||  sent by the server but not processed by the client yet gets
 83 | |||  missed.
 84 | export
 85 | pgNextNotification : Conn -> IO (Maybe Notification)
 86 | pgNextNotification (MkConn conn) = do True <- pgConsumeInput (MkConn conn)
 87 |                                         | False => pure Nothing
 88 |                                       notify <- primIO $ prim__dbGetNextNotification conn
 89 |                                       if isNullNotification notify
 90 |                                          then pure $ Nothing
 91 |                                          else do derefNotify <- notificationStruct notify
 92 |                                                  pure $ Just (notification derefNotify)
 93 |
 94 | --
 95 | -- Loop Retrieval
 96 | --
 97 |
 98 | ||| First waits for activity from server
 99 | ||| then checks if there is a new notification
100 | ||| then either returns a notification or cycles
101 | cycle : Conn -> IO Notification
102 | cycle conn = do True <- pgWait conn
103 |                   | False => cycle conn
104 |                 Nothing <- pgNextNotification conn
105 |                  | Just n => pure n
106 |                 cycle conn
107 |
108 | ||| Produce a potentially infinite stream of notifications.
109 | ||| Unlike `pgNextNotificaiton`, this will wait for the server
110 | ||| to deliver a notification.
111 | |||
112 | ||| This _is_ an infinite sequence with a blocking wait for the
113 | ||| next notification so it is of somewhat limited utility
114 | ||| compared to checking for new notifications at a natural point
115 | ||| in your programs existing logic loop unless your entire loop
116 | ||| is dictated by notification arrival anyway.
117 | export
118 | partial
119 | pgNotificationStream : Conn -> Stream (IO Notification)
120 | pgNotificationStream conn = next :: (pgNotificationStream conn) where
121 |   next : IO Notification
122 |   next = do Nothing <- pgNextNotification conn
123 |              | Just n => pure n
124 |             cycle conn 
125 |
126 |