diff --git a/Cargo.lock b/Cargo.lock index d8f0e51..8ed4986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3301,9 +3301,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "windows-sys 0.45.0", ] +[[package]] +name = "tokio-macros" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index db88d76..67b6feb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ mime = "0.3.16" #sailfish = "0.3.2" tracing = { version = "0.1.37", features = ["log"] } tera = { version="1.17.1", features=["builtins"]} -tokio = { version = "1.25.0", features = ["fs"] } +tokio = { version = "1.25.0", features = ["fs", "macros"] } csv-async = { version = "1.2.5", features = ["serde", "tokio"] } async-trait = "0.1.68" reqwest = { version = "0.11.18", features = ["json", "gzip"] } diff --git a/src/mcaptcha.rs b/src/mcaptcha.rs index c57e695..f7c69c5 100644 --- a/src/mcaptcha.rs +++ b/src/mcaptcha.rs @@ -14,12 +14,17 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +use std::time::Duration; + use async_trait::async_trait; use reqwest::Client; use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; +use tokio::time::sleep; use url::Url; -use crate::errors::*; +use crate::{api::v1::mcaptcha::db::SchedulerJob, errors::*, AppData}; /* TODO: * 1. Define traits to interact with mCaptcha @@ -79,11 +84,6 @@ impl Clone for Box { } } -#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] -pub struct Secret { - pub secret: String, -} - #[derive(Clone)] pub struct MCaptchaClientReqwest { client: Client, @@ -122,7 +122,8 @@ impl MCaptchaClient for MCaptchaClientReqwest { campaign_id: &str, page: usize, ) -> ServiceResult> { - mcaptcha.set_path(&format!("/api/v1/survey/{campaign_id}/get?page={page}")); + mcaptcha.set_path(&format!("/api/v1/survey/takeout/{campaign_id}/get")); + mcaptcha.set_query(Some(&format!("page={page}"))); let res = self .client .get(mcaptcha) @@ -136,6 +137,114 @@ impl MCaptchaClient for MCaptchaClientReqwest { } } +#[derive(Clone)] +pub struct MCaptchaDownloader { + data: AppData, +} + +impl MCaptchaDownloader { + pub fn new(data: AppData) -> Self { + Self { data } + } + + fn can_run(rx: &mut oneshot::Receiver<()>) -> bool { + matches!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)) + } + + pub async fn start_job( + &self, + ) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)> { + let (tx, mut rx) = oneshot::channel(); + let this = self.clone(); + let fut = async move { + loop { + if !Self::can_run(&mut rx) { + log::info!("stopping survey uploads"); + break; + } + + let task = this.data.get_next_job_to_run().await.unwrap(); + if task.is_none() { + for _ in 0..5 { + if !Self::can_run(&mut rx) { + log::info!("Stopping survey uploads"); + break; + } + sleep(Duration::new(1, 0)).await; + } + + continue; + } + + let task = task.unwrap(); + this.data.mark_job_scheduled(&task).await.unwrap(); + this.exec_job(&task, &mut rx).await.unwrap(); + } + }; + let handle = tokio::spawn(fut); + Ok((tx, handle)) + } + + async fn exec_job( + &self, + job: &SchedulerJob, + rx: &mut oneshot::Receiver<()>, + ) -> ServiceResult<()> { + let checkpoint = self.data.mcaptcha_get_checkpoint(&job.campaign_id).await?; + const LIMIT: usize = 50; + let mut page = 1 + (checkpoint / LIMIT); + let campaign_str = job.campaign_id.to_string(); + log::info!("getting page {page} from {campaign_str}"); + loop { + if !Self::can_run(rx) { + log::info!("Stopping survey downloads"); + break; + } + + let mut res = self + .data + .mcaptcha + .download_benchmarks(job.url.clone(), &campaign_str, page) + .await?; + + if !Self::can_run(rx) { + log::info!("Stopping survey downloads"); + break; + } + + let skip = checkpoint - ((page - 1) * LIMIT); + let new_records = res.len() - skip as usize; + let mut skip = skip as isize; + for r in res.drain(0..) { + if skip > 0 { + skip -= 1; + continue; + } + self.data + .mcaptcha_insert_analytics(&job.campaign_id, &r) + .await?; + } + self.data + .mcaptcha_set_checkpoint(&job.campaign_id, new_records) + .await?; + + if !Self::can_run(rx) { + log::info!("Stopping survey downloads"); + break; + } + + page += 1; + if res.len() < LIMIT { + break; + } + } + + self.data.mark_job_finished(job).await.unwrap(); + + Ok(()) + } +} + #[cfg(test)] pub mod tests { use super::*; @@ -196,8 +305,7 @@ pub mod tests { ) -> ServiceResult> { println!( "mcaptcha_url {}, campaign_id {}, page: {page}", - mcaptcha.to_string(), - campaign_id + mcaptcha, campaign_id ); let res = BENCHMARK.clone(); Ok(res)