survey/src/mcaptcha.rs

304 lines
8.0 KiB
Rust

// Copyright (C) 2023 Aravinth Manivannan <realaravinth@batsense.net>
// SPDX-FileCopyrightText: 2023 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::time::Duration;
use async_trait::async_trait;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use url::Url;
use crate::{api::v1::mcaptcha::db::SchedulerJob, errors::*, AppData};
/* TODO:
* 1. Define traits to interact with mCaptcha
* 2. Implement trait with request 3. Implement mocking for testing
* 4. Load to crate::data::Data
*/
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
/// Proof-of-Work CAPTCHA performance analytics
pub struct PerformanceAnalytics {
/// log ID
pub id: usize,
/// time taken to generate proof
pub time: u32,
/// difficulty factor for which the proof was generated
pub difficulty_factor: u32,
/// worker/client type: wasm, javascript, python, etc.
pub worker_type: String,
}
#[async_trait]
pub trait MCaptchaClient:
std::marker::Send + std::marker::Sync + CloneMCaptchaClient
{
async fn share_secret(
&self,
mut mcaptcha: Url,
secret: String,
auth_token: String,
) -> ServiceResult<()>;
async fn download_benchmarks(
&self,
mut mcaptcha: Url,
campaign_id: &str,
page: usize,
) -> ServiceResult<Vec<PerformanceAnalytics>>;
}
/// Trait to clone MCaptchaClient
pub trait CloneMCaptchaClient {
/// clone client
fn clone_client(&self) -> Box<dyn MCaptchaClient>;
}
impl<T> CloneMCaptchaClient for T
where
T: MCaptchaClient + Clone + 'static,
{
fn clone_client(&self) -> Box<dyn MCaptchaClient> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn MCaptchaClient> {
fn clone(&self) -> Self {
(**self).clone_client()
}
}
#[derive(Clone)]
pub struct MCaptchaClientReqwest {
client: Client,
}
impl Default for MCaptchaClientReqwest {
fn default() -> Self {
Self {
client: Client::new(),
}
}
}
#[async_trait]
impl MCaptchaClient for MCaptchaClientReqwest {
async fn share_secret(
&self,
mut mcaptcha: Url,
secret: String,
auth_token: String,
) -> ServiceResult<()> {
#[derive(Serialize)]
struct S {
secret: String,
auth_token: String,
}
let msg = S { secret, auth_token };
mcaptcha.set_path("/api/v1/survey/secret");
self.client.post(mcaptcha).json(&msg).send().await.unwrap();
Ok(())
}
async fn download_benchmarks(
&self,
mut mcaptcha: Url,
campaign_id: &str,
page: usize,
) -> ServiceResult<Vec<PerformanceAnalytics>> {
mcaptcha.set_path(&format!("/api/v1/survey/takeout/{campaign_id}/get"));
mcaptcha.set_query(Some(&format!("page={page}")));
let res = self
.client
.get(mcaptcha)
.send()
.await
.unwrap()
.json()
.await
.unwrap();
Ok(res)
}
}
#[derive(Clone)]
pub struct MCaptchaDownloader {
data: AppData,
}
impl MCaptchaDownloader {
pub fn new(data: AppData) -> Self {
Self { data }
}
fn can_run(rx: &mut oneshot::Receiver<()>) -> bool {
matches!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty))
}
pub async fn start_job(
&self,
) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)> {
let (tx, mut rx) = oneshot::channel();
let this = self.clone();
let fut = async move {
loop {
if !Self::can_run(&mut rx) {
log::info!("stopping survey uploads");
break;
}
let task = this.data.get_next_job_to_run().await.unwrap();
if task.is_none() {
for _ in 0..5 {
if !Self::can_run(&mut rx) {
log::info!("Stopping survey uploads");
break;
}
sleep(Duration::new(1, 0)).await;
}
continue;
}
let task = task.unwrap();
this.data.mark_job_scheduled(&task).await.unwrap();
this.exec_job(&task, &mut rx).await.unwrap();
}
};
let handle = tokio::spawn(fut);
Ok((tx, handle))
}
async fn exec_job(
&self,
job: &SchedulerJob,
rx: &mut oneshot::Receiver<()>,
) -> ServiceResult<()> {
let checkpoint = self.data.mcaptcha_get_checkpoint(&job.campaign_id).await?;
const LIMIT: usize = 50;
let mut page = 1 + (checkpoint / LIMIT);
let campaign_str = job.campaign_id.to_string();
log::info!("getting page {page} from {campaign_str}");
loop {
if !Self::can_run(rx) {
log::info!("Stopping survey downloads");
break;
}
let mut res = self
.data
.mcaptcha
.download_benchmarks(job.url.clone(), &campaign_str, page)
.await?;
if !Self::can_run(rx) {
log::info!("Stopping survey downloads");
break;
}
let skip = checkpoint - ((page - 1) * LIMIT);
let new_records = res.len() - skip as usize;
let mut skip = skip as isize;
for r in res.drain(0..) {
if skip > 0 {
skip -= 1;
continue;
}
self.data
.mcaptcha_insert_analytics(&job.campaign_id, &r)
.await?;
}
self.data
.mcaptcha_set_checkpoint(&job.campaign_id, new_records)
.await?;
if !Self::can_run(rx) {
log::info!("Stopping survey downloads");
break;
}
page += 1;
if res.len() < LIMIT {
break;
}
}
self.data.mark_job_finished(job).await.unwrap();
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use lazy_static::lazy_static;
lazy_static! {
pub static ref BENCHMARK: Vec<PerformanceAnalytics> = vec![
PerformanceAnalytics {
id: 1,
time: 2,
difficulty_factor: 3,
worker_type: "foo".to_string(),
},
PerformanceAnalytics {
id: 4,
time: 5,
difficulty_factor: 6,
worker_type: "bar".to_string(),
},
];
}
#[derive(Clone)]
pub struct TestClient {
pub client: Arc<RwLock<HashMap<String, String>>>,
}
impl Default for TestClient {
fn default() -> Self {
Self {
client: Arc::new(RwLock::new(HashMap::default())),
}
}
}
#[async_trait]
impl MCaptchaClient for TestClient {
async fn share_secret(
&self,
mut mcaptcha: Url,
secret: String,
auth_token: String,
) -> ServiceResult<()> {
mcaptcha.set_path("/api/v1/survey/secret");
let mut x = self.client.write().unwrap();
x.insert(mcaptcha.to_string(), secret);
drop(x);
Ok(())
}
async fn download_benchmarks(
&self,
mcaptcha: Url,
campaign_id: &str,
page: usize,
) -> ServiceResult<Vec<PerformanceAnalytics>> {
println!(
"mcaptcha_url {}, campaign_id {}, page: {page}",
mcaptcha, campaign_id
);
let res = BENCHMARK.clone();
Ok(res)
}
}
}