feat: introduce on loop in bg
This commit is contained in:
parent
d0bc6627f2
commit
fa31c4fac0
3 changed files with 40 additions and 2 deletions
|
@ -16,11 +16,13 @@
|
||||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
use std::future::Future;
|
||||||
|
|
||||||
use actix_web::web;
|
use actix_web::web;
|
||||||
use actix_web::{HttpResponse, Responder};
|
use actix_web::{HttpResponse, Responder};
|
||||||
use actix_web_codegen_const_routes::get;
|
use actix_web_codegen_const_routes::get;
|
||||||
use actix_web_codegen_const_routes::post;
|
use actix_web_codegen_const_routes::post;
|
||||||
|
use tokio::sync::oneshot::{self, error::TryRecvError, Sender};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
pub use api_routes::*;
|
pub use api_routes::*;
|
||||||
|
@ -138,6 +140,37 @@ impl Ctx {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn spawn_bootstrap(
|
||||||
|
self,
|
||||||
|
db: Box<dyn SCDatabase>,
|
||||||
|
) -> ServiceResult<(Sender<bool>, impl Future)> {
|
||||||
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
let fut = async {
|
||||||
|
loop {
|
||||||
|
let shutdown = match rx.try_recv() {
|
||||||
|
// The channel is currently empty
|
||||||
|
Ok(x) => {
|
||||||
|
log::info!("Received signal from tx");
|
||||||
|
x
|
||||||
|
}
|
||||||
|
Err(TryRecvError::Empty) => false,
|
||||||
|
Err(TryRecvError::Closed) => {
|
||||||
|
log::info!("Closed");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if shutdown {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = self.bootstrap(db).await;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let join_handle = tokio::spawn(fut);
|
||||||
|
Ok((tx, join_handle))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn bootstrap(&self, db: &Box<dyn SCDatabase>) -> ServiceResult<()> {
|
pub async fn bootstrap(&self, db: &Box<dyn SCDatabase>) -> ServiceResult<()> {
|
||||||
let mut known_starcharts = HashSet::with_capacity(self.settings.introducer.nodes.len());
|
let mut known_starcharts = HashSet::with_capacity(self.settings.introducer.nodes.len());
|
||||||
for starchart in self.settings.introducer.nodes.iter() {
|
for starchart in self.settings.introducer.nodes.iter() {
|
||||||
|
|
|
@ -86,7 +86,11 @@ async fn main() {
|
||||||
|
|
||||||
let crawler_fut = tokio::spawn(spider::Crawler::start(crawler.clone()));
|
let crawler_fut = tokio::spawn(spider::Crawler::start(crawler.clone()));
|
||||||
let ctx = WebCtx::new(ctx);
|
let ctx = WebCtx::new(ctx);
|
||||||
ctx.bootstrap(&db).await.unwrap();
|
let (kill_introducer, introducer_fut) = ctx
|
||||||
|
.clone()
|
||||||
|
.spawn_bootstrap(db.as_ref().clone())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let c = ctx.clone();
|
let c = ctx.clone();
|
||||||
let d = db.clone();
|
let d = db.clone();
|
||||||
|
@ -121,6 +125,8 @@ async fn main() {
|
||||||
// .await
|
// .await
|
||||||
// .unwrap();
|
// .unwrap();
|
||||||
kill_crawler.send(true).unwrap();
|
kill_crawler.send(true).unwrap();
|
||||||
|
kill_introducer.send(true).unwrap();
|
||||||
crawler_fut.await.unwrap().await;
|
crawler_fut.await.unwrap().await;
|
||||||
|
introducer_fut.await;
|
||||||
s.await.unwrap().unwrap();
|
s.await.unwrap().unwrap();
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,6 @@ impl Crawler {
|
||||||
info!("Closed");
|
info!("Closed");
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
_ => false,
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
true
|
true
|
||||||
|
|
Loading…
Add table
Reference in a new issue