From 6b5ca282431247f4b63e3c54518093e371a22795 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Wed, 27 Sep 2023 20:40:31 +0530 Subject: [PATCH] feat: add job types while running migrations --- migrations/20230927145504_ftest_jobs.sql | 14 ++++++ src/db.rs | 61 ++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 migrations/20230927145504_ftest_jobs.sql diff --git a/migrations/20230927145504_ftest_jobs.sql b/migrations/20230927145504_ftest_jobs.sql new file mode 100644 index 0000000..04462e0 --- /dev/null +++ b/migrations/20230927145504_ftest_jobs.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS ftest_job_states ( + name VARcHAR(20) NOT NULL UNIQUE, + ID SERIAL PRIMARY KEY NOT NULL +); + +CREATE TABLE IF NOT EXISTS ftest_jobs ( + commit_hash VARCHAR(40) NOT NULL UNIQUE, + job_state INTEGER references ftest_job_states(ID) ON DELETE CASCADE, + auth VARCHAR(32) NOT NULL UNIQUE, + created_at timestamptz NOT NULL DEFAULT now(), + scheduled_at timestamptz DEFAULT NULL, + finished_at timestamptz DEFAULT NULL, + ID SERIAL PRIMARY KEY NOT NULL +); diff --git a/src/db.rs b/src/db.rs index 5883f4a..26f2535 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,6 +5,8 @@ use std::str::FromStr; +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; use sqlx::types::time::OffsetDateTime; use sqlx::ConnectOptions; @@ -59,12 +61,66 @@ pub struct Database { pub pool: PgPool, } +#[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)] +pub struct JobState { + pub name: String, +} + +impl JobState { + pub fn new(name: String) -> Self { + Self { name } + } +} + +lazy_static! { + pub static ref JOB_STATE_CREATE: JobState = JobState::new("job.state.create".into()); + pub static ref JOB_STATE_FINISH: JobState = JobState::new("job.state.finish".into()); + pub static ref JOB_STATE_RUNNING: JobState = JobState::new("job.state.running".into()); + pub static ref JOB_STATES: [&'static JobState; 3] = + [&*JOB_STATE_CREATE, &*JOB_STATE_FINISH, &*JOB_STATE_RUNNING]; +} + impl Database { pub async fn migrate(&self) -> ServiceResult<()> { sqlx::migrate!("./migrations/") .run(&self.pool) .await .unwrap(); + self.create_job_states().await?; + Ok(()) + } + + /// check if event type exists + async fn job_state_exists(&self, job_state: &JobState) -> ServiceResult { + let res = sqlx::query!( + "SELECT EXISTS (SELECT 1 from ftest_job_states WHERE name = $1)", + job_state.name, + ) + .fetch_one(&self.pool) + .await + .map_err(map_register_err)?; + + let mut resp = false; + if let Some(x) = res.exists { + resp = x; + } + + Ok(resp) + } + + async fn create_job_states(&self) -> ServiceResult<()> { + for j in &*JOB_STATES { + if !self.job_state_exists(j).await? { + sqlx::query!( + "INSERT INTO ftest_job_states + (name) VALUES ($1) ON CONFLICT (name) DO NOTHING;", + j.name + ) + .execute(&self.pool) + .await + .map_err(map_register_err)?; + } + } Ok(()) } @@ -152,5 +208,10 @@ mod tests { const PASSWORD: &str = "pasdfasdfasdfadf"; db.migrate().await.unwrap(); + + for e in (*JOB_STATES).iter() { + println!("checking job state {}", e.name); + assert!(db.job_state_exists(e).await.unwrap()); + } } }