Compare commits
10 commits
487c988377
...
169137be02
Author | SHA1 | Date | |
---|---|---|---|
|
169137be02 | ||
|
fbcd16aa95 | ||
|
1c29f4e66b | ||
|
6814ff1932 | ||
|
6dfd30a8ab | ||
|
df8876c096 | ||
|
a35c8cbea5 | ||
|
1126603b61 | ||
|
027b386514 | ||
|
2079b82de7 |
4 changed files with 73 additions and 7 deletions
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "activitypub_federation"
|
name = "activitypub_federation"
|
||||||
version = "0.5.8"
|
version = "0.6.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
description = "High-level Activitypub framework"
|
description = "High-level Activitypub framework"
|
||||||
keywords = ["activitypub", "activitystreams", "federation", "fediverse"]
|
keywords = ["activitypub", "activitystreams", "federation", "fediverse"]
|
||||||
|
|
|
@ -24,9 +24,9 @@ use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::{
|
use std::{
|
||||||
fmt::{Debug, Display},
|
fmt::{Debug, Display},
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, Instant, SystemTime},
|
||||||
};
|
};
|
||||||
use tracing::debug;
|
use tracing::{debug, warn};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -92,7 +92,17 @@ impl SendActivityTask {
|
||||||
self.http_signature_compat,
|
self.http_signature_compat,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Send the activity, and log a warning if its too slow.
|
||||||
|
let now = Instant::now();
|
||||||
let response = client.execute(request).await?;
|
let response = client.execute(request).await?;
|
||||||
|
let elapsed = now.elapsed().as_secs();
|
||||||
|
if elapsed > 10 {
|
||||||
|
warn!(
|
||||||
|
"Sending activity {} to {} took {}s",
|
||||||
|
self.activity_id, self.inbox, elapsed
|
||||||
|
);
|
||||||
|
}
|
||||||
self.handle_response(response).await
|
self.handle_response(response).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,14 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
|
||||||
|
|
||||||
// Ensure id field matches final url after redirect
|
// Ensure id field matches final url after redirect
|
||||||
if res.object_id.as_ref() != Some(&res.url) {
|
if res.object_id.as_ref() != Some(&res.url) {
|
||||||
|
if let Some(res_object_id) = res.object_id {
|
||||||
|
// If id is different but still on the same domain, attempt to request object
|
||||||
|
// again from url in id field.
|
||||||
|
if res_object_id.domain() == res.url.domain() {
|
||||||
|
return Box::pin(fetch_object_http(&res_object_id, data)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Failed to fetch the object from its specified id
|
||||||
return Err(Error::FetchWrongId(res.url));
|
return Err(Error::FetchWrongId(res.url));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,3 +154,34 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
|
||||||
)),
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[allow(clippy::unwrap_used)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::{
|
||||||
|
config::FederationConfig,
|
||||||
|
traits::tests::{DbConnection, Person},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_request_limit() -> Result<(), Error> {
|
||||||
|
let config = FederationConfig::builder()
|
||||||
|
.domain("example.com")
|
||||||
|
.app_data(DbConnection)
|
||||||
|
.http_fetch_limit(0)
|
||||||
|
.build()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let data = config.to_request_data();
|
||||||
|
|
||||||
|
let fetch_url = "https://example.net/".to_string();
|
||||||
|
|
||||||
|
let res: Result<FetchObjectResponse<Person>, Error> =
|
||||||
|
fetch_object_http(&Url::parse(&fetch_url).map_err(Error::UrlParse)?, &data).await;
|
||||||
|
|
||||||
|
assert_eq!(res.err(), Some(Error::RequestLimit));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ where
|
||||||
// object found in database
|
// object found in database
|
||||||
if let Some(object) = db_object {
|
if let Some(object) = db_object {
|
||||||
if let Some(last_refreshed_at) = object.last_refreshed_at() {
|
if let Some(last_refreshed_at) = object.last_refreshed_at() {
|
||||||
let is_local = data.config.is_local_url(&self.0);
|
let is_local = self.is_local(data);
|
||||||
if !is_local && should_refetch_object(last_refreshed_at) {
|
if !is_local && should_refetch_object(last_refreshed_at) {
|
||||||
// object is outdated and should be refetched
|
// object is outdated and should be refetched
|
||||||
return self.dereference_from_http(data, Some(object)).await;
|
return self.dereference_from_http(data, Some(object)).await;
|
||||||
|
@ -120,6 +120,7 @@ where
|
||||||
.await
|
.await
|
||||||
.map(|o| o.ok_or(Error::NotFound.into()))?
|
.map(|o| o.ok_or(Error::NotFound.into()))?
|
||||||
} else {
|
} else {
|
||||||
|
// Don't pass in any db object, otherwise it would be returned in case http fetch fails
|
||||||
self.dereference_from_http(data, None).await
|
self.dereference_from_http(data, None).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,6 +147,10 @@ where
|
||||||
Object::read_from_id(*id, data).await
|
Object::read_from_id(*id, data).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetch object from origin instance over HTTP, then verify and parse it.
|
||||||
|
///
|
||||||
|
/// Uses Box::pin to wrap futures to reduce stack size and avoid stack overflow when
|
||||||
|
/// when fetching objects recursively.
|
||||||
async fn dereference_from_http(
|
async fn dereference_from_http(
|
||||||
&self,
|
&self,
|
||||||
data: &Data<<Kind as Object>::DataType>,
|
data: &Data<<Kind as Object>::DataType>,
|
||||||
|
@ -154,7 +159,7 @@ where
|
||||||
where
|
where
|
||||||
<Kind as Object>::Error: From<Error>,
|
<Kind as Object>::Error: From<Error>,
|
||||||
{
|
{
|
||||||
let res = fetch_object_http(&self.0, data).await;
|
let res = Box::pin(fetch_object_http(&self.0, data)).await;
|
||||||
|
|
||||||
if let Err(Error::ObjectDeleted(url)) = res {
|
if let Err(Error::ObjectDeleted(url)) = res {
|
||||||
if let Some(db_object) = db_object {
|
if let Some(db_object) = db_object {
|
||||||
|
@ -163,11 +168,23 @@ where
|
||||||
return Err(Error::ObjectDeleted(url).into());
|
return Err(Error::ObjectDeleted(url).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If fetch failed, return the existing object from local database
|
||||||
|
if let (Err(_), Some(db_object)) = (&res, db_object) {
|
||||||
|
return Ok(db_object);
|
||||||
|
}
|
||||||
let res = res?;
|
let res = res?;
|
||||||
let redirect_url = &res.url;
|
let redirect_url = &res.url;
|
||||||
|
|
||||||
Kind::verify(&res.object, redirect_url, data).await?;
|
// Prevent overwriting local object
|
||||||
Kind::from_json(res.object, data).await
|
if data.config.is_local_url(redirect_url) {
|
||||||
|
return self
|
||||||
|
.dereference_from_db(data)
|
||||||
|
.await?
|
||||||
|
.ok_or(Error::NotFound.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
Box::pin(Kind::verify(&res.object, redirect_url, data)).await?;
|
||||||
|
Box::pin(Kind::from_json(res.object, data)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if the object's domain matches the one defined in [[FederationConfig.domain]].
|
/// Returns true if the object's domain matches the one defined in [[FederationConfig.domain]].
|
||||||
|
|
Loading…
Reference in a new issue