feat: ActivityPub client port&adapter to send activities to remote instances

This commit is contained in:
Aravinth Manivannan 2024-12-01 16:05:59 +05:30
parent 69de4f217a
commit 147e1563ca
Signed by: realaravinth
GPG key ID: F8F50389936984FF
9 changed files with 261 additions and 0 deletions

View file

@ -0,0 +1,17 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use activitypub_federation::error::Error as APError;
use crate::federation::application::port::out::activity_pub::errors::FederationOutAPPortError;
use crate::log::*;
impl From<APError> for FederationOutAPPortError {
fn from(v: APError) -> Self {
log::error!("AP error {:?}", v);
match v {
// APError::UrlVerificationError(_) => Self::InternalError,
_ => Self::InternalError,
}
}
}

View file

@ -0,0 +1,20 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use crate::federation::adapter::WebFederationConfig;
mod errors;
mod push_activity;
pub mod send_activity;
#[derive(Clone)]
pub struct APOutAdapter {
data: WebFederationConfig,
}
impl APOutAdapter {
pub fn new(data: WebFederationConfig) -> Self {
Self { data }
}
}

View file

@ -0,0 +1,38 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use activitypub_federation::config::Data as APData;
use activitypub_federation::traits::ActivityHandler;
use url::Url;
use crate::federation::application::port::out::activity_pub::errors::FederationOutAPPortError;
use crate::federation::domain::commit_aggregate::*;
#[async_trait::async_trait]
impl ActivityHandler for Push {
type DataType = crate::federation::adapter::FData;
type Error = FederationOutAPPortError;
fn id(&self) -> &Url {
self.id()
}
fn actor(&self) -> &Url {
self.actor()
}
async fn verify(&self, data: &APData<Self::DataType>) -> Result<(), Self::Error> {
// let settings = data.get::<WebSettings>().unwrap().clone();
// verify_domains_match(self.id(), self.actor())?;
// verify_domains_match(self.object(), self.actor())?;
// for to in self.to().iter() {
// verify_domains_match(to, &absolute_url(&settings, "").unwrap())?;
// }
// Ok(())
todo!()
}
async fn receive(self, _data: &APData<Self::DataType>) -> Result<(), Self::Error> {
todo!();
}
}

View file

@ -0,0 +1,97 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use activitypub_federation::activity_sending::SendActivityTask;
use url::Url;
use super::APOutAdapter;
use crate::federation::application::port::out::activity_pub::{errors::*, send_activity::*};
use crate::federation::domain::{followers::*, person_event_aggregate::*, remote_actor::*, *};
use crate::log::*;
#[async_trait::async_trait]
impl SendActivity for APOutAdapter {
async fn send_activity_person(
&self,
event: PersonEvent,
p: Person,
remote_actors: &[Actor],
) -> FederationOutAPPortResult<()> {
let inboxes = remote_actors.iter().map(|a| a.inbox().clone()).collect();
let data = self.data.to_request_data();
match event {
PersonEvent::PushCommit(event) => {
let sends = SendActivityTask::prepare(&event, &p, inboxes, &data).await?;
for send in sends {
// info!(
// "Sending activity {} to actor {}",
// &send.activity_id, &send.actor_id,
// );
send.sign_and_send(&data).await?;
}
}
}
Ok(())
}
}
//#[cfg(test)]
//mod tests {
// use remote_actor::Actor;
//
// use super::*;
// use crate::federation::application::port::out::db::{save_person::*, save_remote_actor::*};
//
// #[actix_rt::test]
// async fn test_postgres_follow_person() {
// let settings = crate::settings::tests::get_settings().await;
// let db = super::APOutAdapter::new(
// sqlx::postgres::PgPool::connect(&settings.database.url)
// .await
// .unwrap(),
// );
// let remote_actor = Actor::default();
// db.save(&remote_actor).await.unwrap();
//
// let mut person = Person::default();
// person.generate_keys();
// db.save_person(&person).await.unwrap();
//
// let cmd = FollowPerson::default();
//
// assert_eq!(db.count_followers(&person).await.unwrap(), 0);
//
// assert_eq!(db.count_followers(&person).await.unwrap(), 0);
// let followers = db.get_person_followers(&person, 0).await.unwrap();
// assert_eq!(*followers.total_followers(), 0);
// assert!(followers.followers().is_empty());
//
// db.create(&cmd).await.unwrap();
// db.create(&cmd).await.unwrap();
//
// assert_eq!(
// db.get_person_from_init_activity_id(cmd.init_activity_id())
// .await
// .unwrap()
// .unwrap(),
// person
// );
// let followers = db.get_person_followers(&person, 0).await.unwrap();
// assert_eq!(*followers.total_followers(), 1);
// assert_eq!(*followers.next_page(), None);
// assert_eq!(followers.followers().as_ref(), [cmd.follower().clone()]);
//
// db.delete(cmd.follower(), cmd.init_activity_id())
// .await
// .unwrap();
//
// assert_eq!(db.count_followers(&person).await.unwrap(), 0);
// let followers = db.get_person_followers(&person, 0).await.unwrap();
// assert_eq!(*followers.total_followers(), 0);
// assert_eq!(*followers.next_page(), None);
// assert!(followers.followers().is_empty());
//
// settings.drop_db().await;
// }
//}

View file

@ -0,0 +1,13 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use derive_more::Display;
use serde::{Deserialize, Serialize};
pub type FederationOutAPPortResult<V> = Result<V, FederationOutAPPortError>;
#[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum FederationOutAPPortError {
InternalError,
}

View file

@ -0,0 +1,6 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
pub mod errors;
pub mod send_activity;

View file

@ -0,0 +1,53 @@
// SPDX-FileCopyrightText: 2024 Aravinth Manivannan <realaravinth@batsense.net>
//
// SPDX-License-Identifier: AGPL-3.0-or-later
use std::fmt::Debug;
use std::sync::Arc;
use activitypub_federation::traits::{ActivityHandler, Actor as APActor, Object};
use mockall::predicate::*;
use mockall::*;
use serde::*;
use url::Url;
use super::errors::*;
use crate::federation::domain::{followers::*, person_event_aggregate::*, remote_actor::Actor, *};
#[allow(unused_imports)]
#[cfg(test)]
pub use tests::*;
//pub trait SendableObject: ActivityHandler + Serialize + Debug {}
#[automock]
#[async_trait::async_trait]
pub trait SendActivity: Send + Sync {
async fn send_activity_person(
&self,
event: PersonEvent,
p: Person,
// remote_actors: Vec<Actor>,
remote_actors: &[Actor],
) -> FederationOutAPPortResult<()>;
}
pub type FederationOutAPSendActivityObj = std::sync::Arc<dyn SendActivity>;
#[cfg(test)]
pub mod tests {
use super::*;
use std::sync::Arc;
pub fn mock_send_activity(times: Option<usize>) -> FederationOutAPSendActivityObj {
let mut m = MockSendActivity::new();
if let Some(times) = times {
m.expect_send_activity_person()
.times(times)
.return_const(Ok(()));
} else {
m.expect_send_activity_person().return_const(Ok(()));
}
Arc::new(m)
}
}

View file

@ -5,6 +5,7 @@
use derive_more::Display; use derive_more::Display;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::federation::application::port::out::activity_pub::errors::FederationOutAPPortError;
use crate::federation::application::port::out::db::errors::FederationOutDBPortError; use crate::federation::application::port::out::db::errors::FederationOutDBPortError;
use crate::federation::application::port::out::forge::errors::FederationOutForgePortError; use crate::federation::application::port::out::forge::errors::FederationOutForgePortError;
use crate::federation::domain::SupportedActorError; use crate::federation::domain::SupportedActorError;
@ -51,3 +52,15 @@ impl From<SupportedActorError> for FederationServiceError {
} }
} }
} }
impl From<FederationOutAPPortError> for FederationServiceError {
fn from(v: FederationOutAPPortError) -> Self {
match v {
// FederationOutDBPortError::DuplicateState => Self::InternalError, // only happens when there's a
// bug in code
FederationOutAPPortError::InternalError => Self::InternalError,
// FederationOutDBPortError::DuplicateAccessToken => Self::InternalError, // only happens when bug in
// code
}
}
}

View file

@ -8,6 +8,7 @@ use actix_identity::IdentityMiddleware;
use actix_session::{storage::CookieSessionStore, SessionMiddleware}; use actix_session::{storage::CookieSessionStore, SessionMiddleware};
use actix_web::{cookie::Key, middleware, App, HttpServer}; use actix_web::{cookie::Key, middleware, App, HttpServer};
use db::migrate::RunMigrations; use db::migrate::RunMigrations;
use federation::adapter::out::activity_pub::send_activity;
mod auth; mod auth;
mod db; mod db;
@ -48,6 +49,8 @@ async fn main() {
let s = settings.clone(); let s = settings.clone();
let federation_config = federation::adapter::federation_config(db.pool.clone(), &s).await; let federation_config = federation::adapter::federation_config(db.pool.clone(), &s).await;
let out_ap_send_activity_adapter =
federation::adapter::load_async_adapters(federation_config.clone()).await;
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.wrap(IdentityMiddleware::default()) .wrap(IdentityMiddleware::default())
@ -68,6 +71,7 @@ async fn main() {
)) ))
// .app_data(utils::data::Extensions::default()) // .app_data(utils::data::Extensions::default())
.app_data(federation_config.clone()) .app_data(federation_config.clone())
.app_data(out_ap_send_activity_adapter.clone())
.configure(utils::random_string::GenerateRandomString::inject()) .configure(utils::random_string::GenerateRandomString::inject())
}) })
.bind(&socket_addr) .bind(&socket_addr)