From 5402bc9c195f7c48431fdb7a38c99ee8f341cc79 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 9 Apr 2024 10:38:08 +0200 Subject: [PATCH] Retry activity send in case of timeout or rate limit (#102) --- src/activity_sending.rs | 77 +++++++++++++++++++++++++++++++++++------ src/fetch/object_id.rs | 2 +- src/traits.rs | 2 +- 3 files changed, 68 insertions(+), 13 deletions(-) diff --git a/src/activity_sending.rs b/src/activity_sending.rs index f16ef37..ce53f94 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -12,14 +12,17 @@ use crate::{ }; use bytes::Bytes; use futures::StreamExt; +use http::StatusCode; use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::{ + header::{HeaderMap, HeaderName, HeaderValue}, + Response, +}; use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ - self, fmt::{Debug, Display}, time::{Duration, SystemTime}, }; @@ -90,20 +93,30 @@ impl SendActivityTask { ) .await?; let response = client.execute(request).await?; + self.handle_response(response).await + } - match response { - o if o.status().is_success() => { + /// Based on the HTTP status code determines if an activity was delivered successfully. In that case + /// Ok is returned. Otherwise it returns Err and the activity send should be retried later. + /// + /// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217 + async fn handle_response(&self, response: Response) -> Result<(), Error> { + match response.status() { + status if status.is_success() => { debug!("Activity {self} delivered successfully"); Ok(()) } - o if o.status().is_client_error() => { - let text = o.text_limited().await?; + status + if status.is_client_error() + && status != StatusCode::REQUEST_TIMEOUT + && status != StatusCode::TOO_MANY_REQUESTS => + { + let text = response.text_limited().await?; debug!("Activity {self} was rejected, aborting: {text}"); Ok(()) } - o => { - let status = o.status(); - let text = o.text_limited().await?; + status => { + let text = response.text_limited().await?; Err(Error::Other(format!( "Activity {self} failure with status {status}: {text}", @@ -214,8 +227,6 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { mod tests { use super::*; use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; - use bytes::Bytes; - use http::StatusCode; use std::{ sync::{atomic::AtomicUsize, Arc}, time::Instant, @@ -289,4 +300,48 @@ mod tests { info!("Queue Sent: {:?}", start.elapsed()); Ok(()) } + + #[tokio::test] + async fn test_handle_response() { + let keypair = generate_actor_keypair().unwrap(); + let message = SendActivityTask { + actor_id: "http://localhost:8001".parse().unwrap(), + activity_id: "http://localhost:8001/activity".parse().unwrap(), + activity: "{}".into(), + inbox: "http://localhost:8001".parse().unwrap(), + private_key: keypair.private_key().unwrap(), + http_signature_compat: true, + }; + + let res = |status| { + http::Response::builder() + .status(status) + .body(vec![]) + .unwrap() + .into() + }; + + assert!(message.handle_response(res(StatusCode::OK)).await.is_ok()); + assert!(message + .handle_response(res(StatusCode::BAD_REQUEST)) + .await + .is_ok()); + + assert!(message + .handle_response(res(StatusCode::MOVED_PERMANENTLY)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::REQUEST_TIMEOUT)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::TOO_MANY_REQUESTS)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::INTERNAL_SERVER_ERROR)) + .await + .is_err()); + } } diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index f3fa560..061a9f5 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -347,7 +347,7 @@ const _IMPL_DIESEL_NEW_TYPE_FOR_OBJECT_ID: () = { #[cfg(test)] pub mod tests { use super::*; - use crate::{fetch::object_id::should_refetch_object, traits::tests::DbUser}; + use crate::traits::tests::DbUser; #[test] fn test_deserialize() { diff --git a/src/traits.rs b/src/traits.rs index 9fdec27..720f731 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -343,7 +343,7 @@ pub mod tests { error::Error, fetch::object_id::ObjectId, http_signatures::{generate_actor_keypair, Keypair}, - protocol::{public_key::PublicKey, verification::verify_domains_match}, + protocol::verification::verify_domains_match, }; use activitystreams_kinds::{activity::FollowType, actor::PersonType}; use once_cell::sync::Lazy;