Retry activity send in case of timeout or rate limit (#102)

This commit is contained in:
Nutomic 2024-04-09 10:38:08 +02:00 committed by GitHub
parent 1b46dd6f80
commit 5402bc9c19
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 68 additions and 13 deletions

View file

@ -12,14 +12,17 @@ use crate::{
}; };
use bytes::Bytes; use bytes::Bytes;
use futures::StreamExt; use futures::StreamExt;
use http::StatusCode;
use httpdate::fmt_http_date; use httpdate::fmt_http_date;
use itertools::Itertools; use itertools::Itertools;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Response,
};
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize; use serde::Serialize;
use std::{ use std::{
self,
fmt::{Debug, Display}, fmt::{Debug, Display},
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
@ -90,20 +93,30 @@ impl SendActivityTask {
) )
.await?; .await?;
let response = client.execute(request).await?; let response = client.execute(request).await?;
self.handle_response(response).await
}
match response { /// Based on the HTTP status code determines if an activity was delivered successfully. In that case
o if o.status().is_success() => { /// Ok is returned. Otherwise it returns Err and the activity send should be retried later.
///
/// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217
async fn handle_response(&self, response: Response) -> Result<(), Error> {
match response.status() {
status if status.is_success() => {
debug!("Activity {self} delivered successfully"); debug!("Activity {self} delivered successfully");
Ok(()) Ok(())
} }
o if o.status().is_client_error() => { status
let text = o.text_limited().await?; if status.is_client_error()
&& status != StatusCode::REQUEST_TIMEOUT
&& status != StatusCode::TOO_MANY_REQUESTS =>
{
let text = response.text_limited().await?;
debug!("Activity {self} was rejected, aborting: {text}"); debug!("Activity {self} was rejected, aborting: {text}");
Ok(()) Ok(())
} }
o => { status => {
let status = o.status(); let text = response.text_limited().await?;
let text = o.text_limited().await?;
Err(Error::Other(format!( Err(Error::Other(format!(
"Activity {self} failure with status {status}: {text}", "Activity {self} failure with status {status}: {text}",
@ -214,8 +227,6 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
mod tests { mod tests {
use super::*; use super::*;
use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; use crate::{config::FederationConfig, http_signatures::generate_actor_keypair};
use bytes::Bytes;
use http::StatusCode;
use std::{ use std::{
sync::{atomic::AtomicUsize, Arc}, sync::{atomic::AtomicUsize, Arc},
time::Instant, time::Instant,
@ -289,4 +300,48 @@ mod tests {
info!("Queue Sent: {:?}", start.elapsed()); info!("Queue Sent: {:?}", start.elapsed());
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_handle_response() {
let keypair = generate_actor_keypair().unwrap();
let message = SendActivityTask {
actor_id: "http://localhost:8001".parse().unwrap(),
activity_id: "http://localhost:8001/activity".parse().unwrap(),
activity: "{}".into(),
inbox: "http://localhost:8001".parse().unwrap(),
private_key: keypair.private_key().unwrap(),
http_signature_compat: true,
};
let res = |status| {
http::Response::builder()
.status(status)
.body(vec![])
.unwrap()
.into()
};
assert!(message.handle_response(res(StatusCode::OK)).await.is_ok());
assert!(message
.handle_response(res(StatusCode::BAD_REQUEST))
.await
.is_ok());
assert!(message
.handle_response(res(StatusCode::MOVED_PERMANENTLY))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::REQUEST_TIMEOUT))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::TOO_MANY_REQUESTS))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::INTERNAL_SERVER_ERROR))
.await
.is_err());
}
} }

View file

@ -347,7 +347,7 @@ const _IMPL_DIESEL_NEW_TYPE_FOR_OBJECT_ID: () = {
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use super::*; use super::*;
use crate::{fetch::object_id::should_refetch_object, traits::tests::DbUser}; use crate::traits::tests::DbUser;
#[test] #[test]
fn test_deserialize() { fn test_deserialize() {

View file

@ -343,7 +343,7 @@ pub mod tests {
error::Error, error::Error,
fetch::object_id::ObjectId, fetch::object_id::ObjectId,
http_signatures::{generate_actor_keypair, Keypair}, http_signatures::{generate_actor_keypair, Keypair},
protocol::{public_key::PublicKey, verification::verify_domains_match}, protocol::verification::verify_domains_match,
}; };
use activitystreams_kinds::{activity::FollowType, actor::PersonType}; use activitystreams_kinds::{activity::FollowType, actor::PersonType};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;