From 718e57a1dfad0035cb18ce5ea03156b3b6273ab8 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Fri, 29 Sep 2023 19:34:21 +0530 Subject: [PATCH] feat: job scheduler; runs one job at any given time --- src/main.rs | 2 +- src/runner/scheduler.rs | 70 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 src/runner/scheduler.rs diff --git a/src/main.rs b/src/main.rs index f85567d..74bc3db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,7 +78,7 @@ async fn main() -> std::io::Result<()> { async fn serve(settings: Settings, ctx: AppCtx) -> std::io::Result<()> { let ip = settings.server.get_ip(); let workers = settings.server.workers.unwrap_or_else(num_cpus::get); - let scheduler = runner::Scheduler::spawn(ctx.clone()).await; + let scheduler = runner::scheduler::Scheduler::spawn(ctx.clone()).await; info!("Starting server on: http://{}", ip); HttpServer::new(move || { diff --git a/src/runner/scheduler.rs b/src/runner/scheduler.rs new file mode 100644 index 0000000..39e834d --- /dev/null +++ b/src/runner/scheduler.rs @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2023 Aravinth Manivannan +// +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::time::Duration; + +use tokio::sync::oneshot::error::TryRecvError; +use tokio::{ + sync::oneshot::{self, Receiver, Sender}, + task::JoinHandle, +}; + +use crate::AppCtx; + +pub struct Scheduler { + ctx: AppCtx, + rx: Receiver<()>, +} + +pub struct SchedulerControler { + tx: Sender<()>, + handle: JoinHandle<()>, +} + +impl SchedulerControler { + pub async fn stop(self) { + self.tx.send(()).unwrap(); + self.handle.await.unwrap(); + } +} + +impl Scheduler { + pub async fn spawn(ctx: AppCtx) -> SchedulerControler { + let (tx, rx) = oneshot::channel(); + let mut sc = Scheduler { ctx, rx }; + + let x = async move { + loop { + if let Err(TryRecvError::Empty) = sc.rx.try_recv() { + let task = sc.ctx.db.get_next_job_to_run().await; + if task.is_err() { + tokio::time::sleep(Duration::new(5, 0)).await; + continue; + } + let task = task.unwrap(); + tracing::info!("Scheduling job for commit {}", task.commit_hash); + sc.ctx + .db + .mark_job_scheduled(&task.commit_hash) + .await + .unwrap(); + super::run(sc.ctx.clone(), &task.commit_hash).await; + sc.ctx + .db + .mark_job_finished(&task.commit_hash) + .await + .unwrap(); + } else { + tracing::info!("Terminating scheduler"); + break; + } + } + }; + + SchedulerControler { + handle: tokio::spawn(x), + tx, + } + } +}