0 | module Postgres.DB
  1 |
  2 | import public Data.Vect.Quantifiers
  3 | import public Postgres.Data.ResultStatus
  4 | import public Postgres.Result
  5 | import public Postgres.Query
  6 |
  7 | import Postgres.DB.Core
  8 | import Postgres.Data.Conn
  9 | import Postgres.Data.ConnectionStatus
 10 | import Postgres.Data.PostgresType
 11 | import Postgres.Data.PostgresValue
 12 | import Postgres.Data.PostgresTable
 13 | import Postgres.Exec
 14 | import Postgres.LoadTypes
 15 | import Postgres.Notification
 16 | import JSON.Parser
 17 |
 18 | import public Control.TransitionIndexed
 19 |
 20 | --
 21 | -- Safe connection
 22 | --
 23 |
 24 | ||| Open a connection to a Postgres database,
 25 | ||| perform some actions with it, and then close
 26 | ||| the connection.
 27 | |||
 28 | ||| If the database connection is established successfully,
 29 | ||| the `onOpen` function will be executed. If the
 30 | ||| connection cannot be established, `onError` will be
 31 | ||| executed.
 32 | |||
 33 | ||| You do _not_ call `pgOpen` or `pgClose` directly
 34 | ||| when using this function.
 35 | export
 36 | withConn : HasIO io => (pgUrl: String) -> (onOpen: Conn -> io b) -> (onError: ConnectionStatus -> io b) -> io b
 37 | withConn pgUrl onOpen onError 
 38 |   = do conn <- pgOpen pgUrl
 39 |        out <- case (pgStatus conn) of
 40 |                 OK => onOpen conn
 41 |                 x => onError x
 42 |        pgClose conn
 43 |        pure out
 44 |
 45 | --
 46 | -- Database abstraction
 47 | --
 48 |
 49 | public export
 50 | data DBState = Open | Closed
 51 |
 52 | public export 
 53 | data OpenResult = OK | Failed String
 54 |
 55 | public export
 56 | OpenResultState : OpenResult -> DBState
 57 | OpenResultState = \case OK          => Open
 58 |                         (Failed _)  => Closed
 59 |
 60 | export
 61 | data Connection : Type where
 62 |   MkConnection : Conn -> TypeDictionary -> Connection
 63 |
 64 | getConn : Connection -> Conn
 65 | getConn (MkConnection conn _) = conn
 66 |
 67 | getTypes : Connection -> TypeDictionary
 68 | getTypes (MkConnection _ types) = types
 69 |
 70 | export
 71 | data Database : (ty : Type) -> (s1 : DBState) -> (s2Fn : (ty -> DBState)) -> Type where
 72 |   DBOpen  : (url : String) -> Database OpenResult Closed OpenResultState
 73 |   DBClose : Database () Open (const Closed)
 74 |
 75 |   Exec    : (fn : Connection -> IO a) -> Database a Open (const Open)
 76 |
 77 |   DIO      : IO a -> Database a s1 (const s1)
 78 |
 79 |   GetTypes : Database TypeDictionary Open (const Open)
 80 |
 81 |   Pure    : (x : a) -> Database a (stateFn x) stateFn
 82 |   Bind    : (db : Database a s1 s2Fn) -> (f : (x : a) -> Database b (s2Fn x) s3Fn) -> Database b s1 s3Fn
 83 |
 84 | %name Database db, db1, db2
 85 |
 86 | export
 87 | TransitionIndexedPointed DBState Database where
 88 |   pure = Pure
 89 |
 90 | export
 91 | TransitionIndexedMonad DBState Database where
 92 |   bind = Bind
 93 |
 94 | export
 95 | liftIO' : IO a -> Database a s1 (const s1)
 96 | liftIO' = DIO
 97 |
 98 | data ConnectionState : DBState -> Type where
 99 |   CConnected : (conn : Connection) -> ConnectionState Open
100 |   CDisconnected : ConnectionState Closed
101 |
102 | openResult : HasIO io => Conn -> io OpenResult
103 | openResult conn = case (pgStatus conn) of
104 |                        OK => pure OK
105 |                        x  => Failed <$> pgErrorMessage conn
106 |
107 | runDatabase' : HasIO io => ConnectionState s1 -> Database a s1 s2Fn -> io (x : a ** ConnectionState (s2Fn x))
108 | runDatabase' CDisconnected (DBOpen url) = do conn <- pgOpen url
109 |                                              status <- openResult conn
110 |                                              case status of
111 |                                                   OK => do Right types <- pgLoadTypes conn
112 |                                                              | Left err => pure (Failed err ** CDisconnected)
113 |                                                            Prelude.pure (OK ** CConnected (MkConnection conn types))
114 |                                                   Failed err => pure (Failed err ** CDisconnected)
115 | runDatabase' (CConnected conn) DBClose = do pgClose (getConn conn)
116 |                                             pure $ (() ** CDisconnected)
117 | runDatabase' (CConnected conn) (Exec fn) = liftIO $ do res <- fn conn
118 |                                                        pure (res ** CConnected conn)
119 | runDatabase' (CConnected conn) GetTypes = pure $ (getTypes conn ** CConnected conn)
120 | runDatabase' cs (Pure y) = pure (y ** cs)
121 | runDatabase' cs (Bind db f) = do (res ** cs'<- runDatabase' cs db
122 |                                  runDatabase' cs' (f res)
123 | runDatabase' cs (DIO io) = do v <- liftIO io
124 |                               pure (v ** cs)
125 |
126 | export
127 | evalDatabase : HasIO io => Database a Closed (const Closed) -> io a
128 | evalDatabase db = pure $ fst !(runDatabase' CDisconnected db)
129 |
130 | export
131 | initDatabase : Database () Closed (const Closed)
132 | initDatabase = pure ()
133 |
134 | export
135 | openDatabase : (url : String) -> Database OpenResult Closed OpenResultState
136 | openDatabase = DBOpen
137 |
138 | export
139 | closeDatabase : Database () Open (const Closed)
140 | closeDatabase = DBClose
141 |
142 | export
143 | exec : (Connection -> IO a) -> Database a Open (const Open)
144 | exec = Exec
145 |
146 | ||| You can execute arbitrary things against the lower-level
147 | ||| Conn, including closing the database connection prematurely.
148 | export
149 | unsafeExec : (Conn -> IO a) -> Database a Open (const Open)
150 | unsafeExec f = Exec (\(MkConnection conn empty) => f conn)
151 |
152 | ||| Take a function that operates on a Conn
153 | ||| (currency of the underlying lower level
154 | ||| Postgres stuff) and turn it into a function
155 | ||| that operates on a Connection.
156 | pgExec : (Conn -> IO a) -> Connection -> IO a
157 | pgExec f = f . getConn 
158 |
159 | ||| Perform some operation on an open database without closing it.
160 | ||| A database connection will be created beforehand and then
161 | ||| properly disposed of afterward.
162 | export
163 | withDB : HasIO io => (url : String) -> Database a Open (const Open) -> io $ Either String a
164 | withDB url dbOps = evalDatabase dbCommands
165 |   where
166 |     dbCommands : Database (Either String a) Closed (const Closed)
167 |     dbCommands = do initDatabase
168 |                     OK <- openDatabase url
169 |                       | Failed err => pure $ Left err
170 |                     out <- dbOps
171 |                     closeDatabase
172 |                     pure $ Right out
173 |
174 | ||| Dump the Postgres type dictionary for debugging purposes.
175 | export
176 | debugDumpTypes : Database () Open (const Open)
177 | debugDumpTypes = do types <- GetTypes
178 |                     liftIO' $ putStrLn $ show types
179 |                     pure ()
180 |
181 | --
182 | -- Postgres Commands
183 | --
184 |
185 | ||| Query the database interpreting all columns as strings.
186 | export
187 | stringQuery : (header : Bool) -> (query : String) -> Connection -> IO (Either String (StringResultset header))
188 | stringQuery header query (MkConnection conn types) = pgStringResultsQuery header query conn
189 |
190 | ||| Query the database expecting a JSON result is returned.
191 | export 
192 | jsonQuery : (query : String) -> Connection -> IO (Maybe JSON)
193 | jsonQuery = pgExec . pgJSONResultQuery
194 |
195 | ||| Query the database expecting the given array of types in each
196 | ||| row of the result returned.
197 | export
198 | expectedQuery : {cols : Nat} 
199 |              -> (expected : Vect cols Type) 
200 |              -> (query : String) 
201 |              -> {auto castable : (All Castable expected)} 
202 |              -> Connection 
203 |              -> IO (Either String (rowCount ** Vect rowCount (HVect expected)))
204 | expectedQuery expected query (MkConnection conn types) = pgResultQuery expected query conn
205 |
206 | ||| Query the given table in the database mapping each row to the given Idris type.
207 | ||| @param table The table to query against.
208 | ||| @param cols A Vect of tuples where the first element is a column name to select and
209 | |||             the second element is an Idris type to cast the column value to.
210 | ||| @param conn A database connection.
211 | export
212 | tableQuery : PostgresTable t =>
213 |              {n : _}
214 |           -> (table : t)
215 |           -> (cols : Vect n (ColumnIdentifier, Type))
216 |           -> HasMappings IdrCast table cols =>
217 |              (conn : Connection)
218 |           -> IO (Either String (rowCount ** Vect rowCount (HVect (Builtin.snd <$> cols))))
219 | tableQuery table cols @{mappings} conn with (select table cols @{mappings})
220 |   tableQuery table cols @{mappings} conn | query =
221 |     expectedQuery (snd <$> cols) query conn @{allCastable table}
222 |
223 | namespace StringColumns
224 |   ||| Query the given table in the database mapping each row to the given Idris type.
225 |   ||| @param table The table to query against.
226 |   ||| @param cols A Vect of tuples where the first element is a column name to select and
227 |   |||             the second element is an Idris type to cast the column value to.
228 |   ||| @param conn A database connection.
229 |   export
230 |   tableQuery' : PostgresTable t =>
231 |                {n : _}
232 |             -> (table : t)
233 |             -> (cols : Vect n (String, Type))
234 |             -> HasMappings IdrCast table (mapFst (MkColumnId (aliasOrName table)) <$> cols) =>
235 |                (conn : Connection)
236 |             -> IO (Either String (rowCount ** Vect rowCount (HVect (Builtin.snd <$> (mapFst (MkColumnId (aliasOrName table)) <$> cols)))))
237 |   tableQuery' table cols conn = tableQuery table (mapFst (MkColumnId (aliasOrName table)) <$> cols) conn
238 |
239 | ||| Perform the given command and instead of parsing the response
240 | ||| just report the result status. This is useful when you don't
241 | ||| care if/what the response looks like, just whether the command
242 | ||| worked
243 | export
244 | perform : (command : String) -> Connection -> IO ResultStatus
245 | perform cmd = pgExec (\c => withExecResult c cmd (pure . pgResultStatus))
246 |
247 | ||| Insert the given values into the given table.
248 | ||| @param table The table to insert into.
249 | ||| @param cols A Vect of column names to insert into (does
250 | |||             not need to be every column in the table but
251 | |||             there is not currently protection against omitting
252 | |||             a column with no default value).
253 | ||| @param values The values to insert.
254 | ||| @param conn A database connection.
255 | export
256 | tableInsert : {n : _}
257 |            -> (table : PersistedTable)
258 |            -> (cols : Vect n ColumnIdentifier)
259 |            -> {colTypes : Vect n Type}
260 |            -> (values : HVect colTypes)
261 |            -> HasLooseMappings PGCast table (zip cols colTypes) =>
262 |               (conn : Connection)
263 |            -> IO ResultStatus
264 | tableInsert table cols values conn with (insert table cols values)
265 |   tableInsert table cols values conn | query =
266 |     perform query conn
267 |
268 | ||| Start listening for notifications on the given channel.
269 | export
270 | listen : (channel : String) -> Connection -> IO ResultStatus
271 | listen = pgExec . pgListen
272 |
273 | ||| Gets the next notification _of those sitting around locally_.
274 | ||| Returns `Nothing` if there are no notifications.
275 | |||
276 | ||| See `libpq` documentation on `PQnotifies` for details on the
277 | ||| distinction between retrieving notifications from the server and
278 | ||| getting the next notification that has already been retrieved.
279 | |||
280 | ||| NOTE: This function _does_ consume input to make sure no notification
281 | |||  sent by the server but not processed by the client yet gets
282 | |||  missed.
283 | export
284 | nextNotification : Connection -> IO (Maybe Notification)
285 | nextNotification = pgExec pgNextNotification
286 |
287 | ||| Produce a potentially infinite stream of notifications.
288 | ||| Unlike `nextNotificaiton`, this will wait for the server
289 | ||| to deliver a notification.
290 | |||
291 | ||| This _is_ an infinite sequence with a blocking wait for the
292 | ||| next notification so it is of somewhat limited utility
293 | ||| compared to checking for new notifications at a natural point
294 | ||| in your programs existing logic loop unless your entire loop
295 | ||| is dictated by notification arrival anyway.
296 | covering
297 | export
298 | notificationStream : Connection -> Stream (IO Notification)
299 | notificationStream = pgNotificationStream . getConn
300 |
301 |