Remove activity queue and add raw sending (#75)

* make prepare_raw, sign_raw, send_raw functions public

* remove in-memory activity queue

* rename module

* comment

* don"t clone

* fix doc comment

* remove send_activity function

---------

Co-authored-by: Nutomic <me@nutomic.com>
This commit is contained in:
phiresky 2023-09-01 11:19:22 +02:00 committed by GitHub
parent 9477180b4e
commit 51443aa57c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 340 additions and 725 deletions

View file

@ -56,6 +56,8 @@ axum = { version = "0.6.18", features = [
], default-features = false, optional = true } ], default-features = false, optional = true }
tower = { version = "0.4.13", optional = true } tower = { version = "0.4.13", optional = true }
hyper = { version = "0.14", optional = true } hyper = { version = "0.14", optional = true }
futures = "0.3.28"
moka = { version = "0.11.2", features = ["future"] }
[features] [features]
default = ["actix-web", "axum"] default = ["actix-web", "axum"]

View file

@ -14,4 +14,4 @@ let config = FederationConfig::builder()
# }).unwrap() # }).unwrap()
``` ```
`debug` is necessary to test federation with http and localhost URLs, but it should never be used in production. The `worker_count` value can be adjusted depending on the instance size. A lower value saves resources on a small instance, while a higher value is necessary on larger instances to keep up with send jobs. `url_verifier` can be used to implement a domain blacklist. `debug` is necessary to test federation with http and localhost URLs, but it should never be used in production. `url_verifier` can be used to implement a domain blacklist.

View file

@ -4,7 +4,7 @@ To send an activity we need to initialize our previously defined struct, and pic
``` ```
# use activitypub_federation::config::FederationConfig; # use activitypub_federation::config::FederationConfig;
# use activitypub_federation::activity_queue::send_activity; # use activitypub_federation::activity_sending::SendActivityTask;
# use activitypub_federation::http_signatures::generate_actor_keypair; # use activitypub_federation::http_signatures::generate_actor_keypair;
# use activitypub_federation::traits::Actor; # use activitypub_federation::traits::Actor;
# use activitypub_federation::fetch::object_id::ObjectId; # use activitypub_federation::fetch::object_id::ObjectId;
@ -25,7 +25,11 @@ let activity = Follow {
id: "https://lemmy.ml/activities/321".try_into()? id: "https://lemmy.ml/activities/321".try_into()?
}; };
let inboxes = vec![recipient.shared_inbox_or_inbox()]; let inboxes = vec![recipient.shared_inbox_or_inbox()];
send_activity(activity, &sender, inboxes, &data).await?;
let sends = SendActivityTask::prepare(&activity, &sender, inboxes, &data).await?;
for send in sends {
send.sign_and_send(&data).await?;
}
# Ok::<(), anyhow::Error>(()) # Ok::<(), anyhow::Error>(())
# }).unwrap() # }).unwrap()
``` ```
@ -38,6 +42,7 @@ private key. Finally the activity is delivered to the inbox.
It is possible that delivery fails because the target instance is temporarily unreachable. In It is possible that delivery fails because the target instance is temporarily unreachable. In
this case the task is scheduled for retry after a certain waiting time. For each task delivery this case the task is scheduled for retry after a certain waiting time. For each task delivery
is retried up to 3 times after the initial attempt. The retry intervals are as follows: is retried up to 3 times after the initial attempt. The retry intervals are as follows:
- one minute, in case of service restart - one minute, in case of service restart
- one hour, in case of instance maintenance - one hour, in case of instance maintenance
- 2.5 days, in case of major incident with rebuild from backup - 2.5 days, in case of major incident with rebuild from backup

View file

@ -6,7 +6,7 @@ use crate::{
DbPost, DbPost,
}; };
use activitypub_federation::{ use activitypub_federation::{
activity_queue::send_activity, activity_sending::SendActivityTask,
config::Data, config::Data,
fetch::object_id::ObjectId, fetch::object_id::ObjectId,
kinds::activity::CreateType, kinds::activity::CreateType,
@ -39,7 +39,12 @@ impl CreatePost {
id: generate_object_id(data.domain())?, id: generate_object_id(data.domain())?,
}; };
let create_with_context = WithContext::new_default(create); let create_with_context = WithContext::new_default(create);
send_activity(create_with_context, &data.local_user(), vec![inbox], data).await?; let sends =
SendActivityTask::prepare(&create_with_context, &data.local_user(), vec![inbox], data)
.await?;
for send in sends {
send.sign_and_send(data).await?;
}
Ok(()) Ok(())
} }
} }

View file

@ -6,7 +6,7 @@ use crate::{
utils::generate_object_id, utils::generate_object_id,
}; };
use activitypub_federation::{ use activitypub_federation::{
activity_queue::send_activity, activity_sending::SendActivityTask,
config::Data, config::Data,
fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor}, fetch::{object_id::ObjectId, webfinger::webfinger_resolve_actor},
http_signatures::generate_actor_keypair, http_signatures::generate_actor_keypair,
@ -113,7 +113,10 @@ impl DbUser {
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>, <Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
{ {
let activity = WithContext::new_default(activity); let activity = WithContext::new_default(activity);
send_activity(activity, self, recipients, data).await?; let sends = SendActivityTask::prepare(&activity, self, recipients, data).await?;
for send in sends {
send.sign_and_send(data).await?;
}
Ok(()) Ok(())
} }
} }

View file

@ -1,667 +0,0 @@
//! Queue for signing and sending outgoing activities with retry
//!
#![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{
config::Data,
error::Error,
http_signatures::sign_request,
reqwest_shim::ResponseExt,
traits::{ActivityHandler, Actor},
FEDERATION_CONTENT_TYPE,
};
use anyhow::{anyhow, Context};
use bytes::Bytes;
use futures_core::Future;
use http::{header::HeaderName, HeaderMap, HeaderValue};
use httpdate::fmt_http_date;
use itertools::Itertools;
use openssl::pkey::{PKey, Private};
use reqwest::Request;
use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize;
use std::{
fmt::{Debug, Display},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, SystemTime},
};
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedSender},
task::{JoinHandle, JoinSet},
};
use tracing::{debug, info, warn};
use url::Url;
/// Send a new activity to the given inboxes
///
/// - `activity`: The activity to be sent, gets converted to json
/// - `private_key`: Private key belonging to the actor who sends the activity, for signing HTTP
/// signature. Generated with [crate::http_signatures::generate_actor_keypair].
/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor
/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox]
/// for each target actor.
pub async fn send_activity<Activity, Datatype, ActorType>(
activity: Activity,
actor: &ActorType,
inboxes: Vec<Url>,
data: &Data<Datatype>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize,
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
Datatype: Clone,
ActorType: Actor,
{
let config = &data.config;
let actor_id = activity.actor();
let activity_id = activity.id();
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
let private_key_pem = actor
.private_key_pem()
.ok_or_else(|| anyhow!("Actor {actor_id} does not contain a private key for signing"))?;
// This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening
let private_key = tokio::task::spawn_blocking(move || {
PKey::private_key_from_pem(private_key_pem.as_bytes())
.map_err(|err| anyhow!("Could not create private key from PEM data:{err}"))
})
.await
.map_err(|err| anyhow!("Error joining:{err}"))??;
let inboxes: Vec<Url> = inboxes
.into_iter()
.unique()
.filter(|i| !config.is_local_url(i))
.collect();
// This field is only optional to make builder work, its always present at this point
let activity_queue = config
.activity_queue
.as_ref()
.expect("Config has activity queue");
for inbox in inboxes {
if let Err(err) = config.verify_url_valid(&inbox).await {
debug!("inbox url invalid, skipping: {inbox}: {err}");
continue;
}
let message = SendActivityTask {
actor_id: actor_id.clone(),
activity_id: activity_id.clone(),
inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
};
// Don't use the activity queue if this is in debug mode, send and wait directly
if config.debug {
if let Err(err) = sign_and_send(
&message,
&config.client,
config.request_timeout,
Default::default(),
)
.await
{
warn!("{err}");
}
} else {
activity_queue.queue(message).await?;
let stats = activity_queue.get_stats();
let running = stats.running.load(Ordering::Relaxed);
if running == config.worker_count && config.worker_count != 0 {
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
warn!("{:?}", stats);
} else {
info!("{:?}", stats);
}
}
}
Ok(())
}
#[derive(Clone, Debug)]
struct SendActivityTask {
actor_id: Url,
activity_id: Url,
activity: Bytes,
inbox: Url,
private_key: PKey<Private>,
http_signature_compat: bool,
}
async fn sign_and_send(
task: &SendActivityTask,
client: &ClientWithMiddleware,
timeout: Duration,
retry_strategy: RetryStrategy,
) -> Result<(), anyhow::Error> {
debug!(
"Sending {} to {}, contents:\n {}",
task.activity_id,
task.inbox,
serde_json::from_slice::<serde_json::Value>(&task.activity)?
);
let request_builder = client
.post(task.inbox.to_string())
.timeout(timeout)
.headers(generate_request_headers(&task.inbox));
let request = sign_request(
request_builder,
&task.actor_id,
task.activity.clone(),
task.private_key.clone(),
task.http_signature_compat,
)
.await
.context("signing request")?;
retry(
|| {
send(
task,
client,
request
.try_clone()
.expect("The body of the request is not cloneable"),
)
},
retry_strategy,
)
.await
}
async fn send(
task: &SendActivityTask,
client: &ClientWithMiddleware,
request: Request,
) -> Result<(), anyhow::Error> {
let response = client.execute(request).await;
match response {
Ok(o) if o.status().is_success() => {
debug!(
"Activity {} delivered successfully to {}",
task.activity_id, task.inbox
);
Ok(())
}
Ok(o) if o.status().is_client_error() => {
let text = o.text_limited().await.map_err(Error::other)?;
debug!(
"Activity {} was rejected by {}, aborting: {}",
task.activity_id, task.inbox, text,
);
Ok(())
}
Ok(o) => {
let status = o.status();
let text = o.text_limited().await.map_err(Error::other)?;
Err(anyhow!(
"Queueing activity {} to {} for retry after failure with status {}: {}",
task.activity_id,
task.inbox,
status,
text,
))
}
Err(e) => Err(anyhow!(
"Queueing activity {} to {} for retry after connection failure: {}",
task.activity_id,
task.inbox,
e
)),
}
}
pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
if let Some(port) = inbox_url.port() {
host = format!("{}:{}", host, port);
}
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static(FEDERATION_CONTENT_TYPE),
);
headers.insert(
HeaderName::from_static("host"),
HeaderValue::from_str(&host).expect("Hostname is valid"),
);
headers.insert(
"date",
HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"),
);
headers
}
/// A simple activity queue which spawns tokio workers to send out requests
/// When creating a queue, it will spawn a task per worker thread
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
pub(crate) struct ActivityQueue {
// Stats shared between the queue and workers
stats: Arc<Stats>,
sender: UnboundedSender<SendActivityTask>,
sender_task: JoinHandle<()>,
retry_sender_task: JoinHandle<()>,
}
/// Simple stat counter to show where we're up to with sending messages
/// This is a lock-free way to share things between tasks
/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning
#[derive(Default)]
pub(crate) struct Stats {
pending: AtomicUsize,
running: AtomicUsize,
retries: AtomicUsize,
dead_last_hour: AtomicUsize,
completed_last_hour: AtomicUsize,
}
impl Debug for Stats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Activity queue stats: pending: {}, running: {}, retries: {}, dead: {}, complete: {}",
self.pending.load(Ordering::Relaxed),
self.running.load(Ordering::Relaxed),
self.retries.load(Ordering::Relaxed),
self.dead_last_hour.load(Ordering::Relaxed),
self.completed_last_hour.load(Ordering::Relaxed)
)
}
}
#[derive(Clone, Copy, Default)]
struct RetryStrategy {
/// Amount of time in seconds to back off
backoff: usize,
/// Amount of times to retry
retries: usize,
/// If this particular request has already been retried, you can add an offset here to increment the count to start
offset: usize,
/// Number of seconds to sleep before trying
initial_sleep: usize,
}
/// A tokio spawned worker which is responsible for submitting requests to federated servers
/// This will retry up to one time with the same signature, and if it fails, will move it to the retry queue.
/// We need to retry activity sending in case the target instances is temporarily unreachable.
/// In this case, the task is stored and resent when the instance is hopefully back up. This
/// list shows the retry intervals, and which events of the target instance can be covered:
/// - 60s (one minute, service restart) -- happens in the worker w/ same signature
/// - 60min (one hour, instance maintenance) --- happens in the retry worker
/// - 60h (2.5 days, major incident with rebuild from backup) --- happens in the retry worker
async fn worker(
client: ClientWithMiddleware,
timeout: Duration,
message: SendActivityTask,
retry_queue: UnboundedSender<SendActivityTask>,
stats: Arc<Stats>,
strategy: RetryStrategy,
) {
stats.pending.fetch_sub(1, Ordering::Relaxed);
stats.running.fetch_add(1, Ordering::Relaxed);
let outcome = sign_and_send(&message, &client, timeout, strategy).await;
// "Running" has finished, check the outcome
stats.running.fetch_sub(1, Ordering::Relaxed);
match outcome {
Ok(_) => {
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
}
Err(_err) => {
stats.retries.fetch_add(1, Ordering::Relaxed);
warn!(
"Sending activity {} to {} to the retry queue to be tried again later",
message.activity_id, message.inbox
);
// Send to the retry queue. Ignoring whether it succeeds or not
retry_queue.send(message).ok();
}
}
}
async fn retry_worker(
client: ClientWithMiddleware,
timeout: Duration,
message: SendActivityTask,
stats: Arc<Stats>,
strategy: RetryStrategy,
) {
// Because the times are pretty extravagant between retries, we have to re-sign each time
let outcome = retry(
|| {
sign_and_send(
&message,
&client,
timeout,
RetryStrategy {
backoff: 0,
retries: 0,
offset: 0,
initial_sleep: 0,
},
)
},
strategy,
)
.await;
stats.retries.fetch_sub(1, Ordering::Relaxed);
match outcome {
Ok(_) => {
stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
}
Err(_err) => {
stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
}
}
}
impl ActivityQueue {
fn new(
client: ClientWithMiddleware,
worker_count: usize,
retry_count: usize,
timeout: Duration,
backoff: usize, // This should be 60 seconds by default or 1 second in tests
) -> Self {
let stats: Arc<Stats> = Default::default();
// This task clears the dead/completed stats every hour
let hour_stats = stats.clone();
tokio::spawn(async move {
let duration = Duration::from_secs(3600);
loop {
tokio::time::sleep(duration).await;
hour_stats.completed_last_hour.store(0, Ordering::Relaxed);
hour_stats.dead_last_hour.store(0, Ordering::Relaxed);
}
});
let (retry_sender, mut retry_receiver) = unbounded_channel();
let retry_stats = stats.clone();
let retry_client = client.clone();
// The "fast path" retry
// The backoff should be < 5 mins for this to work otherwise signatures may expire
// This strategy is the one that is used with the *same* signature
let strategy = RetryStrategy {
backoff,
retries: 1,
offset: 0,
initial_sleep: 0,
};
// The "retry path" strategy
// After the fast path fails, a task will sleep up to backoff ^ 2 and then retry again
let retry_strategy = RetryStrategy {
backoff,
retries: 3,
offset: 2,
initial_sleep: backoff.pow(2), // wait 60 mins before even trying
};
let retry_sender_task = tokio::spawn(async move {
let mut join_set = JoinSet::new();
while let Some(message) = retry_receiver.recv().await {
let retry_task = retry_worker(
retry_client.clone(),
timeout,
message,
retry_stats.clone(),
retry_strategy,
);
if retry_count > 0 {
// If we're over the limit of retries, wait for them to finish before spawning
while join_set.len() >= retry_count {
join_set.join_next().await;
}
join_set.spawn(retry_task);
} else {
// If the retry worker count is `0` then just spawn and don't use the join_set
tokio::spawn(retry_task);
}
}
while !join_set.is_empty() {
join_set.join_next().await;
}
});
let (sender, mut receiver) = unbounded_channel();
let sender_stats = stats.clone();
let sender_task = tokio::spawn(async move {
let mut join_set = JoinSet::new();
while let Some(message) = receiver.recv().await {
let task = worker(
client.clone(),
timeout,
message,
retry_sender.clone(),
sender_stats.clone(),
strategy,
);
if worker_count > 0 {
// If we're over the limit of workers, wait for them to finish before spawning
while join_set.len() >= worker_count {
join_set.join_next().await;
}
join_set.spawn(task);
} else {
// If the worker count is `0` then just spawn and don't use the join_set
tokio::spawn(task);
}
}
drop(retry_sender);
while !join_set.is_empty() {
join_set.join_next().await;
}
});
Self {
stats,
sender,
sender_task,
retry_sender_task,
}
}
async fn queue(&self, message: SendActivityTask) -> Result<(), anyhow::Error> {
self.stats.pending.fetch_add(1, Ordering::Relaxed);
self.sender.send(message)?;
Ok(())
}
fn get_stats(&self) -> &Stats {
&self.stats
}
#[allow(unused)]
// Drops all the senders and shuts down the workers
pub(crate) async fn shutdown(
self,
wait_for_retries: bool,
) -> Result<Arc<Stats>, anyhow::Error> {
drop(self.sender);
self.sender_task.await?;
if wait_for_retries {
self.retry_sender_task.await?;
}
Ok(self.stats)
}
}
/// Creates an activity queue using tokio spawned tasks
/// Note: requires a tokio runtime
pub(crate) fn create_activity_queue(
client: ClientWithMiddleware,
worker_count: usize,
retry_count: usize,
request_timeout: Duration,
) -> ActivityQueue {
ActivityQueue::new(client, worker_count, retry_count, request_timeout, 60)
}
/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries
async fn retry<T, E: Display + Debug, F: Future<Output = Result<T, E>>, A: FnMut() -> F>(
mut action: A,
strategy: RetryStrategy,
) -> Result<T, E> {
let mut count = strategy.offset;
// Do an initial sleep if it's called for
if strategy.initial_sleep > 0 {
let sleep_dur = Duration::from_secs(strategy.initial_sleep as u64);
tokio::time::sleep(sleep_dur).await;
}
loop {
match action().await {
Ok(val) => return Ok(val),
Err(err) => {
if count < strategy.retries {
count += 1;
let sleep_amt = strategy.backoff.pow(count as u32) as u64;
let sleep_dur = Duration::from_secs(sleep_amt);
warn!("{err:?}. Sleeping for {sleep_dur:?} and trying again");
tokio::time::sleep(sleep_dur).await;
continue;
} else {
return Err(err);
}
}
}
}
}
#[cfg(test)]
mod tests {
use axum::extract::State;
use bytes::Bytes;
use http::StatusCode;
use std::time::Instant;
use crate::http_signatures::generate_actor_keypair;
use super::*;
#[allow(unused)]
// This will periodically send back internal errors to test the retry
async fn dodgy_handler(
State(state): State<Arc<AtomicUsize>>,
headers: HeaderMap,
body: Bytes,
) -> Result<(), StatusCode> {
debug!("Headers:{:?}", headers);
debug!("Body len:{}", body.len());
if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
Ok(())
}
async fn test_server() {
use axum::{routing::post, Router};
// We should break every now and then ;)
let state = Arc::new(AtomicUsize::new(0));
let app = Router::new()
.route("/", post(dodgy_handler))
.with_state(state);
axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
// Queues 100 messages and then asserts that the worker runs them
async fn test_activity_queue_workers() {
let num_workers = 64;
let num_messages: usize = 100;
tokio::spawn(test_server());
/*
// uncomment for debug logs & stats
use tracing::log::LevelFilter;
env_logger::builder()
.filter_level(LevelFilter::Warn)
.filter_module("activitypub_federation", LevelFilter::Info)
.format_timestamp(None)
.init();
*/
let activity_queue = ActivityQueue::new(
reqwest::Client::default().into(),
num_workers,
num_workers,
Duration::from_secs(10),
1,
);
let keypair = generate_actor_keypair().unwrap();
let message = SendActivityTask {
actor_id: "http://localhost:8001".parse().unwrap(),
activity_id: "http://localhost:8001/activity".parse().unwrap(),
activity: "{}".into(),
inbox: "http://localhost:8001".parse().unwrap(),
private_key: keypair.private_key().unwrap(),
http_signature_compat: true,
};
let start = Instant::now();
for _ in 0..num_messages {
activity_queue.queue(message.clone()).await.unwrap();
}
info!("Queue Sent: {:?}", start.elapsed());
let stats = activity_queue.shutdown(true).await.unwrap();
info!(
"Queue Finished. Num msgs: {}, Time {:?}, msg/s: {:0.0}",
num_messages,
start.elapsed(),
num_messages as f64 / start.elapsed().as_secs_f64()
);
assert_eq!(
stats.completed_last_hour.load(Ordering::Relaxed),
num_messages
);
}
}

300
src/activity_sending.rs Normal file
View file

@ -0,0 +1,300 @@
//! Queue for signing and sending outgoing activities with retry
//!
#![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{
config::Data,
error::Error,
http_signatures::sign_request,
reqwest_shim::ResponseExt,
traits::{ActivityHandler, Actor},
FEDERATION_CONTENT_TYPE,
};
use anyhow::{anyhow, Context};
use bytes::Bytes;
use futures::StreamExt;
use http::{header::HeaderName, HeaderMap, HeaderValue};
use httpdate::fmt_http_date;
use itertools::Itertools;
use openssl::pkey::{PKey, Private};
use reqwest::Request;
use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize;
use std::{
self,
fmt::{Debug, Display},
time::{Duration, SystemTime},
};
use tracing::debug;
use url::Url;
#[derive(Clone, Debug)]
/// all info needed to send one activity to one inbox
pub struct SendActivityTask<'a> {
actor_id: &'a Url,
activity_id: &'a Url,
activity: Bytes,
inbox: Url,
private_key: PKey<Private>,
http_signature_compat: bool,
}
impl Display for SendActivityTask<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} to {}", self.activity_id, self.inbox)
}
}
impl SendActivityTask<'_> {
/// prepare an activity for sending
///
/// - `activity`: The activity to be sent, gets converted to json
/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor
/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox]
/// for each target actor.
pub async fn prepare<'a, Activity, Datatype, ActorType>(
activity: &'a Activity,
actor: &ActorType,
inboxes: Vec<Url>,
data: &Data<Datatype>,
) -> Result<Vec<SendActivityTask<'a>>, <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize,
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
Datatype: Clone,
ActorType: Actor,
{
let config = &data.config;
let actor_id = activity.actor();
let activity_id = activity.id();
let activity_serialized: Bytes = serde_json::to_vec(&activity)?.into();
let private_key = get_pkey_cached(data, actor).await?;
Ok(futures::stream::iter(
inboxes
.into_iter()
.unique()
.filter(|i| !config.is_local_url(i)),
)
.filter_map(|inbox| async {
if let Err(err) = config.verify_url_valid(&inbox).await {
debug!("inbox url invalid, skipping: {inbox}: {err}");
return None;
};
Some(SendActivityTask {
actor_id,
activity_id,
inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
})
})
.collect()
.await)
}
/// convert a sendactivitydata to a request, signing and sending it
pub async fn sign_and_send<Datatype: Clone>(
&self,
data: &Data<Datatype>,
) -> Result<(), anyhow::Error> {
let req = self
.sign(&data.config.client, data.config.request_timeout)
.await?;
self.send(&data.config.client, req).await
}
async fn sign(
&self,
client: &ClientWithMiddleware,
timeout: Duration,
) -> Result<Request, anyhow::Error> {
let task = self;
let request_builder = client
.post(task.inbox.to_string())
.timeout(timeout)
.headers(generate_request_headers(&task.inbox));
let request = sign_request(
request_builder,
task.actor_id,
task.activity.clone(),
task.private_key.clone(),
task.http_signature_compat,
)
.await
.context("signing request")?;
Ok(request)
}
async fn send(
&self,
client: &ClientWithMiddleware,
request: Request,
) -> Result<(), anyhow::Error> {
let response = client.execute(request).await;
match response {
Ok(o) if o.status().is_success() => {
debug!("Activity {self} delivered successfully");
Ok(())
}
Ok(o) if o.status().is_client_error() => {
let text = o.text_limited().await.map_err(Error::other)?;
debug!("Activity {self} was rejected, aborting: {text}");
Ok(())
}
Ok(o) => {
let status = o.status();
let text = o.text_limited().await.map_err(Error::other)?;
Err(anyhow!(
"Activity {self} failure with status {status}: {text}",
))
}
Err(e) => Err(anyhow!("Activity {self} connection failure: {e}")),
}
}
}
async fn get_pkey_cached<ActorType>(
data: &Data<impl Clone>,
actor: &ActorType,
) -> Result<PKey<Private>, anyhow::Error>
where
ActorType: Actor,
{
let actor_id = actor.id();
// PKey is internally like an Arc<>, so cloning is ok
data.config
.actor_pkey_cache
.try_get_with_by_ref(&actor_id, async {
let private_key_pem = actor.private_key_pem().ok_or_else(|| {
anyhow!("Actor {actor_id} does not contain a private key for signing")
})?;
// This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening
let pkey = tokio::task::spawn_blocking(move || {
PKey::private_key_from_pem(private_key_pem.as_bytes())
.map_err(|err| anyhow!("Could not create private key from PEM data:{err}"))
})
.await
.map_err(|err| anyhow!("Error joining: {err}"))??;
std::result::Result::<PKey<Private>, anyhow::Error>::Ok(pkey)
})
.await
.map_err(|e| anyhow!("cloned error: {e}"))
}
pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
if let Some(port) = inbox_url.port() {
host = format!("{}:{}", host, port);
}
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static(FEDERATION_CONTENT_TYPE),
);
headers.insert(
HeaderName::from_static("host"),
HeaderValue::from_str(&host).expect("Hostname is valid"),
);
headers.insert(
"date",
HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"),
);
headers
}
#[cfg(test)]
mod tests {
use axum::extract::State;
use bytes::Bytes;
use http::StatusCode;
use std::{
sync::{atomic::AtomicUsize, Arc},
time::Instant,
};
use tracing::info;
use crate::{config::FederationConfig, http_signatures::generate_actor_keypair};
use super::*;
#[allow(unused)]
// This will periodically send back internal errors to test the retry
async fn dodgy_handler(
State(state): State<Arc<AtomicUsize>>,
headers: HeaderMap,
body: Bytes,
) -> Result<(), StatusCode> {
debug!("Headers:{:?}", headers);
debug!("Body len:{}", body.len());
/*if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 {
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}*/
Ok(())
}
async fn test_server() {
use axum::{routing::post, Router};
// We should break every now and then ;)
let state = Arc::new(AtomicUsize::new(0));
let app = Router::new()
.route("/", post(dodgy_handler))
.with_state(state);
axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
// Sends 100 messages
async fn test_activity_sending() -> anyhow::Result<()> {
let num_messages: usize = 100;
tokio::spawn(test_server());
/*
// uncomment for debug logs & stats
use tracing::log::LevelFilter;
env_logger::builder()
.filter_level(LevelFilter::Warn)
.filter_module("activitypub_federation", LevelFilter::Info)
.format_timestamp(None)
.init();
*/
let keypair = generate_actor_keypair().unwrap();
let message = SendActivityTask {
actor_id: &"http://localhost:8001".parse().unwrap(),
activity_id: &"http://localhost:8001/activity".parse().unwrap(),
activity: "{}".into(),
inbox: "http://localhost:8001".parse().unwrap(),
private_key: keypair.private_key().unwrap(),
http_signature_compat: true,
};
let data = FederationConfig::builder()
.app_data(())
.domain("localhost")
.build()
.await?
.to_request_data();
let start = Instant::now();
for _ in 0..num_messages {
message.sign_and_send(&data).await?;
}
info!("Queue Sent: {:?}", start.elapsed());
Ok(())
}
}

View file

@ -57,7 +57,7 @@ where
mod test { mod test {
use super::*; use super::*;
use crate::{ use crate::{
activity_queue::generate_request_headers, activity_sending::generate_request_headers,
config::FederationConfig, config::FederationConfig,
http_signatures::sign_request, http_signatures::sign_request,
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR}, traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},

View file

@ -9,22 +9,21 @@
//! .domain("example.com") //! .domain("example.com")
//! .app_data(()) //! .app_data(())
//! .http_fetch_limit(50) //! .http_fetch_limit(50)
//! .worker_count(16)
//! .build().await?; //! .build().await?;
//! # Ok::<(), anyhow::Error>(()) //! # Ok::<(), anyhow::Error>(())
//! # }).unwrap() //! # }).unwrap()
//! ``` //! ```
use crate::{ use crate::{
activity_queue::{create_activity_queue, ActivityQueue},
error::Error, error::Error,
protocol::verification::verify_domains_match, protocol::verification::verify_domains_match,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use anyhow::{anyhow, Context}; use anyhow::anyhow;
use async_trait::async_trait; use async_trait::async_trait;
use derive_builder::Builder; use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone}; use dyn_clone::{clone_trait_object, DynClone};
use moka::future::Cache;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
@ -56,16 +55,6 @@ pub struct FederationConfig<T: Clone> {
/// HTTP client used for all outgoing requests. Middleware can be used to add functionality /// HTTP client used for all outgoing requests. Middleware can be used to add functionality
/// like log tracing or retry of failed requests. /// like log tracing or retry of failed requests.
pub(crate) client: ClientWithMiddleware, pub(crate) client: ClientWithMiddleware,
/// Number of tasks that can be in-flight concurrently.
/// Tasks are retried once after a minute, then put into the retry queue.
/// Setting this count to `0` means that there is no limit to concurrency
#[builder(default = "0")]
pub(crate) worker_count: usize,
/// Number of concurrent tasks that are being retried in-flight concurrently.
/// Tasks are retried after an hour, then again in 60 hours.
/// Setting this count to `0` means that there is no limit to concurrency
#[builder(default = "0")]
pub(crate) retry_count: usize,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends /// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests /// outgoing activities synchronously, not in background thread. This helps to make tests
/// more consistent. Do not use for production. /// more consistent. Do not use for production.
@ -92,10 +81,11 @@ pub struct FederationConfig<T: Clone> {
/// <https://docs.joinmastodon.org/spec/activitypub/#secure-mode> /// <https://docs.joinmastodon.org/spec/activitypub/#secure-mode>
#[builder(default = "None", setter(custom))] #[builder(default = "None", setter(custom))]
pub(crate) signed_fetch_actor: Option<Arc<(Url, PKey<Private>)>>, pub(crate) signed_fetch_actor: Option<Arc<(Url, PKey<Private>)>>,
/// Queue for sending outgoing activities. Only optional to make builder work, its always #[builder(
/// present once constructed. default = "Cache::builder().max_capacity(10000).build()",
#[builder(setter(skip))] setter(custom)
pub(crate) activity_queue: Option<Arc<ActivityQueue>>, )]
pub(crate) actor_pkey_cache: Cache<Url, PKey<Private>>,
} }
impl<T: Clone> FederationConfig<T> { impl<T: Clone> FederationConfig<T> {
@ -186,28 +176,6 @@ impl<T: Clone> FederationConfig<T> {
pub fn domain(&self) -> &str { pub fn domain(&self) -> &str {
&self.domain &self.domain
} }
/// Shut down this federation, waiting for the outgoing queue to be sent.
/// If the activityqueue is still in use in other requests or was never constructed, returns an error.
/// If wait_retries is true, also wait for requests that have initially failed and are being retried.
/// Returns a stats object that can be printed for debugging (structure currently not part of the public interface).
///
/// Currently, this method does not work correctly if worker_count = 0 (unlimited)
pub async fn shutdown(mut self, wait_retries: bool) -> anyhow::Result<impl std::fmt::Debug> {
let q = self
.activity_queue
.take()
.context("ActivityQueue never constructed, build() not called?")?;
// Todo: use Arc::into_inner but is only part of rust 1.70.
let stats = Arc::<ActivityQueue>::try_unwrap(q)
.map_err(|_| {
anyhow::anyhow!(
"Could not cleanly shut down: activityqueue arc was still in use elsewhere "
)
})?
.shutdown(wait_retries)
.await?;
Ok(stats)
}
} }
impl<T: Clone> FederationConfigBuilder<T> { impl<T: Clone> FederationConfigBuilder<T> {
@ -223,20 +191,19 @@ impl<T: Clone> FederationConfigBuilder<T> {
self self
} }
/// sets the number of parsed actor private keys to keep in memory
pub fn actor_pkey_cache(&mut self, cache_size: u64) -> &mut Self {
self.actor_pkey_cache = Some(Cache::builder().max_capacity(cache_size).build());
self
}
/// Constructs a new config instance with the values supplied to builder. /// Constructs a new config instance with the values supplied to builder.
/// ///
/// Values which are not explicitly specified use the defaults. Also initializes the /// Values which are not explicitly specified use the defaults. Also initializes the
/// queue for outgoing activities, which is stored internally in the config struct. /// queue for outgoing activities, which is stored internally in the config struct.
/// Requires a tokio runtime for the background queue. /// Requires a tokio runtime for the background queue.
pub async fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> { pub async fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
let mut config = self.partial_build()?; let config = self.partial_build()?;
let queue = create_activity_queue(
config.client.clone(),
config.worker_count,
config.retry_count,
config.request_timeout,
);
config.activity_queue = Some(Arc::new(queue));
Ok(config) Ok(config)
} }
} }

View file

@ -1,7 +1,7 @@
//! Generating keypairs, creating and verifying signatures //! Generating keypairs, creating and verifying signatures
//! //!
//! Signature creation and verification is handled internally in the library. See //! Signature creation and verification is handled internally in the library. See
//! [send_activity](crate::activity_queue::send_activity) and //! [send_activity](crate::activity_sending::send_activity) and
//! [receive_activity (actix-web)](crate::actix_web::inbox::receive_activity) / //! [receive_activity (actix-web)](crate::actix_web::inbox::receive_activity) /
//! [receive_activity (axum)](crate::axum::inbox::receive_activity). //! [receive_activity (axum)](crate::axum::inbox::receive_activity).
@ -274,7 +274,7 @@ pub(crate) fn verify_body_hash(
#[cfg(test)] #[cfg(test)]
pub mod test { pub mod test {
use super::*; use super::*;
use crate::activity_queue::generate_request_headers; use crate::activity_sending::generate_request_headers;
use reqwest::Client; use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use std::str::FromStr; use std::str::FromStr;

View file

@ -10,7 +10,7 @@
#![doc = include_str!("../docs/10_fetching_objects_with_unknown_type.md")] #![doc = include_str!("../docs/10_fetching_objects_with_unknown_type.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
pub mod activity_queue; pub mod activity_sending;
#[cfg(feature = "actix-web")] #[cfg(feature = "actix-web")]
pub mod actix_web; pub mod actix_web;
#[cfg(feature = "axum")] #[cfg(feature = "axum")]