feat: job runner to execute download requests from mCaptcha/mCaptcha

This commit is contained in:
Aravinth Manivannan 2023-10-20 01:40:32 +05:30
parent c0a125d5f1
commit c8ecd29e94
Signed by: realaravinth
GPG key ID: F8F50389936984FF
3 changed files with 130 additions and 10 deletions

12
Cargo.lock generated
View file

@ -3301,9 +3301,21 @@ dependencies = [
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.45.0",
]
[[package]]
name = "tokio-macros"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"

View file

@ -70,7 +70,7 @@ mime = "0.3.16"
#sailfish = "0.3.2"
tracing = { version = "0.1.37", features = ["log"] }
tera = { version="1.17.1", features=["builtins"]}
tokio = { version = "1.25.0", features = ["fs"] }
tokio = { version = "1.25.0", features = ["fs", "macros"] }
csv-async = { version = "1.2.5", features = ["serde", "tokio"] }
async-trait = "0.1.68"
reqwest = { version = "0.11.18", features = ["json", "gzip"] }

View file

@ -14,12 +14,17 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::time::Duration;
use async_trait::async_trait;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use url::Url;
use crate::errors::*;
use crate::{api::v1::mcaptcha::db::SchedulerJob, errors::*, AppData};
/* TODO:
* 1. Define traits to interact with mCaptcha
@ -79,11 +84,6 @@ impl Clone for Box<dyn MCaptchaClient> {
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
pub struct Secret {
pub secret: String,
}
#[derive(Clone)]
pub struct MCaptchaClientReqwest {
client: Client,
@ -122,7 +122,8 @@ impl MCaptchaClient for MCaptchaClientReqwest {
campaign_id: &str,
page: usize,
) -> ServiceResult<Vec<PerformanceAnalytics>> {
mcaptcha.set_path(&format!("/api/v1/survey/{campaign_id}/get?page={page}"));
mcaptcha.set_path(&format!("/api/v1/survey/takeout/{campaign_id}/get"));
mcaptcha.set_query(Some(&format!("page={page}")));
let res = self
.client
.get(mcaptcha)
@ -136,6 +137,114 @@ impl MCaptchaClient for MCaptchaClientReqwest {
}
}
#[derive(Clone)]
pub struct MCaptchaDownloader {
data: AppData,
}
impl MCaptchaDownloader {
pub fn new(data: AppData) -> Self {
Self { data }
}
fn can_run(rx: &mut oneshot::Receiver<()>) -> bool {
matches!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty))
}
pub async fn start_job(
&self,
) -> ServiceResult<(oneshot::Sender<()>, JoinHandle<()>)> {
let (tx, mut rx) = oneshot::channel();
let this = self.clone();
let fut = async move {
loop {
if !Self::can_run(&mut rx) {
log::info!("stopping survey uploads");
break;
}
let task = this.data.get_next_job_to_run().await.unwrap();
if task.is_none() {
for _ in 0..5 {
if !Self::can_run(&mut rx) {
log::info!("Stopping survey uploads");
break;
}
sleep(Duration::new(1, 0)).await;
}
continue;
}
let task = task.unwrap();
this.data.mark_job_scheduled(&task).await.unwrap();
this.exec_job(&task, &mut rx).await.unwrap();
}
};
let handle = tokio::spawn(fut);
Ok((tx, handle))
}
async fn exec_job(
&self,
job: &SchedulerJob,
rx: &mut oneshot::Receiver<()>,
) -> ServiceResult<()> {
let checkpoint = self.data.mcaptcha_get_checkpoint(&job.campaign_id).await?;
const LIMIT: usize = 50;
let mut page = 1 + (checkpoint / LIMIT);
let campaign_str = job.campaign_id.to_string();
log::info!("getting page {page} from {campaign_str}");
loop {
if !Self::can_run(rx) {
log::info!("Stopping survey downloads");
break;
}
let mut res = self
.data
.mcaptcha
.download_benchmarks(job.url.clone(), &campaign_str, page)
.await?;
if !Self::can_run(rx) {
log::info!("Stopping survey downloads");
break;
}
let skip = checkpoint - ((page - 1) * LIMIT);
let new_records = res.len() - skip as usize;
let mut skip = skip as isize;
for r in res.drain(0..) {
if skip > 0 {
skip -= 1;
continue;
}
self.data
.mcaptcha_insert_analytics(&job.campaign_id, &r)
.await?;
}
self.data
.mcaptcha_set_checkpoint(&job.campaign_id, new_records)
.await?;
if !Self::can_run(rx) {
log::info!("Stopping survey downloads");
break;
}
page += 1;
if res.len() < LIMIT {
break;
}
}
self.data.mark_job_finished(job).await.unwrap();
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use super::*;
@ -196,8 +305,7 @@ pub mod tests {
) -> ServiceResult<Vec<PerformanceAnalytics>> {
println!(
"mcaptcha_url {}, campaign_id {}, page: {page}",
mcaptcha.to_string(),
campaign_id
mcaptcha, campaign_id
);
let res = BENCHMARK.clone();
Ok(res)