diff --git a/Cargo.toml b/Cargo.toml index 4314e4b..e90f266 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ mime = "0.3.16" mime_guess = "2.0.3" rand = "0.8.5" tera = "1.15" -tokio = { version = "1.17", features = ["fs", "time"] } +tokio = { version = "1.17", features = ["fs", "time", "sync"] } url = { version = "2.2.2", features = ["serde"] } validator = { version = "0.15", features = ["derive"]} derive_more = "0.99.17" diff --git a/src/main.rs b/src/main.rs index 51894ac..d0b52fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use actix_files::Files; use actix_web::{middleware, web::Data, App, HttpServer}; use lazy_static::lazy_static; +use tokio::sync::oneshot; pub mod ctx; pub mod db; @@ -66,10 +67,20 @@ async fn main() { pretty_env_logger::init(); lazy_static::initialize(&pages::TEMPLATES); - let ctx = WebCtx::new(Ctx::new(settings.clone()).await); + let ctx = Ctx::new(settings.clone()).await; let db = WebDB::new(sqlite::get_data(Some(settings.clone())).await); let federate = WebFederate::new(get_federate(Some(settings.clone())).await); + let (kill_crawler, rx) = oneshot::channel(); + let crawler = spider::Crawler::new( + rx, + ctx.clone(), + db.as_ref().clone(), + federate.as_ref().clone(), + ); + + let crawler_fut = tokio::spawn(spider::Crawler::start(crawler.clone())); + let ctx = WebCtx::new(ctx); let socket_addr = settings.server.get_ip(); HttpServer::new(move || { @@ -90,4 +101,7 @@ async fn main() { .run() .await .unwrap(); + + kill_crawler.send(true).unwrap(); + crawler_fut.await.unwrap().await; } diff --git a/src/spider.rs b/src/spider.rs index e75b104..05ac171 100644 --- a/src/spider.rs +++ b/src/spider.rs @@ -16,6 +16,8 @@ * along with this program. If not, see . */ use std::future::Future; +use std::sync::Arc; +use std::sync::RwLock; use log::info; use tokio::sync::oneshot::{error::TryRecvError, Receiver}; @@ -119,6 +121,105 @@ impl Ctx { } } +pub struct Crawler { + rx: RwLock>>, + ctx: ArcCtx, + db: BoxDB, + federate: ArcFederate, +} + +impl Crawler { + pub fn new(rx: Receiver, ctx: ArcCtx, db: BoxDB, federate: ArcFederate) -> Arc { + let rx = RwLock::new(Some(rx)); + Arc::new(Self { + rx, + ctx, + db, + federate, + }) + } + + pub fn is_running(&self) -> bool { + self.rx.read().unwrap().is_some() + } + + fn shutdown(&self) -> bool { + let res = if let Some(rx) = self.rx.write().unwrap().as_mut() { + // let rx = self.rx.as_mut().unwrap(); + match rx.try_recv() { + // The channel is currently empty + Ok(x) => { + info!("Received signal from tx"); + x + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Closed) => { + info!("Closed"); + true + } + _ => false, + } + } else { + true + }; + if res { + let mut rx = self.rx.write().unwrap(); + *rx = None; + } + res + } + + // static is justified since the crawler will be initialized when the program starts + // and only shutdown when the program exits + pub async fn start(c: Arc) -> impl Future { + if c.shutdown() { + info!("Stopping crawling job"); + return tokio::spawn(tokio::time::sleep(std::time::Duration::new(0, 0))); + } + + let fut = async move { + const LIMIT: u32 = 2; + let mut page = 0; + loop { + info!("Running crawling job"); + let offset = page * LIMIT; + if c.shutdown() { + break; + } + + let forges = c.db.get_all_forges(offset, LIMIT).await.unwrap(); + if forges.is_empty() { + tokio::time::sleep(std::time::Duration::new(c.ctx.settings.crawler.ttl, 0)) + .await; + c.federate.tar().await.unwrap(); + if c.shutdown() { + info!("Stopping crawling job"); + break; + } + + continue; + } + for forge in forges.iter() { + if c.shutdown() { + info!("Stopping crawling job"); + break; + } + c.ctx + .crawl(&Url::parse(&forge.url).unwrap(), &c.db, &c.federate) + .await; + page += 1; + } + if c.shutdown() { + info!("Stopping crawling job"); + break; + } + } + }; + + tokio::spawn(fut) + } +} + #[cfg(test)] mod tests { use crate::tests::sqlx_sqlite; @@ -146,4 +247,28 @@ mod tests { } assert!(db.forge_exists(&url).await.unwrap()); } + + // #[actix_rt::test] + // async fn crawlerd() { + // use super::*; + //use tokio::sync::oneshot; + // + // let (db, ctx, federate, _tmp_dir) = sqlx_sqlite::get_ctx().await; + // let (kill_crawler, rx) = oneshot::channel(); + // let crawler = Crawler::new(rx, ctx.clone(), db, federate); + // let fut = tokio::spawn(Crawler::start(crawler.clone())); + // assert!(crawler.is_running()); + // + // tokio::time::sleep(std::time::Duration::new(2, 0)) + // .await; + // + // + // kill_crawler.send(true).unwrap(); + // tokio::time::sleep(std::time::Duration::new(2, 0)) + // .await; + // + // + // fut.await.unwrap().await; + // assert!(!crawler.is_running()); + // } }