Merge handler params into single struct (#25)

This commit is contained in:
Nutomic 2023-02-11 21:32:35 +09:00 committed by GitHub
parent 6d9682f4e6
commit 83ad4bfdc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 489 additions and 356 deletions

View file

@ -1,5 +1,9 @@
use crate::{activities::follow::Follow, instance::InstanceHandle, objects::person::MyUser}; use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::MyUser};
use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler}; use activitypub_federation::{
core::object_id::ObjectId,
request_data::RequestData,
traits::ActivityHandler,
};
use activitystreams_kinds::activity::AcceptType; use activitystreams_kinds::activity::AcceptType;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -27,7 +31,7 @@ impl Accept {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for Accept { impl ActivityHandler for Accept {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -38,11 +42,7 @@ impl ActivityHandler for Accept {
self.actor.inner() self.actor.inner()
} }
async fn receive( async fn receive(self, _data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self,
_data: &Data<Self::DataType>,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
} }

View file

@ -1,12 +1,12 @@
use crate::{ use crate::{
instance::InstanceHandle, instance::DatabaseHandle,
objects::{note::Note, person::MyUser}, objects::{note::Note, person::MyUser},
MyPost, MyPost,
}; };
use activitypub_federation::{ use activitypub_federation::{
core::object_id::ObjectId, core::object_id::ObjectId,
data::Data,
deser::helpers::deserialize_one_or_many, deser::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::{ActivityHandler, ApubObject}, traits::{ActivityHandler, ApubObject},
}; };
use activitystreams_kinds::activity::CreateType; use activitystreams_kinds::activity::CreateType;
@ -39,7 +39,7 @@ impl CreateNote {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for CreateNote { impl ActivityHandler for CreateNote {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -50,12 +50,8 @@ impl ActivityHandler for CreateNote {
self.actor.inner() self.actor.inner()
} }
async fn receive( async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self, MyPost::from_apub(self.object, data).await?;
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
MyPost::from_apub(self.object, data, request_counter).await?;
Ok(()) Ok(())
} }
} }

View file

@ -1,12 +1,12 @@
use crate::{ use crate::{
activities::accept::Accept, activities::accept::Accept,
generate_object_id, generate_object_id,
instance::InstanceHandle, instance::DatabaseHandle,
objects::person::MyUser, objects::person::MyUser,
}; };
use activitypub_federation::{ use activitypub_federation::{
core::object_id::ObjectId, core::object_id::ObjectId,
data::Data, request_data::RequestData,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use activitystreams_kinds::activity::FollowType; use activitystreams_kinds::activity::FollowType;
@ -36,7 +36,7 @@ impl Follow {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for Follow { impl ActivityHandler for Follow {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -49,11 +49,7 @@ impl ActivityHandler for Follow {
// Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446 // Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446
#[allow(clippy::await_holding_lock)] #[allow(clippy::await_holding_lock)]
async fn receive( async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
// add to followers // add to followers
let local_user = { let local_user = {
let mut users = data.users.lock().unwrap(); let mut users = data.users.lock().unwrap();
@ -63,18 +59,11 @@ impl ActivityHandler for Follow {
}; };
// send back an accept // send back an accept
let follower = self let follower = self.actor.dereference(data).await?;
.actor
.dereference(data, data.local_instance(), request_counter)
.await?;
let id = generate_object_id(data.local_instance().hostname())?; let id = generate_object_id(data.local_instance().hostname())?;
let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); let accept = Accept::new(local_user.ap_id.clone(), self, id.clone());
local_user local_user
.send( .send(accept, vec![follower.shared_inbox_or_inbox()], data)
accept,
vec![follower.shared_inbox_or_inbox()],
data.local_instance(),
)
.await?; .await?;
Ok(()) Ok(())
} }

View file

@ -6,40 +6,32 @@ use crate::{
person::{MyUser, PersonAcceptedActivities}, person::{MyUser, PersonAcceptedActivities},
}, },
}; };
use activitypub_federation::{ use activitypub_federation::{
core::{ core::{
actix::inbox::receive_activity, actix::inbox::receive_activity,
object_id::ObjectId, object_id::ObjectId,
signatures::generate_actor_keypair, signatures::generate_actor_keypair,
}, },
data::Data,
deser::context::WithContext, deser::context::WithContext,
request_data::{ApubContext, ApubMiddleware, RequestData},
traits::ApubObject, traits::ApubObject,
InstanceSettings, FederationSettings,
LocalInstance, InstanceConfig,
UrlVerifier, UrlVerifier,
APUB_JSON_CONTENT_TYPE, APUB_JSON_CONTENT_TYPE,
}; };
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use async_trait::async_trait; use async_trait::async_trait;
use reqwest::Client; use reqwest::Client;
use std::{ use std::sync::{Arc, Mutex};
ops::Deref,
sync::{Arc, Mutex},
};
use tokio::task; use tokio::task;
use url::Url; use url::Url;
pub type InstanceHandle = Arc<Instance>; pub type DatabaseHandle = Arc<Database>;
pub struct Instance { /// Our "database" which contains all known posts users (local and federated)
/// This holds all library data pub struct Database {
local_instance: LocalInstance,
/// Our "database" which contains all known users (local and federated)
pub users: Mutex<Vec<MyUser>>, pub users: Mutex<Vec<MyUser>>,
/// Same, but for posts
pub posts: Mutex<Vec<MyPost>>, pub posts: Mutex<Vec<MyPost>>,
} }
@ -58,37 +50,32 @@ impl UrlVerifier for MyUrlVerifier {
} }
} }
impl Instance { impl Database {
pub fn new(hostname: String) -> Result<InstanceHandle, Error> { pub fn new(hostname: String) -> Result<ApubContext<DatabaseHandle>, Error> {
let settings = InstanceSettings::builder() let settings = FederationSettings::builder()
.debug(true) .debug(true)
.url_verifier(Box::new(MyUrlVerifier())) .url_verifier(Box::new(MyUrlVerifier()))
.build()?; .build()?;
let local_instance = let local_instance =
LocalInstance::new(hostname.clone(), Client::default().into(), settings); InstanceConfig::new(hostname.clone(), Client::default().into(), settings);
let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?); let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?);
let instance = Arc::new(Instance { let instance = Arc::new(Database {
local_instance,
users: Mutex::new(vec![local_user]), users: Mutex::new(vec![local_user]),
posts: Mutex::new(vec![]), posts: Mutex::new(vec![]),
}); });
Ok(instance) Ok(ApubContext::new(instance, local_instance))
} }
pub fn local_user(&self) -> MyUser { pub fn local_user(&self) -> MyUser {
self.users.lock().unwrap().first().cloned().unwrap() self.users.lock().unwrap().first().cloned().unwrap()
} }
pub fn local_instance(&self) -> &LocalInstance { pub fn listen(data: &ApubContext<DatabaseHandle>) -> Result<(), Error> {
&self.local_instance let hostname = data.local_instance().hostname();
} let data = data.clone();
pub fn listen(instance: &InstanceHandle) -> Result<(), Error> {
let hostname = instance.local_instance.hostname();
let instance = instance.clone();
let server = HttpServer::new(move || { let server = HttpServer::new(move || {
App::new() App::new()
.app_data(web::Data::new(instance.clone())) .wrap(ApubMiddleware::new(data.clone()))
.route("/objects/{user_name}", web::get().to(http_get_user)) .route("/objects/{user_name}", web::get().to(http_get_user))
.service( .service(
web::scope("") web::scope("")
@ -106,10 +93,9 @@ impl Instance {
/// Handles requests to fetch user json over HTTP /// Handles requests to fetch user json over HTTP
async fn http_get_user( async fn http_get_user(
request: HttpRequest, request: HttpRequest,
data: web::Data<InstanceHandle>, data: RequestData<DatabaseHandle>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let data: InstanceHandle = data.into_inner().deref().clone(); let hostname: String = data.local_instance().hostname().to_string();
let hostname: String = data.local_instance.hostname().to_string();
let request_url = format!("http://{}{}", hostname, &request.uri().to_string()); let request_url = format!("http://{}{}", hostname, &request.uri().to_string());
let url = Url::parse(&request_url)?; let url = Url::parse(&request_url)?;
let user = ObjectId::<MyUser>::new(url) let user = ObjectId::<MyUser>::new(url)
@ -127,15 +113,11 @@ async fn http_get_user(
async fn http_post_user_inbox( async fn http_post_user_inbox(
request: HttpRequest, request: HttpRequest,
payload: String, payload: String,
data: web::Data<InstanceHandle>, data: RequestData<DatabaseHandle>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let data: InstanceHandle = data.into_inner().deref().clone();
let activity = serde_json::from_str(&payload)?; let activity = serde_json::from_str(&payload)?;
receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, InstanceHandle>( receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, DatabaseHandle>(
request, request, activity, &data,
activity,
&data.clone().local_instance,
&Data::new(data),
) )
.await .await
} }

View file

@ -1,4 +1,4 @@
use crate::{error::Error, instance::Instance, objects::note::MyPost, utils::generate_object_id}; use crate::{error::Error, instance::Database, objects::note::MyPost, utils::generate_object_id};
use tracing::log::LevelFilter; use tracing::log::LevelFilter;
mod activities; mod activities;
@ -13,15 +13,15 @@ async fn main() -> Result<(), Error> {
.filter_level(LevelFilter::Debug) .filter_level(LevelFilter::Debug)
.init(); .init();
let alpha = Instance::new("localhost:8001".to_string())?; let alpha = Database::new("localhost:8001".to_string())?;
let beta = Instance::new("localhost:8002".to_string())?; let beta = Database::new("localhost:8002".to_string())?;
Instance::listen(&alpha)?; Database::listen(&alpha)?;
Instance::listen(&beta)?; Database::listen(&beta)?;
// alpha user follows beta user // alpha user follows beta user
alpha alpha
.local_user() .local_user()
.follow(&beta.local_user(), &alpha) .follow(&beta.local_user(), &alpha.to_request_data())
.await?; .await?;
// assert that follow worked correctly // assert that follow worked correctly
assert_eq!( assert_eq!(
@ -31,7 +31,9 @@ async fn main() -> Result<(), Error> {
// beta sends a post to its followers // beta sends a post to its followers
let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id); let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id);
beta.local_user().post(sent_post.clone(), &beta).await?; beta.local_user()
.post(sent_post.clone(), &beta.to_request_data())
.await?;
let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap(); let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap();
// assert that alpha received the post // assert that alpha received the post

View file

@ -1,7 +1,8 @@
use crate::{generate_object_id, instance::InstanceHandle, objects::person::MyUser}; use crate::{generate_object_id, instance::DatabaseHandle, objects::person::MyUser};
use activitypub_federation::{ use activitypub_federation::{
core::object_id::ObjectId, core::object_id::ObjectId,
deser::helpers::deserialize_one_or_many, deser::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::ApubObject, traits::ApubObject,
}; };
use activitystreams_kinds::{object::NoteType, public}; use activitystreams_kinds::{object::NoteType, public};
@ -41,19 +42,22 @@ pub struct Note {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ApubObject for MyPost { impl ApubObject for MyPost {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type ApubType = Note; type ApubType = Note;
type DbType = (); type DbType = ();
type Error = crate::error::Error; type Error = crate::error::Error;
async fn read_from_apub_id( async fn read_from_apub_id(
_object_id: Url, _object_id: Url,
_data: &Self::DataType, _data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error> { ) -> Result<Option<Self>, Self::Error> {
todo!() todo!()
} }
async fn into_apub(self, data: &Self::DataType) -> Result<Self::ApubType, Self::Error> { async fn into_apub(
self,
data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
let creator = self.creator.dereference_local(data).await?; let creator = self.creator.dereference_local(data).await?;
Ok(Note { Ok(Note {
kind: Default::default(), kind: Default::default(),
@ -66,8 +70,7 @@ impl ApubObject for MyPost {
async fn from_apub( async fn from_apub(
apub: Self::ApubType, apub: Self::ApubType,
data: &Self::DataType, data: &RequestData<Self::DataType>,
_request_counter: &mut i32,
) -> Result<Self, Self::Error> { ) -> Result<Self, Self::Error> {
let post = MyPost { let post = MyPost {
text: apub.content, text: apub.content,

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
activities::{accept::Accept, create_note::CreateNote, follow::Follow}, activities::{accept::Accept, create_note::CreateNote, follow::Follow},
error::Error, error::Error,
instance::InstanceHandle, instance::DatabaseHandle,
objects::note::MyPost, objects::note::MyPost,
utils::generate_object_id, utils::generate_object_id,
}; };
@ -11,10 +11,9 @@ use activitypub_federation::{
object_id::ObjectId, object_id::ObjectId,
signatures::{Keypair, PublicKey}, signatures::{Keypair, PublicKey},
}, },
data::Data,
deser::context::WithContext, deser::context::WithContext,
request_data::RequestData,
traits::{ActivityHandler, Actor, ApubObject}, traits::{ActivityHandler, Actor, ApubObject},
LocalInstance,
}; };
use activitystreams_kinds::actor::PersonType; use activitystreams_kinds::actor::PersonType;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -81,30 +80,31 @@ impl MyUser {
PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone()) PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone())
} }
pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> { pub async fn follow(
&self,
other: &MyUser,
instance: &RequestData<DatabaseHandle>,
) -> Result<(), Error> {
let id = generate_object_id(instance.local_instance().hostname())?; let id = generate_object_id(instance.local_instance().hostname())?;
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
self.send( self.send(follow, vec![other.shared_inbox_or_inbox()], instance)
follow,
vec![other.shared_inbox_or_inbox()],
instance.local_instance(),
)
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> { pub async fn post(
&self,
post: MyPost,
instance: &RequestData<DatabaseHandle>,
) -> Result<(), Error> {
let id = generate_object_id(instance.local_instance().hostname())?; let id = generate_object_id(instance.local_instance().hostname())?;
let create = CreateNote::new(post.into_apub(instance).await?, id.clone()); let create = CreateNote::new(post.into_apub(instance).await?, id.clone());
let mut inboxes = vec![]; let mut inboxes = vec![];
for f in self.followers.clone() { for f in self.followers.clone() {
let user: MyUser = ObjectId::new(f) let user: MyUser = ObjectId::new(f).dereference(instance).await?;
.dereference(instance, instance.local_instance(), &mut 0)
.await?;
inboxes.push(user.shared_inbox_or_inbox()); inboxes.push(user.shared_inbox_or_inbox());
} }
self.send(create, inboxes, instance.local_instance()) self.send(create, inboxes, instance).await?;
.await?;
Ok(()) Ok(())
} }
@ -112,7 +112,7 @@ impl MyUser {
&self, &self,
activity: Activity, activity: Activity,
recipients: Vec<Url>, recipients: Vec<Url>,
local_instance: &LocalInstance, data: &RequestData<DatabaseHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error> ) -> Result<(), <Activity as ActivityHandler>::Error>
where where
Activity: ActivityHandler + Serialize + Send + Sync, Activity: ActivityHandler + Serialize + Send + Sync,
@ -124,7 +124,7 @@ impl MyUser {
self.public_key(), self.public_key(),
self.private_key.clone().expect("has private key"), self.private_key.clone().expect("has private key"),
recipients, recipients,
local_instance, data.local_instance(),
) )
.await?; .await?;
Ok(()) Ok(())
@ -133,14 +133,14 @@ impl MyUser {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ApubObject for MyUser { impl ApubObject for MyUser {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type ApubType = Person; type ApubType = Person;
type DbType = MyUser; type DbType = MyUser;
type Error = crate::error::Error; type Error = crate::error::Error;
async fn read_from_apub_id( async fn read_from_apub_id(
object_id: Url, object_id: Url,
data: &Self::DataType, data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error> { ) -> Result<Option<Self>, Self::Error> {
let users = data.users.lock().unwrap(); let users = data.users.lock().unwrap();
let res = users let res = users
@ -150,7 +150,10 @@ impl ApubObject for MyUser {
Ok(res) Ok(res)
} }
async fn into_apub(self, _data: &Self::DataType) -> Result<Self::ApubType, Self::Error> { async fn into_apub(
self,
_data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
Ok(Person { Ok(Person {
kind: Default::default(), kind: Default::default(),
id: self.ap_id.clone(), id: self.ap_id.clone(),
@ -161,8 +164,7 @@ impl ApubObject for MyUser {
async fn from_apub( async fn from_apub(
apub: Self::ApubType, apub: Self::ApubType,
_data: &Self::DataType, _data: &RequestData<Self::DataType>,
_request_counter: &mut i32,
) -> Result<Self, Self::Error> { ) -> Result<Self, Self::Error> {
Ok(MyUser { Ok(MyUser {
ap_id: apub.id, ap_id: apub.id,

View file

@ -1,5 +1,9 @@
use crate::{activities::follow::Follow, instance::InstanceHandle, objects::person::MyUser}; use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::MyUser};
use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler}; use activitypub_federation::{
core::object_id::ObjectId,
request_data::RequestData,
traits::ActivityHandler,
};
use activitystreams_kinds::activity::AcceptType; use activitystreams_kinds::activity::AcceptType;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
@ -27,7 +31,7 @@ impl Accept {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for Accept { impl ActivityHandler for Accept {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -38,11 +42,7 @@ impl ActivityHandler for Accept {
self.actor.inner() self.actor.inner()
} }
async fn receive( async fn receive(self, _data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self,
_data: &Data<Self::DataType>,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
} }

View file

@ -1,12 +1,12 @@
use crate::{ use crate::{
instance::InstanceHandle, instance::DatabaseHandle,
objects::{note::Note, person::MyUser}, objects::{note::Note, person::MyUser},
MyPost, MyPost,
}; };
use activitypub_federation::{ use activitypub_federation::{
core::object_id::ObjectId, core::object_id::ObjectId,
data::Data,
deser::helpers::deserialize_one_or_many, deser::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::{ActivityHandler, ApubObject}, traits::{ActivityHandler, ApubObject},
}; };
use activitystreams_kinds::activity::CreateType; use activitystreams_kinds::activity::CreateType;
@ -39,7 +39,7 @@ impl CreateNote {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for CreateNote { impl ActivityHandler for CreateNote {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -50,12 +50,8 @@ impl ActivityHandler for CreateNote {
self.actor.inner() self.actor.inner()
} }
async fn receive( async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self, MyPost::from_apub(self.object, data).await?;
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
MyPost::from_apub(self.object, data, request_counter).await?;
Ok(()) Ok(())
} }
} }

View file

@ -1,12 +1,12 @@
use crate::{ use crate::{
activities::accept::Accept, activities::accept::Accept,
generate_object_id, generate_object_id,
instance::InstanceHandle, instance::DatabaseHandle,
objects::person::MyUser, objects::person::MyUser,
}; };
use activitypub_federation::{ use activitypub_federation::{
core::object_id::ObjectId, core::object_id::ObjectId,
data::Data, request_data::RequestData,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use activitystreams_kinds::activity::FollowType; use activitystreams_kinds::activity::FollowType;
@ -36,7 +36,7 @@ impl Follow {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ActivityHandler for Follow { impl ActivityHandler for Follow {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type Error = crate::error::Error; type Error = crate::error::Error;
fn id(&self) -> &Url { fn id(&self) -> &Url {
@ -49,11 +49,7 @@ impl ActivityHandler for Follow {
// Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446 // Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446
#[allow(clippy::await_holding_lock)] #[allow(clippy::await_holding_lock)]
async fn receive( async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
// add to followers // add to followers
let local_user = { let local_user = {
let mut users = data.users.lock().unwrap(); let mut users = data.users.lock().unwrap();
@ -63,18 +59,11 @@ impl ActivityHandler for Follow {
}; };
// send back an accept // send back an accept
let follower = self let follower = self.actor.dereference(data).await?;
.actor
.dereference(data, data.local_instance(), request_counter)
.await?;
let id = generate_object_id(data.local_instance().hostname())?; let id = generate_object_id(data.local_instance().hostname())?;
let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); let accept = Accept::new(local_user.ap_id.clone(), self, id.clone());
local_user local_user
.send( .send(accept, vec![follower.shared_inbox_or_inbox()], data)
accept,
vec![follower.shared_inbox_or_inbox()],
data.local_instance(),
)
.await?; .await?;
Ok(()) Ok(())
} }

View file

@ -3,26 +3,27 @@ use crate::{
generate_object_id, generate_object_id,
objects::{ objects::{
note::MyPost, note::MyPost,
person::{MyUser, PersonAcceptedActivities}, person::{MyUser, Person, PersonAcceptedActivities},
}, },
}; };
use activitypub_federation::{ use activitypub_federation::{
core::{object_id::ObjectId, signatures::generate_actor_keypair}, core::{
data::Data, axum::{inbox::receive_activity, json::ApubJson, verify_request_payload, DigestVerified},
object_id::ObjectId,
signatures::generate_actor_keypair,
},
deser::context::WithContext, deser::context::WithContext,
request_data::{ApubContext, ApubMiddleware, RequestData},
traits::ApubObject, traits::ApubObject,
InstanceSettings, FederationSettings,
LocalInstance, InstanceConfig,
UrlVerifier, UrlVerifier,
}; };
use activitypub_federation::core::axum::{verify_request_payload, DigestVerified};
use async_trait::async_trait; use async_trait::async_trait;
use axum::{ use axum::{
body, body,
body::Body, body::Body,
extract::{Json, OriginalUri, State}, extract::{Json, OriginalUri},
middleware, middleware,
response::IntoResponse, response::IntoResponse,
routing::{get, post}, routing::{get, post},
@ -37,17 +38,14 @@ use std::{
}; };
use tokio::task; use tokio::task;
use tower::ServiceBuilder; use tower::ServiceBuilder;
use tower_http::ServiceBuilderExt; use tower_http::{trace::TraceLayer, ServiceBuilderExt};
use url::Url; use url::Url;
pub type InstanceHandle = Arc<Instance>; pub type DatabaseHandle = Arc<Database>;
pub struct Instance { /// Our "database" which contains all known posts and users (local and federated)
/// This holds all library data pub struct Database {
local_instance: LocalInstance,
/// Our "database" which contains all known users (local and federated)
pub users: Mutex<Vec<MyUser>>, pub users: Mutex<Vec<MyUser>>,
/// Same, but for posts
pub posts: Mutex<Vec<MyPost>>, pub posts: Mutex<Vec<MyPost>>,
} }
@ -66,34 +64,29 @@ impl UrlVerifier for MyUrlVerifier {
} }
} }
impl Instance { impl Database {
pub fn new(hostname: String) -> Result<InstanceHandle, Error> { pub fn new(hostname: String) -> Result<ApubContext<DatabaseHandle>, Error> {
let settings = InstanceSettings::builder() let settings = FederationSettings::builder()
.debug(true) .debug(true)
.url_verifier(Box::new(MyUrlVerifier())) .url_verifier(Box::new(MyUrlVerifier()))
.build()?; .build()?;
let local_instance = let local_instance =
LocalInstance::new(hostname.clone(), Client::default().into(), settings); InstanceConfig::new(hostname.clone(), Client::default().into(), settings);
let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?); let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?);
let instance = Arc::new(Instance { let instance = Arc::new(Database {
local_instance,
users: Mutex::new(vec![local_user]), users: Mutex::new(vec![local_user]),
posts: Mutex::new(vec![]), posts: Mutex::new(vec![]),
}); });
Ok(instance) Ok(ApubContext::new(instance, local_instance))
} }
pub fn local_user(&self) -> MyUser { pub fn local_user(&self) -> MyUser {
self.users.lock().unwrap().first().cloned().unwrap() self.users.lock().unwrap().first().cloned().unwrap()
} }
pub fn local_instance(&self) -> &LocalInstance { pub fn listen(data: &ApubContext<DatabaseHandle>) -> Result<(), Error> {
&self.local_instance let hostname = data.local_instance().hostname();
} let data = data.clone();
pub fn listen(instance: &InstanceHandle) -> Result<(), Error> {
let hostname = instance.local_instance.hostname();
let instance = instance.clone();
let app = Router::new() let app = Router::new()
.route("/inbox", post(http_post_user_inbox)) .route("/inbox", post(http_post_user_inbox))
.layer( .layer(
@ -102,7 +95,7 @@ impl Instance {
.layer(middleware::from_fn(verify_request_payload)), .layer(middleware::from_fn(verify_request_payload)),
) )
.route("/objects/:user_name", get(http_get_user)) .route("/objects/:user_name", get(http_get_user))
.with_state(instance) .layer(ApubMiddleware::new(data))
.layer(TraceLayer::new_for_http()); .layer(TraceLayer::new_for_http());
// run it // run it
@ -117,15 +110,11 @@ impl Instance {
} }
} }
use crate::objects::person::Person;
use activitypub_federation::core::axum::{inbox::receive_activity, json::ApubJson};
use tower_http::trace::TraceLayer;
async fn http_get_user( async fn http_get_user(
State(data): State<InstanceHandle>, data: RequestData<DatabaseHandle>,
request: Request<Body>, request: Request<Body>,
) -> Result<ApubJson<WithContext<Person>>, Error> { ) -> Result<ApubJson<WithContext<Person>>, Error> {
let hostname: String = data.local_instance.hostname().to_string(); let hostname: String = data.local_instance().hostname().to_string();
let request_url = format!("http://{}{}", hostname, &request.uri()); let request_url = format!("http://{}{}", hostname, &request.uri());
let url = Url::parse(&request_url).expect("Failed to parse url"); let url = Url::parse(&request_url).expect("Failed to parse url");
@ -143,15 +132,14 @@ async fn http_post_user_inbox(
headers: HeaderMap, headers: HeaderMap,
method: Method, method: Method,
OriginalUri(uri): OriginalUri, OriginalUri(uri): OriginalUri,
State(data): State<InstanceHandle>, data: RequestData<DatabaseHandle>,
Extension(digest_verified): Extension<DigestVerified>, Extension(digest_verified): Extension<DigestVerified>,
Json(activity): Json<WithContext<PersonAcceptedActivities>>, Json(activity): Json<WithContext<PersonAcceptedActivities>>,
) -> impl IntoResponse { ) -> impl IntoResponse {
receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, InstanceHandle>( receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, DatabaseHandle>(
digest_verified, digest_verified,
activity, activity,
&data.clone().local_instance, &data,
&Data::new(data),
headers, headers,
method, method,
uri, uri,

View file

@ -1,4 +1,4 @@
use crate::{error::Error, instance::Instance, objects::note::MyPost, utils::generate_object_id}; use crate::{error::Error, instance::Database, objects::note::MyPost, utils::generate_object_id};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod activities; mod activities;
@ -18,15 +18,15 @@ async fn main() -> Result<(), Error> {
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
let alpha = Instance::new("localhost:8001".to_string())?; let alpha = Database::new("localhost:8001".to_string())?;
let beta = Instance::new("localhost:8002".to_string())?; let beta = Database::new("localhost:8002".to_string())?;
Instance::listen(&alpha)?; Database::listen(&alpha)?;
Instance::listen(&beta)?; Database::listen(&beta)?;
// alpha user follows beta user // alpha user follows beta user
alpha alpha
.local_user() .local_user()
.follow(&beta.local_user(), &alpha) .follow(&beta.local_user(), &alpha.to_request_data())
.await?; .await?;
// assert that follow worked correctly // assert that follow worked correctly
@ -37,7 +37,9 @@ async fn main() -> Result<(), Error> {
// beta sends a post to its followers // beta sends a post to its followers
let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id); let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id);
beta.local_user().post(sent_post.clone(), &beta).await?; beta.local_user()
.post(sent_post.clone(), &beta.to_request_data())
.await?;
let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap(); let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap();
// assert that alpha received the post // assert that alpha received the post

View file

@ -1,7 +1,8 @@
use crate::{generate_object_id, instance::InstanceHandle, objects::person::MyUser}; use crate::{generate_object_id, instance::DatabaseHandle, objects::person::MyUser};
use activitypub_federation::{ use activitypub_federation::{
core::object_id::ObjectId, core::object_id::ObjectId,
deser::helpers::deserialize_one_or_many, deser::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::ApubObject, traits::ApubObject,
}; };
use activitystreams_kinds::{object::NoteType, public}; use activitystreams_kinds::{object::NoteType, public};
@ -41,19 +42,22 @@ pub struct Note {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ApubObject for MyPost { impl ApubObject for MyPost {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type ApubType = Note; type ApubType = Note;
type DbType = (); type DbType = ();
type Error = crate::error::Error; type Error = crate::error::Error;
async fn read_from_apub_id( async fn read_from_apub_id(
_object_id: Url, _object_id: Url,
_data: &Self::DataType, _data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error> { ) -> Result<Option<Self>, Self::Error> {
todo!() todo!()
} }
async fn into_apub(self, data: &Self::DataType) -> Result<Self::ApubType, Self::Error> { async fn into_apub(
self,
data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
let creator = self.creator.dereference_local(data).await?; let creator = self.creator.dereference_local(data).await?;
Ok(Note { Ok(Note {
kind: Default::default(), kind: Default::default(),
@ -66,8 +70,7 @@ impl ApubObject for MyPost {
async fn from_apub( async fn from_apub(
apub: Self::ApubType, apub: Self::ApubType,
data: &Self::DataType, data: &RequestData<Self::DataType>,
_request_counter: &mut i32,
) -> Result<Self, Self::Error> { ) -> Result<Self, Self::Error> {
let post = MyPost { let post = MyPost {
text: apub.content, text: apub.content,

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
activities::{accept::Accept, create_note::CreateNote, follow::Follow}, activities::{accept::Accept, create_note::CreateNote, follow::Follow},
error::Error, error::Error,
instance::InstanceHandle, instance::DatabaseHandle,
objects::note::MyPost, objects::note::MyPost,
utils::generate_object_id, utils::generate_object_id,
}; };
@ -11,10 +11,9 @@ use activitypub_federation::{
object_id::ObjectId, object_id::ObjectId,
signatures::{Keypair, PublicKey}, signatures::{Keypair, PublicKey},
}, },
data::Data,
deser::context::WithContext, deser::context::WithContext,
request_data::RequestData,
traits::{ActivityHandler, Actor, ApubObject}, traits::{ActivityHandler, Actor, ApubObject},
LocalInstance,
}; };
use activitystreams_kinds::actor::PersonType; use activitystreams_kinds::actor::PersonType;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -81,30 +80,31 @@ impl MyUser {
PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone()) PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone())
} }
pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> { pub async fn follow(
let id = generate_object_id(instance.local_instance().hostname())?; &self,
other: &MyUser,
data: &RequestData<DatabaseHandle>,
) -> Result<(), Error> {
let id = generate_object_id(data.local_instance().hostname())?;
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
self.send( self.send(follow, vec![other.shared_inbox_or_inbox()], data)
follow,
vec![other.shared_inbox_or_inbox()],
instance.local_instance(),
)
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> { pub async fn post(
let id = generate_object_id(instance.local_instance().hostname())?; &self,
let create = CreateNote::new(post.into_apub(instance).await?, id.clone()); post: MyPost,
data: &RequestData<DatabaseHandle>,
) -> Result<(), Error> {
let id = generate_object_id(data.local_instance().hostname())?;
let create = CreateNote::new(post.into_apub(data).await?, id.clone());
let mut inboxes = vec![]; let mut inboxes = vec![];
for f in self.followers.clone() { for f in self.followers.clone() {
let user: MyUser = ObjectId::new(f) let user: MyUser = ObjectId::new(f).dereference(data).await?;
.dereference(instance, instance.local_instance(), &mut 0)
.await?;
inboxes.push(user.shared_inbox_or_inbox()); inboxes.push(user.shared_inbox_or_inbox());
} }
self.send(create, inboxes, instance.local_instance()) self.send(create, inboxes, data).await?;
.await?;
Ok(()) Ok(())
} }
@ -112,7 +112,7 @@ impl MyUser {
&self, &self,
activity: Activity, activity: Activity,
recipients: Vec<Url>, recipients: Vec<Url>,
local_instance: &LocalInstance, data: &RequestData<DatabaseHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error> ) -> Result<(), <Activity as ActivityHandler>::Error>
where where
Activity: ActivityHandler + Serialize + Send + Sync, Activity: ActivityHandler + Serialize + Send + Sync,
@ -124,7 +124,7 @@ impl MyUser {
self.public_key(), self.public_key(),
self.private_key.clone().expect("has private key"), self.private_key.clone().expect("has private key"),
recipients, recipients,
local_instance, data.local_instance(),
) )
.await?; .await?;
Ok(()) Ok(())
@ -133,14 +133,14 @@ impl MyUser {
#[async_trait::async_trait] #[async_trait::async_trait]
impl ApubObject for MyUser { impl ApubObject for MyUser {
type DataType = InstanceHandle; type DataType = DatabaseHandle;
type ApubType = Person; type ApubType = Person;
type DbType = MyUser; type DbType = MyUser;
type Error = crate::error::Error; type Error = crate::error::Error;
async fn read_from_apub_id( async fn read_from_apub_id(
object_id: Url, object_id: Url,
data: &Self::DataType, data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error> { ) -> Result<Option<Self>, Self::Error> {
let users = data.users.lock().unwrap(); let users = data.users.lock().unwrap();
let res = users let res = users
@ -150,7 +150,10 @@ impl ApubObject for MyUser {
Ok(res) Ok(res)
} }
async fn into_apub(self, _data: &Self::DataType) -> Result<Self::ApubType, Self::Error> { async fn into_apub(
self,
_data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
Ok(Person { Ok(Person {
kind: Default::default(), kind: Default::default(),
id: self.ap_id.clone(), id: self.ap_id.clone(),
@ -161,8 +164,7 @@ impl ApubObject for MyUser {
async fn from_apub( async fn from_apub(
apub: Self::ApubType, apub: Self::ApubType,
_data: &Self::DataType, _data: &RequestData<Self::DataType>,
_request_counter: &mut i32,
) -> Result<Self, Self::Error> { ) -> Result<Self, Self::Error> {
Ok(MyUser { Ok(MyUser {
ap_id: apub.id, ap_id: apub.id,

View file

@ -3,8 +3,8 @@ use crate::{
traits::ActivityHandler, traits::ActivityHandler,
utils::reqwest_shim::ResponseExt, utils::reqwest_shim::ResponseExt,
Error, Error,
InstanceSettings, FederationSettings,
LocalInstance, InstanceConfig,
APUB_JSON_CONTENT_TYPE, APUB_JSON_CONTENT_TYPE,
}; };
use anyhow::anyhow; use anyhow::anyhow;
@ -44,7 +44,7 @@ pub async fn send_activity<Activity>(
public_key: PublicKey, public_key: PublicKey,
private_key: String, private_key: String,
recipients: Vec<Url>, recipients: Vec<Url>,
instance: &LocalInstance, instance: &InstanceConfig,
) -> Result<(), <Activity as ActivityHandler>::Error> ) -> Result<(), <Activity as ActivityHandler>::Error>
where where
Activity: ActivityHandler + Serialize, Activity: ActivityHandler + Serialize,
@ -211,7 +211,7 @@ fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
pub(crate) fn create_activity_queue( pub(crate) fn create_activity_queue(
client: ClientWithMiddleware, client: ClientWithMiddleware,
settings: &InstanceSettings, settings: &FederationSettings,
) -> Manager { ) -> Manager {
// queue is not used in debug mod, so dont create any workers to avoid log spam // queue is not used in debug mod, so dont create any workers to avoid log spam
let worker_count = if settings.debug { let worker_count = if settings.debug {

View file

@ -1,12 +1,9 @@
use crate::{ use crate::{
core::object_id::ObjectId, core::{object_id::ObjectId, signatures::verify_signature},
data::Data, request_data::RequestData,
traits::{ActivityHandler, Actor, ApubObject}, traits::{ActivityHandler, Actor, ApubObject},
Error, Error,
LocalInstance,
}; };
use crate::core::signatures::verify_signature;
use actix_web::{HttpRequest, HttpResponse}; use actix_web::{HttpRequest, HttpResponse};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use tracing::debug; use tracing::debug;
@ -15,8 +12,7 @@ use tracing::debug;
pub async fn receive_activity<Activity, ActorT, Datatype>( pub async fn receive_activity<Activity, ActorT, Datatype>(
request: HttpRequest, request: HttpRequest,
activity: Activity, activity: Activity,
local_instance: &LocalInstance, data: &RequestData<Datatype>,
data: &Data<Datatype>,
) -> Result<HttpResponse, <Activity as ActivityHandler>::Error> ) -> Result<HttpResponse, <Activity as ActivityHandler>::Error>
where where
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static, Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
@ -28,11 +24,12 @@ where
+ From<serde_json::Error>, + From<serde_json::Error>,
<ActorT as ApubObject>::Error: From<Error> + From<anyhow::Error>, <ActorT as ApubObject>::Error: From<Error> + From<anyhow::Error>,
{ {
local_instance.verify_url_and_domain(&activity).await?; data.local_instance()
.verify_url_and_domain(&activity)
.await?;
let request_counter = &mut 0;
let actor = ObjectId::<ActorT>::new(activity.actor().clone()) let actor = ObjectId::<ActorT>::new(activity.actor().clone())
.dereference(data, local_instance, request_counter) .dereference(data)
.await?; .await?;
verify_signature( verify_signature(
@ -43,6 +40,6 @@ where
)?; )?;
debug!("Receiving activity {}", activity.id().to_string()); debug!("Receiving activity {}", activity.id().to_string());
activity.receive(data, request_counter).await?; activity.receive(data).await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }

View file

@ -0,0 +1,74 @@
use crate::request_data::{ApubContext, ApubMiddleware, RequestData};
use actix_web::{
dev::{forward_ready, Payload, Service, ServiceRequest, ServiceResponse, Transform},
Error,
FromRequest,
HttpMessage,
HttpRequest,
};
use std::future::{ready, Ready};
impl<S, B, T> Transform<S, ServiceRequest> for ApubMiddleware<T>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
T: Clone + Sync + 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Transform = ApubService<S, T>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(ApubService {
service,
context: self.0.clone(),
}))
}
}
pub struct ApubService<S, T>
where
S: Service<ServiceRequest, Error = Error>,
S::Future: 'static,
T: Sync,
{
service: S,
context: ApubContext<T>,
}
impl<S, B, T> Service<ServiceRequest> for ApubService<S, T>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
T: Clone + Sync + 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = S::Future;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
req.extensions_mut().insert(self.context.clone());
self.service.call(req)
}
}
impl<T: Clone + 'static> FromRequest for RequestData<T> {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
ready(match req.extensions().get::<ApubContext<T>>() {
Some(c) => Ok(c.to_request_data()),
None => Err(actix_web::error::ErrorBadRequest(
"Missing extension, did you register ApubMiddleware?",
)),
})
}
}

View file

@ -1 +1,2 @@
pub mod inbox; pub mod inbox;
pub mod middleware;

View file

@ -1,9 +1,8 @@
use crate::{ use crate::{
core::{axum::DigestVerified, object_id::ObjectId, signatures::verify_signature}, core::{axum::DigestVerified, object_id::ObjectId, signatures::verify_signature},
data::Data, request_data::RequestData,
traits::{ActivityHandler, Actor, ApubObject}, traits::{ActivityHandler, Actor, ApubObject},
Error, Error,
LocalInstance,
}; };
use http::{HeaderMap, Method, Uri}; use http::{HeaderMap, Method, Uri};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
@ -13,8 +12,7 @@ use tracing::debug;
pub async fn receive_activity<Activity, ActorT, Datatype>( pub async fn receive_activity<Activity, ActorT, Datatype>(
_digest_verified: DigestVerified, _digest_verified: DigestVerified,
activity: Activity, activity: Activity,
local_instance: &LocalInstance, data: &RequestData<Datatype>,
data: &Data<Datatype>,
headers: HeaderMap, headers: HeaderMap,
method: Method, method: Method,
uri: Uri, uri: Uri,
@ -29,16 +27,17 @@ where
+ From<serde_json::Error>, + From<serde_json::Error>,
<ActorT as ApubObject>::Error: From<Error> + From<anyhow::Error>, <ActorT as ApubObject>::Error: From<Error> + From<anyhow::Error>,
{ {
local_instance.verify_url_and_domain(&activity).await?; data.local_instance()
.verify_url_and_domain(&activity)
.await?;
let request_counter = &mut 0;
let actor = ObjectId::<ActorT>::new(activity.actor().clone()) let actor = ObjectId::<ActorT>::new(activity.actor().clone())
.dereference(data, local_instance, request_counter) .dereference(data)
.await?; .await?;
verify_signature(&headers, &method, &uri, actor.public_key())?; verify_signature(&headers, &method, &uri, actor.public_key())?;
debug!("Receiving activity {}", activity.id().to_string()); debug!("Receiving activity {}", activity.id().to_string());
activity.receive(data, request_counter).await?; activity.receive(data).await?;
Ok(()) Ok(())
} }

View file

@ -0,0 +1,62 @@
use crate::request_data::{ApubContext, ApubMiddleware, RequestData};
use axum::{async_trait, body::Body, extract::FromRequestParts, http::Request, response::Response};
use http::{request::Parts, StatusCode};
use std::task::{Context, Poll};
use tower::{Layer, Service};
impl<S, T: Clone> Layer<S> for ApubMiddleware<T> {
type Service = ApubService<S, T>;
fn layer(&self, inner: S) -> Self::Service {
ApubService {
inner,
context: self.0.clone(),
}
}
}
#[derive(Clone)]
pub struct ApubService<S, T> {
inner: S,
context: ApubContext<T>,
}
impl<S, T> Service<Request<Body>> for ApubService<S, T>
where
S: Service<Request<Body>, Response = Response> + Send + 'static,
S::Future: Send + 'static,
T: Clone + Send + Sync + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut request: Request<Body>) -> Self::Future {
request.extensions_mut().insert(self.context.clone());
self.inner.call(request)
}
}
#[async_trait]
impl<S, T: Clone + 'static> FromRequestParts<S> for RequestData<T>
where
S: Send + Sync,
T: Send + Sync,
{
type Rejection = (StatusCode, &'static str);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
// TODO: need to set this extension from middleware
match parts.extensions.get::<ApubContext<T>>() {
Some(c) => Ok(c.to_request_data()),
None => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Missing extension, did you register ApubMiddleware?",
)),
}
}
}

View file

@ -11,6 +11,7 @@ use digest::{verify_sha256, DigestPart};
mod digest; mod digest;
pub mod inbox; pub mod inbox;
pub mod json; pub mod json;
pub mod middleware;
/// A request guard to ensure digest has been verified request has been /// A request guard to ensure digest has been verified request has been
/// see [`receive_activity`] /// see [`receive_activity`]

View file

View file

@ -1,4 +1,4 @@
use crate::{traits::ApubObject, utils::fetch_object_http, Error, LocalInstance}; use crate::{request_data::RequestData, traits::ApubObject, utils::fetch_object_http, Error};
use anyhow::anyhow; use anyhow::anyhow;
use chrono::{Duration as ChronoDuration, NaiveDateTime, Utc}; use chrono::{Duration as ChronoDuration, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -13,7 +13,7 @@ use url::Url;
#[serde(transparent)] #[serde(transparent)]
pub struct ObjectId<Kind>(Box<Url>, PhantomData<Kind>) pub struct ObjectId<Kind>(Box<Url>, PhantomData<Kind>)
where where
Kind: ApubObject + Send + 'static, Kind: ApubObject,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>; for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>;
impl<Kind> ObjectId<Kind> impl<Kind> ObjectId<Kind>
@ -39,9 +39,7 @@ where
/// Fetches an activitypub object, either from local database (if possible), or over http. /// Fetches an activitypub object, either from local database (if possible), or over http.
pub async fn dereference( pub async fn dereference(
&self, &self,
data: &<Kind as ApubObject>::DataType, data: &RequestData<<Kind as ApubObject>::DataType>,
instance: &LocalInstance,
request_counter: &mut i32,
) -> Result<Kind, <Kind as ApubObject>::Error> ) -> Result<Kind, <Kind as ApubObject>::Error>
where where
<Kind as ApubObject>::Error: From<Error> + From<anyhow::Error>, <Kind as ApubObject>::Error: From<Error> + From<anyhow::Error>,
@ -49,7 +47,7 @@ where
let db_object = self.dereference_from_db(data).await?; let db_object = self.dereference_from_db(data).await?;
// if its a local object, only fetch it from the database and not over http // if its a local object, only fetch it from the database and not over http
if instance.is_local_url(&self.0) { if data.local_instance().is_local_url(&self.0) {
return match db_object { return match db_object {
None => Err(Error::NotFound.into()), None => Err(Error::NotFound.into()),
Some(o) => Ok(o), Some(o) => Ok(o),
@ -61,17 +59,14 @@ where
// object is old and should be refetched // object is old and should be refetched
if let Some(last_refreshed_at) = object.last_refreshed_at() { if let Some(last_refreshed_at) = object.last_refreshed_at() {
if should_refetch_object(last_refreshed_at) { if should_refetch_object(last_refreshed_at) {
return self return self.dereference_from_http(data, Some(object)).await;
.dereference_from_http(data, instance, request_counter, Some(object))
.await;
} }
} }
Ok(object) Ok(object)
} }
// object not found, need to fetch over http // object not found, need to fetch over http
else { else {
self.dereference_from_http(data, instance, request_counter, None) self.dereference_from_http(data, None).await
.await
} }
} }
@ -79,7 +74,7 @@ where
/// the object is not found in the database. /// the object is not found in the database.
pub async fn dereference_local( pub async fn dereference_local(
&self, &self,
data: &<Kind as ApubObject>::DataType, data: &RequestData<<Kind as ApubObject>::DataType>,
) -> Result<Kind, <Kind as ApubObject>::Error> ) -> Result<Kind, <Kind as ApubObject>::Error>
where where
<Kind as ApubObject>::Error: From<Error>, <Kind as ApubObject>::Error: From<Error>,
@ -91,7 +86,7 @@ where
/// returning none means the object was not found in local db /// returning none means the object was not found in local db
async fn dereference_from_db( async fn dereference_from_db(
&self, &self,
data: &<Kind as ApubObject>::DataType, data: &RequestData<<Kind as ApubObject>::DataType>,
) -> Result<Option<Kind>, <Kind as ApubObject>::Error> { ) -> Result<Option<Kind>, <Kind as ApubObject>::Error> {
let id = self.0.clone(); let id = self.0.clone();
ApubObject::read_from_apub_id(*id, data).await ApubObject::read_from_apub_id(*id, data).await
@ -99,15 +94,13 @@ where
async fn dereference_from_http( async fn dereference_from_http(
&self, &self,
data: &<Kind as ApubObject>::DataType, data: &RequestData<<Kind as ApubObject>::DataType>,
instance: &LocalInstance,
request_counter: &mut i32,
db_object: Option<Kind>, db_object: Option<Kind>,
) -> Result<Kind, <Kind as ApubObject>::Error> ) -> Result<Kind, <Kind as ApubObject>::Error>
where where
<Kind as ApubObject>::Error: From<Error> + From<anyhow::Error>, <Kind as ApubObject>::Error: From<Error> + From<anyhow::Error>,
{ {
let res = fetch_object_http(&self.0, instance, request_counter).await; let res = fetch_object_http(&self.0, data).await;
if let Err(Error::ObjectDeleted) = &res { if let Err(Error::ObjectDeleted) = &res {
if let Some(db_object) = db_object { if let Some(db_object) = db_object {
@ -118,14 +111,14 @@ where
let res2 = res?; let res2 = res?;
Kind::from_apub(res2, data, request_counter).await Kind::from_apub(res2, data).await
} }
} }
/// Need to implement clone manually, to avoid requiring Kind to be Clone /// Need to implement clone manually, to avoid requiring Kind to be Clone
impl<Kind> Clone for ObjectId<Kind> impl<Kind> Clone for ObjectId<Kind>
where where
Kind: ApubObject + Send + 'static, Kind: ApubObject,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>, for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
@ -155,7 +148,7 @@ fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool {
impl<Kind> Display for ObjectId<Kind> impl<Kind> Display for ObjectId<Kind>
where where
Kind: ApubObject + Send + 'static, Kind: ApubObject,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>, for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{ {
#[allow(clippy::recursive_format_impl)] #[allow(clippy::recursive_format_impl)]
@ -167,7 +160,7 @@ where
impl<Kind> From<ObjectId<Kind>> for Url impl<Kind> From<ObjectId<Kind>> for Url
where where
Kind: ApubObject + Send + 'static, Kind: ApubObject,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>, for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{ {
fn from(id: ObjectId<Kind>) -> Self { fn from(id: ObjectId<Kind>) -> Self {
@ -175,10 +168,20 @@ where
} }
} }
impl<Kind> PartialEq for ObjectId<Kind> impl<Kind> From<Url> for ObjectId<Kind>
where where
Kind: ApubObject + Send + 'static, Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>, for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn from(url: Url) -> Self {
ObjectId::new(url)
}
}
impl<Kind> PartialEq for ObjectId<Kind>
where
Kind: ApubObject,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{ {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0) && self.1 == other.1 self.0.eq(&other.0) && self.1 == other.1
@ -191,7 +194,7 @@ mod tests {
use crate::core::object_id::should_refetch_object; use crate::core::object_id::should_refetch_object;
use anyhow::Error; use anyhow::Error;
#[derive(Debug)] #[derive(Debug, Clone)]
struct TestObject {} struct TestObject {}
#[async_trait::async_trait] #[async_trait::async_trait]
@ -203,7 +206,7 @@ mod tests {
async fn read_from_apub_id( async fn read_from_apub_id(
_object_id: Url, _object_id: Url,
_data: &Self::DataType, _data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error> ) -> Result<Option<Self>, Self::Error>
where where
Self: Sized, Self: Sized,
@ -211,14 +214,16 @@ mod tests {
todo!() todo!()
} }
async fn into_apub(self, _data: &Self::DataType) -> Result<Self::ApubType, Self::Error> { async fn into_apub(
self,
_data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
todo!() todo!()
} }
async fn from_apub( async fn from_apub(
_apub: Self::ApubType, _apub: Self::ApubType,
_data: &Self::DataType, _data: &RequestData<Self::DataType>,
_request_counter: &mut i32,
) -> Result<Self, Self::Error> ) -> Result<Self, Self::Error>
where where
Self: Sized, Self: Sized,

View file

@ -1,37 +0,0 @@
use std::{ops::Deref, sync::Arc};
/// This type can be used to pass your own data into library functions and traits. It can be useful
/// to pass around database connections or other context.
#[derive(Debug)]
pub struct Data<T: ?Sized>(Arc<T>);
impl<T> Data<T> {
/// Create new `Data` instance.
pub fn new(state: T) -> Data<T> {
Data(Arc::new(state))
}
/// Get reference to inner app data.
pub fn get_ref(&self) -> &T {
self.0.as_ref()
}
/// Convert to the internal Arc<T>
pub fn into_inner(self) -> Arc<T> {
self.0
}
}
impl<T: ?Sized> Deref for Data<T> {
type Target = Arc<T>;
fn deref(&self) -> &Arc<T> {
&self.0
}
}
impl<T: ?Sized> Clone for Data<T> {
fn clone(&self) -> Data<T> {
Data(self.0.clone())
}
}

View file

@ -1,4 +1,8 @@
use crate::{data::Data, deser::helpers::deserialize_one_or_many, traits::ActivityHandler}; use crate::{
deser::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::ActivityHandler,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use std::str::FromStr; use std::str::FromStr;
@ -48,11 +52,7 @@ where
self.inner.actor() self.inner.actor()
} }
async fn receive( async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self, self.inner.receive(data).await
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
self.inner.receive(data, request_counter).await
} }
} }

View file

@ -13,24 +13,24 @@ use std::time::Duration;
use url::Url; use url::Url;
pub mod core; pub mod core;
pub mod data;
pub mod deser; pub mod deser;
pub mod request_data;
pub mod traits; pub mod traits;
pub mod utils; pub mod utils;
/// Mime type for Activitypub, used for `Accept` and `Content-Type` HTTP headers /// Mime type for Activitypub, used for `Accept` and `Content-Type` HTTP headers
pub static APUB_JSON_CONTENT_TYPE: &str = "application/activity+json"; pub static APUB_JSON_CONTENT_TYPE: &str = "application/activity+json";
/// Represents a single, federated instance (for example lemmy.ml). There should only be one of /// Represents configuration for a single, federated instance. There should usually be only one of
/// this in your application (except for testing). /// this per application.
pub struct LocalInstance { pub struct InstanceConfig {
hostname: String, hostname: String,
client: ClientWithMiddleware, client: ClientWithMiddleware,
activity_queue: Manager, activity_queue: Manager,
settings: InstanceSettings, settings: FederationSettings,
} }
impl LocalInstance { impl InstanceConfig {
async fn verify_url_and_domain<Activity, Datatype>( async fn verify_url_and_domain<Activity, Datatype>(
&self, &self,
activity: &Activity, activity: &Activity,
@ -92,9 +92,11 @@ pub trait UrlVerifier: DynClone + Send {
} }
clone_trait_object!(UrlVerifier); clone_trait_object!(UrlVerifier);
// Use InstanceSettingsBuilder to initialize this /// Various settings related to Activitypub federation.
///
/// Use [FederationSettings.builder()] to initialize this.
#[derive(Builder)] #[derive(Builder)]
pub struct InstanceSettings { pub struct FederationSettings {
/// Maximum number of outgoing HTTP requests per incoming activity /// Maximum number of outgoing HTTP requests per incoming activity
#[builder(default = "20")] #[builder(default = "20")]
http_fetch_limit: i32, http_fetch_limit: i32,
@ -124,9 +126,9 @@ pub struct InstanceSettings {
http_signature_compat: bool, http_signature_compat: bool,
} }
impl InstanceSettings { impl FederationSettings {
/// Returns a new settings builder. /// Returns a new settings builder.
pub fn builder() -> InstanceSettingsBuilder { pub fn builder() -> FederationSettingsBuilder {
<_>::default() <_>::default()
} }
} }
@ -141,10 +143,10 @@ impl UrlVerifier for DefaultUrlVerifier {
} }
} }
impl LocalInstance { impl InstanceConfig {
pub fn new(domain: String, client: ClientWithMiddleware, settings: InstanceSettings) -> Self { pub fn new(domain: String, client: ClientWithMiddleware, settings: FederationSettings) -> Self {
let activity_queue = create_activity_queue(client.clone(), &settings); let activity_queue = create_activity_queue(client.clone(), &settings);
LocalInstance { InstanceConfig {
hostname: domain, hostname: domain,
client, client,
activity_queue, activity_queue,

76
src/request_data.rs Normal file
View file

@ -0,0 +1,76 @@
use crate::InstanceConfig;
use std::{
ops::Deref,
sync::{atomic::AtomicI32, Arc},
};
/// Stores context data which is necessary for the library to work.
#[derive(Clone)]
pub struct ApubContext<T> {
/// Data which the application requires in handlers, such as database connection
/// or configuration.
application_data: Arc<T>,
/// Configuration of this library.
pub(crate) local_instance: Arc<InstanceConfig>,
}
impl<T: Clone> ApubContext<T> {
pub fn new(state: T, local_instance: InstanceConfig) -> ApubContext<T> {
ApubContext {
application_data: Arc::new(state),
local_instance: Arc::new(local_instance),
}
}
pub fn local_instance(&self) -> &InstanceConfig {
self.local_instance.deref()
}
/// Create new [RequestData] from this. You should prefer to use a middleware if possible.
pub fn to_request_data(&self) -> RequestData<T> {
RequestData {
apub_context: self.clone(),
request_counter: AtomicI32::default(),
}
}
}
/// Stores data for handling one specific HTTP request. Most importantly this contains a
/// counter for outgoing HTTP requests. This is necessary to prevent denial of service attacks,
/// where an attacker triggers fetching of recursive objects.
///
/// https://www.w3.org/TR/activitypub/#security-recursive-objects
pub struct RequestData<T> {
pub(crate) apub_context: ApubContext<T>,
pub(crate) request_counter: AtomicI32,
}
impl<T> RequestData<T> {
pub fn local_instance(&self) -> &InstanceConfig {
self.apub_context.local_instance.deref()
}
}
impl<T: Clone> Deref for ApubContext<T> {
type Target = T;
fn deref(&self) -> &T {
&self.application_data
}
}
impl<T: Clone> Deref for RequestData<T> {
type Target = T;
fn deref(&self) -> &T {
&self.apub_context.application_data
}
}
#[derive(Clone)]
pub struct ApubMiddleware<T: Clone>(pub(crate) ApubContext<T>);
impl<T: Clone> ApubMiddleware<T> {
pub fn new(apub_context: ApubContext<T>) -> Self {
ApubMiddleware(apub_context)
}
}

View file

View file

@ -1,4 +1,4 @@
use crate::data::Data; use crate::request_data::RequestData;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use std::ops::Deref; use std::ops::Deref;
use url::Url; use url::Url;
@ -17,18 +17,14 @@ pub trait ActivityHandler {
fn actor(&self) -> &Url; fn actor(&self) -> &Url;
/// Receives the activity and stores its action in database. /// Receives the activity and stores its action in database.
async fn receive( async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error>;
self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error>;
} }
/// Allow for boxing of enum variants /// Allow for boxing of enum variants
#[async_trait::async_trait] #[async_trait::async_trait]
impl<T> ActivityHandler for Box<T> impl<T> ActivityHandler for Box<T>
where where
T: ActivityHandler + Send + Sync, T: ActivityHandler + Send,
{ {
type DataType = T::DataType; type DataType = T::DataType;
type Error = T::Error; type Error = T::Error;
@ -41,12 +37,8 @@ where
self.deref().actor() self.deref().actor()
} }
async fn receive( async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
self, (*self).receive(data).await
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
(*self).receive(data, request_counter).await
} }
} }
@ -66,14 +58,14 @@ pub trait ApubObject {
/// Try to read the object with given ID from local database. Returns Ok(None) if it doesn't exist. /// Try to read the object with given ID from local database. Returns Ok(None) if it doesn't exist.
async fn read_from_apub_id( async fn read_from_apub_id(
object_id: Url, object_id: Url,
data: &Self::DataType, data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error> ) -> Result<Option<Self>, Self::Error>
where where
Self: Sized; Self: Sized;
/// Marks the object as deleted in local db. Called when a delete activity is received, or if /// Marks the object as deleted in local db. Called when a delete activity is received, or if
/// fetch returns a tombstone. /// fetch returns a tombstone.
async fn delete(self, _data: &Self::DataType) -> Result<(), Self::Error> async fn delete(self, _data: &RequestData<Self::DataType>) -> Result<(), Self::Error>
where where
Self: Sized, Self: Sized,
{ {
@ -81,7 +73,10 @@ pub trait ApubObject {
} }
/// Trait for converting an object or actor into the respective ActivityPub type. /// Trait for converting an object or actor into the respective ActivityPub type.
async fn into_apub(self, data: &Self::DataType) -> Result<Self::ApubType, Self::Error>; async fn into_apub(
self,
data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error>;
/// Converts an object from ActivityPub type to Lemmy internal type. /// Converts an object from ActivityPub type to Lemmy internal type.
/// ///
@ -91,8 +86,7 @@ pub trait ApubObject {
/// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case /// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case
async fn from_apub( async fn from_apub(
apub: Self::ApubType, apub: Self::ApubType,
data: &Self::DataType, data: &RequestData<Self::DataType>,
request_counter: &mut i32,
) -> Result<Self, Self::Error> ) -> Result<Self, Self::Error>
where where
Self: Sized; Self: Sized;

View file

@ -1,24 +1,29 @@
use crate::{utils::reqwest_shim::ResponseExt, Error, LocalInstance, APUB_JSON_CONTENT_TYPE}; use crate::{
request_data::RequestData,
utils::reqwest_shim::ResponseExt,
Error,
APUB_JSON_CONTENT_TYPE,
};
use http::{header::HeaderName, HeaderValue, StatusCode}; use http::{header::HeaderName, HeaderValue, StatusCode};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use std::collections::BTreeMap; use std::{collections::BTreeMap, sync::atomic::Ordering};
use tracing::info; use tracing::info;
use url::Url; use url::Url;
pub(crate) mod reqwest_shim; pub(crate) mod reqwest_shim;
pub async fn fetch_object_http<Kind: DeserializeOwned>( pub async fn fetch_object_http<T, Kind: DeserializeOwned>(
url: &Url, url: &Url,
instance: &LocalInstance, data: &RequestData<T>,
request_counter: &mut i32,
) -> Result<Kind, Error> { ) -> Result<Kind, Error> {
let instance = &data.local_instance();
// dont fetch local objects this way // dont fetch local objects this way
debug_assert!(url.domain() != Some(&instance.hostname)); debug_assert!(url.domain() != Some(&instance.hostname));
instance.verify_url_valid(url).await?; instance.verify_url_valid(url).await?;
info!("Fetching remote object {}", url.to_string()); info!("Fetching remote object {}", url.to_string());
*request_counter += 1; let counter = data.request_counter.fetch_add(1, Ordering::SeqCst);
if *request_counter > instance.settings.http_fetch_limit { if counter > instance.settings.http_fetch_limit {
return Err(Error::RequestLimit); return Err(Error::RequestLimit);
} }