feat: job scheduler; runs one job at any given time
This commit is contained in:
parent
21a283d138
commit
718e57a1df
2 changed files with 71 additions and 1 deletions
|
@ -78,7 +78,7 @@ async fn main() -> std::io::Result<()> {
|
||||||
async fn serve(settings: Settings, ctx: AppCtx) -> std::io::Result<()> {
|
async fn serve(settings: Settings, ctx: AppCtx) -> std::io::Result<()> {
|
||||||
let ip = settings.server.get_ip();
|
let ip = settings.server.get_ip();
|
||||||
let workers = settings.server.workers.unwrap_or_else(num_cpus::get);
|
let workers = settings.server.workers.unwrap_or_else(num_cpus::get);
|
||||||
let scheduler = runner::Scheduler::spawn(ctx.clone()).await;
|
let scheduler = runner::scheduler::Scheduler::spawn(ctx.clone()).await;
|
||||||
|
|
||||||
info!("Starting server on: http://{}", ip);
|
info!("Starting server on: http://{}", ip);
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
|
|
70
src/runner/scheduler.rs
Normal file
70
src/runner/scheduler.rs
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
// SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tokio::sync::oneshot::error::TryRecvError;
|
||||||
|
use tokio::{
|
||||||
|
sync::oneshot::{self, Receiver, Sender},
|
||||||
|
task::JoinHandle,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::AppCtx;
|
||||||
|
|
||||||
|
pub struct Scheduler {
|
||||||
|
ctx: AppCtx,
|
||||||
|
rx: Receiver<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SchedulerControler {
|
||||||
|
tx: Sender<()>,
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulerControler {
|
||||||
|
pub async fn stop(self) {
|
||||||
|
self.tx.send(()).unwrap();
|
||||||
|
self.handle.await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Scheduler {
|
||||||
|
pub async fn spawn(ctx: AppCtx) -> SchedulerControler {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let mut sc = Scheduler { ctx, rx };
|
||||||
|
|
||||||
|
let x = async move {
|
||||||
|
loop {
|
||||||
|
if let Err(TryRecvError::Empty) = sc.rx.try_recv() {
|
||||||
|
let task = sc.ctx.db.get_next_job_to_run().await;
|
||||||
|
if task.is_err() {
|
||||||
|
tokio::time::sleep(Duration::new(5, 0)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let task = task.unwrap();
|
||||||
|
tracing::info!("Scheduling job for commit {}", task.commit_hash);
|
||||||
|
sc.ctx
|
||||||
|
.db
|
||||||
|
.mark_job_scheduled(&task.commit_hash)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
super::run(sc.ctx.clone(), &task.commit_hash).await;
|
||||||
|
sc.ctx
|
||||||
|
.db
|
||||||
|
.mark_job_finished(&task.commit_hash)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
} else {
|
||||||
|
tracing::info!("Terminating scheduler");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
SchedulerControler {
|
||||||
|
handle: tokio::spawn(x),
|
||||||
|
tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue