feat & chore: define MinAppContext and FullAppContext

SUMMARY
    ftest when run as a local dev tool will only run one job during its
    runtime. Database and settings are not required for this purpose.

    This commit defines MinAppContext for running in a single job
    execution mode and FullAppContext for running as a daemon
This commit is contained in:
Aravinth Manivannan 2023-10-04 02:14:47 +05:30
parent 968c5d2b69
commit 8afea6fc81
Signed by: realaravinth
GPG key ID: F8F50389936984FF
8 changed files with 212 additions and 79 deletions

View file

@ -4,7 +4,8 @@
use crate::complaince::result::Result as CResult;
use crate::errors::*;
use crate::AppCtx;
use crate::AppFullCtx;
use crate::AppMinCtx;
use actix_web::{web, HttpResponse, Responder};
use serde::{Deserialize, Serialize};
@ -35,17 +36,17 @@ pub struct TestEvent {
async fn results(
payload: web::Json<CResult>,
auth: web::Path<String>,
ctx: AppCtx,
ctx: AppMinCtx,
) -> ServiceResult<impl Responder> {
let auth = auth.into_inner();
{
let r = ctx.results.read().unwrap();
let r = ctx.results_().read().unwrap();
if r.get(&auth).is_none() {
return Err(ServiceError::WrongPassword);
}
}
let tx = {
let mut w = ctx.results.write().unwrap();
let mut w = ctx.results_().write().unwrap();
w.remove(&auth)
};
@ -56,8 +57,11 @@ async fn results(
}
#[actix_web_codegen_const_routes::post(path = "ROUTES.init")]
async fn run_tests(payload: web::Json<TestEvent>, ctx: AppCtx) -> ServiceResult<impl Responder> {
ctx.db.add_job(&payload.commit).await?;
async fn run_tests(
payload: web::Json<TestEvent>,
ctx: AppFullCtx,
) -> ServiceResult<impl Responder> {
ctx.db().add_job(&payload.commit).await?;
tracing::info!("Creating job for commit {}", payload.commit);
Ok(HttpResponse::Created())
}

View file

@ -1,13 +1,11 @@
use std::collections::HashMap;
// Copyright (C) 2022 Aravinth Manivannan <realaravinth@batsense.net>
// SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use reqwest::Client;
use tokio::sync::mpsc::Sender;
use crate::db::*;
@ -17,26 +15,153 @@ use crate::settings::Settings;
use super::complaince::result::Result as CResult;
pub type ArcCtx = Arc<Ctx>;
pub type ArcFullCtx = Arc<dyn FullAppContext>;
pub type ArcMinCtx = Arc<dyn MinAppContext>;
pub trait FullAppContext: std::marker::Send + std::marker::Sync + CloneFullAppCtx {
fn settings(&self) -> &Settings;
fn db(&self) -> &Database;
fn docker(&self) -> Arc<dyn DockerLike>;
fn results(&self) -> &ResultStore;
fn port(&self) -> u32;
}
pub trait MinAppContext: std::marker::Send + std::marker::Sync {
fn docker_(&self) -> Arc<dyn DockerLike>;
fn results_(&self) -> &ResultStore;
fn port_(&self) -> u32;
}
impl MinAppContext for Arc<dyn FullAppContext> {
fn docker_(&self) -> Arc<dyn DockerLike> {
self.docker()
}
fn results_(&self) -> &ResultStore {
self.results()
}
fn port_(&self) -> u32 {
self.port()
}
}
#[derive(Clone)]
pub struct Ctx {
pub settings: Settings,
pub db: Database,
pub client: Client,
pub results: Arc<RwLock<HashMap<String, Sender<CResult>>>>,
pub docker: Arc<dyn DockerLike>,
settings: Settings,
db: Database,
results: ResultStore,
docker: Arc<dyn DockerLike>,
}
impl FullAppContext for Ctx {
fn settings(&self) -> &Settings {
&self.settings
}
fn db(&self) -> &Database {
&self.db
}
fn docker(&self) -> Arc<dyn DockerLike> {
self.docker.clone()
}
fn results(&self) -> &ResultStore {
&self.results
}
fn port(&self) -> u32 {
self.settings.server.port
}
}
impl MinAppContext for Ctx {
fn docker_(&self) -> Arc<dyn DockerLike> {
self.docker()
}
fn results_(&self) -> &ResultStore {
self.results()
}
fn port_(&self) -> u32 {
self.port()
}
}
//impl MinAppContext for Arc<dyn MinAppContext> {
// fn docker_(&self) -> Arc<dyn DockerLike>{
// self.docker_()
// }
// fn results_(&self) -> &ResultStore {
// self.results_()
// }
// fn port_(&self) -> u32 {
// self.port_()
// }
//}
//
//impl FullAppContext for Arc<dyn FullAppContext> {
// fn settings(&self) -> &Settings {
// self.settings()
// }
// fn db(&self) -> &Database {
// self.db()
// }
//
// fn docker(&self) -> Arc<dyn DockerLike>{
// self.docker()
// }
// fn results(&self) -> &ResultStore {
// self.results()
// }
// fn port(&self) -> u32 {
// self.port()
// }
//
//
//}
pub trait CloneFullAppCtx {
fn clone_f(&self) -> Box<dyn FullAppContext>;
}
impl<T> CloneFullAppCtx for T
where
T: FullAppContext + Clone + 'static,
{
fn clone_f(&self) -> Box<dyn FullAppContext> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn FullAppContext> {
fn clone(&self) -> Self {
(**self).clone_f()
}
}
//pub trait CloneMinAppContext {
// fn clone_fi(&self) -> Box<dyn MinAppContext>;
//}
//
//impl<T> CloneMinAppContext for T
//where
// T: CloneMinAppContext + Clone + 'static,
//{
// fn clone_fi(&self) -> Box<dyn MinAppContext> {
// Box::new(self.clone())
// }
//}
//
//impl Clone for Box<dyn MinAppContext> {
// fn clone(&self) -> Self {
// (**self).clone_fi()
// }
//}
pub type ResultStore = Arc<RwLock<HashMap<String, Sender<CResult>>>>;
impl Ctx {
pub async fn new(settings: Settings) -> Self {
let results = HashMap::default();
let results = Arc::new(RwLock::new(results));
let client = Client::default();
let db = get_db(&settings).await;
Self {
settings,
client,
db,
results,
docker: Arc::new(Docker::new()),

View file

@ -27,7 +27,8 @@ pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
pub const PKG_DESCRIPTION: &str = env!("CARGO_PKG_DESCRIPTION");
pub const PKG_HOMEPAGE: &str = env!("CARGO_PKG_HOMEPAGE");
pub type AppCtx = WebData<ctx::ArcCtx>;
pub type AppFullCtx = WebData<ctx::ArcFullCtx>;
pub type AppMinCtx = WebData<ctx::ArcMinCtx>;
mod api;
mod complaince;
@ -69,23 +70,25 @@ async fn main() -> std::io::Result<()> {
let settings = Settings::new().unwrap();
settings.init();
let ctx = AppCtx::new(Arc::new(Ctx::new(settings.clone()).await));
ctx.db.migrate().await.unwrap();
let inner_ctx = Arc::new(Ctx::new(settings.clone()).await);
let ctx = AppFullCtx::new(inner_ctx.clone());
let ctx2 = AppMinCtx::new(inner_ctx);
ctx.db().migrate().await.unwrap();
if ctx.settings.docker_proxy {
crate::runner::suite::SuiteRunnerState::run_proxy(&ctx)
if ctx.settings().docker_proxy {
crate::runner::suite::SuiteRunnerState::run_proxy(ctx.as_ref());
}
serve(ctx.clone()).await?;
if ctx.settings.docker_proxy {
crate::runner::suite::SuiteRunnerState::stop_proxy(&ctx);
serve(ctx.clone(), ctx2).await?;
if ctx.settings().docker_proxy {
crate::runner::suite::SuiteRunnerState::stop_proxy(ctx.as_ref());
}
Ok(())
}
async fn serve(ctx: AppCtx) -> std::io::Result<()> {
let ip = ctx.settings.server.get_ip();
let workers = ctx.settings.server.workers.unwrap_or_else(num_cpus::get);
async fn serve(ctx: AppFullCtx, ctx2: AppMinCtx) -> std::io::Result<()> {
let ip = ctx.settings().server.get_ip();
let workers = ctx.settings().server.workers.unwrap_or_else(num_cpus::get);
let scheduler = runner::scheduler::Scheduler::spawn(ctx.clone()).await;
info!("Starting server on: http://{}", ip);
@ -94,6 +97,7 @@ async fn serve(ctx: AppCtx) -> std::io::Result<()> {
.wrap(TracingLogger::default())
.wrap(actix_middleware::Compress::default())
.app_data(ctx.clone())
.app_data(ctx2.clone())
.app_data(get_json_err())
.wrap(
actix_middleware::DefaultHeaders::new()

View file

@ -4,13 +4,15 @@
use std::collections::HashMap;
use crate::complaince::target::Target;
use crate::utils::get_random;
use crate::AppCtx;
use crate::{complaince::target::Target, ctx::MinAppContext};
use super::results::{ArchivableContainer, ArchivableInitResult};
pub fn launch_init_containers(ctx: &AppCtx, target: &Target) -> Option<Vec<ArchivableInitResult>> {
pub fn launch_init_containers(
ctx: &dyn MinAppContext,
target: &Target,
) -> Option<Vec<ArchivableInitResult>> {
if let Some(init_scripts) = target.init_scripts.as_ref() {
let mut init_results = Vec::with_capacity(init_scripts.len());
for init in init_scripts.iter() {
@ -21,7 +23,7 @@ pub fn launch_init_containers(ctx: &AppCtx, target: &Target) -> Option<Vec<Archi
env.extend(custom_vars);
}
let name = format!("{}--{}", init.name, &auth[0..5]);
ctx.docker.run_container(
ctx.docker_().run_container(
&name,
&init.container,
false,
@ -29,9 +31,9 @@ pub fn launch_init_containers(ctx: &AppCtx, target: &Target) -> Option<Vec<Archi
Some("ftest".to_string()),
true,
);
let logs = ctx.docker.get_logs(&name);
let exit_code = ctx.docker.get_exit_status(&name);
ctx.docker.rm_container(&name, true);
let logs = ctx.docker_().get_logs(&name);
let exit_code = ctx.docker_().get_exit_status(&name);
ctx.docker_().rm_container(&name, true);
// TODO:fail when success == false
let c = ArchivableInitResult {
success: exit_code == 0,
@ -57,25 +59,25 @@ mod tests {
use super::*;
use crate::docker_compose::DockerCompose;
use crate::{AppCtx, Ctx, Settings};
use crate::{AppMinCtx, Ctx, Settings};
use crate::complaince::suite::Test;
#[actix_rt::test]
async fn launch_init_containers_works() {
let settings = Settings::new().unwrap();
let ctx = AppCtx::new(Arc::new(Ctx::new(settings.clone()).await));
let ctx = AppMinCtx::new(Arc::new(Ctx::new(settings.clone()).await));
// let base_dir = Path::new(&ctx.settings.repository.base_dir);
// let control = base_dir.join("control");
let compose = DockerCompose::new(
"../ftest-control/targets/forgejo".into(),
ctx.docker.clone(),
ctx.docker_().clone(),
);
compose.up();
let mut env_vars: HashMap<String, String> = HashMap::new();
env_vars.insert("FORGEJO_URL".into(), "http://forgejo:7000".into());
env_vars.insert("FORGEJO_URL".into(), "http://forgejo".into());
env_vars.insert("FORGEJO_SSH_URL".into(), "http://forgejo:2222".into());
env_vars.insert("CI".into(), "true".into());
@ -95,7 +97,7 @@ mod tests {
suites: Vec::default(),
};
let init = launch_init_containers(&ctx, &target);
let init = launch_init_containers(ctx.as_ref().as_ref(), &target);
assert!(init.is_some());
let init = init.unwrap();
assert_eq!(init.len(), 1);

View file

@ -8,7 +8,7 @@ use std::path::PathBuf;
use crate::git::Git;
use crate::runner::results::*;
use crate::AppCtx;
use crate::AppFullCtx;
pub mod init_scripts;
pub mod results;
@ -16,15 +16,15 @@ pub mod scheduler;
pub mod suite;
pub mod target;
pub async fn run(ctx: AppCtx, commit: &str) {
let base_dir = Path::new(&ctx.settings.repository.base_dir);
pub async fn run(ctx: AppFullCtx, commit: &str) {
let base_dir = Path::new(&ctx.settings().repository.base_dir);
let control = base_dir.join("control");
Git::checkout_commit(commit, &control);
for entry in crate::runner::target::get_targets(&control).iter() {
let (suite_results, init_containers) =
crate::runner::target::run_target(&ctx, entry.into()).await;
crate::runner::target::run_target(ctx.as_ref(), entry.into()).await;
let content = ArchivableResult {
commit: commit.to_string(),
suites: suite_results,

View file

@ -10,10 +10,10 @@ use tokio::{
task::JoinHandle,
};
use crate::AppCtx;
use crate::AppFullCtx;
pub struct Scheduler {
ctx: AppCtx,
ctx: AppFullCtx,
rx: Receiver<()>,
}
@ -30,14 +30,14 @@ impl SchedulerControler {
}
impl Scheduler {
pub async fn spawn(ctx: AppCtx) -> SchedulerControler {
pub async fn spawn(ctx: AppFullCtx) -> SchedulerControler {
let (tx, rx) = oneshot::channel();
let mut sc = Scheduler { ctx, rx };
let x = async move {
loop {
if let Err(TryRecvError::Empty) = sc.rx.try_recv() {
let task = sc.ctx.db.get_next_job_to_run().await;
let task = sc.ctx.db().get_next_job_to_run().await;
if task.is_err() {
tokio::time::sleep(Duration::new(5, 0)).await;
continue;
@ -45,13 +45,13 @@ impl Scheduler {
let task = task.unwrap();
tracing::info!("Scheduling job for commit {}", task.commit_hash);
sc.ctx
.db
.db()
.mark_job_scheduled(&task.commit_hash)
.await
.unwrap();
super::run(sc.ctx.clone(), &task.commit_hash).await;
sc.ctx
.db
.db()
.mark_job_finished(&task.commit_hash)
.await
.unwrap();

View file

@ -11,6 +11,7 @@ use super::results::*;
use crate::complaince::result::Result as CResult;
use crate::complaince::suite::Suite;
use crate::complaince::suite::Test;
use crate::ctx::MinAppContext;
use crate::utils::get_random;
pub struct SuiteRunnerState {
@ -31,13 +32,13 @@ impl SuiteRunnerState {
pub async fn run(
container_host: &Url,
suite: &crate::complaince::suite::Suite,
ctx: &crate::ctx::ArcCtx,
ctx: &dyn MinAppContext,
) -> ArchivableSuiteResult {
let state = Self::launch_suite(container_host, suite, ctx);
state.collect_results(ctx).await
}
pub fn run_proxy(ctx: &crate::ctx::ArcCtx) {
pub fn run_proxy(ctx: &dyn MinAppContext) {
Self::stop_proxy(ctx);
let mut default = "";
@ -67,23 +68,23 @@ impl SuiteRunnerState {
&format!("ftest_backend:{default}"),
"forgeflux/ftest-nginx-proxy",
];
let mut child = std::process::Command::new("docker")
let mut child = std::process::Command::new("docker_")
.args(&args)
.spawn()
.expect("unable to obtain Docker version");
.expect("unable to obtain docker_ version");
child.wait().unwrap();
log::info!("Started {FTEST_NGINX_PROXY} nginx proxy");
}
pub fn stop_proxy(ctx: &crate::ctx::ArcCtx) {
ctx.docker.rm_container(FTEST_NGINX_PROXY, true);
pub fn stop_proxy(ctx: &dyn MinAppContext) {
ctx.docker_().rm_container(FTEST_NGINX_PROXY, true);
log::info!("Stopped {FTEST_NGINX_PROXY} nginx proxy");
}
fn launch_suite(
container_host: &Url,
suite: &crate::complaince::suite::Suite,
ctx: &crate::ctx::ArcCtx,
ctx: &dyn MinAppContext,
) -> Self {
let mut tests = HashMap::with_capacity(suite.tests.len());
for test in suite.tests.iter() {
@ -91,10 +92,7 @@ impl SuiteRunnerState {
println!("starting test {}", test.name);
let mut env = HashMap::new();
env.insert("FTEST_AUTH".into(), auth.to_owned());
env.insert(
"FTEST_HOST".into(),
format!("http://ftest:{}", ctx.settings.server.port),
);
env.insert("FTEST_HOST".into(), format!("http://ftest:{}", ctx.port_()));
env.insert("FTEST_TARGET_HOST".into(), container_host.to_string());
env.insert("FTEST_USER".into(), "alice".into());
if let Some(custom_vars) = test.env_vars.clone() {
@ -103,7 +101,7 @@ impl SuiteRunnerState {
let name = format!("{}---{}--{}", suite.name, test.name, &auth[0..5]);
tracing::info!("Starting {name} with env vars {:?}", &env);
ctx.docker.run_container(
ctx.docker_().run_container(
&name,
&test.container,
true,
@ -114,7 +112,7 @@ impl SuiteRunnerState {
let (tx, rx) = mpsc::channel(1);
{
let mut w = ctx.results.write().unwrap();
let mut w = ctx.results_().write().unwrap();
w.insert(auth.to_owned(), tx);
}
tests.insert(
@ -133,12 +131,12 @@ impl SuiteRunnerState {
}
}
async fn collect_results(mut self, ctx: &crate::ctx::ArcCtx) -> ArchivableSuiteResult {
async fn collect_results(mut self, ctx: &dyn MinAppContext) -> ArchivableSuiteResult {
let mut tests = Vec::with_capacity(self.tests.len());
for (_auth, mut r) in self.tests.drain() {
let result = r.rx.recv().await.unwrap();
let log = ctx.docker.get_logs(&r.container_name);
ctx.docker.rm_container(&r.container_name, true);
let log = ctx.docker_().get_logs(&r.container_name);
ctx.docker_().rm_container(&r.container_name, true);
let container = ArchivableContainer {
name: r.container_name,
logs: log,
@ -174,7 +172,7 @@ mod tests {
use crate::complaince::result::Result as CResult;
use crate::complaince::suite::Test;
use crate::{AppCtx, Ctx, Settings};
use crate::{AppMinCtx, Ctx, Settings};
use std::sync::Arc;
@ -190,18 +188,18 @@ mod tests {
// }
//
// #[derive(Clone)]
// struct TestDocker {
// struct Testdocker_ {
// state: Arc<RwLock<ContainerState>>,
// }
//
// impl TestDocker {
// impl Testdocker_ {
// pub fn new() -> Self {
// Self {
// state: Arc::new(RwLock::new(ContainerState::NoContainer)),
// }
// }
// }
// impl DockerLike for TestDocker {
// impl docker_Like for Testdocker_ {
// fn version(&self) -> String {
// unimplemented!();
// }
@ -243,8 +241,8 @@ mod tests {
let settings = Settings::new().unwrap();
let ctx = Ctx::new(settings.clone()).await;
// ctx.docker = Arc::new(TestDocker::new());
let ctx = AppCtx::new(Arc::new(ctx));
// ctx.docker_ = Arc::new(Testdocker_::new());
let ctx = crate::AppFullCtx::new(Arc::new(ctx));
let mut dummy_test = Test {
name: "suite_runner_works".into(),
@ -273,15 +271,15 @@ mod tests {
let state = SuiteRunnerState::launch_suite(
&Url::parse("http://suite_runner_works.service").unwrap(),
&suite,
&ctx,
ctx.as_ref(),
);
assert_eq!(state.tests.len(), 2);
std::thread::sleep(std::time::Duration::new(13, 0));
for (k, v) in state.tests.iter() {
assert_eq!(ctx.docker.get_exit_status(&v.container_name), 0);
assert_eq!(ctx.docker_().get_exit_status(&v.container_name), 0);
let tx = {
let r = ctx.results.read().unwrap();
let r = ctx.results_().read().unwrap();
r.get(k).unwrap().to_owned()
};
let tx_result = CResult {
@ -292,7 +290,7 @@ mod tests {
tx.send(tx_result).await.unwrap();
}
let results = state.collect_results(&ctx).await;
let results = state.collect_results(ctx.as_ref()).await;
assert_eq!(results.tests.len(), 2);
for archivable_test in results.tests.iter() {

View file

@ -7,22 +7,22 @@ use std::path::Path;
use std::path::PathBuf;
use crate::complaince::target::Target;
use crate::ctx::MinAppContext;
use crate::git::Git;
use crate::runner::results::*;
use crate::AppCtx;
use crate::docker_compose::DockerCompose;
const FTEST_TARGET_FILE: &str = "ftest.toml";
pub async fn run_target(
ctx: &AppCtx,
ctx: &dyn MinAppContext,
target: PathBuf,
) -> (
Vec<ArchivableSuiteResult>,
Option<Vec<ArchivableInitResult>>,
) {
let compose = DockerCompose::new(target.canonicalize().unwrap(), ctx.docker.clone());
let compose = DockerCompose::new(target.canonicalize().unwrap(), ctx.docker_().clone());
compose.up();
let services = compose.services();