From fa31c4fac003c8db7bf933d587a8bd27468cfeb0 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Fri, 10 Mar 2023 20:24:23 +0530 Subject: [PATCH] feat: introduce on loop in bg --- src/introduce.rs | 33 +++++++++++++++++++++++++++++++++ src/main.rs | 8 +++++++- src/spider.rs | 1 - 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/introduce.rs b/src/introduce.rs index 750aea2..58a0d85 100644 --- a/src/introduce.rs +++ b/src/introduce.rs @@ -16,11 +16,13 @@ * along with this program. If not, see . */ use std::collections::HashSet; +use std::future::Future; use actix_web::web; use actix_web::{HttpResponse, Responder}; use actix_web_codegen_const_routes::get; use actix_web_codegen_const_routes::post; +use tokio::sync::oneshot::{self, error::TryRecvError, Sender}; use url::Url; pub use api_routes::*; @@ -138,6 +140,37 @@ impl Ctx { Ok(()) } + pub async fn spawn_bootstrap( + self, + db: Box, + ) -> ServiceResult<(Sender, impl Future)> { + let (tx, mut rx) = oneshot::channel(); + let fut = async { + loop { + let shutdown = match rx.try_recv() { + // The channel is currently empty + Ok(x) => { + log::info!("Received signal from tx"); + x + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Closed) => { + log::info!("Closed"); + true + } + }; + if shutdown { + break; + } + + let _ = self.bootstrap(db).await; + } + }; + + let join_handle = tokio::spawn(fut); + Ok((tx, join_handle)) + } + pub async fn bootstrap(&self, db: &Box) -> ServiceResult<()> { let mut known_starcharts = HashSet::with_capacity(self.settings.introducer.nodes.len()); for starchart in self.settings.introducer.nodes.iter() { diff --git a/src/main.rs b/src/main.rs index 4ae9cd1..7ecd651 100644 --- a/src/main.rs +++ b/src/main.rs @@ -86,7 +86,11 @@ async fn main() { let crawler_fut = tokio::spawn(spider::Crawler::start(crawler.clone())); let ctx = WebCtx::new(ctx); - ctx.bootstrap(&db).await.unwrap(); + let (kill_introducer, introducer_fut) = ctx + .clone() + .spawn_bootstrap(db.as_ref().clone()) + .await + .unwrap(); let c = ctx.clone(); let d = db.clone(); @@ -121,6 +125,8 @@ async fn main() { // .await // .unwrap(); kill_crawler.send(true).unwrap(); + kill_introducer.send(true).unwrap(); crawler_fut.await.unwrap().await; + introducer_fut.await; s.await.unwrap().unwrap(); } diff --git a/src/spider.rs b/src/spider.rs index 6dd7a8e..cd9f92b 100644 --- a/src/spider.rs +++ b/src/spider.rs @@ -159,7 +159,6 @@ impl Crawler { info!("Closed"); true } - _ => false, } } else { true