feat: async crawl

This commit is contained in:
Aravinth Manivannan 2022-07-15 20:20:59 +05:30
parent 993ba2fe25
commit 0d1f42d5e4
Signed by: realaravinth
GPG key ID: AD9F0F08E855ED88
3 changed files with 141 additions and 2 deletions

View file

@ -33,7 +33,7 @@ mime = "0.3.16"
mime_guess = "2.0.3" mime_guess = "2.0.3"
rand = "0.8.5" rand = "0.8.5"
tera = "1.15" tera = "1.15"
tokio = { version = "1.17", features = ["fs", "time"] } tokio = { version = "1.17", features = ["fs", "time", "sync"] }
url = { version = "2.2.2", features = ["serde"] } url = { version = "2.2.2", features = ["serde"] }
validator = { version = "0.15", features = ["derive"]} validator = { version = "0.15", features = ["derive"]}
derive_more = "0.99.17" derive_more = "0.99.17"

View file

@ -20,6 +20,7 @@ use std::sync::Arc;
use actix_files::Files; use actix_files::Files;
use actix_web::{middleware, web::Data, App, HttpServer}; use actix_web::{middleware, web::Data, App, HttpServer};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tokio::sync::oneshot;
pub mod ctx; pub mod ctx;
pub mod db; pub mod db;
@ -66,10 +67,20 @@ async fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
lazy_static::initialize(&pages::TEMPLATES); lazy_static::initialize(&pages::TEMPLATES);
let ctx = WebCtx::new(Ctx::new(settings.clone()).await); let ctx = Ctx::new(settings.clone()).await;
let db = WebDB::new(sqlite::get_data(Some(settings.clone())).await); let db = WebDB::new(sqlite::get_data(Some(settings.clone())).await);
let federate = WebFederate::new(get_federate(Some(settings.clone())).await); let federate = WebFederate::new(get_federate(Some(settings.clone())).await);
let (kill_crawler, rx) = oneshot::channel();
let crawler = spider::Crawler::new(
rx,
ctx.clone(),
db.as_ref().clone(),
federate.as_ref().clone(),
);
let crawler_fut = tokio::spawn(spider::Crawler::start(crawler.clone()));
let ctx = WebCtx::new(ctx);
let socket_addr = settings.server.get_ip(); let socket_addr = settings.server.get_ip();
HttpServer::new(move || { HttpServer::new(move || {
@ -90,4 +101,7 @@ async fn main() {
.run() .run()
.await .await
.unwrap(); .unwrap();
kill_crawler.send(true).unwrap();
crawler_fut.await.unwrap().await;
} }

View file

@ -16,6 +16,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
use std::future::Future; use std::future::Future;
use std::sync::Arc;
use std::sync::RwLock;
use log::info; use log::info;
use tokio::sync::oneshot::{error::TryRecvError, Receiver}; use tokio::sync::oneshot::{error::TryRecvError, Receiver};
@ -119,6 +121,105 @@ impl Ctx {
} }
} }
pub struct Crawler {
rx: RwLock<Option<Receiver<bool>>>,
ctx: ArcCtx,
db: BoxDB,
federate: ArcFederate,
}
impl Crawler {
pub fn new(rx: Receiver<bool>, ctx: ArcCtx, db: BoxDB, federate: ArcFederate) -> Arc<Self> {
let rx = RwLock::new(Some(rx));
Arc::new(Self {
rx,
ctx,
db,
federate,
})
}
pub fn is_running(&self) -> bool {
self.rx.read().unwrap().is_some()
}
fn shutdown(&self) -> bool {
let res = if let Some(rx) = self.rx.write().unwrap().as_mut() {
// let rx = self.rx.as_mut().unwrap();
match rx.try_recv() {
// The channel is currently empty
Ok(x) => {
info!("Received signal from tx");
x
}
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Closed) => {
info!("Closed");
true
}
_ => false,
}
} else {
true
};
if res {
let mut rx = self.rx.write().unwrap();
*rx = None;
}
res
}
// static is justified since the crawler will be initialized when the program starts
// and only shutdown when the program exits
pub async fn start(c: Arc<Crawler>) -> impl Future {
if c.shutdown() {
info!("Stopping crawling job");
return tokio::spawn(tokio::time::sleep(std::time::Duration::new(0, 0)));
}
let fut = async move {
const LIMIT: u32 = 2;
let mut page = 0;
loop {
info!("Running crawling job");
let offset = page * LIMIT;
if c.shutdown() {
break;
}
let forges = c.db.get_all_forges(offset, LIMIT).await.unwrap();
if forges.is_empty() {
tokio::time::sleep(std::time::Duration::new(c.ctx.settings.crawler.ttl, 0))
.await;
c.federate.tar().await.unwrap();
if c.shutdown() {
info!("Stopping crawling job");
break;
}
continue;
}
for forge in forges.iter() {
if c.shutdown() {
info!("Stopping crawling job");
break;
}
c.ctx
.crawl(&Url::parse(&forge.url).unwrap(), &c.db, &c.federate)
.await;
page += 1;
}
if c.shutdown() {
info!("Stopping crawling job");
break;
}
}
};
tokio::spawn(fut)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::tests::sqlx_sqlite; use crate::tests::sqlx_sqlite;
@ -146,4 +247,28 @@ mod tests {
} }
assert!(db.forge_exists(&url).await.unwrap()); assert!(db.forge_exists(&url).await.unwrap());
} }
// #[actix_rt::test]
// async fn crawlerd() {
// use super::*;
//use tokio::sync::oneshot;
//
// let (db, ctx, federate, _tmp_dir) = sqlx_sqlite::get_ctx().await;
// let (kill_crawler, rx) = oneshot::channel();
// let crawler = Crawler::new(rx, ctx.clone(), db, federate);
// let fut = tokio::spawn(Crawler::start(crawler.clone()));
// assert!(crawler.is_running());
//
// tokio::time::sleep(std::time::Duration::new(2, 0))
// .await;
//
//
// kill_crawler.send(true).unwrap();
// tokio::time::sleep(std::time::Duration::new(2, 0))
// .await;
//
//
// fut.await.unwrap().await;
// assert!(!crawler.is_running());
// }
} }