Better error when activity receive fails (#89)
* Minor refactoring * Better error when receive fails * clippy * add test case * comments * take ref
This commit is contained in:
parent
12aad8bf3c
commit
50db596ce0
5 changed files with 111 additions and 53 deletions
|
@ -16,16 +16,12 @@ use futures::StreamExt;
|
||||||
use httpdate::fmt_http_date;
|
use httpdate::fmt_http_date;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use openssl::pkey::{PKey, Private};
|
use openssl::pkey::{PKey, Private};
|
||||||
use reqwest::{
|
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
|
||||||
header::{HeaderMap, HeaderName, HeaderValue},
|
|
||||||
Request,
|
|
||||||
};
|
|
||||||
use reqwest_middleware::ClientWithMiddleware;
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::{
|
use std::{
|
||||||
self,
|
self,
|
||||||
fmt::{Debug, Display},
|
fmt::{Debug, Display},
|
||||||
time::{Duration, SystemTime},
|
time::SystemTime,
|
||||||
};
|
};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
@ -96,33 +92,19 @@ impl SendActivityTask<'_> {
|
||||||
|
|
||||||
/// convert a sendactivitydata to a request, signing and sending it
|
/// convert a sendactivitydata to a request, signing and sending it
|
||||||
pub async fn sign_and_send<Datatype: Clone>(&self, data: &Data<Datatype>) -> Result<(), Error> {
|
pub async fn sign_and_send<Datatype: Clone>(&self, data: &Data<Datatype>) -> Result<(), Error> {
|
||||||
let req = self
|
let client = &data.config.client;
|
||||||
.sign(&data.config.client, data.config.request_timeout)
|
|
||||||
.await?;
|
|
||||||
self.send(&data.config.client, req).await
|
|
||||||
}
|
|
||||||
async fn sign(
|
|
||||||
&self,
|
|
||||||
client: &ClientWithMiddleware,
|
|
||||||
timeout: Duration,
|
|
||||||
) -> Result<Request, Error> {
|
|
||||||
let task = self;
|
|
||||||
let request_builder = client
|
let request_builder = client
|
||||||
.post(task.inbox.to_string())
|
.post(self.inbox.to_string())
|
||||||
.timeout(timeout)
|
.timeout(data.config.request_timeout)
|
||||||
.headers(generate_request_headers(&task.inbox));
|
.headers(generate_request_headers(&self.inbox));
|
||||||
let request = sign_request(
|
let request = sign_request(
|
||||||
request_builder,
|
request_builder,
|
||||||
task.actor_id,
|
self.actor_id,
|
||||||
task.activity.clone(),
|
self.activity.clone(),
|
||||||
task.private_key.clone(),
|
self.private_key.clone(),
|
||||||
task.http_signature_compat,
|
self.http_signature_compat,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(request)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send(&self, client: &ClientWithMiddleware, request: Request) -> Result<(), Error> {
|
|
||||||
let response = client.execute(request).await?;
|
let response = client.execute(request).await?;
|
||||||
|
|
||||||
match response {
|
match response {
|
||||||
|
@ -286,7 +268,7 @@ mod tests {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
for _ in 0..num_messages {
|
for _ in 0..num_messages {
|
||||||
message.sign_and_send(&data).await?;
|
message.clone().sign_and_send(&data).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Queue Sent: {:?}", start.elapsed());
|
info!("Queue Sent: {:?}", start.elapsed());
|
||||||
|
|
|
@ -3,8 +3,8 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
config::Data,
|
config::Data,
|
||||||
error::Error,
|
error::Error,
|
||||||
fetch::object_id::ObjectId,
|
|
||||||
http_signatures::{verify_body_hash, verify_signature},
|
http_signatures::{verify_body_hash, verify_signature},
|
||||||
|
parse_received_activity,
|
||||||
traits::{ActivityHandler, Actor, Object},
|
traits::{ActivityHandler, Actor, Object},
|
||||||
};
|
};
|
||||||
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
|
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
|
||||||
|
@ -29,11 +29,7 @@ where
|
||||||
{
|
{
|
||||||
verify_body_hash(request.headers().get("Digest"), &body)?;
|
verify_body_hash(request.headers().get("Digest"), &body)?;
|
||||||
|
|
||||||
let activity: Activity = serde_json::from_slice(&body).map_err(Error::Json)?;
|
let (activity, actor) = parse_received_activity::<Activity, ActorT, _>(&body, data).await?;
|
||||||
data.config.verify_url_and_domain(&activity).await?;
|
|
||||||
let actor = ObjectId::<ActorT>::from(activity.actor().clone())
|
|
||||||
.dereference(data)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
verify_signature(
|
verify_signature(
|
||||||
request.headers(),
|
request.headers(),
|
||||||
|
@ -54,12 +50,14 @@ mod test {
|
||||||
use crate::{
|
use crate::{
|
||||||
activity_sending::generate_request_headers,
|
activity_sending::generate_request_headers,
|
||||||
config::FederationConfig,
|
config::FederationConfig,
|
||||||
|
fetch::object_id::ObjectId,
|
||||||
http_signatures::sign_request,
|
http_signatures::sign_request,
|
||||||
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
|
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
|
||||||
};
|
};
|
||||||
use actix_web::test::TestRequest;
|
use actix_web::test::TestRequest;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use reqwest_middleware::ClientWithMiddleware;
|
use reqwest_middleware::ClientWithMiddleware;
|
||||||
|
use serde_json::json;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -105,22 +103,49 @@ mod test {
|
||||||
assert_eq!(&err, &Error::ActivitySignatureInvalid)
|
assert_eq!(&err, &Error::ActivitySignatureInvalid)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
|
#[tokio::test]
|
||||||
|
async fn test_receive_unparseable_activity() {
|
||||||
|
let (_, _, config) = setup_receive_test().await;
|
||||||
|
|
||||||
|
let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
|
||||||
|
let id = "http://localhost:123/1";
|
||||||
|
let activity = json!({
|
||||||
|
"actor": actor.as_str(),
|
||||||
|
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
||||||
|
"object": "http://ds9.lemmy.ml/post/1",
|
||||||
|
"cc": ["http://enterprise.lemmy.ml/c/main"],
|
||||||
|
"type": "Delete",
|
||||||
|
"id": id
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
|
||||||
|
let incoming_request = construct_request(&body, &actor).await;
|
||||||
|
|
||||||
|
// intentionally cause a parse error by using wrong type for deser
|
||||||
|
let res = receive_activity::<Follow, DbUser, DbConnection>(
|
||||||
|
incoming_request.to_http_request(),
|
||||||
|
body,
|
||||||
|
&config.to_request_data(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Err(Error::ParseReceivedActivity(url, _)) => {
|
||||||
|
assert_eq!(id, url.as_str());
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
|
||||||
let inbox = "https://example.com/inbox";
|
let inbox = "https://example.com/inbox";
|
||||||
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
|
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
|
||||||
let request_builder = ClientWithMiddleware::from(Client::default())
|
let request_builder = ClientWithMiddleware::from(Client::default())
|
||||||
.post(inbox)
|
.post(inbox)
|
||||||
.headers(headers);
|
.headers(headers);
|
||||||
let activity = Follow {
|
|
||||||
actor: ObjectId::parse("http://localhost:123").unwrap(),
|
|
||||||
object: ObjectId::parse("http://localhost:124").unwrap(),
|
|
||||||
kind: Default::default(),
|
|
||||||
id: "http://localhost:123/1".try_into().unwrap(),
|
|
||||||
};
|
|
||||||
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
|
|
||||||
let outgoing_request = sign_request(
|
let outgoing_request = sign_request(
|
||||||
request_builder,
|
request_builder,
|
||||||
&activity.actor.into_inner(),
|
actor,
|
||||||
body.clone(),
|
body.clone(),
|
||||||
DB_USER_KEYPAIR.private_key().unwrap(),
|
DB_USER_KEYPAIR.private_key().unwrap(),
|
||||||
false,
|
false,
|
||||||
|
@ -131,6 +156,18 @@ mod test {
|
||||||
for h in outgoing_request.headers() {
|
for h in outgoing_request.headers() {
|
||||||
incoming_request = incoming_request.append_header(h);
|
incoming_request = incoming_request.append_header(h);
|
||||||
}
|
}
|
||||||
|
incoming_request
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
|
||||||
|
let activity = Follow {
|
||||||
|
actor: ObjectId::parse("http://localhost:123").unwrap(),
|
||||||
|
object: ObjectId::parse("http://localhost:124").unwrap(),
|
||||||
|
kind: Default::default(),
|
||||||
|
id: "http://localhost:123/1".try_into().unwrap(),
|
||||||
|
};
|
||||||
|
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
|
||||||
|
let incoming_request = construct_request(&body, activity.actor.inner()).await;
|
||||||
|
|
||||||
let config = FederationConfig::builder()
|
let config = FederationConfig::builder()
|
||||||
.domain("localhost:8002")
|
.domain("localhost:8002")
|
||||||
|
|
|
@ -5,8 +5,8 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
config::Data,
|
config::Data,
|
||||||
error::Error,
|
error::Error,
|
||||||
fetch::object_id::ObjectId,
|
http_signatures::verify_signature,
|
||||||
http_signatures::{verify_body_hash, verify_signature},
|
parse_received_activity,
|
||||||
traits::{ActivityHandler, Actor, Object},
|
traits::{ActivityHandler, Actor, Object},
|
||||||
};
|
};
|
||||||
use axum::{
|
use axum::{
|
||||||
|
@ -33,13 +33,8 @@ where
|
||||||
<ActorT as Object>::Error: From<Error>,
|
<ActorT as Object>::Error: From<Error>,
|
||||||
Datatype: Clone,
|
Datatype: Clone,
|
||||||
{
|
{
|
||||||
verify_body_hash(activity_data.headers.get("Digest"), &activity_data.body)?;
|
let (activity, actor) =
|
||||||
|
parse_received_activity::<Activity, ActorT, _>(&activity_data.body, data).await?;
|
||||||
let activity: Activity = serde_json::from_slice(&activity_data.body).map_err(Error::Json)?;
|
|
||||||
data.config.verify_url_and_domain(&activity).await?;
|
|
||||||
let actor = ObjectId::<ActorT>::from(activity.actor().clone())
|
|
||||||
.dereference(data)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
verify_signature(
|
verify_signature(
|
||||||
&activity_data.headers,
|
&activity_data.headers,
|
||||||
|
|
|
@ -38,6 +38,9 @@ pub enum Error {
|
||||||
/// JSON Error
|
/// JSON Error
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Json(#[from] serde_json::Error),
|
Json(#[from] serde_json::Error),
|
||||||
|
/// Failed to parse an activity received from another instance
|
||||||
|
#[error("Failed to parse incoming activity with id {0}: {1}")]
|
||||||
|
ParseReceivedActivity(Url, serde_json::Error),
|
||||||
/// Reqwest Middleware Error
|
/// Reqwest Middleware Error
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
ReqwestMiddleware(#[from] reqwest_middleware::Error),
|
ReqwestMiddleware(#[from] reqwest_middleware::Error),
|
||||||
|
|
41
src/lib.rs
41
src/lib.rs
|
@ -23,7 +23,48 @@ pub mod protocol;
|
||||||
pub(crate) mod reqwest_shim;
|
pub(crate) mod reqwest_shim;
|
||||||
pub mod traits;
|
pub mod traits;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
config::Data,
|
||||||
|
error::Error,
|
||||||
|
fetch::object_id::ObjectId,
|
||||||
|
traits::{ActivityHandler, Actor, Object},
|
||||||
|
};
|
||||||
pub use activitystreams_kinds as kinds;
|
pub use activitystreams_kinds as kinds;
|
||||||
|
|
||||||
|
use serde::{de::DeserializeOwned, Deserialize};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
/// Mime type for Activitypub data, used for `Accept` and `Content-Type` HTTP headers
|
/// Mime type for Activitypub data, used for `Accept` and `Content-Type` HTTP headers
|
||||||
pub static FEDERATION_CONTENT_TYPE: &str = "application/activity+json";
|
pub static FEDERATION_CONTENT_TYPE: &str = "application/activity+json";
|
||||||
|
|
||||||
|
/// Deserialize incoming inbox activity to the given type, perform basic
|
||||||
|
/// validation and extract the actor.
|
||||||
|
async fn parse_received_activity<Activity, ActorT, Datatype>(
|
||||||
|
body: &[u8],
|
||||||
|
data: &Data<Datatype>,
|
||||||
|
) -> Result<(Activity, ActorT), <Activity as ActivityHandler>::Error>
|
||||||
|
where
|
||||||
|
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
|
||||||
|
ActorT: Object<DataType = Datatype> + Actor + Send + 'static,
|
||||||
|
for<'de2> <ActorT as Object>::Kind: serde::Deserialize<'de2>,
|
||||||
|
<Activity as ActivityHandler>::Error: From<Error> + From<<ActorT as Object>::Error>,
|
||||||
|
<ActorT as Object>::Error: From<Error>,
|
||||||
|
Datatype: Clone,
|
||||||
|
{
|
||||||
|
let activity: Activity = serde_json::from_slice(body).map_err(|e| {
|
||||||
|
// Attempt to include activity id in error message
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct Id {
|
||||||
|
id: Url,
|
||||||
|
}
|
||||||
|
match serde_json::from_slice::<Id>(body) {
|
||||||
|
Ok(id) => Error::ParseReceivedActivity(id.id, e),
|
||||||
|
Err(e) => Error::Json(e),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
data.config.verify_url_and_domain(&activity).await?;
|
||||||
|
let actor = ObjectId::<ActorT>::from(activity.actor().clone())
|
||||||
|
.dereference(data)
|
||||||
|
.await?;
|
||||||
|
Ok((activity, actor))
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue