feat: schedule download jobs on reqs from mCaptcha/mCaptcha

This commit is contained in:
Aravinth Manivannan 2023-10-20 01:42:54 +05:30
parent 3e5dca9069
commit 5e7d1cae65
Signed by: realaravinth
GPG key ID: F8F50389936984FF
2 changed files with 42 additions and 42 deletions

View file

@ -18,10 +18,10 @@ use actix_web::web::ServiceConfig;
use actix_web::{web, HttpResponse, Responder}; use actix_web::{web, HttpResponse, Responder};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
use uuid::Uuid;
use crate::api::v1::ROUTES; use crate::api::v1::ROUTES;
use crate::errors::*; use crate::errors::*;
use crate::mcaptcha::Secret;
use crate::AppData; use crate::AppData;
pub fn services(cfg: &mut ServiceConfig) { pub fn services(cfg: &mut ServiceConfig) {
@ -62,6 +62,16 @@ async fn register(
Ok(HttpResponse::Ok()) Ok(HttpResponse::Ok())
} }
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
pub struct UploadJobCreated {
id: Uuid,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
pub struct Secret {
pub secret: String,
}
#[actix_web_codegen_const_routes::post(path = "ROUTES.mcaptcha.upload")] #[actix_web_codegen_const_routes::post(path = "ROUTES.mcaptcha.upload")]
async fn upload( async fn upload(
data: AppData, data: AppData,
@ -76,10 +86,8 @@ async fn upload(
* 5. Download results * 5. Download results
* 6. Update sync point * 6. Update sync point
*/ */
let url = data data.mcaptcha_authenticate(&payload.secret).await?;
.mcaptcha_authenticate_and_get_url(&payload.secret) // let campaign_str = campaign.to_string();
.await?;
let campaign_str = campaign.to_string();
if !data if !data
.mcaptcha_campaign_is_registered(&campaign, &payload.secret) .mcaptcha_campaign_is_registered(&campaign, &payload.secret)
@ -89,37 +97,11 @@ async fn upload(
.await?; .await?;
} }
let checkpoint = data let res = UploadJobCreated {
.mcaptcha_get_checkpoint(&campaign, &payload.secret) id: data.add_job(&campaign).await?,
.await?; };
const LIMIT: usize = 50;
let mut page = 1 + (checkpoint / LIMIT);
loop {
let mut res = data
.mcaptcha
.download_benchmarks(url.clone(), &campaign_str, page)
.await?;
let skip = checkpoint - ((page - 1) * LIMIT);
let new_records = res.len() - skip as usize;
let mut skip = skip as isize;
for r in res.drain(0..) {
if skip > 0 {
skip -= 1;
continue;
}
data.mcaptcha_insert_analytics(&campaign, &payload.secret, &r)
.await?;
}
data.mcaptcha_set_checkpoint(&campaign, &payload.secret, new_records)
.await?;
page += 1; Ok(HttpResponse::Created().json(res))
if res.len() < LIMIT {
break;
}
}
Ok(HttpResponse::Ok())
} }
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
@ -145,16 +127,13 @@ async fn download(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::api::v1::bench::Submission;
use crate::api::v1::bench::SubmissionType;
use crate::api::v1::get_random; use crate::api::v1::get_random;
use crate::errors::*;
use crate::mcaptcha::PerformanceAnalytics; use crate::mcaptcha::PerformanceAnalytics;
use crate::mcaptcha::Secret; use crate::mcaptcha::Secret;
use crate::tests::*; use crate::tests::*;
use crate::*; use crate::*;
use actix_web::{http::header, test}; use actix_web::test;
#[actix_rt::test] #[actix_rt::test]
async fn mcaptcha_hooks_work() { async fn mcaptcha_hooks_work() {
@ -166,6 +145,11 @@ mod tests {
let (data, client) = get_test_data_with_mcaptcha_client().await; let (data, client) = get_test_data_with_mcaptcha_client().await;
let app = get_app!(data).await; let app = get_app!(data).await;
let mcaptcha_downloader =
crate::mcaptcha::MCaptchaDownloader::new(AppData::new(data.clone()));
let (mcaptcha_downloader_killer, mcaptcha_downloader_job) =
mcaptcha_downloader.start_job().await.unwrap();
if data if data
.mcaptcha_url_exists(&mcaptcha_instance_str) .mcaptcha_url_exists(&mcaptcha_instance_str)
.await .await
@ -241,7 +225,16 @@ mod tests {
.to_request(), .to_request(),
) )
.await; .await;
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::CREATED);
let job: super::UploadJobCreated = test::read_body_json(resp).await;
loop {
if data.get_job(&job.id).await.unwrap().unwrap().state
== *crate::db::JOB_STATE_FINISH
{
break;
}
tokio::time::sleep(std::time::Duration::new(1, 0)).await;
}
let public_id = data let public_id = data
.mcaptcha_get_campaign_public_id(&campaign_id, &secret) .mcaptcha_get_campaign_public_id(&campaign_id, &secret)
@ -271,5 +264,7 @@ mod tests {
let resp: Vec<PerformanceAnalytics> = test::read_body_json(resp).await; let resp: Vec<PerformanceAnalytics> = test::read_body_json(resp).await;
assert_eq!(resp.len(), 2); assert_eq!(resp.len(), 2);
assert_eq!(resp, got); assert_eq!(resp, got);
mcaptcha_downloader_killer.send(()).unwrap();
mcaptcha_downloader_job.await.unwrap();
} }
} }

View file

@ -99,6 +99,10 @@ async fn main() -> std::io::Result<()> {
let (archive_kiler, archive_job) = let (archive_kiler, archive_job) =
arch.init_archive_job(data.clone()).await.unwrap(); arch.init_archive_job(data.clone()).await.unwrap();
let mcaptcha_downloader = mcaptcha::MCaptchaDownloader::new(data.clone());
let (mcaptcha_downloader_killer, mcaptcha_downloader_job) =
mcaptcha_downloader.start_job().await.unwrap();
let ip = settings.server.get_ip(); let ip = settings.server.get_ip();
println!("Starting server on: http://{}", ip); println!("Starting server on: http://{}", ip);
@ -128,8 +132,9 @@ async fn main() -> std::io::Result<()> {
.await .await
.unwrap(); .unwrap();
archive_kiler.send(true).unwrap(); let _ = mcaptcha_downloader_killer.send(());
archive_job.await; let _ = archive_kiler.send(true);
let _ = tokio::join!(archive_job, mcaptcha_downloader_job);
Ok(()) Ok(())
} }