feat: schedule and record job states. Create job states during migration

This commit is contained in:
Aravinth Manivannan 2023-10-19 11:21:14 +05:30
parent b5b83b955a
commit 6b93524027
Signed by: realaravinth
GPG key ID: F8F50389936984FF
7 changed files with 167 additions and 2 deletions

View file

@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO survey_mcaptcha_upload_job_states \n (name) VALUES ($1) ON CONFLICT (name) DO NOTHING;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar"
]
},
"nullable": []
},
"hash": "11ff04344412d1a2e5fdb1ab654fe4e90c2ba897bb4889426031ffacc2ae06e4"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT EXISTS (SELECT 1 from survey_mcaptcha_upload_job_states WHERE name = $1)",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "exists",
"type_info": "Bool"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
null
]
},
"hash": "2d18e0fad79c6df26465f82eca20cdfca35a710f34a54ac115d23435762a3038"
}

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT\n campaign_id\n FROM\n survey_mcaptcha_campaign\n WHERE ID = (\n SELECT\n campaign_id\n FROM\n survey_mcaptcha_upload_jobs\n WHERE\n job_state = (SELECT ID FROM survey_mcaptcha_upload_job_states WHERE name = $1)\n AND\n finished_at is NULL\n AND\n scheduled_at is NULL\n ORDER BY created_at ASC\n );",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "campaign_id",
"type_info": "Varchar"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "4237df28c12e17ca68a1e1b33ae80ce5a4a8dff6d0795f277fb18b7b40dc69ef"
}

View file

@ -20,3 +20,19 @@ CREATE TABLE IF NOT EXISTS survey_mcaptcha_analytics (
worker_type VARCHAR(100) NOT NULL, worker_type VARCHAR(100) NOT NULL,
ID SERIAL PRIMARY KEY NOT NULL ID SERIAL PRIMARY KEY NOT NULL
); );
CREATE TABLE IF NOT EXISTS survey_mcaptcha_upload_job_states (
name VARCHAR(20) NOT NULL UNIQUE,
ID SERIAL PRIMARY KEY NOT NULL
);
CREATE TABLE IF NOT EXISTS survey_mcaptcha_upload_jobs (
campaign_id INTEGER references survey_mcaptcha_campaign(ID) ON DELETE CASCADE,
public_id varchar(100) NOT NULL UNIQUE,
created_at timestamptz NOT NULL DEFAULT now(),
scheduled_at timestamptz DEFAULT NULL,
finished_at timestamptz DEFAULT NULL,
job_state INTEGER references survey_mcaptcha_upload_job_states(ID) ON DELETE CASCADE,
ID SERIAL PRIMARY KEY NOT NULL
);

87
src/db.rs Normal file
View file

@ -0,0 +1,87 @@
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
#[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)]
pub struct JobState {
pub name: String,
}
impl JobState {
pub fn new(name: String) -> Self {
Self { name }
}
}
lazy_static! {
pub static ref JOB_STATE_CREATE: JobState = JobState::new("job.state.create".into());
pub static ref JOB_STATE_FINISH: JobState = JobState::new("job.state.finish".into());
pub static ref JOB_STATE_RUNNING: JobState =
JobState::new("job.state.running".into());
pub static ref JOB_STATES: [&'static JobState; 3] =
[&*JOB_STATE_CREATE, &*JOB_STATE_FINISH, &*JOB_STATE_RUNNING];
}
async fn job_state_exists(
db: &PgPool,
job_state: &JobState,
) -> sqlx::error::Result<bool> {
let res = sqlx::query!(
"SELECT EXISTS (SELECT 1 from survey_mcaptcha_upload_job_states WHERE name = $1)",
job_state.name,
)
.fetch_one(db)
.await?;
let mut resp = false;
if let Some(x) = res.exists {
resp = x;
}
Ok(resp)
}
async fn create_job_states(db: &PgPool) -> sqlx::error::Result<()> {
for j in &*JOB_STATES {
if !job_state_exists(db, j).await? {
sqlx::query!(
"INSERT INTO survey_mcaptcha_upload_job_states
(name) VALUES ($1) ON CONFLICT (name) DO NOTHING;",
j.name
)
.execute(db)
.await?;
}
}
Ok(())
}
pub async fn migrate_db(db: &PgPool) -> sqlx::error::Result<()> {
sqlx::migrate!("./migrations/").run(db).await?;
create_job_states(db).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[actix_rt::test]
async fn test_mcaptcha_job_states_exist() {
// can't use crate::tests::get_test_data because this module is used by
// ./src/tests-migrate.rs too, which doesn't load tests module
let settings = crate::settings::Settings::new().unwrap();
let db = sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.connect(&settings.database.url)
.await
.expect("Unable to form database pool");
migrate_db(&db).await.unwrap();
for e in (*JOB_STATES).iter() {
println!("checking job state {}", e.name);
assert!(job_state_exists(&db, e).await.unwrap());
}
}
}

View file

@ -30,6 +30,7 @@ use log::info;
mod api; mod api;
mod archive; mod archive;
mod data; mod data;
mod db;
mod errors; mod errors;
mod mcaptcha; mod mcaptcha;
mod pages; mod pages;
@ -91,7 +92,7 @@ async fn main() -> std::io::Result<()> {
Box::new(mcaptcha::MCaptchaClientReqwest::default()); Box::new(mcaptcha::MCaptchaClientReqwest::default());
let data = Data::new(settings.clone(), mcaptcha).await; let data = Data::new(settings.clone(), mcaptcha).await;
sqlx::migrate!("./migrations/").run(&data.db).await.unwrap(); db::migrate_db(&data.db).await.unwrap();
let data = actix_web::web::Data::new(data); let data = actix_web::web::Data::new(data);
let arch = archive::Archiver::new(&data.settings); let arch = archive::Archiver::new(&data.settings);

View file

@ -18,6 +18,7 @@ use std::env;
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
mod db;
mod settings; mod settings;
pub use settings::Settings; pub use settings::Settings;
@ -40,7 +41,9 @@ async fn main() {
} }
} }
sqlx::migrate!("./migrations/").run(&db).await.unwrap(); db::migrate_db(&db).await.unwrap();
// sqlx::migrate!("./migrations/").run(&db).await.unwrap();
} }
fn build() { fn build() {