feat: log federated seraches in leaky-bucket counter
This commit is contained in:
parent
5e18cad34c
commit
d3b59c0072
2 changed files with 66 additions and 22 deletions
11
src/ctx.rs
11
src/ctx.rs
|
@ -18,9 +18,11 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use actix::dev::*;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use reqwest::{Client, ClientBuilder};
|
use reqwest::{Client, ClientBuilder};
|
||||||
|
|
||||||
|
use crate::master::Master;
|
||||||
use crate::settings::Settings;
|
use crate::settings::Settings;
|
||||||
use crate::{DOMAIN, PKG_NAME, VERSION};
|
use crate::{DOMAIN, PKG_NAME, VERSION};
|
||||||
|
|
||||||
|
@ -34,6 +36,7 @@ const CLIENT_TIMEOUT: u64 = 60;
|
||||||
pub struct Ctx {
|
pub struct Ctx {
|
||||||
pub client: Client,
|
pub client: Client,
|
||||||
pub settings: Settings,
|
pub settings: Settings,
|
||||||
|
pub master: Addr<Master>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ctx {
|
impl Ctx {
|
||||||
|
@ -48,6 +51,12 @@ impl Ctx {
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
Arc::new(Self { client, settings })
|
let master = Master::new(45).start();
|
||||||
|
|
||||||
|
Arc::new(Self {
|
||||||
|
client,
|
||||||
|
settings,
|
||||||
|
master,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use crate::counter::AddSearch;
|
||||||
|
use crate::master::{AddCounter, GetSite};
|
||||||
/*
|
/*
|
||||||
* ForgeFlux StarChart - A federated software forge spider
|
* ForgeFlux StarChart - A federated software forge spider
|
||||||
* Copyright (C) 2022 Aravinth Manivannan <realaravinth@batsense.net>
|
* Copyright (C) 2022 Aravinth Manivannan <realaravinth@batsense.net>
|
||||||
|
@ -15,11 +17,11 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
use crate::{errors::*, WebCtx};
|
use crate::{counter, errors::*, WebCtx};
|
||||||
use actix_web::web;
|
use actix_web::web;
|
||||||
use actix_web::{HttpResponse, Responder};
|
use actix_web::{HttpResponse, Responder};
|
||||||
use actix_web_codegen_const_routes::post;
|
use actix_web_codegen_const_routes::post;
|
||||||
use db_core::Repository;
|
use db_core::prelude::*;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::Ctx;
|
use crate::Ctx;
|
||||||
|
@ -43,6 +45,53 @@ impl Ctx {
|
||||||
.await
|
.await
|
||||||
.unwrap())
|
.unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn search_repository(
|
||||||
|
&self,
|
||||||
|
db: &Box<dyn SCDatabase>,
|
||||||
|
query: String,
|
||||||
|
) -> ServiceResult<Vec<Repository>> {
|
||||||
|
let query = if query.contains('*') {
|
||||||
|
query
|
||||||
|
} else {
|
||||||
|
format!("*{}*", query)
|
||||||
|
};
|
||||||
|
let local_resp = db.search_repository(&query).await?;
|
||||||
|
let mut federated_resp = Vec::default();
|
||||||
|
|
||||||
|
for starchart in db.search_mini_index(&query).await?.iter() {
|
||||||
|
if db.is_starchart_imported(&Url::parse(&starchart)?).await? {
|
||||||
|
log::debug!("{starchart} is imported");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let addr = if let Some(addr) = self.master.send(GetSite(starchart.clone())).await? {
|
||||||
|
addr
|
||||||
|
} else {
|
||||||
|
self.master
|
||||||
|
.send(AddCounter {
|
||||||
|
id: starchart.clone(),
|
||||||
|
counter: counter::Count {
|
||||||
|
duration: 54,
|
||||||
|
search_threshold: 0,
|
||||||
|
}
|
||||||
|
.into(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
self.master.send(GetSite(starchart.clone())).await?.unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
let count = addr.send(AddSearch).await?;
|
||||||
|
if count > 50 {
|
||||||
|
todo!("Clone index");
|
||||||
|
} else {
|
||||||
|
let resp = self.client_federated_search(Url::parse(starchart)?).await?;
|
||||||
|
federated_resp.extend(resp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
federated_resp.extend(local_resp);
|
||||||
|
Ok(federated_resp)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[post(path = "ROUTES.search.repository")]
|
#[post(path = "ROUTES.search.repository")]
|
||||||
|
@ -51,23 +100,10 @@ pub async fn search_repository(
|
||||||
ctx: WebCtx,
|
ctx: WebCtx,
|
||||||
db: WebDB,
|
db: WebDB,
|
||||||
) -> ServiceResult<impl Responder> {
|
) -> ServiceResult<impl Responder> {
|
||||||
let payload = payload.into_inner();
|
let resp = ctx
|
||||||
let query = if payload.query.contains('*') {
|
.search_repository(&db, payload.into_inner().query)
|
||||||
payload.query
|
.await?;
|
||||||
} else {
|
Ok(HttpResponse::Ok().json(resp))
|
||||||
format!("*{}*", payload.query)
|
|
||||||
};
|
|
||||||
let local_resp = db.search_repository(&query).await?;
|
|
||||||
let mut federated_resp = Vec::default();
|
|
||||||
|
|
||||||
for starchart in db.search_mini_index(&query).await?.iter() {
|
|
||||||
let resp = ctx.client_federated_search(Url::parse(starchart)?).await?;
|
|
||||||
federated_resp.extend(resp);
|
|
||||||
}
|
|
||||||
|
|
||||||
federated_resp.extend(local_resp);
|
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(federated_resp))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn services(cfg: &mut web::ServiceConfig) {
|
pub fn services(cfg: &mut web::ServiceConfig) {
|
||||||
|
@ -76,12 +112,11 @@ pub fn services(cfg: &mut web::ServiceConfig) {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use actix_web::http::StatusCode;
|
|
||||||
use actix_web::test;
|
use actix_web::test;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use db_core::prelude::*;
|
use actix_web::http::StatusCode;
|
||||||
|
|
||||||
use crate::tests::*;
|
use crate::tests::*;
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
Loading…
Reference in a new issue