// Copyright (C) 2023 Aravinth Manivannan // SPDX-FileCopyrightText: 2023 Aravinth Manivannan // // SPDX-License-Identifier: AGPL-3.0-or-later use std::time::Duration; use async_trait::async_trait; use reqwest::Client; use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; use tokio::task::JoinHandle; use tokio::time::sleep; use url::Url; use crate::{api::v1::mcaptcha::db::SchedulerJob, errors::*, AppData}; /* TODO: * 1. Define traits to interact with mCaptcha * 2. Implement trait with request 3. Implement mocking for testing * 4. Load to crate::data::Data */ #[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)] /// Proof-of-Work CAPTCHA performance analytics pub struct PerformanceAnalytics { /// log ID pub id: usize, /// time taken to generate proof pub time: u32, /// difficulty factor for which the proof was generated pub difficulty_factor: u32, /// worker/client type: wasm, javascript, python, etc. pub worker_type: String, } #[async_trait] pub trait MCaptchaClient: std::marker::Send + std::marker::Sync + CloneMCaptchaClient { async fn share_secret( &self, mut mcaptcha: Url, secret: String, auth_token: String, ) -> ServiceResult<()>; async fn download_benchmarks( &self, mut mcaptcha: Url, campaign_id: &str, page: usize, ) -> ServiceResult>; } /// Trait to clone MCaptchaClient pub trait CloneMCaptchaClient { /// clone client fn clone_client(&self) -> Box; } impl CloneMCaptchaClient for T where T: MCaptchaClient + Clone + 'static, { fn clone_client(&self) -> Box { Box::new(self.clone()) } } impl Clone for Box { fn clone(&self) -> Self { (**self).clone_client() } } #[derive(Clone)] pub struct MCaptchaClientReqwest { client: Client, } impl Default for MCaptchaClientReqwest { fn default() -> Self { Self { client: Client::new(), } } } #[async_trait] impl MCaptchaClient for MCaptchaClientReqwest { async fn share_secret( &self, mut mcaptcha: Url, secret: String, auth_token: String, ) -> ServiceResult<()> { #[derive(Serialize)] struct S { secret: String, auth_token: String, } let msg = S { secret, auth_token }; mcaptcha.set_path("/api/v1/survey/secret"); self.client.post(mcaptcha).json(&msg).send().await.unwrap(); Ok(()) } async fn download_benchmarks( &self, mut mcaptcha: Url, campaign_id: &str, page: usize, ) -> ServiceResult> { mcaptcha.set_path(&format!("/api/v1/survey/takeout/{campaign_id}/get")); mcaptcha.set_query(Some(&format!("page={page}"))); let res = self .client .get(mcaptcha) .send() .await .unwrap() .json() .await .unwrap(); Ok(res) } } #[derive(Clone)] pub struct MCaptchaDownloader { data: AppData, } impl MCaptchaDownloader { pub fn new(data: AppData) -> Self { Self { data } } fn can_run(rx: &mut oneshot::Receiver<()>) -> bool { matches!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)) } pub async fn start_job( &self, ) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)> { let (tx, mut rx) = oneshot::channel(); let this = self.clone(); let fut = async move { loop { if !Self::can_run(&mut rx) { log::info!("stopping survey uploads"); break; } let task = this.data.get_next_job_to_run().await.unwrap(); if task.is_none() { for _ in 0..5 { if !Self::can_run(&mut rx) { log::info!("Stopping survey uploads"); break; } sleep(Duration::new(1, 0)).await; } continue; } let task = task.unwrap(); this.data.mark_job_scheduled(&task).await.unwrap(); this.exec_job(&task, &mut rx).await.unwrap(); } }; let handle = tokio::spawn(fut); Ok((tx, handle)) } async fn exec_job( &self, job: &SchedulerJob, rx: &mut oneshot::Receiver<()>, ) -> ServiceResult<()> { let checkpoint = self.data.mcaptcha_get_checkpoint(&job.campaign_id).await?; const LIMIT: usize = 50; let mut page = 1 + (checkpoint / LIMIT); let campaign_str = job.campaign_id.to_string(); log::info!("getting page {page} from {campaign_str}"); loop { if !Self::can_run(rx) { log::info!("Stopping survey downloads"); break; } let mut res = self .data .mcaptcha .download_benchmarks(job.url.clone(), &campaign_str, page) .await?; if !Self::can_run(rx) { log::info!("Stopping survey downloads"); break; } let skip = checkpoint - ((page - 1) * LIMIT); let new_records = res.len() - skip as usize; let mut skip = skip as isize; for r in res.drain(0..) { if skip > 0 { skip -= 1; continue; } self.data .mcaptcha_insert_analytics(&job.campaign_id, &r) .await?; } self.data .mcaptcha_set_checkpoint(&job.campaign_id, new_records) .await?; if !Self::can_run(rx) { log::info!("Stopping survey downloads"); break; } page += 1; if res.len() < LIMIT { break; } } self.data.mark_job_finished(job).await.unwrap(); Ok(()) } } #[cfg(test)] pub mod tests { use super::*; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use lazy_static::lazy_static; lazy_static! { pub static ref BENCHMARK: Vec = vec![ PerformanceAnalytics { id: 1, time: 2, difficulty_factor: 3, worker_type: "foo".to_string(), }, PerformanceAnalytics { id: 4, time: 5, difficulty_factor: 6, worker_type: "bar".to_string(), }, ]; } #[derive(Clone)] pub struct TestClient { pub client: Arc>>, } impl Default for TestClient { fn default() -> Self { Self { client: Arc::new(RwLock::new(HashMap::default())), } } } #[async_trait] impl MCaptchaClient for TestClient { async fn share_secret( &self, mut mcaptcha: Url, secret: String, auth_token: String, ) -> ServiceResult<()> { mcaptcha.set_path("/api/v1/survey/secret"); let mut x = self.client.write().unwrap(); x.insert(mcaptcha.to_string(), secret); drop(x); Ok(()) } async fn download_benchmarks( &self, mcaptcha: Url, campaign_id: &str, page: usize, ) -> ServiceResult> { println!( "mcaptcha_url {}, campaign_id {}, page: {page}", mcaptcha, campaign_id ); let res = BENCHMARK.clone(); Ok(res) } } }