/* * Copyright (C) 2022 Aravinth Manivannan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use std::str::FromStr; use sqlx::postgres::PgPoolOptions; use sqlx::types::time::OffsetDateTime; //use sqlx::types::Json; use sqlx::ConnectOptions; use sqlx::PgPool; use crate::errors::*; /// Connect to databse pub enum ConnectionOptions { /// fresh connection Fresh(Fresh), /// existing connection Existing(Conn), } /// Use an existing database pool pub struct Conn(pub PgPool); pub struct Fresh { pub pool_options: PgPoolOptions, pub disable_logging: bool, pub url: String, } impl ConnectionOptions { async fn connect(self) -> ServiceResult { let pool = match self { Self::Fresh(fresh) => { let mut connect_options = sqlx::postgres::PgConnectOptions::from_str(&fresh.url).unwrap(); if fresh.disable_logging { connect_options.disable_statement_logging(); } sqlx::postgres::PgConnectOptions::from_str(&fresh.url) .unwrap() .disable_statement_logging(); fresh .pool_options .connect_with(connect_options) .await .unwrap() //.map_err(|e| DBError::DBError(Box::new(e)))? } Self::Existing(conn) => conn.0, }; Ok(Database { pool }) } } #[derive(Clone)] pub struct Database { pub pool: PgPool, } impl Database { pub async fn migrate(&self) -> ServiceResult<()> { sqlx::migrate!("./migrations/") .run(&self.pool) .await .unwrap(); //.map_err(|e| DBError::DBError(Box::new(e)))?; Ok(()) } pub async fn ping(&self) -> bool { use sqlx::Connection; if let Ok(mut con) = self.pool.acquire().await { con.ping().await.is_ok() } else { false } } /// register a new website pub async fn add_site(&self, host: &str) -> ServiceResult<()> { sqlx::query!( "INSERT INTO forms_websites (hostname) VALUES ($1) ON CONFLICT DO NOTHING;", &host, ) .execute(&self.pool) .await .unwrap(); // res.map_err(map_register_err)?; Ok(()) } /// delete a new website pub async fn delete_site(&self, host: &str) -> ServiceResult<()> { sqlx::query!("DELETE FROM forms_websites WHERE hostname = ($1)", &host,) .execute(&self.pool) .await .unwrap(); // res.map_err(map_register_err)?; Ok(()) } pub async fn get_form_submissions( &self, page: usize, host: &str, path: &str, ) -> ServiceResult> { let res = sqlx::query_as!( FormSubmission, " SELECT value, time, ID FROM forms_submissions WHERE website_id = (SELECT ID from forms_websites WHERE hostname = $1) AND website_path = $2 LIMIT 50 OFFSET $3 ", host, path, page as i32 ) .fetch_all(&self.pool) .await .unwrap(); Ok(res) } pub async fn add_form_submission( &self, data: &serde_json::Value, host: &str, path: &str, ) -> ServiceResult<()> { let now = now_unix_time_stamp(); self.add_site(host).await?; //let data = Json(data); sqlx::query!( " INSERT INTO forms_submissions (website_path, value, time, website_id) VALUES ($1, $2, $3, (SELECT ID from forms_websites WHERE hostname = $4)); ", path, data, now, host, ) .execute(&self.pool) .await .unwrap(); Ok(()) } } #[derive(Clone, Debug)] pub struct FormSubmission { pub id: i32, pub value: Option, pub time: OffsetDateTime, } fn now_unix_time_stamp() -> OffsetDateTime { OffsetDateTime::now_utc() } pub async fn get_db(settings: &crate::settings::Settings) -> Database { let pool_options = PgPoolOptions::new().max_connections(settings.database.pool); ConnectionOptions::Fresh(Fresh { pool_options, url: settings.database.url.clone(), disable_logging: !settings.debug, }) .connect() .await .unwrap() } #[cfg(test)] mod tests { use super::*; use crate::settings::Settings; #[actix_rt::test] async fn db_works() { let settings = Settings::new().unwrap(); let pool_options = PgPoolOptions::new().max_connections(1); let db = ConnectionOptions::Fresh(Fresh { pool_options, url: settings.database.url.clone(), disable_logging: !settings.debug, }) .connect() .await .unwrap(); assert!(db.ping().await); let urls = ["example.com", "example.com", "example.net"]; for url in urls.iter() { db.delete_site(url).await.unwrap(); // ensuring delete doesn't fail when record doesn't exist db.delete_site(url).await.unwrap(); println!("using {url}"); db.add_site(url).await.unwrap(); // ensuring add_site doesn't fail when record exists db.add_site(url).await.unwrap(); let path = "/foo"; db.add_form_submission(&serde_json::Value::default(), url, path) .await .unwrap(); let subs = db.get_form_submissions(0, url, path).await.unwrap(); assert_eq!( &serde_json::Value::default(), subs[0].value.as_ref().unwrap() ); } } }