From 5e7d1cae65d06f07df6fe1ca8dee1930425af3d4 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Fri, 20 Oct 2023 01:42:54 +0530 Subject: [PATCH] feat: schedule download jobs on reqs from mCaptcha/mCaptcha --- src/api/v1/mcaptcha/hooks.rs | 75 +++++++++++++++++------------------- src/main.rs | 9 ++++- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/src/api/v1/mcaptcha/hooks.rs b/src/api/v1/mcaptcha/hooks.rs index 5474bce..4f123de 100644 --- a/src/api/v1/mcaptcha/hooks.rs +++ b/src/api/v1/mcaptcha/hooks.rs @@ -18,10 +18,10 @@ use actix_web::web::ServiceConfig; use actix_web::{web, HttpResponse, Responder}; use serde::{Deserialize, Serialize}; use url::Url; +use uuid::Uuid; use crate::api::v1::ROUTES; use crate::errors::*; -use crate::mcaptcha::Secret; use crate::AppData; pub fn services(cfg: &mut ServiceConfig) { @@ -62,6 +62,16 @@ async fn register( Ok(HttpResponse::Ok()) } +#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] +pub struct UploadJobCreated { + id: Uuid, +} + +#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] +pub struct Secret { + pub secret: String, +} + #[actix_web_codegen_const_routes::post(path = "ROUTES.mcaptcha.upload")] async fn upload( data: AppData, @@ -76,10 +86,8 @@ async fn upload( * 5. Download results * 6. Update sync point */ - let url = data - .mcaptcha_authenticate_and_get_url(&payload.secret) - .await?; - let campaign_str = campaign.to_string(); + data.mcaptcha_authenticate(&payload.secret).await?; + // let campaign_str = campaign.to_string(); if !data .mcaptcha_campaign_is_registered(&campaign, &payload.secret) @@ -89,37 +97,11 @@ async fn upload( .await?; } - let checkpoint = data - .mcaptcha_get_checkpoint(&campaign, &payload.secret) - .await?; - const LIMIT: usize = 50; - let mut page = 1 + (checkpoint / LIMIT); - loop { - let mut res = data - .mcaptcha - .download_benchmarks(url.clone(), &campaign_str, page) - .await?; - let skip = checkpoint - ((page - 1) * LIMIT); - let new_records = res.len() - skip as usize; - let mut skip = skip as isize; - for r in res.drain(0..) { - if skip > 0 { - skip -= 1; - continue; - } - data.mcaptcha_insert_analytics(&campaign, &payload.secret, &r) - .await?; - } - data.mcaptcha_set_checkpoint(&campaign, &payload.secret, new_records) - .await?; + let res = UploadJobCreated { + id: data.add_job(&campaign).await?, + }; - page += 1; - if res.len() < LIMIT { - break; - } - } - - Ok(HttpResponse::Ok()) + Ok(HttpResponse::Created().json(res)) } #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)] @@ -145,16 +127,13 @@ async fn download( #[cfg(test)] mod tests { - use crate::api::v1::bench::Submission; - use crate::api::v1::bench::SubmissionType; use crate::api::v1::get_random; - use crate::errors::*; use crate::mcaptcha::PerformanceAnalytics; use crate::mcaptcha::Secret; use crate::tests::*; use crate::*; - use actix_web::{http::header, test}; + use actix_web::test; #[actix_rt::test] async fn mcaptcha_hooks_work() { @@ -166,6 +145,11 @@ mod tests { let (data, client) = get_test_data_with_mcaptcha_client().await; let app = get_app!(data).await; + let mcaptcha_downloader = + crate::mcaptcha::MCaptchaDownloader::new(AppData::new(data.clone())); + let (mcaptcha_downloader_killer, mcaptcha_downloader_job) = + mcaptcha_downloader.start_job().await.unwrap(); + if data .mcaptcha_url_exists(&mcaptcha_instance_str) .await @@ -241,7 +225,16 @@ mod tests { .to_request(), ) .await; - assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.status(), StatusCode::CREATED); + let job: super::UploadJobCreated = test::read_body_json(resp).await; + loop { + if data.get_job(&job.id).await.unwrap().unwrap().state + == *crate::db::JOB_STATE_FINISH + { + break; + } + tokio::time::sleep(std::time::Duration::new(1, 0)).await; + } let public_id = data .mcaptcha_get_campaign_public_id(&campaign_id, &secret) @@ -271,5 +264,7 @@ mod tests { let resp: Vec = test::read_body_json(resp).await; assert_eq!(resp.len(), 2); assert_eq!(resp, got); + mcaptcha_downloader_killer.send(()).unwrap(); + mcaptcha_downloader_job.await.unwrap(); } } diff --git a/src/main.rs b/src/main.rs index 8666d5c..09575ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -99,6 +99,10 @@ async fn main() -> std::io::Result<()> { let (archive_kiler, archive_job) = arch.init_archive_job(data.clone()).await.unwrap(); + let mcaptcha_downloader = mcaptcha::MCaptchaDownloader::new(data.clone()); + let (mcaptcha_downloader_killer, mcaptcha_downloader_job) = + mcaptcha_downloader.start_job().await.unwrap(); + let ip = settings.server.get_ip(); println!("Starting server on: http://{}", ip); @@ -128,8 +132,9 @@ async fn main() -> std::io::Result<()> { .await .unwrap(); - archive_kiler.send(true).unwrap(); - archive_job.await; + let _ = mcaptcha_downloader_killer.send(()); + let _ = archive_kiler.send(true); + let _ = tokio::join!(archive_job, mcaptcha_downloader_job); Ok(()) }