/* * Copyright (C) 2023 Aravinth Manivannan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ use url::Url; use uuid::Uuid; use crate::api::v1::get_random; use crate::db::{ JobState, JOB_STATES, JOB_STATE_CREATE, JOB_STATE_FINISH, JOB_STATE_RUNNING, }; use crate::errors::*; use crate::mcaptcha::PerformanceAnalytics; use crate::Data; use sqlx::types::time::OffsetDateTime; fn now_unix_time_stamp() -> OffsetDateTime { OffsetDateTime::now_utc() } impl Data { /// Check if an mCaptcha instance is registered on the database pub async fn mcaptcha_url_exists(&self, url: &str) -> ServiceResult { let res = sqlx::query!( "SELECT EXISTS (SELECT 1 from survey_mcaptcha_hostname WHERE url = $1)", url ) .fetch_one(&self.db) .await?; let mut resp = false; if let Some(x) = res.exists { if x { resp = true; } } Ok(resp) } /// Register an mCaptcha instance pub async fn mcaptcha_register_instance(&self, url: &str) -> ServiceResult { let secret = get_random(32); sqlx::query!( "INSERT INTO survey_mcaptcha_hostname (url, secret) VALUES ($1, $2)", url, &secret, ) .execute(&self.db) .await?; Ok(secret) } /// Update the secret of an mCaptcha instance pub async fn mcaptcha_update_secret(&self, url: &str) -> ServiceResult { let secret = get_random(32); sqlx::query!( "UPDATE survey_mcaptcha_hostname set secret = $1 WHERE url = $2", &secret, url ) .execute(&self.db) .await?; Ok(secret) } /// Authenticate an mCaptcha instance and return its URL pub async fn mcaptcha_authenticate(&self, secret: &str) -> ServiceResult<()> { let res = sqlx::query!( "SELECT EXISTS ( SELECT url FROM survey_mcaptcha_hostname WHERE secret = $1 )", secret ) .fetch_one(&self.db) .await?; if !matches!(res.exists, Some(true)) { return Err(ServiceError::WrongPassword); } Ok(()) } /// Delete mCaptcha instance from database pub async fn mcaptcha_delete_mcaptcha_instance( &self, url: &str, secret: &str, ) -> ServiceResult<()> { sqlx::query!( "DELETE FROM survey_mcaptcha_hostname WHERE secret = $1 AND url =$2", secret, url ) .execute(&self.db) .await?; Ok(()) } /// Delete mCaptcha campaign from database pub async fn mcaptcha_delete_mcaptcha_campaign( &self, campaign_id: &Uuid, secret: &str, ) -> ServiceResult<()> { let campaign_str = campaign_id.to_string(); sqlx::query!( "DELETE FROM survey_mcaptcha_campaign WHERE campaign_id = $1 AND url_id = ( SELECT ID FROM survey_mcaptcha_hostname WHERE secret = $2 )", &campaign_str, secret ) .execute(&self.db) .await?; Ok(()) } /// Check if an mCaptcha instance campaign is registered on DB pub async fn mcaptcha_campaign_is_registered( &self, campaign_id: &Uuid, secret: &str, ) -> ServiceResult { let campaign_str = campaign_id.to_string(); let res = sqlx::query!( "SELECT EXISTS ( SELECT ID FROM survey_mcaptcha_campaign WHERE campaign_id = $1 AND url_id = ( SELECT ID FROM survey_mcaptcha_hostname WHERE secret = $2 ) )", &campaign_str, secret ) .fetch_one(&self.db) .await?; let mut resp = false; if let Some(x) = res.exists { if x { resp = true; } } Ok(resp) } /// Register an mCaptcha instance campaign on DB pub async fn mcaptcha_register_campaign( &self, campaign_id: &Uuid, secret: &str, ) -> ServiceResult<()> { let campaign_str = campaign_id.to_string(); let public_id = Uuid::new_v4(); sqlx::query!( "INSERT INTO survey_mcaptcha_campaign (campaign_id, public_id, url_id) VALUES ($1, $2, (SELECT ID FROM survey_mcaptcha_hostname WHERE secret = $3));", &campaign_str, &public_id.to_string(), secret, ) .execute(&self.db) .await?; Ok(()) } /// Register an mCaptcha instance campaign on DB pub async fn mcaptcha_get_campaign_public_id( &self, campaign_id: &Uuid, secret: &str, ) -> ServiceResult { let campaign_str = campaign_id.to_string(); struct S { public_id: String, } let res = sqlx::query_as!( S, "SELECT public_id FROM survey_mcaptcha_campaign WHERE campaign_id = $1 AND url_id = (SELECT ID FROM survey_mcaptcha_hostname WHERE secret = $2);", &campaign_str, secret, ) .fetch_one(&self.db) .await?; Ok(Uuid::parse_str(&res.public_id).unwrap()) } /// Get an mCaptcha instance campaign checkpoint pub async fn mcaptcha_get_checkpoint( &self, campaign_id: &Uuid, ) -> ServiceResult { let campaign_str = campaign_id.to_string(); struct CheckPoint { synced_till: i32, } let checkpoint = sqlx::query_as!( CheckPoint, "SELECT synced_till FROM survey_mcaptcha_campaign WHERE campaign_id = $1;", &campaign_str, ) .fetch_one(&self.db) .await?; let checkpoint = checkpoint.synced_till as usize; Ok(checkpoint) } /// Set an mCaptcha instance campaign checkpoint pub async fn mcaptcha_set_checkpoint( &self, campaign_id: &Uuid, checkpoint: usize, ) -> ServiceResult<()> { let campaign_str = campaign_id.to_string(); sqlx::query!( "UPDATE survey_mcaptcha_campaign SET synced_till = $1 WHERE campaign_id = $2; ", checkpoint as i32, &campaign_str, ) .execute(&self.db) .await?; Ok(()) } /// Store mCaptcha instance campaign analytics pub async fn mcaptcha_insert_analytics( &self, campaign_id: &Uuid, r: &PerformanceAnalytics, ) -> ServiceResult<()> { let campaign_str = campaign_id.to_string(); sqlx::query!( "INSERT INTO survey_mcaptcha_analytics ( campaign_id, time, difficulty_factor, worker_type ) VALUES (( SELECT ID FROM survey_mcaptcha_campaign WHERE campaign_id = $1 ), $2, $3, $4 );", &campaign_str, r.time as i32, r.difficulty_factor as i32, &r.worker_type, ) .execute(&self.db) .await?; Ok(()) } /// fetch PoW analytics pub async fn mcaptcha_analytics_fetch( &self, public_id: &Uuid, limit: usize, offset: usize, ) -> ServiceResult> { let public_id_str = public_id.to_string(); struct P { id: i32, time: i32, difficulty_factor: i32, worker_type: String, } impl From

for PerformanceAnalytics { fn from(v: P) -> Self { Self { time: v.time as u32, difficulty_factor: v.difficulty_factor as u32, worker_type: v.worker_type, id: v.id as usize, } } } let mut c = sqlx::query_as!( P, "SELECT id, time, difficulty_factor, worker_type FROM survey_mcaptcha_analytics WHERE campaign_id = ( SELECT ID FROM survey_mcaptcha_campaign WHERE public_id = $1 ) ORDER BY ID OFFSET $2 LIMIT $3 ", &public_id_str, offset as i32, limit as i32 ) .fetch_all(&self.db) .await?; let mut res = Vec::with_capacity(c.len()); for i in c.drain(0..) { res.push(i.into()) } Ok(res) } pub async fn get_next_job_to_run(&self) -> ServiceResult> { let res = match sqlx::query_as!( InnerSchedulerJob, "SELECT survey_mcaptcha_campaign.campaign_id, survey_mcaptcha_upload_jobs.public_id, survey_mcaptcha_hostname.url FROM survey_mcaptcha_campaign INNER JOIN survey_mcaptcha_upload_jobs ON survey_mcaptcha_upload_jobs.campaign_id = survey_mcaptcha_campaign.ID INNER JOIN survey_mcaptcha_hostname ON survey_mcaptcha_hostname.ID = survey_mcaptcha_campaign.url_id WHERE survey_mcaptcha_upload_jobs.job_state = ( SELECT ID FROM survey_mcaptcha_upload_job_states WHERE name = $1 ) AND survey_mcaptcha_upload_jobs.finished_at is NULL AND survey_mcaptcha_upload_jobs.scheduled_at is NULL ORDER BY created_at ASC;", &JOB_STATE_CREATE.name ) .fetch_one(&self.db) .await { Ok(res) => Ok(Some(res.into())), Err(sqlx::Error::RowNotFound) => Ok(None), Err(e) => Err(e), }?; Ok(res) } pub async fn add_job(&self, campaign_id: &Uuid) -> ServiceResult { let now = now_unix_time_stamp(); if let Some(unfinished_job) = self.get_unfinished_job_for_campaign(campaign_id).await? { return Ok(unfinished_job.public_job_id); } let public_id = Uuid::new_v4(); let public_id_str = public_id.to_string(); let campaign_str = campaign_id.to_string(); sqlx::query!( "INSERT INTO survey_mcaptcha_upload_jobs (campaign_id, job_state, created_at, public_id) VALUES ( (SELECT ID FROM survey_mcaptcha_campaign WHERE campaign_id = $1), (SELECT ID FROM survey_mcaptcha_upload_job_states WHERE name = $2), $3, $4)", &campaign_str, &JOB_STATE_CREATE.name, now, public_id_str ) .execute(&self.db) .await?; Ok(public_id) } pub async fn get_unfinished_job_for_campaign( &self, campaign_id: &Uuid, ) -> ServiceResult> { let res = match sqlx::query_as!( InnerJob, " SELECT survey_mcaptcha_upload_jobs.ID, survey_mcaptcha_upload_jobs.public_id, survey_mcaptcha_campaign.campaign_id, survey_mcaptcha_campaign.public_id as campaign_public_id, survey_mcaptcha_upload_job_states.name, survey_mcaptcha_upload_jobs.created_at, survey_mcaptcha_upload_jobs.scheduled_at, survey_mcaptcha_upload_jobs.finished_at FROM survey_mcaptcha_upload_jobs INNER JOIN survey_mcaptcha_upload_job_states ON survey_mcaptcha_upload_job_states.ID = survey_mcaptcha_upload_jobs.job_state INNER JOIN survey_mcaptcha_campaign ON survey_mcaptcha_campaign.ID = survey_mcaptcha_upload_jobs.campaign_id WHERE survey_mcaptcha_campaign.campaign_id = $1 AND survey_mcaptcha_upload_job_states.name = $2;", &campaign_id.to_string(), &JOB_STATE_CREATE.name ) .fetch_one(&self.db) .await { Ok(res) => Ok(Some(res.into())), Err(sqlx::Error::RowNotFound) => Ok(None), Err(e) => Err(e), }?; Ok(res) } pub async fn get_job(&self, public_id: &uuid::Uuid) -> ServiceResult> { let res = match sqlx::query_as!( InnerJob, " SELECT survey_mcaptcha_upload_jobs.ID, survey_mcaptcha_upload_jobs.public_id, survey_mcaptcha_campaign.campaign_id, survey_mcaptcha_campaign.public_id as campaign_public_id, survey_mcaptcha_upload_job_states.name, survey_mcaptcha_upload_jobs.created_at, survey_mcaptcha_upload_jobs.scheduled_at, survey_mcaptcha_upload_jobs.finished_at FROM survey_mcaptcha_upload_jobs INNER JOIN survey_mcaptcha_upload_job_states ON survey_mcaptcha_upload_job_states.ID = survey_mcaptcha_upload_jobs.job_state INNER JOIN survey_mcaptcha_campaign ON survey_mcaptcha_campaign.ID = survey_mcaptcha_upload_jobs.campaign_id WHERE survey_mcaptcha_upload_jobs.public_id = $1", &public_id.to_string() ) .fetch_one(&self.db) .await { Ok(res) => Ok(Some(res.into())), Err(sqlx::Error::RowNotFound) => Ok(None), Err(e) => Err(e), }?; Ok(res) } pub async fn get_all_jobs_of_state( &self, state: &JobState, ) -> ServiceResult> { let mut res = sqlx::query_as!( InnerJob, " SELECT survey_mcaptcha_upload_jobs.ID, survey_mcaptcha_upload_jobs.public_id, survey_mcaptcha_campaign.campaign_id, survey_mcaptcha_campaign.public_id as campaign_public_id, survey_mcaptcha_upload_job_states.name, survey_mcaptcha_upload_jobs.created_at, survey_mcaptcha_upload_jobs.scheduled_at, survey_mcaptcha_upload_jobs.finished_at FROM survey_mcaptcha_upload_jobs INNER JOIN survey_mcaptcha_upload_job_states ON survey_mcaptcha_upload_job_states.ID = survey_mcaptcha_upload_jobs.job_state INNER JOIN survey_mcaptcha_campaign ON survey_mcaptcha_campaign.ID = survey_mcaptcha_upload_jobs.campaign_id WHERE survey_mcaptcha_upload_job_states.name = $1;", &state.name ) .fetch_all(&self.db) .await?; let res = res.drain(0..).map(|r| r.into()).collect(); Ok(res) } pub async fn mark_job_scheduled(&self, job: &SchedulerJob) -> ServiceResult<()> { let now = now_unix_time_stamp(); sqlx::query!( " UPDATE survey_mcaptcha_upload_jobs SET job_state = (SELECT ID FROM survey_mcaptcha_upload_job_states WHERE name = $1), scheduled_at = $2 WHERE public_id = $3;", &JOB_STATE_RUNNING.name, now, &job.public_job_id.to_string(), ) .execute(&self.db) .await ?; Ok(()) } pub async fn mark_job_finished(&self, job: &SchedulerJob) -> ServiceResult<()> { let now = now_unix_time_stamp(); sqlx::query!( " UPDATE survey_mcaptcha_upload_jobs SET job_state = (SELECT ID FROM survey_mcaptcha_upload_job_states WHERE name = $1), finished_at = $2 WHERE public_id = $3;", &JOB_STATE_FINISH.name, now, &job.public_job_id.to_string(), ) .execute(&self.db) .await ?; Ok(()) } } #[derive(Clone, Debug, PartialEq, Eq)] pub struct SchedulerJob { pub campaign_id: Uuid, pub public_job_id: Uuid, pub url: Url, } #[derive(Clone, Debug, PartialEq, Eq)] struct InnerSchedulerJob { campaign_id: String, public_id: String, url: String, } impl From for SchedulerJob { fn from(j: InnerSchedulerJob) -> Self { SchedulerJob { campaign_id: Uuid::parse_str(&j.campaign_id).unwrap(), public_job_id: Uuid::parse_str(&j.public_id).unwrap(), url: Url::parse(&j.url).unwrap(), } } } #[derive(Clone, Debug, PartialEq, Eq)] pub struct Job { pub state: JobState, pub campaign_id: Uuid, pub campaign_public_id: Uuid, pub public_job_id: Uuid, pub id: u32, pub created_at: OffsetDateTime, pub scheduled_at: Option, pub finished_at: Option, } struct InnerJob { name: String, campaign_id: String, public_id: String, campaign_public_id: String, id: i32, created_at: OffsetDateTime, scheduled_at: Option, finished_at: Option, } impl From for Job { fn from(j: InnerJob) -> Self { Job { state: (JOB_STATES) .iter() .find(|d| d.name == j.name) .unwrap() .to_owned() .to_owned(), id: j.id as u32, created_at: j.created_at, scheduled_at: j.scheduled_at, finished_at: j.finished_at, campaign_id: Uuid::parse_str(&j.campaign_id).unwrap(), campaign_public_id: Uuid::parse_str(&j.campaign_public_id).unwrap(), public_job_id: Uuid::parse_str(&j.public_id).unwrap(), } } } #[cfg(test)] mod tests { use crate::{mcaptcha::PerformanceAnalytics, tests::*}; use super::*; use url::Url; #[actix_rt::test] async fn test_db_mcaptcha_works() { let url = Url::parse("http://test_add_campaign.example").unwrap(); let data = get_test_data().await; let url_str = url.to_string(); if data.mcaptcha_url_exists(&url_str).await.unwrap() { let secret = data.mcaptcha_update_secret(&url_str).await.unwrap(); data.mcaptcha_delete_mcaptcha_instance(&url_str, &secret) .await .unwrap(); } assert!(!data.mcaptcha_url_exists(&url_str).await.unwrap()); let secret = data.mcaptcha_register_instance(&url_str).await.unwrap(); assert!(data.mcaptcha_url_exists(&url_str).await.unwrap()); let secret2 = data.mcaptcha_update_secret(&url_str).await.unwrap(); assert_ne!(secret2, secret); let secret = secret2; assert!(data.mcaptcha_authenticate(&secret).await.is_ok()); assert_eq!( data.mcaptcha_authenticate("foo").await.err(), Some(ServiceError::WrongPassword) ); let uuid = Uuid::new_v4(); if data .mcaptcha_campaign_is_registered(&uuid, &secret) .await .unwrap() { data.mcaptcha_delete_mcaptcha_campaign(&uuid, &secret) .await .unwrap(); } assert!(!data .mcaptcha_campaign_is_registered(&uuid, &secret) .await .unwrap()); data.mcaptcha_register_campaign(&uuid, &secret) .await .unwrap(); assert!(data .mcaptcha_campaign_is_registered(&uuid, &secret) .await .unwrap()); assert_eq!(data.mcaptcha_get_checkpoint(&uuid).await.unwrap(), 0); data.mcaptcha_set_checkpoint(&uuid, 1).await.unwrap(); assert_eq!(data.mcaptcha_get_checkpoint(&uuid).await.unwrap(), 1); let analytics = PerformanceAnalytics { id: 1, time: 1, difficulty_factor: 1, worker_type: "foo".to_string(), }; data.mcaptcha_insert_analytics(&uuid, &analytics) .await .unwrap(); let public_id = data .mcaptcha_get_campaign_public_id(&uuid, &secret) .await .unwrap(); let db_analytics = data .mcaptcha_analytics_fetch(&public_id, 50, 0) .await .unwrap(); assert_eq!(db_analytics.len(), 1); assert_eq!(db_analytics[0].time, analytics.time); assert_eq!( db_analytics[0].difficulty_factor, analytics.difficulty_factor ); assert_eq!(db_analytics[0].worker_type, analytics.worker_type); assert_eq!( data.mcaptcha_analytics_fetch(&public_id, 50, 1) .await .unwrap(), vec![] ); // job related stuff let job1_public_id = data.add_job(&uuid).await.unwrap(); let job = data.get_job(&job1_public_id).await.unwrap().unwrap(); assert_eq!(public_id, job.campaign_public_id); assert_eq!( data.get_unfinished_job_for_campaign(&uuid) .await .unwrap() .unwrap(), job ); let job2_public_id = data.add_job(&uuid).await.unwrap(); let job2 = data.get_job(&job2_public_id).await.unwrap().unwrap(); assert_eq!(job2, job); let scheduler_job = data.get_next_job_to_run().await.unwrap().unwrap(); assert_eq!(scheduler_job.url, url); assert_eq!( data.get_next_job_to_run() .await .unwrap() .unwrap() .public_job_id, job.public_job_id ); assert!(job.created_at < now_unix_time_stamp()); assert!(job.scheduled_at.is_none()); assert!(job.finished_at.is_none()); assert_eq!( data.get_all_jobs_of_state(&JOB_STATE_CREATE).await.unwrap(), vec![job.clone()] ); data.mark_job_scheduled(&scheduler_job).await.unwrap(); assert!(data.get_next_job_to_run().await.unwrap().is_none(),); let job = data.get_job(&job.public_job_id).await.unwrap().unwrap(); assert!(job.scheduled_at.is_some()); assert_eq!( data.get_all_jobs_of_state(&JOB_STATE_RUNNING) .await .unwrap(), vec![job.clone()] ); data.mark_job_finished(&scheduler_job).await.unwrap(); let job = data.get_job(&job.public_job_id).await.unwrap().unwrap(); assert!(job.finished_at.is_some()); assert_eq!( data.get_all_jobs_of_state(&JOB_STATE_FINISH).await.unwrap(), vec![job.clone()] ); let job2_public_id = data.add_job(&uuid).await.unwrap(); let job2 = data.get_job(&job2_public_id).await.unwrap().unwrap(); assert_ne!(job2.public_job_id, job.public_job_id); assert_eq!( data.get_next_job_to_run() .await .unwrap() .unwrap() .public_job_id, job2.public_job_id ); assert_eq!(public_id, job2.campaign_public_id); } }