From a3f2c3632e9de0a50af7e4bca7a2ede2e12aabf8 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Sun, 12 Mar 2023 20:11:06 +0530 Subject: [PATCH] feat: publish benchmark data periodically (configurable) --- config/default.toml | 1 + src/archive.rs | 75 ++++++++++++++++++++++++++++++++++++--------- src/main.rs | 14 +++++++-- src/settings.rs | 2 ++ 4 files changed, 74 insertions(+), 18 deletions(-) diff --git a/config/default.toml b/config/default.toml index 4fe75cd..a034271 100644 --- a/config/default.toml +++ b/config/default.toml @@ -36,6 +36,7 @@ pool = 4 [publish] dir = "/tmp/mcaptcha-survey" +duration = 3600 [footer] about = "https://mcapthca.org/about" diff --git a/src/archive.rs b/src/archive.rs index 8e5866b..e152435 100644 --- a/src/archive.rs +++ b/src/archive.rs @@ -14,13 +14,15 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +use std::future::Future; use std::path::{Path, PathBuf}; use serde::{Deserialize, Serialize}; use sqlx::types::time::OffsetDateTime; +use sqlx::types::Uuid; use tokio::fs; use tokio::io::AsyncWriteExt; -use uuid::Uuid; +use tokio::sync::oneshot::{self, error::TryRecvError, Sender}; use crate::api::v1::admin::campaigns::runners::get_results; use crate::api::v1::admin::campaigns::SurveyResponse; @@ -68,7 +70,7 @@ impl Archive { impl Archiver { pub fn new(s: &Settings) -> Self { Archiver { - base_path: s.archive.base_path.clone(), + base_path: s.publish.dir.clone(), } } @@ -163,7 +165,7 @@ impl Archiver { WHERE survey_campaigns.ID = $1 ", - &c.id + &Uuid::parse_str(&c.id.to_string()).unwrap() ) .fetch_one(&data.db) .await?; @@ -183,8 +185,15 @@ impl Archiver { wri.write_record(&keys).await.unwrap(); loop { - let mut resp = - get_results(&owner.name, &c.id, data, page, limit, None).await?; + let mut resp = get_results( + &owner.name, + &Uuid::parse_str(&c.id.to_string()).unwrap(), + data, + page, + limit, + None, + ) + .await?; for r in resp.drain(0..) { let rec = Self::extract_record(c, r); @@ -201,6 +210,40 @@ impl Archiver { Ok(()) } + pub async fn init_archive_job( + self, + data: AppData, + ) -> ServiceResult<(Sender, impl Future)> { + let (tx, mut rx) = oneshot::channel(); + + let job = async move { + loop { + // let rx = self.rx.as_mut().unwrap(); + match rx.try_recv() { + // The channel is currently empty + Ok(_) => { + log::info!("Killing archive loop: received signal"); + break; + } + Err(TryRecvError::Empty) => { + let _ = self.archive(&data).await; + + tokio::time::sleep(std::time::Duration::new( + data.settings.publish.duration, + 0, + )) + .await; + } + Err(TryRecvError::Closed) => break, + } + + let _ = self.archive(&data).await; + } + }; + let job_fut = tokio::spawn(job); + Ok((tx, job_fut)) + } + pub async fn archive(&self, data: &AppData) -> ServiceResult<()> { let mut db_campaigns = sqlx::query_as!( InnerCampaign, @@ -209,8 +252,8 @@ impl Archiver { .fetch_all(&data.db) .await?; for c in db_campaigns.drain(0..) { + let archive = Archive::new(c.id.clone(), self.base_path.clone()); let campaign: Campaign = c.into(); - let archive = Archive::new(campaign.id.clone(), self.base_path.clone()); self.write_campaign_file(&campaign, &archive).await?; self.write_benchmark_file(&campaign, &archive, data).await?; } @@ -228,7 +271,7 @@ struct InnerCampaign { #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct Campaign { - pub id: Uuid, + pub id: uuid::Uuid, pub name: String, pub difficulties: Vec, pub created_at: i64, @@ -237,7 +280,7 @@ pub struct Campaign { impl From for Campaign { fn from(i: InnerCampaign) -> Self { Self { - id: i.id, + id: uuid::Uuid::parse_str(&i.id.to_string()).unwrap(), name: i.name, difficulties: i.difficulties.iter().map(|d| *d as u32).collect(), created_at: i.created_at.unix_timestamp(), @@ -263,14 +306,14 @@ mod tests { fn archive_path_works() { let mut settings = Settings::new().unwrap(); let tmp_dir = Temp::new_dir().unwrap(); - settings.archive.base_path = tmp_dir.join("base_path").to_str().unwrap().into(); + settings.publish.dir = tmp_dir.join("base_path").to_str().unwrap().into(); let uuid = Uuid::new_v4(); - let archive = Archive::new(uuid.clone(), settings.archive.base_path.clone()); + let archive = Archive::new(uuid.clone(), settings.publish.dir.clone()); let archive_path = archive.archive_path_now(); assert_eq!( archive_path, - Path::new(&settings.archive.base_path) + Path::new(&settings.publish.dir) .join(&uuid.to_string()) .join(&archive.now.to_string()) ); @@ -278,7 +321,7 @@ mod tests { let campaign_file_path = archive.campaign_file_path(); assert_eq!( campaign_file_path, - Path::new(&settings.archive.base_path) + Path::new(&settings.publish.dir) .join(&uuid.to_string()) .join(&archive.now.to_string()) .join(CAMPAIGN_INFO_FILE) @@ -287,7 +330,7 @@ mod tests { let benchmark_file_path = archive.benchmark_file_path(); assert_eq!( benchmark_file_path, - Path::new(&settings.archive.base_path) + Path::new(&settings.publish.dir) .join(&uuid.to_string()) .join(&archive.now.to_string()) .join(BENCHMARK_FILE) @@ -348,8 +391,10 @@ mod tests { .await.unwrap(); let campaign: Campaign = db_campaign.into(); - let archive = - Archive::new(campaign.id.clone(), data.settings.archive.base_path.clone()); + let archive = Archive::new( + Uuid::parse_str(&campaign.id.to_string()).unwrap(), + data.settings.publish.dir.clone(), + ); let archiver = Archiver::new(&data.settings); archiver.archive(&AppData::new(data.clone())).await.unwrap(); let contents: Campaign = serde_json::from_str( diff --git a/src/main.rs b/src/main.rs index 39f2b92..b70c44d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,7 +72,9 @@ pub type AppData = actix_web::web::Data>; #[cfg(not(tarpaulin_include))] #[actix_web::main] async fn main() -> std::io::Result<()> { - //env::set_var("RUST_LOG", "info"); + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "info"); + } pretty_env_logger::init(); @@ -87,7 +89,8 @@ async fn main() -> std::io::Result<()> { let data = actix_web::web::Data::new(data); let arch = archive::Archiver::new(&data.settings); - arch.archive(&data).await.unwrap(); + let (archive_kiler, archive_job) = + arch.init_archive_job(data.clone()).await.unwrap(); let ip = settings.server.get_ip(); println!("Starting server on: http://{}", ip); @@ -106,14 +109,19 @@ async fn main() -> std::io::Result<()> { .wrap(actix_middleware::NormalizePath::new( actix_middleware::TrailingSlash::Trim, )) - .configure(services) .service(Files::new("/download", &settings.publish.dir).show_files_listing()) + .configure(services) .app_data(data.clone()) }) .bind(ip) .unwrap() .run() .await + .unwrap(); + + archive_kiler.send(true).unwrap(); + archive_job.await; + Ok(()) } #[cfg(not(tarpaulin_include))] diff --git a/src/settings.rs b/src/settings.rs index 9976e24..e820f03 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -85,8 +85,10 @@ pub struct Footer { pub thanks: Url, } +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Publish { pub dir: String, + pub duration: u64, } impl Publish {