forms/src/db.rs

306 lines
8.3 KiB
Rust

/*
* Copyright (C) 2022 Aravinth Manivannan <realaravinth@batsense.net>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::str::FromStr;
use libforms::FormSubmissionResp;
use sqlx::postgres::PgPoolOptions;
use sqlx::types::time::OffsetDateTime;
//use sqlx::types::Json;
use sqlx::ConnectOptions;
use sqlx::PgPool;
use crate::errors::*;
/// Connect to databse
pub enum ConnectionOptions {
/// fresh connection
Fresh(Fresh),
/// existing connection
Existing(Conn),
}
/// Use an existing database pool
pub struct Conn(pub PgPool);
pub struct Fresh {
pub pool_options: PgPoolOptions,
pub disable_logging: bool,
pub url: String,
}
impl ConnectionOptions {
async fn connect(self) -> ServiceResult<Database> {
let pool = match self {
Self::Fresh(fresh) => {
let mut connect_options =
sqlx::postgres::PgConnectOptions::from_str(&fresh.url).unwrap();
if fresh.disable_logging {
connect_options.disable_statement_logging();
}
sqlx::postgres::PgConnectOptions::from_str(&fresh.url)
.unwrap()
.disable_statement_logging();
fresh
.pool_options
.connect_with(connect_options)
.await
.unwrap()
//.map_err(|e| DBError::DBError(Box::new(e)))?
}
Self::Existing(conn) => conn.0,
};
Ok(Database { pool })
}
}
#[derive(Clone)]
pub struct Database {
pub pool: PgPool,
}
impl Database {
pub async fn migrate(&self) -> ServiceResult<()> {
sqlx::migrate!("./migrations/")
.run(&self.pool)
.await
.unwrap();
//.map_err(|e| DBError::DBError(Box::new(e)))?;
Ok(())
}
pub async fn ping(&self) -> bool {
use sqlx::Connection;
if let Ok(mut con) = self.pool.acquire().await {
con.ping().await.is_ok()
} else {
false
}
}
/// register a new website
pub async fn add_site(&self, host: &str) -> ServiceResult<()> {
sqlx::query!(
"INSERT INTO forms_websites (hostname) VALUES ($1) ON CONFLICT DO NOTHING;",
&host,
)
.execute(&self.pool)
.await
.unwrap();
// res.map_err(map_register_err)?;
Ok(())
}
/// delete a new website
pub async fn delete_site(&self, host: &str) -> ServiceResult<()> {
sqlx::query!("DELETE FROM forms_websites WHERE hostname = ($1)", &host,)
.execute(&self.pool)
.await
.unwrap();
// res.map_err(map_register_err)?;
Ok(())
}
/// Get forms belonging to a host
pub async fn list_forms(&self, host: &str) -> ServiceResult<Vec<String>> {
struct S {
website_path: String,
}
let mut forms = sqlx::query_as!(
S,
"SELECT DISTINCT
website_path
FROM
forms_submissions
WHERE
website_id = (SELECT ID FROM forms_websites WHERE hostname = $1)",
host
)
.fetch_all(&self.pool)
.await
.unwrap();
let mut resp = Vec::with_capacity(forms.len());
for f in forms.drain(0..) {
resp.push(f.website_path);
}
Ok(resp)
}
pub async fn get_form_submissions(
&self,
page: usize,
host: &str,
path: &str,
) -> ServiceResult<Vec<FormSubmission>> {
let res = sqlx::query_as!(
FormSubmission,
"
SELECT
value, time, ID
FROM
forms_submissions
WHERE
website_id = (SELECT ID from forms_websites WHERE hostname = $1)
AND
website_path = $2
LIMIT 50 OFFSET $3
",
host,
path,
page as i32
)
.fetch_all(&self.pool)
.await
.unwrap();
Ok(res)
}
pub async fn add_form_submission(
&self,
data: &serde_json::Value,
host: &str,
path: &str,
) -> ServiceResult<()> {
let now = now_unix_time_stamp();
self.add_site(host).await?;
//let data = Json(data);
sqlx::query!(
"
INSERT INTO forms_submissions (website_path, value, time, website_id)
VALUES ($1, $2, $3, (SELECT ID from forms_websites WHERE hostname = $4));
",
path,
data,
now,
host,
)
.execute(&self.pool)
.await
.unwrap();
Ok(())
}
/// delete form submission
pub async fn delete_submission(&self, id: i32, host: &str, path: &str) -> ServiceResult<()> {
sqlx::query!(
"DELETE
FROM
forms_submissions
WHERE
ID = $1
AND
website_path = $2
AND
website_id = (SELECT ID FROM forms_websites WHERE hostname = $3);
",
id,
path,
host
)
.execute(&self.pool)
.await
.unwrap();
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct FormSubmission {
pub id: i32,
pub value: Option<serde_json::Value>,
pub time: OffsetDateTime,
}
impl FormSubmission {
pub fn to_resp(self) -> FormSubmissionResp {
FormSubmissionResp {
value: self.value,
time: self.time.unix_timestamp(),
id: self.id as usize,
}
}
}
fn now_unix_time_stamp() -> OffsetDateTime {
OffsetDateTime::now_utc()
}
pub async fn get_db(settings: &crate::settings::Settings) -> Database {
let pool_options = PgPoolOptions::new().max_connections(settings.database.pool);
ConnectionOptions::Fresh(Fresh {
pool_options,
url: settings.database.url.clone(),
disable_logging: !settings.debug,
})
.connect()
.await
.unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::settings::Settings;
#[actix_rt::test]
async fn db_works() {
let settings = Settings::new().unwrap();
let pool_options = PgPoolOptions::new().max_connections(1);
let db = ConnectionOptions::Fresh(Fresh {
pool_options,
url: settings.database.url.clone(),
disable_logging: !settings.debug,
})
.connect()
.await
.unwrap();
assert!(db.ping().await);
let urls = ["example.com", "example.org", "example.net"];
for url in urls.iter() {
db.delete_site(url).await.unwrap();
// ensuring delete doesn't fail when record doesn't exist
db.delete_site(url).await.unwrap();
println!("using {url}");
db.add_site(url).await.unwrap();
// ensuring add_site doesn't fail when record exists
db.add_site(url).await.unwrap();
let path = "/foo";
db.add_form_submission(&serde_json::Value::default(), url, path)
.await
.unwrap();
assert_eq!(db.list_forms(url).await.unwrap().len(), 1);
let subs = db.get_form_submissions(0, url, path).await.unwrap();
assert_eq!(
&serde_json::Value::default(),
subs[0].value.as_ref().unwrap()
);
db.delete_submission(subs[0].id, url, path).await.unwrap();
let subs = db.get_form_submissions(0, url, path).await.unwrap();
assert!(subs.is_empty());
}
}
}