feat: get_next_job_to_run

DESCRIPTION
    Next job is the oldest job with  NULL for `scheduled_at` and
    `finished_at` and `JOB_STATE_CREATE` for state.
This commit is contained in:
Aravinth Manivannan 2023-09-27 22:02:07 +05:30
parent 88af240a0c
commit 7ab31a6074
Signed by: realaravinth
GPG Key ID: F8F50389936984FF
1 changed files with 44 additions and 2 deletions

View File

@ -267,6 +267,28 @@ impl Database {
Ok(())
}
pub async fn get_next_job_to_run(&self) -> ServiceResult<SchedulerJob> {
let res = sqlx::query_as!(
SchedulerJob,
"SELECT
auth, commit_hash
FROM
ftest_jobs
WHERE
job_state = (SELECT ID FROM ftest_job_states WHERE name = $1)
AND
finished_at is NULL
AND
scheduled_at is NULL
ORDER BY created_at ASC;",
&JOB_STATE_CREATE.name
)
.fetch_one(&self.pool)
.await
.map_err(map_register_err)?;
Ok(res)
}
}
fn now_unix_time_stamp() -> OffsetDateTime {
@ -294,7 +316,13 @@ pub fn map_row_not_found_err(e: sqlx::Error, row_not_found: ServiceError) -> Ser
}
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SchedulerJob {
pub commit_hash: String,
pub auth: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Job {
pub state: JobState,
pub commit_hash: String,
@ -374,6 +402,7 @@ mod tests {
assert!(db.ping().await);
const COMMIT_HASH: &str = "pasdfasdfasdfadf";
const COMMIT_HASH2: &str = "pasdfasdfasdfadf22";
db.migrate().await.unwrap();
@ -383,15 +412,24 @@ mod tests {
}
let _ = db.delete_job(COMMIT_HASH).await;
let _ = db.delete_job(COMMIT_HASH2).await;
let auth = db.add_job(COMMIT_HASH).await.unwrap();
let job = db.get_job(COMMIT_HASH).await.unwrap();
db.add_job(COMMIT_HASH2).await.unwrap();
let job2 = db.get_job(COMMIT_HASH2).await.unwrap();
assert_eq!(
db.get_next_job_to_run().await.unwrap().commit_hash,
job.commit_hash
);
assert!(job.created_at < now_unix_time_stamp());
assert!(job.scheduled_at.is_none());
assert!(job.finished_at.is_none());
assert_eq!(
db.get_all_jobs_of_state(&*JOB_STATE_CREATE).await.unwrap(),
vec![job]
vec![job, job2.clone()]
);
db.mark_job_scheduled(&auth).await.unwrap();
@ -409,5 +447,9 @@ mod tests {
db.get_all_jobs_of_state(&*JOB_STATE_FINISH).await.unwrap(),
vec![job]
);
assert_eq!(
db.get_next_job_to_run().await.unwrap().commit_hash,
job2.commit_hash
);
}
}