All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
305 lines
8.3 KiB
Rust
305 lines
8.3 KiB
Rust
/*
|
|
* Copyright (C) 2022 Aravinth Manivannan <realaravinth@batsense.net>
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as
|
|
* published by the Free Software Foundation, either version 3 of the
|
|
* License, or (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Affero General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
use std::str::FromStr;
|
|
|
|
use libforms::FormSubmissionResp;
|
|
use sqlx::postgres::PgPoolOptions;
|
|
use sqlx::types::time::OffsetDateTime;
|
|
//use sqlx::types::Json;
|
|
use sqlx::ConnectOptions;
|
|
use sqlx::PgPool;
|
|
|
|
use crate::errors::*;
|
|
|
|
/// Connect to databse
|
|
pub enum ConnectionOptions {
|
|
/// fresh connection
|
|
Fresh(Fresh),
|
|
/// existing connection
|
|
Existing(Conn),
|
|
}
|
|
|
|
/// Use an existing database pool
|
|
pub struct Conn(pub PgPool);
|
|
|
|
pub struct Fresh {
|
|
pub pool_options: PgPoolOptions,
|
|
pub disable_logging: bool,
|
|
pub url: String,
|
|
}
|
|
|
|
impl ConnectionOptions {
|
|
async fn connect(self) -> ServiceResult<Database> {
|
|
let pool = match self {
|
|
Self::Fresh(fresh) => {
|
|
let mut connect_options =
|
|
sqlx::postgres::PgConnectOptions::from_str(&fresh.url).unwrap();
|
|
if fresh.disable_logging {
|
|
connect_options.disable_statement_logging();
|
|
}
|
|
sqlx::postgres::PgConnectOptions::from_str(&fresh.url)
|
|
.unwrap()
|
|
.disable_statement_logging();
|
|
fresh
|
|
.pool_options
|
|
.connect_with(connect_options)
|
|
.await
|
|
.unwrap()
|
|
//.map_err(|e| DBError::DBError(Box::new(e)))?
|
|
}
|
|
|
|
Self::Existing(conn) => conn.0,
|
|
};
|
|
Ok(Database { pool })
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct Database {
|
|
pub pool: PgPool,
|
|
}
|
|
|
|
impl Database {
|
|
pub async fn migrate(&self) -> ServiceResult<()> {
|
|
sqlx::migrate!("./migrations/")
|
|
.run(&self.pool)
|
|
.await
|
|
.unwrap();
|
|
//.map_err(|e| DBError::DBError(Box::new(e)))?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn ping(&self) -> bool {
|
|
use sqlx::Connection;
|
|
|
|
if let Ok(mut con) = self.pool.acquire().await {
|
|
con.ping().await.is_ok()
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
|
|
/// register a new website
|
|
pub async fn add_site(&self, host: &str) -> ServiceResult<()> {
|
|
sqlx::query!(
|
|
"INSERT INTO forms_websites (hostname) VALUES ($1) ON CONFLICT DO NOTHING;",
|
|
&host,
|
|
)
|
|
.execute(&self.pool)
|
|
.await
|
|
.unwrap();
|
|
// res.map_err(map_register_err)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// delete a new website
|
|
pub async fn delete_site(&self, host: &str) -> ServiceResult<()> {
|
|
sqlx::query!("DELETE FROM forms_websites WHERE hostname = ($1)", &host,)
|
|
.execute(&self.pool)
|
|
.await
|
|
.unwrap();
|
|
// res.map_err(map_register_err)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Get forms belonging to a host
|
|
pub async fn list_forms(&self, host: &str) -> ServiceResult<Vec<String>> {
|
|
struct S {
|
|
website_path: String,
|
|
}
|
|
let mut forms = sqlx::query_as!(
|
|
S,
|
|
"SELECT DISTINCT
|
|
website_path
|
|
FROM
|
|
forms_submissions
|
|
WHERE
|
|
website_id = (SELECT ID FROM forms_websites WHERE hostname = $1)",
|
|
host
|
|
)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.unwrap();
|
|
let mut resp = Vec::with_capacity(forms.len());
|
|
for f in forms.drain(0..) {
|
|
resp.push(f.website_path);
|
|
}
|
|
Ok(resp)
|
|
}
|
|
|
|
pub async fn get_form_submissions(
|
|
&self,
|
|
page: usize,
|
|
host: &str,
|
|
path: &str,
|
|
) -> ServiceResult<Vec<FormSubmission>> {
|
|
let res = sqlx::query_as!(
|
|
FormSubmission,
|
|
"
|
|
SELECT
|
|
value, time, ID
|
|
FROM
|
|
forms_submissions
|
|
WHERE
|
|
website_id = (SELECT ID from forms_websites WHERE hostname = $1)
|
|
AND
|
|
website_path = $2
|
|
LIMIT 50 OFFSET $3
|
|
",
|
|
host,
|
|
path,
|
|
page as i32
|
|
)
|
|
.fetch_all(&self.pool)
|
|
.await
|
|
.unwrap();
|
|
Ok(res)
|
|
}
|
|
|
|
pub async fn add_form_submission(
|
|
&self,
|
|
data: &serde_json::Value,
|
|
host: &str,
|
|
path: &str,
|
|
) -> ServiceResult<()> {
|
|
let now = now_unix_time_stamp();
|
|
self.add_site(host).await?;
|
|
|
|
//let data = Json(data);
|
|
|
|
sqlx::query!(
|
|
"
|
|
INSERT INTO forms_submissions (website_path, value, time, website_id)
|
|
VALUES ($1, $2, $3, (SELECT ID from forms_websites WHERE hostname = $4));
|
|
",
|
|
path,
|
|
data,
|
|
now,
|
|
host,
|
|
)
|
|
.execute(&self.pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// delete form submission
|
|
pub async fn delete_submission(&self, id: i32, host: &str, path: &str) -> ServiceResult<()> {
|
|
sqlx::query!(
|
|
"DELETE
|
|
FROM
|
|
forms_submissions
|
|
WHERE
|
|
ID = $1
|
|
AND
|
|
website_path = $2
|
|
AND
|
|
website_id = (SELECT ID FROM forms_websites WHERE hostname = $3);
|
|
",
|
|
id,
|
|
path,
|
|
host
|
|
)
|
|
.execute(&self.pool)
|
|
.await
|
|
.unwrap();
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct FormSubmission {
|
|
pub id: i32,
|
|
pub value: Option<serde_json::Value>,
|
|
pub time: OffsetDateTime,
|
|
}
|
|
|
|
impl FormSubmission {
|
|
pub fn to_resp(self) -> FormSubmissionResp {
|
|
FormSubmissionResp {
|
|
value: self.value,
|
|
time: self.time.unix_timestamp(),
|
|
id: self.id as usize,
|
|
}
|
|
}
|
|
}
|
|
|
|
fn now_unix_time_stamp() -> OffsetDateTime {
|
|
OffsetDateTime::now_utc()
|
|
}
|
|
|
|
pub async fn get_db(settings: &crate::settings::Settings) -> Database {
|
|
let pool_options = PgPoolOptions::new().max_connections(settings.database.pool);
|
|
ConnectionOptions::Fresh(Fresh {
|
|
pool_options,
|
|
url: settings.database.url.clone(),
|
|
disable_logging: !settings.debug,
|
|
})
|
|
.connect()
|
|
.await
|
|
.unwrap()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::settings::Settings;
|
|
|
|
#[actix_rt::test]
|
|
async fn db_works() {
|
|
let settings = Settings::new().unwrap();
|
|
let pool_options = PgPoolOptions::new().max_connections(1);
|
|
let db = ConnectionOptions::Fresh(Fresh {
|
|
pool_options,
|
|
url: settings.database.url.clone(),
|
|
disable_logging: !settings.debug,
|
|
})
|
|
.connect()
|
|
.await
|
|
.unwrap();
|
|
assert!(db.ping().await);
|
|
|
|
let urls = ["example.com", "example.org", "example.net"];
|
|
for url in urls.iter() {
|
|
db.delete_site(url).await.unwrap();
|
|
// ensuring delete doesn't fail when record doesn't exist
|
|
db.delete_site(url).await.unwrap();
|
|
println!("using {url}");
|
|
db.add_site(url).await.unwrap();
|
|
// ensuring add_site doesn't fail when record exists
|
|
db.add_site(url).await.unwrap();
|
|
|
|
let path = "/foo";
|
|
db.add_form_submission(&serde_json::Value::default(), url, path)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(db.list_forms(url).await.unwrap().len(), 1);
|
|
|
|
let subs = db.get_form_submissions(0, url, path).await.unwrap();
|
|
assert_eq!(
|
|
&serde_json::Value::default(),
|
|
subs[0].value.as_ref().unwrap()
|
|
);
|
|
|
|
db.delete_submission(subs[0].id, url, path).await.unwrap();
|
|
let subs = db.get_form_submissions(0, url, path).await.unwrap();
|
|
assert!(subs.is_empty());
|
|
}
|
|
}
|
|
}
|