feat: archive campaign and benchmark data
DESCRIPTION FORMATS - Campaign configuration is stored in JSON format - Benchmark data is stored in CSV format DIRECTORY STRUCTURE Each campaign gets a separate directory. A campaign can have multiple archives. Archives are stored in directories whose names would be the same as the UNIX timestamp of when they were recorded. EXAMPLE The example below shows three campaigns with one archive each. Each archive is stored in a directory denoting the moment in which the archive was generated. Each archive includes campaign configuration and benchmark. ```bash 14:53 atm@lab archive → tree . ├── 4e951e01-71ee-4a18-9b97-782965495ae3 │ └── 1675329764 │ ├── benchmark.csv │ └── challenge.json ├── 9d16df08-dffc-484e-bbe2-10c00b431c7e │ └── 1675329764 │ ├── benchmark.csv │ └── challenge.json └── fa9f7c24-afec-4505-adb9-8e0c3ce54d37 └── 1675329764 ├── benchmark.csv └── challenge.json 7 directories, 6 files ```
This commit is contained in:
parent
a44f6f1748
commit
604fca0a62
4 changed files with 292 additions and 1 deletions
36
Cargo.lock
generated
36
Cargo.lock
generated
|
@ -502,6 +502,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "b7f0778972c64420fdedc63f09919c8a88bda7b25135357fd25a5d9f3257e832"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"once_cell",
|
||||
"regex-automata",
|
||||
"serde 1.0.152",
|
||||
]
|
||||
|
||||
|
@ -737,6 +739,32 @@ dependencies = [
|
|||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "csv-async"
|
||||
version = "1.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c652d4c48e4dc80b26fadd169c02fb6053d9f57507ddd3e6b8706e7d0242235e"
|
||||
dependencies = [
|
||||
"bstr",
|
||||
"cfg-if",
|
||||
"csv-core",
|
||||
"futures",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde 1.0.152",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "csv-core"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ctr"
|
||||
version = "0.9.2"
|
||||
|
@ -2021,6 +2049,12 @@ dependencies = [
|
|||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.28"
|
||||
|
@ -2484,6 +2518,7 @@ dependencies = [
|
|||
"argon2-creds",
|
||||
"cache-buster",
|
||||
"config",
|
||||
"csv-async",
|
||||
"derive_builder",
|
||||
"derive_more",
|
||||
"futures",
|
||||
|
@ -2498,6 +2533,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"sqlx",
|
||||
"tera",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"url",
|
||||
"urlencoding",
|
||||
|
|
|
@ -67,6 +67,8 @@ mime = "0.3.16"
|
|||
#sailfish = "0.3.2"
|
||||
tracing = { version = "0.1.37", features = ["log"] }
|
||||
tera = { version="1.17.1", features=["builtins"]}
|
||||
tokio = { version = "1.25.0", features = ["fs"] }
|
||||
csv-async = { version = "1.2.5", features = ["serde", "tokio"] }
|
||||
|
||||
#tokio = "1.11.0"
|
||||
|
||||
|
|
249
src/archive.rs
Normal file
249
src/archive.rs
Normal file
|
@ -0,0 +1,249 @@
|
|||
use std::collections::HashMap;
|
||||
/*
|
||||
* Copyright (C) 2023 Aravinth Manivannan <realaravinth@batsense.net>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::types::time::OffsetDateTime;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::api::v1::admin::campaigns::runners::get_results;
|
||||
use crate::api::v1::admin::campaigns::SurveyResponse;
|
||||
use crate::{errors::ServiceResult, AppData, Settings};
|
||||
|
||||
const CAMPAIGN_INFO_FILE: &str = "campaign.json";
|
||||
|
||||
const BENCHMARK_FILE: &str = "benchmark.csv";
|
||||
|
||||
pub struct Archiver {
|
||||
base_path: String,
|
||||
}
|
||||
|
||||
impl Archiver {
|
||||
pub fn new(s: &Settings) -> Self {
|
||||
Archiver {
|
||||
base_path: s.archive.base_path.clone(),
|
||||
}
|
||||
}
|
||||
fn campaign_path(&self, id: &Uuid) -> PathBuf {
|
||||
Path::new(&self.base_path).join(&id.to_string())
|
||||
}
|
||||
|
||||
async fn create_dir_util(p: &PathBuf) -> ServiceResult<()> {
|
||||
if p.exists() {
|
||||
if !p.is_dir() {
|
||||
fs::remove_file(&p).await.unwrap();
|
||||
fs::create_dir_all(&p).await.unwrap();
|
||||
}
|
||||
} else {
|
||||
fs::create_dir_all(&p).await.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn archive_path_now(&self, id: &Uuid) -> PathBuf {
|
||||
let unix_time = OffsetDateTime::now_utc().unix_timestamp();
|
||||
self.campaign_path(id).join(unix_time.to_string())
|
||||
}
|
||||
|
||||
fn campaign_file_path(&self, id: &Uuid) -> PathBuf {
|
||||
self.archive_path_now(id).join(CAMPAIGN_INFO_FILE)
|
||||
}
|
||||
|
||||
fn benchmark_file_path(&self, id: &Uuid) -> PathBuf {
|
||||
self.archive_path_now(id).join(BENCHMARK_FILE)
|
||||
}
|
||||
|
||||
async fn write_campaign_file(&self, c: &Campaign) -> ServiceResult<()> {
|
||||
let archive_path = self.archive_path_now(&c.id);
|
||||
Self::create_dir_util(&archive_path).await?;
|
||||
let campaign_file_path = self.campaign_file_path(&c.id);
|
||||
let contents = serde_json::to_string(c).unwrap();
|
||||
// fs::write(campaign_file_path, contents).await.unwrap();
|
||||
let mut file = fs::File::create(&campaign_file_path).await.unwrap();
|
||||
file.write(contents.as_bytes()).await.unwrap();
|
||||
file.flush().await.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_benchmark_file(
|
||||
&self,
|
||||
c: &Campaign,
|
||||
data: &AppData,
|
||||
) -> ServiceResult<()> {
|
||||
let archive_path = self.archive_path_now(&c.id);
|
||||
Self::create_dir_util(&archive_path).await?;
|
||||
|
||||
let benchmark_file_path = self.benchmark_file_path(&c.id);
|
||||
struct Username {
|
||||
name: String,
|
||||
}
|
||||
let owner = sqlx::query_as!(
|
||||
Username,
|
||||
"SELECT
|
||||
survey_admins.name
|
||||
FROM
|
||||
survey_admins
|
||||
INNER JOIN survey_campaigns ON
|
||||
survey_admins.ID = survey_campaigns.user_id
|
||||
WHERE
|
||||
survey_campaigns.ID = $1
|
||||
",
|
||||
&c.id
|
||||
)
|
||||
.fetch_one(&data.db)
|
||||
.await?;
|
||||
|
||||
let mut page = 0;
|
||||
let limit = 50;
|
||||
let file = fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.append(true)
|
||||
.create(true)
|
||||
.open(&benchmark_file_path)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut wri = csv_async::AsyncWriter::from_writer(file);
|
||||
|
||||
loop {
|
||||
let mut resp =
|
||||
get_results(&owner.name, &c.id, data, page, limit, None).await?;
|
||||
|
||||
for r in resp.drain(0..) {
|
||||
let csv_resp = to_hashmap(r, c);
|
||||
let keys: Vec<&str> = csv_resp.keys().map(|k| k.as_str()).collect();
|
||||
wri.write_record(&keys).await.unwrap();
|
||||
|
||||
let values: Vec<&str> = csv_resp.values().map(|v| v.as_str()).collect();
|
||||
wri.write_record(&values).await.unwrap();
|
||||
wri.flush().await.unwrap();
|
||||
|
||||
//wri.serialize(csv_resp).await.unwrap();
|
||||
wri.flush().await.unwrap();
|
||||
}
|
||||
page += 1;
|
||||
|
||||
wri.flush().await.unwrap();
|
||||
if resp.len() < limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn archive(&self, data: &AppData) -> ServiceResult<()> {
|
||||
let mut db_campaigns = sqlx::query_as!(
|
||||
InnerCampaign,
|
||||
"SELECT ID, name, difficulties, created_at FROM survey_campaigns"
|
||||
)
|
||||
.fetch_all(&data.db)
|
||||
.await?;
|
||||
for c in db_campaigns.drain(0..) {
|
||||
let campaign: Campaign = c.into();
|
||||
self.write_campaign_file(&campaign).await?;
|
||||
self.write_benchmark_file(&campaign, data).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_hashmap(s: SurveyResponse, c: &Campaign) -> HashMap<String, String> {
|
||||
let mut map = HashMap::with_capacity(7 + c.difficulties.len());
|
||||
map.insert("user".into(), s.user.id.to_string());
|
||||
map.insert("device_user_provided".into(), s.device_user_provided);
|
||||
map.insert(
|
||||
"device_software_recognised".into(),
|
||||
s.device_software_recognised,
|
||||
);
|
||||
map.insert(
|
||||
"threads".into(),
|
||||
s.threads.map_or_else(|| "-".into(), |v| v.to_string()),
|
||||
);
|
||||
map.insert("submitted_at".into(), s.submitted_at.to_string());
|
||||
map.insert("submission_type".into(), s.submission_type.to_string());
|
||||
for d in c.difficulties.iter() {
|
||||
let bench = s
|
||||
.benches
|
||||
.iter()
|
||||
.find(|b| b.difficulty == *d as i32)
|
||||
.map_or_else(|| "-".into(), |v| v.duration.to_string());
|
||||
map.insert(format!("Difficulty: {d}"), bench);
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
//#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
//pub struct CSVSurveyResp {
|
||||
// pub user: Uuid,
|
||||
// pub device_user_provided: String,
|
||||
// pub device_software_recognised: String,
|
||||
// pub id: usize,
|
||||
// pub threads: Option<usize>,
|
||||
// pub submitted_at: i64,
|
||||
// pub submission_type: SubmissionType,
|
||||
// pub benches: String,
|
||||
//}
|
||||
//
|
||||
//impl From<SurveyResponse> for CSVSurveyResp {
|
||||
// fn from(s: SurveyResponse) -> Self {
|
||||
// let mut benches = String::default();
|
||||
// for b in s.benches.iter() {
|
||||
// benches = format!("{benches} ({})", b.to_csv_resp());
|
||||
// }
|
||||
// Self {
|
||||
// user: s.user.id,
|
||||
// device_software_recognised: s.device_software_recognised,
|
||||
// device_user_provided: s.device_user_provided,
|
||||
// id: s.id,
|
||||
// threads: s.threads,
|
||||
// submission_type: s.submission_type,
|
||||
// benches,
|
||||
// submitted_at: s.submitted_at,
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
struct InnerCampaign {
|
||||
id: Uuid,
|
||||
name: String,
|
||||
difficulties: Vec<i32>,
|
||||
created_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Campaign {
|
||||
pub id: Uuid,
|
||||
pub name: String,
|
||||
pub difficulties: Vec<u32>,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
impl From<InnerCampaign> for Campaign {
|
||||
fn from(i: InnerCampaign) -> Self {
|
||||
Self {
|
||||
id: i.id,
|
||||
name: i.name,
|
||||
difficulties: i.difficulties.iter().map(|d| *d as u32).collect(),
|
||||
created_at: i.created_at.unix_timestamp(),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ use lazy_static::lazy_static;
|
|||
use log::info;
|
||||
|
||||
mod api;
|
||||
mod archive;
|
||||
mod data;
|
||||
mod errors;
|
||||
mod pages;
|
||||
|
@ -70,7 +71,7 @@ pub type AppData = actix_web::web::Data<Arc<crate::data::Data>>;
|
|||
#[cfg(not(tarpaulin_include))]
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
env::set_var("RUST_LOG", "info");
|
||||
//env::set_var("RUST_LOG", "info");
|
||||
|
||||
pretty_env_logger::init();
|
||||
|
||||
|
@ -84,6 +85,9 @@ async fn main() -> std::io::Result<()> {
|
|||
sqlx::migrate!("./migrations/").run(&data.db).await.unwrap();
|
||||
let data = actix_web::web::Data::new(data);
|
||||
|
||||
let arch = archive::Archiver::new(&data.settings);
|
||||
arch.archive(&data).await.unwrap();
|
||||
|
||||
let ip = settings.server.get_ip();
|
||||
println!("Starting server on: http://{}", ip);
|
||||
|
||||
|
|
Loading…
Reference in a new issue