From 8afea6fc8136998e1a4523e5f248cef3093232e2 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Wed, 4 Oct 2023 02:14:47 +0530 Subject: [PATCH] feat & chore: define MinAppContext and FullAppContext SUMMARY ftest when run as a local dev tool will only run one job during its runtime. Database and settings are not required for this purpose. This commit defines MinAppContext for running in a single job execution mode and FullAppContext for running as a daemon --- src/api/v1/webhooks.rs | 16 ++-- src/ctx.rs | 147 ++++++++++++++++++++++++++++++++++--- src/main.rs | 26 ++++--- src/runner/init_scripts.rs | 26 ++++--- src/runner/mod.rs | 8 +- src/runner/scheduler.rs | 12 +-- src/runner/suite.rs | 50 ++++++------- src/runner/target.rs | 6 +- 8 files changed, 212 insertions(+), 79 deletions(-) diff --git a/src/api/v1/webhooks.rs b/src/api/v1/webhooks.rs index 05ac409..c941ebf 100644 --- a/src/api/v1/webhooks.rs +++ b/src/api/v1/webhooks.rs @@ -4,7 +4,8 @@ use crate::complaince::result::Result as CResult; use crate::errors::*; -use crate::AppCtx; +use crate::AppFullCtx; +use crate::AppMinCtx; use actix_web::{web, HttpResponse, Responder}; use serde::{Deserialize, Serialize}; @@ -35,17 +36,17 @@ pub struct TestEvent { async fn results( payload: web::Json, auth: web::Path, - ctx: AppCtx, + ctx: AppMinCtx, ) -> ServiceResult { let auth = auth.into_inner(); { - let r = ctx.results.read().unwrap(); + let r = ctx.results_().read().unwrap(); if r.get(&auth).is_none() { return Err(ServiceError::WrongPassword); } } let tx = { - let mut w = ctx.results.write().unwrap(); + let mut w = ctx.results_().write().unwrap(); w.remove(&auth) }; @@ -56,8 +57,11 @@ async fn results( } #[actix_web_codegen_const_routes::post(path = "ROUTES.init")] -async fn run_tests(payload: web::Json, ctx: AppCtx) -> ServiceResult { - ctx.db.add_job(&payload.commit).await?; +async fn run_tests( + payload: web::Json, + ctx: AppFullCtx, +) -> ServiceResult { + ctx.db().add_job(&payload.commit).await?; tracing::info!("Creating job for commit {}", payload.commit); Ok(HttpResponse::Created()) } diff --git a/src/ctx.rs b/src/ctx.rs index f4e2cb9..ad60ce4 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -1,13 +1,11 @@ -use std::collections::HashMap; - // Copyright (C) 2022 Aravinth Manivannan // SPDX-FileCopyrightText: 2023 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later +use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use reqwest::Client; use tokio::sync::mpsc::Sender; use crate::db::*; @@ -17,26 +15,153 @@ use crate::settings::Settings; use super::complaince::result::Result as CResult; -pub type ArcCtx = Arc; +pub type ArcFullCtx = Arc; +pub type ArcMinCtx = Arc; + +pub trait FullAppContext: std::marker::Send + std::marker::Sync + CloneFullAppCtx { + fn settings(&self) -> &Settings; + fn db(&self) -> &Database; + fn docker(&self) -> Arc; + fn results(&self) -> &ResultStore; + fn port(&self) -> u32; +} + +pub trait MinAppContext: std::marker::Send + std::marker::Sync { + fn docker_(&self) -> Arc; + fn results_(&self) -> &ResultStore; + fn port_(&self) -> u32; +} + +impl MinAppContext for Arc { + fn docker_(&self) -> Arc { + self.docker() + } + fn results_(&self) -> &ResultStore { + self.results() + } + fn port_(&self) -> u32 { + self.port() + } +} #[derive(Clone)] pub struct Ctx { - pub settings: Settings, - pub db: Database, - pub client: Client, - pub results: Arc>>>, - pub docker: Arc, + settings: Settings, + db: Database, + results: ResultStore, + docker: Arc, } +impl FullAppContext for Ctx { + fn settings(&self) -> &Settings { + &self.settings + } + fn db(&self) -> &Database { + &self.db + } + fn docker(&self) -> Arc { + self.docker.clone() + } + fn results(&self) -> &ResultStore { + &self.results + } + fn port(&self) -> u32 { + self.settings.server.port + } +} + +impl MinAppContext for Ctx { + fn docker_(&self) -> Arc { + self.docker() + } + fn results_(&self) -> &ResultStore { + self.results() + } + fn port_(&self) -> u32 { + self.port() + } +} + +//impl MinAppContext for Arc { +// fn docker_(&self) -> Arc{ +// self.docker_() +// } +// fn results_(&self) -> &ResultStore { +// self.results_() +// } +// fn port_(&self) -> u32 { +// self.port_() +// } +//} +// +//impl FullAppContext for Arc { +// fn settings(&self) -> &Settings { +// self.settings() +// } +// fn db(&self) -> &Database { +// self.db() +// } +// +// fn docker(&self) -> Arc{ +// self.docker() +// } +// fn results(&self) -> &ResultStore { +// self.results() +// } +// fn port(&self) -> u32 { +// self.port() +// } +// +// +//} + +pub trait CloneFullAppCtx { + fn clone_f(&self) -> Box; +} + +impl CloneFullAppCtx for T +where + T: FullAppContext + Clone + 'static, +{ + fn clone_f(&self) -> Box { + Box::new(self.clone()) + } +} + +impl Clone for Box { + fn clone(&self) -> Self { + (**self).clone_f() + } +} + +//pub trait CloneMinAppContext { +// fn clone_fi(&self) -> Box; +//} +// +//impl CloneMinAppContext for T +//where +// T: CloneMinAppContext + Clone + 'static, +//{ +// fn clone_fi(&self) -> Box { +// Box::new(self.clone()) +// } +//} +// +//impl Clone for Box { +// fn clone(&self) -> Self { +// (**self).clone_fi() +// } +//} + +pub type ResultStore = Arc>>>; + impl Ctx { pub async fn new(settings: Settings) -> Self { let results = HashMap::default(); let results = Arc::new(RwLock::new(results)); - let client = Client::default(); let db = get_db(&settings).await; Self { settings, - client, db, results, docker: Arc::new(Docker::new()), diff --git a/src/main.rs b/src/main.rs index 2550159..0e87d69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,8 @@ pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); pub const PKG_DESCRIPTION: &str = env!("CARGO_PKG_DESCRIPTION"); pub const PKG_HOMEPAGE: &str = env!("CARGO_PKG_HOMEPAGE"); -pub type AppCtx = WebData; +pub type AppFullCtx = WebData; +pub type AppMinCtx = WebData; mod api; mod complaince; @@ -69,23 +70,25 @@ async fn main() -> std::io::Result<()> { let settings = Settings::new().unwrap(); settings.init(); - let ctx = AppCtx::new(Arc::new(Ctx::new(settings.clone()).await)); - ctx.db.migrate().await.unwrap(); + let inner_ctx = Arc::new(Ctx::new(settings.clone()).await); + let ctx = AppFullCtx::new(inner_ctx.clone()); + let ctx2 = AppMinCtx::new(inner_ctx); + ctx.db().migrate().await.unwrap(); - if ctx.settings.docker_proxy { - crate::runner::suite::SuiteRunnerState::run_proxy(&ctx) + if ctx.settings().docker_proxy { + crate::runner::suite::SuiteRunnerState::run_proxy(ctx.as_ref()); } - serve(ctx.clone()).await?; - if ctx.settings.docker_proxy { - crate::runner::suite::SuiteRunnerState::stop_proxy(&ctx); + serve(ctx.clone(), ctx2).await?; + if ctx.settings().docker_proxy { + crate::runner::suite::SuiteRunnerState::stop_proxy(ctx.as_ref()); } Ok(()) } -async fn serve(ctx: AppCtx) -> std::io::Result<()> { - let ip = ctx.settings.server.get_ip(); - let workers = ctx.settings.server.workers.unwrap_or_else(num_cpus::get); +async fn serve(ctx: AppFullCtx, ctx2: AppMinCtx) -> std::io::Result<()> { + let ip = ctx.settings().server.get_ip(); + let workers = ctx.settings().server.workers.unwrap_or_else(num_cpus::get); let scheduler = runner::scheduler::Scheduler::spawn(ctx.clone()).await; info!("Starting server on: http://{}", ip); @@ -94,6 +97,7 @@ async fn serve(ctx: AppCtx) -> std::io::Result<()> { .wrap(TracingLogger::default()) .wrap(actix_middleware::Compress::default()) .app_data(ctx.clone()) + .app_data(ctx2.clone()) .app_data(get_json_err()) .wrap( actix_middleware::DefaultHeaders::new() diff --git a/src/runner/init_scripts.rs b/src/runner/init_scripts.rs index da18674..b9bcc9d 100644 --- a/src/runner/init_scripts.rs +++ b/src/runner/init_scripts.rs @@ -4,13 +4,15 @@ use std::collections::HashMap; -use crate::complaince::target::Target; use crate::utils::get_random; -use crate::AppCtx; +use crate::{complaince::target::Target, ctx::MinAppContext}; use super::results::{ArchivableContainer, ArchivableInitResult}; -pub fn launch_init_containers(ctx: &AppCtx, target: &Target) -> Option> { +pub fn launch_init_containers( + ctx: &dyn MinAppContext, + target: &Target, +) -> Option> { if let Some(init_scripts) = target.init_scripts.as_ref() { let mut init_results = Vec::with_capacity(init_scripts.len()); for init in init_scripts.iter() { @@ -21,7 +23,7 @@ pub fn launch_init_containers(ctx: &AppCtx, target: &Target) -> Option Option = HashMap::new(); - env_vars.insert("FORGEJO_URL".into(), "http://forgejo:7000".into()); + env_vars.insert("FORGEJO_URL".into(), "http://forgejo".into()); env_vars.insert("FORGEJO_SSH_URL".into(), "http://forgejo:2222".into()); env_vars.insert("CI".into(), "true".into()); @@ -95,7 +97,7 @@ mod tests { suites: Vec::default(), }; - let init = launch_init_containers(&ctx, &target); + let init = launch_init_containers(ctx.as_ref().as_ref(), &target); assert!(init.is_some()); let init = init.unwrap(); assert_eq!(init.len(), 1); diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 4cdd1d8..50850e6 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -8,7 +8,7 @@ use std::path::PathBuf; use crate::git::Git; use crate::runner::results::*; -use crate::AppCtx; +use crate::AppFullCtx; pub mod init_scripts; pub mod results; @@ -16,15 +16,15 @@ pub mod scheduler; pub mod suite; pub mod target; -pub async fn run(ctx: AppCtx, commit: &str) { - let base_dir = Path::new(&ctx.settings.repository.base_dir); +pub async fn run(ctx: AppFullCtx, commit: &str) { + let base_dir = Path::new(&ctx.settings().repository.base_dir); let control = base_dir.join("control"); Git::checkout_commit(commit, &control); for entry in crate::runner::target::get_targets(&control).iter() { let (suite_results, init_containers) = - crate::runner::target::run_target(&ctx, entry.into()).await; + crate::runner::target::run_target(ctx.as_ref(), entry.into()).await; let content = ArchivableResult { commit: commit.to_string(), suites: suite_results, diff --git a/src/runner/scheduler.rs b/src/runner/scheduler.rs index 39e834d..c57e488 100644 --- a/src/runner/scheduler.rs +++ b/src/runner/scheduler.rs @@ -10,10 +10,10 @@ use tokio::{ task::JoinHandle, }; -use crate::AppCtx; +use crate::AppFullCtx; pub struct Scheduler { - ctx: AppCtx, + ctx: AppFullCtx, rx: Receiver<()>, } @@ -30,14 +30,14 @@ impl SchedulerControler { } impl Scheduler { - pub async fn spawn(ctx: AppCtx) -> SchedulerControler { + pub async fn spawn(ctx: AppFullCtx) -> SchedulerControler { let (tx, rx) = oneshot::channel(); let mut sc = Scheduler { ctx, rx }; let x = async move { loop { if let Err(TryRecvError::Empty) = sc.rx.try_recv() { - let task = sc.ctx.db.get_next_job_to_run().await; + let task = sc.ctx.db().get_next_job_to_run().await; if task.is_err() { tokio::time::sleep(Duration::new(5, 0)).await; continue; @@ -45,13 +45,13 @@ impl Scheduler { let task = task.unwrap(); tracing::info!("Scheduling job for commit {}", task.commit_hash); sc.ctx - .db + .db() .mark_job_scheduled(&task.commit_hash) .await .unwrap(); super::run(sc.ctx.clone(), &task.commit_hash).await; sc.ctx - .db + .db() .mark_job_finished(&task.commit_hash) .await .unwrap(); diff --git a/src/runner/suite.rs b/src/runner/suite.rs index 353df6c..5132acd 100644 --- a/src/runner/suite.rs +++ b/src/runner/suite.rs @@ -11,6 +11,7 @@ use super::results::*; use crate::complaince::result::Result as CResult; use crate::complaince::suite::Suite; use crate::complaince::suite::Test; +use crate::ctx::MinAppContext; use crate::utils::get_random; pub struct SuiteRunnerState { @@ -31,13 +32,13 @@ impl SuiteRunnerState { pub async fn run( container_host: &Url, suite: &crate::complaince::suite::Suite, - ctx: &crate::ctx::ArcCtx, + ctx: &dyn MinAppContext, ) -> ArchivableSuiteResult { let state = Self::launch_suite(container_host, suite, ctx); state.collect_results(ctx).await } - pub fn run_proxy(ctx: &crate::ctx::ArcCtx) { + pub fn run_proxy(ctx: &dyn MinAppContext) { Self::stop_proxy(ctx); let mut default = ""; @@ -67,23 +68,23 @@ impl SuiteRunnerState { &format!("ftest_backend:{default}"), "forgeflux/ftest-nginx-proxy", ]; - let mut child = std::process::Command::new("docker") + let mut child = std::process::Command::new("docker_") .args(&args) .spawn() - .expect("unable to obtain Docker version"); + .expect("unable to obtain docker_ version"); child.wait().unwrap(); log::info!("Started {FTEST_NGINX_PROXY} nginx proxy"); } - pub fn stop_proxy(ctx: &crate::ctx::ArcCtx) { - ctx.docker.rm_container(FTEST_NGINX_PROXY, true); + pub fn stop_proxy(ctx: &dyn MinAppContext) { + ctx.docker_().rm_container(FTEST_NGINX_PROXY, true); log::info!("Stopped {FTEST_NGINX_PROXY} nginx proxy"); } fn launch_suite( container_host: &Url, suite: &crate::complaince::suite::Suite, - ctx: &crate::ctx::ArcCtx, + ctx: &dyn MinAppContext, ) -> Self { let mut tests = HashMap::with_capacity(suite.tests.len()); for test in suite.tests.iter() { @@ -91,10 +92,7 @@ impl SuiteRunnerState { println!("starting test {}", test.name); let mut env = HashMap::new(); env.insert("FTEST_AUTH".into(), auth.to_owned()); - env.insert( - "FTEST_HOST".into(), - format!("http://ftest:{}", ctx.settings.server.port), - ); + env.insert("FTEST_HOST".into(), format!("http://ftest:{}", ctx.port_())); env.insert("FTEST_TARGET_HOST".into(), container_host.to_string()); env.insert("FTEST_USER".into(), "alice".into()); if let Some(custom_vars) = test.env_vars.clone() { @@ -103,7 +101,7 @@ impl SuiteRunnerState { let name = format!("{}---{}--{}", suite.name, test.name, &auth[0..5]); tracing::info!("Starting {name} with env vars {:?}", &env); - ctx.docker.run_container( + ctx.docker_().run_container( &name, &test.container, true, @@ -114,7 +112,7 @@ impl SuiteRunnerState { let (tx, rx) = mpsc::channel(1); { - let mut w = ctx.results.write().unwrap(); + let mut w = ctx.results_().write().unwrap(); w.insert(auth.to_owned(), tx); } tests.insert( @@ -133,12 +131,12 @@ impl SuiteRunnerState { } } - async fn collect_results(mut self, ctx: &crate::ctx::ArcCtx) -> ArchivableSuiteResult { + async fn collect_results(mut self, ctx: &dyn MinAppContext) -> ArchivableSuiteResult { let mut tests = Vec::with_capacity(self.tests.len()); for (_auth, mut r) in self.tests.drain() { let result = r.rx.recv().await.unwrap(); - let log = ctx.docker.get_logs(&r.container_name); - ctx.docker.rm_container(&r.container_name, true); + let log = ctx.docker_().get_logs(&r.container_name); + ctx.docker_().rm_container(&r.container_name, true); let container = ArchivableContainer { name: r.container_name, logs: log, @@ -174,7 +172,7 @@ mod tests { use crate::complaince::result::Result as CResult; use crate::complaince::suite::Test; - use crate::{AppCtx, Ctx, Settings}; + use crate::{AppMinCtx, Ctx, Settings}; use std::sync::Arc; @@ -190,18 +188,18 @@ mod tests { // } // // #[derive(Clone)] - // struct TestDocker { + // struct Testdocker_ { // state: Arc>, // } // - // impl TestDocker { + // impl Testdocker_ { // pub fn new() -> Self { // Self { // state: Arc::new(RwLock::new(ContainerState::NoContainer)), // } // } // } - // impl DockerLike for TestDocker { + // impl docker_Like for Testdocker_ { // fn version(&self) -> String { // unimplemented!(); // } @@ -243,8 +241,8 @@ mod tests { let settings = Settings::new().unwrap(); let ctx = Ctx::new(settings.clone()).await; - // ctx.docker = Arc::new(TestDocker::new()); - let ctx = AppCtx::new(Arc::new(ctx)); + // ctx.docker_ = Arc::new(Testdocker_::new()); + let ctx = crate::AppFullCtx::new(Arc::new(ctx)); let mut dummy_test = Test { name: "suite_runner_works".into(), @@ -273,15 +271,15 @@ mod tests { let state = SuiteRunnerState::launch_suite( &Url::parse("http://suite_runner_works.service").unwrap(), &suite, - &ctx, + ctx.as_ref(), ); assert_eq!(state.tests.len(), 2); std::thread::sleep(std::time::Duration::new(13, 0)); for (k, v) in state.tests.iter() { - assert_eq!(ctx.docker.get_exit_status(&v.container_name), 0); + assert_eq!(ctx.docker_().get_exit_status(&v.container_name), 0); let tx = { - let r = ctx.results.read().unwrap(); + let r = ctx.results_().read().unwrap(); r.get(k).unwrap().to_owned() }; let tx_result = CResult { @@ -292,7 +290,7 @@ mod tests { tx.send(tx_result).await.unwrap(); } - let results = state.collect_results(&ctx).await; + let results = state.collect_results(ctx.as_ref()).await; assert_eq!(results.tests.len(), 2); for archivable_test in results.tests.iter() { diff --git a/src/runner/target.rs b/src/runner/target.rs index 2d1a7c3..32a70cf 100644 --- a/src/runner/target.rs +++ b/src/runner/target.rs @@ -7,22 +7,22 @@ use std::path::Path; use std::path::PathBuf; use crate::complaince::target::Target; +use crate::ctx::MinAppContext; use crate::git::Git; use crate::runner::results::*; -use crate::AppCtx; use crate::docker_compose::DockerCompose; const FTEST_TARGET_FILE: &str = "ftest.toml"; pub async fn run_target( - ctx: &AppCtx, + ctx: &dyn MinAppContext, target: PathBuf, ) -> ( Vec, Option>, ) { - let compose = DockerCompose::new(target.canonicalize().unwrap(), ctx.docker.clone()); + let compose = DockerCompose::new(target.canonicalize().unwrap(), ctx.docker_().clone()); compose.up(); let services = compose.services();