From 604fca0a622f160b5ad50533ccc269b2d6639d10 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Thu, 2 Feb 2023 21:55:13 +0530 Subject: [PATCH] feat: archive campaign and benchmark data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DESCRIPTION FORMATS - Campaign configuration is stored in JSON format - Benchmark data is stored in CSV format DIRECTORY STRUCTURE Each campaign gets a separate directory. A campaign can have multiple archives. Archives are stored in directories whose names would be the same as the UNIX timestamp of when they were recorded. EXAMPLE The example below shows three campaigns with one archive each. Each archive is stored in a directory denoting the moment in which the archive was generated. Each archive includes campaign configuration and benchmark. ```bash 14:53 atm@lab archive → tree . ├── 4e951e01-71ee-4a18-9b97-782965495ae3 │   └── 1675329764 │   ├── benchmark.csv │   └── challenge.json ├── 9d16df08-dffc-484e-bbe2-10c00b431c7e │   └── 1675329764 │   ├── benchmark.csv │   └── challenge.json └── fa9f7c24-afec-4505-adb9-8e0c3ce54d37 └── 1675329764 ├── benchmark.csv └── challenge.json 7 directories, 6 files ``` --- Cargo.lock | 36 +++++++ Cargo.toml | 2 + src/archive.rs | 249 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 6 +- 4 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 src/archive.rs diff --git a/Cargo.lock b/Cargo.lock index fa4e85f..4596e49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -502,6 +502,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f0778972c64420fdedc63f09919c8a88bda7b25135357fd25a5d9f3257e832" dependencies = [ "memchr", + "once_cell", + "regex-automata", "serde 1.0.152", ] @@ -737,6 +739,32 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv-async" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c652d4c48e4dc80b26fadd169c02fb6053d9f57507ddd3e6b8706e7d0242235e" +dependencies = [ + "bstr", + "cfg-if", + "csv-core", + "futures", + "itoa", + "ryu", + "serde 1.0.152", + "tokio", + "tokio-stream", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.9.2" @@ -2021,6 +2049,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" + [[package]] name = "regex-syntax" version = "0.6.28" @@ -2484,6 +2518,7 @@ dependencies = [ "argon2-creds", "cache-buster", "config", + "csv-async", "derive_builder", "derive_more", "futures", @@ -2498,6 +2533,7 @@ dependencies = [ "serde_json", "sqlx", "tera", + "tokio", "tracing", "url", "urlencoding", diff --git a/Cargo.toml b/Cargo.toml index 91e6d0e..134edd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,8 @@ mime = "0.3.16" #sailfish = "0.3.2" tracing = { version = "0.1.37", features = ["log"] } tera = { version="1.17.1", features=["builtins"]} +tokio = { version = "1.25.0", features = ["fs"] } +csv-async = { version = "1.2.5", features = ["serde", "tokio"] } #tokio = "1.11.0" diff --git a/src/archive.rs b/src/archive.rs new file mode 100644 index 0000000..8bdfc88 --- /dev/null +++ b/src/archive.rs @@ -0,0 +1,249 @@ +use std::collections::HashMap; +/* + * Copyright (C) 2023 Aravinth Manivannan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; +use sqlx::types::time::OffsetDateTime; +use tokio::fs; +use tokio::io::AsyncWriteExt; +use uuid::Uuid; + +use crate::api::v1::admin::campaigns::runners::get_results; +use crate::api::v1::admin::campaigns::SurveyResponse; +use crate::{errors::ServiceResult, AppData, Settings}; + +const CAMPAIGN_INFO_FILE: &str = "campaign.json"; + +const BENCHMARK_FILE: &str = "benchmark.csv"; + +pub struct Archiver { + base_path: String, +} + +impl Archiver { + pub fn new(s: &Settings) -> Self { + Archiver { + base_path: s.archive.base_path.clone(), + } + } + fn campaign_path(&self, id: &Uuid) -> PathBuf { + Path::new(&self.base_path).join(&id.to_string()) + } + + async fn create_dir_util(p: &PathBuf) -> ServiceResult<()> { + if p.exists() { + if !p.is_dir() { + fs::remove_file(&p).await.unwrap(); + fs::create_dir_all(&p).await.unwrap(); + } + } else { + fs::create_dir_all(&p).await.unwrap(); + } + Ok(()) + } + + fn archive_path_now(&self, id: &Uuid) -> PathBuf { + let unix_time = OffsetDateTime::now_utc().unix_timestamp(); + self.campaign_path(id).join(unix_time.to_string()) + } + + fn campaign_file_path(&self, id: &Uuid) -> PathBuf { + self.archive_path_now(id).join(CAMPAIGN_INFO_FILE) + } + + fn benchmark_file_path(&self, id: &Uuid) -> PathBuf { + self.archive_path_now(id).join(BENCHMARK_FILE) + } + + async fn write_campaign_file(&self, c: &Campaign) -> ServiceResult<()> { + let archive_path = self.archive_path_now(&c.id); + Self::create_dir_util(&archive_path).await?; + let campaign_file_path = self.campaign_file_path(&c.id); + let contents = serde_json::to_string(c).unwrap(); + // fs::write(campaign_file_path, contents).await.unwrap(); + let mut file = fs::File::create(&campaign_file_path).await.unwrap(); + file.write(contents.as_bytes()).await.unwrap(); + file.flush().await.unwrap(); + + Ok(()) + } + + async fn write_benchmark_file( + &self, + c: &Campaign, + data: &AppData, + ) -> ServiceResult<()> { + let archive_path = self.archive_path_now(&c.id); + Self::create_dir_util(&archive_path).await?; + + let benchmark_file_path = self.benchmark_file_path(&c.id); + struct Username { + name: String, + } + let owner = sqlx::query_as!( + Username, + "SELECT + survey_admins.name + FROM + survey_admins + INNER JOIN survey_campaigns ON + survey_admins.ID = survey_campaigns.user_id + WHERE + survey_campaigns.ID = $1 + ", + &c.id + ) + .fetch_one(&data.db) + .await?; + + let mut page = 0; + let limit = 50; + let file = fs::OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(&benchmark_file_path) + .await + .unwrap(); + let mut wri = csv_async::AsyncWriter::from_writer(file); + + loop { + let mut resp = + get_results(&owner.name, &c.id, data, page, limit, None).await?; + + for r in resp.drain(0..) { + let csv_resp = to_hashmap(r, c); + let keys: Vec<&str> = csv_resp.keys().map(|k| k.as_str()).collect(); + wri.write_record(&keys).await.unwrap(); + + let values: Vec<&str> = csv_resp.values().map(|v| v.as_str()).collect(); + wri.write_record(&values).await.unwrap(); + wri.flush().await.unwrap(); + + //wri.serialize(csv_resp).await.unwrap(); + wri.flush().await.unwrap(); + } + page += 1; + + wri.flush().await.unwrap(); + if resp.len() < limit { + break; + } + } + + Ok(()) + } + + pub async fn archive(&self, data: &AppData) -> ServiceResult<()> { + let mut db_campaigns = sqlx::query_as!( + InnerCampaign, + "SELECT ID, name, difficulties, created_at FROM survey_campaigns" + ) + .fetch_all(&data.db) + .await?; + for c in db_campaigns.drain(0..) { + let campaign: Campaign = c.into(); + self.write_campaign_file(&campaign).await?; + self.write_benchmark_file(&campaign, data).await?; + } + Ok(()) + } +} + +pub fn to_hashmap(s: SurveyResponse, c: &Campaign) -> HashMap { + let mut map = HashMap::with_capacity(7 + c.difficulties.len()); + map.insert("user".into(), s.user.id.to_string()); + map.insert("device_user_provided".into(), s.device_user_provided); + map.insert( + "device_software_recognised".into(), + s.device_software_recognised, + ); + map.insert( + "threads".into(), + s.threads.map_or_else(|| "-".into(), |v| v.to_string()), + ); + map.insert("submitted_at".into(), s.submitted_at.to_string()); + map.insert("submission_type".into(), s.submission_type.to_string()); + for d in c.difficulties.iter() { + let bench = s + .benches + .iter() + .find(|b| b.difficulty == *d as i32) + .map_or_else(|| "-".into(), |v| v.duration.to_string()); + map.insert(format!("Difficulty: {d}"), bench); + } + map +} + +//#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +//pub struct CSVSurveyResp { +// pub user: Uuid, +// pub device_user_provided: String, +// pub device_software_recognised: String, +// pub id: usize, +// pub threads: Option, +// pub submitted_at: i64, +// pub submission_type: SubmissionType, +// pub benches: String, +//} +// +//impl From for CSVSurveyResp { +// fn from(s: SurveyResponse) -> Self { +// let mut benches = String::default(); +// for b in s.benches.iter() { +// benches = format!("{benches} ({})", b.to_csv_resp()); +// } +// Self { +// user: s.user.id, +// device_software_recognised: s.device_software_recognised, +// device_user_provided: s.device_user_provided, +// id: s.id, +// threads: s.threads, +// submission_type: s.submission_type, +// benches, +// submitted_at: s.submitted_at, +// } +// } +//} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct InnerCampaign { + id: Uuid, + name: String, + difficulties: Vec, + created_at: OffsetDateTime, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct Campaign { + pub id: Uuid, + pub name: String, + pub difficulties: Vec, + pub created_at: i64, +} + +impl From for Campaign { + fn from(i: InnerCampaign) -> Self { + Self { + id: i.id, + name: i.name, + difficulties: i.difficulties.iter().map(|d| *d as u32).collect(), + created_at: i.created_at.unix_timestamp(), + } + } +} diff --git a/src/main.rs b/src/main.rs index beaf39e..6822b59 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,7 @@ use lazy_static::lazy_static; use log::info; mod api; +mod archive; mod data; mod errors; mod pages; @@ -70,7 +71,7 @@ pub type AppData = actix_web::web::Data>; #[cfg(not(tarpaulin_include))] #[actix_web::main] async fn main() -> std::io::Result<()> { - env::set_var("RUST_LOG", "info"); + //env::set_var("RUST_LOG", "info"); pretty_env_logger::init(); @@ -84,6 +85,9 @@ async fn main() -> std::io::Result<()> { sqlx::migrate!("./migrations/").run(&data.db).await.unwrap(); let data = actix_web::web::Data::new(data); + let arch = archive::Archiver::new(&data.settings); + arch.archive(&data).await.unwrap(); + let ip = settings.server.get_ip(); println!("Starting server on: http://{}", ip);