First pass converting to new pub library.

Still a lot to do:
- Delete old deliverer folder
- Revisit handler funcs
- Constructors
- Side effects for the wrapped callback functions
- Any other TODOs
This commit is contained in:
Cory Slep 2019-02-12 00:16:33 +01:00
parent 293cf3e752
commit 16af404462
27 changed files with 3024 additions and 361 deletions

32
pub/activity.go Normal file
View File

@ -0,0 +1,32 @@
package pub
import (
"github.com/go-fed/activity/streams/vocab"
)
// Activity represents any ActivityStreams Activity type.
type Activity interface {
// Activity is also a vocab.Type
vocab.Type
// GetActivityStreamsActor returns the "actor" property if it exists, and
// nil otherwise.
GetActivityStreamsActor() vocab.ActivityStreamsActorProperty
// GetActivityStreamsAudience returns the "audience" property if it
// exists, and nil otherwise.
GetActivityStreamsAudience() vocab.ActivityStreamsAudienceProperty
// GetActivityStreamsBcc returns the "bcc" property if it exists, and nil
// otherwise.
GetActivityStreamsBcc() vocab.ActivityStreamsBccProperty
// GetActivityStreamsBto returns the "bto" property if it exists, and nil
// otherwise.
GetActivityStreamsBto() vocab.ActivityStreamsBtoProperty
// GetActivityStreamsCc returns the "cc" property if it exists, and nil
// otherwise.
GetActivityStreamsCc() vocab.ActivityStreamsCcProperty
// GetActivityStreamsTo returns the "to" property if it exists, and nil
// otherwise.
GetActivityStreamsTo() vocab.ActivityStreamsToProperty
// GetActivityStreamsAttributedTo returns the "attributedTo" property if
// it exists, and nil otherwise.
GetActivityStreamsAttributedTo() vocab.ActivityStreamsAttributedToProperty
}

77
pub/actor.go Normal file
View File

@ -0,0 +1,77 @@
package pub
import (
"context"
"net/http"
)
// Actor represents ActivityPub's actor concept. It conceptually has an inbox
// and outbox that receives either a POST or GET request, which triggers side
// effects in the federating application.
//
// An Actor within an application may federate server-to-server (Federation
// Protocol), client-to-server (Social API), or both. The Actor represents the
// server in either use case.
//
// Not all Actors have the same behaviors depending on the constructor used to
// create them. Refer to the constructor's documentation to determine the exact
// behavior of the Actor on an application.
//
// The behaviors documented here are common to all Actors returned by any
// constructor.
type Actor interface {
// PostInbox returns true if the request was handled as an ActivityPub
// POST to an actor's inbox. If false, the request was not an
// ActivityPub request and may still be handled by the caller in
// another way, such as serving a web page.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
//
// TODO: Move this to individual constructors.
// If the Federated Protocol is not enabled, writes the
// http.StatusMethodNotAllowed status code in the response. No side
// effects occur.
//
// If the Federated Protocol is enabled, side effects will occur.
PostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// GetInbox returns true if the request was handled as an ActivityPub
// GET to an actor's inbox. If false, the request was not an ActivityPub
// request and may still be handled by the caller in another way, such
// as serving a web page.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
//
// If the request is an ActivityPub request, the Actor will defer to the
// application to determine the correct authorization of the request and
// the resulting OrderedCollection to respond with. The Actor handles
// serializing this OrderedCollection and responding with the correct
// headers and http.StatusOK.
GetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// PostOutbox returns true if the request was handled as an ActivityPub
// POST to an actor's outbox. If false, the request was not an
// ActivityPub request and may still be handled by the caller in another
// way, such as serving a web page.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
PostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// GetOutbox returns true if the request was handled as an ActivityPub
// GET to an actor's outbox. If false, the request was not an
// ActivityPub request.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
//
// If the request is an ActivityPub request, the Actor will defer to the
// application to determine the correct authorization of the request and
// the resulting OrderedCollection to respond with. The Actor handles
// serializing this OrderedCollection and responding with the correct
// headers and http.StatusOK.
GetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
}

297
pub/base_actor.go Normal file
View File

@ -0,0 +1,297 @@
package pub
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
// TODO: Rename GetType and GetName
// baseActor must satisfy the Actor interface.
var _ Actor = &baseActor{}
// baseActor is an application-independent ActivityPub implementation. It does
// not implement the entire protocol, and relies on a delegate to do so. It
// only implements the part of the protocol that is side-effect-free, allowing
// an existing application to write a DelegateActor that glues their application
// into the ActivityPub world.
//
// It is preferred to use a DelegateActor provided by this library, so that the
// application does not need to worry about the ActivityPub implementation.
type baseActor struct {
// delegate contains application-specific delegation logic.
delegate DelegateActor
// EnableSocialAPI enables or disables the Social API, the client to
// server part of ActivityPub. Useful if permitting remote clients to
// act on behalf of the users of the client application.
EnableSocialAPI bool
// EnableFederatedProtocol enables or disables the Federated Protocol, or the
// server to server part of ActivityPub. Useful to permit integrating
// with the rest of the federative web.
EnableFederatedProtocol bool
// Clock simply tracks the current time.
Clock Clock
}
// PostInbox implements the generic algorithm for handling a POST request to an
// actor's inbox independent on an application. It relies on a delegate to
// implement application specific functionality.
func (b *baseActor) PostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
// Do nothing if it is not an ActivityPub POST request.
if !isActivityPubPost(r) {
return false, nil
}
// If the Federated Protocol is not enabled, then this endpoint is not
// enabled.
if !b.EnableFederatedProtocol {
w.WriteHeader(http.StatusMethodNotAllowed)
return true, nil
}
// Check the peer request is authentic.
shouldReturn, err := b.delegate.AuthenticatePostInbox(c, w, r)
if err != nil {
return true, err
} else if shouldReturn {
return true, nil
}
// Begin processing the request, but have not yet applied
// authorization (ex: blocks). Obtain the activity reject unknown
// activities.
raw, err := ioutil.ReadAll(r.Body)
if err != nil {
return true, err
}
var m map[string]interface{}
if err = json.Unmarshal(raw, &m); err != nil {
return true, err
}
// TODO: No longer reject unknown activities.
asValue, err := toType(c, m)
if err != nil {
return true, err
}
activity, ok := asValue.(Activity)
if !ok {
return true, fmt.Errorf("activity streams value is not an Activity: %T", asValue)
}
if activity.GetActivityStreamsId() == nil {
w.WriteHeader(http.StatusBadRequest)
return true, nil
}
// Check authorization of the activity.
shouldReturn, err = b.delegate.AuthorizePostInbox(c, w, activity)
if err != nil {
return true, err
} else if shouldReturn {
return true, nil
}
// Post the activity to the actor's inbox and trigger side effects for
// that particular Activity type. It is up to the delegate to resolve
// the given map.
err = b.delegate.PostInbox(c, r.URL, activity)
if err != nil {
// Special case: We know it is a bad request if the object or
// target properties needed to be populated, but weren't.
//
// Send the rejection to the peer.
if err == ErrObjectRequired || err == ErrTargetRequired {
w.WriteHeader(http.StatusBadRequest)
return true, nil
}
return true, err
}
// Our side effects are complete, now delegate determining whether to
// do inbox forwarding, as well as the action to do it.
if err := b.delegate.InboxForwarding(c, r.URL, activity); err != nil {
return true, err
}
// Request has been processed. Begin responding to the request.
//
// Simply respond with an OK status to the peer.
w.WriteHeader(http.StatusOK)
return true, nil
}
// GetInbox implements the generic algorithm for handling a GET request to an
// actor's inbox independent on an application. It relies on a delegate to
// implement application specific functionality.
func (b *baseActor) GetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
// Do nothing if it is not an ActivityPub GET request.
if !isActivityPubGet(r) {
return false, nil
}
// Delegate authenticating and authorizing the request.
shouldReturn, err := b.delegate.AuthenticateGetInbox(c, w, r)
if err != nil {
return true, err
} else if shouldReturn {
return true, nil
}
// Everything is good to begin processing the request.
oc, err := b.delegate.GetInbox(c, r)
if err != nil {
return true, err
}
// Deduplicate the 'orderedItems' property by ID.
err = dedupeOrderedItems(oc)
if err != nil {
return true, err
}
// Request has been processed. Begin responding to the request.
//
// Serialize the OrderedCollection.
m, err := serialize(oc)
if err != nil {
return true, err
}
raw, err := json.Marshal(m)
if err != nil {
return true, err
}
// Write the response.
addResponseHeaders(w.Header(), b.Clock, raw)
w.WriteHeader(http.StatusOK)
n, err := w.Write(raw)
if err != nil {
return true, err
} else if n != len(raw) {
return true, fmt.Errorf("ResponseWriter.Write wrote %d of %d bytes", n, len(raw))
}
return true, nil
}
// PostOutbox implements the generic algorithm for handling a POST request to an
// actor's outbox independent on an application. It relies on a delegate to
// implement application specific functionality.
func (b *baseActor) PostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
// Do nothing if it is not an ActivityPub POST request.
if !isActivityPubPost(r) {
return false, nil
}
// If the Social API is not enabled, then this endpoint is not enabled.
if !b.EnableSocialAPI {
w.WriteHeader(http.StatusMethodNotAllowed)
return true, nil
}
// Delegate authenticating and authorizing the request.
shouldReturn, err := b.delegate.AuthenticatePostOutbox(c, w, r)
if err != nil {
return true, err
} else if shouldReturn {
return true, nil
}
// Everything is good to begin processing the request.
raw, err := ioutil.ReadAll(r.Body)
if err != nil {
return true, err
}
var m map[string]interface{}
if err = json.Unmarshal(raw, &m); err != nil {
return true, err
}
// Note that converting to a Type will NOT successfully convert types
// not known to go-fed. This prevents accidentally wrapping an Activity
// type unknown to go-fed in a Create below. Instead,
// streams.ErrUnhandledType will be returned here.
//
// TODO: No longer reject unknown activities.
asValue, err := toType(c, m)
if err != nil {
return true, err
}
// If the value is not an Activity or type extending from Activity, then
// we need to wrap it in a Create Activity.
if !IsAnActivityType(asValue) {
asValue, err = b.delegate.WrapInCreate(c, asValue, r.URL)
if err != nil {
return true, err
}
}
// At this point, this should be a safe conversion. If this error is
// triggered, then there is either a bug in the delegation of
// WrapInCreate, behavior is not lining up in the generated ExtendedBy
// code, or something else is incorrect with the type system.
activity, ok := asValue.(Activity)
if !ok {
return true, fmt.Errorf("activity streams value is not an Activity: %T", asValue)
}
// Delegate generating new IDs for the activity and all new objects.
if err = b.delegate.AddNewIds(c, activity); err != nil {
return true, err
}
// Post the activity to the actor's outbox and trigger side effects for
// that particular Activity type.
deliverable, err := b.delegate.PostOutbox(c, activity, r.URL)
if err != nil {
// Special case: We know it is a bad request if the object or
// target properties needed to be populated, but weren't.
//
// Send the rejection to the peer.
if err == ErrObjectRequired || err == ErrTargetRequired {
w.WriteHeader(http.StatusBadRequest)
return true, nil
}
return true, err
}
// Request has been processed and all side effects internal to this
// application server have finished. Begin side effects affecting other
// servers and/or the client who sent this request.
//
// If we are federating and the type is a deliverable one, then deliver
// the activity to federating peers.
if b.EnableFederatedProtocol && deliverable {
if err := b.delegate.Deliver(c, r.URL, activity); err != nil {
return true, err
}
}
// Respond to the request with the new Activity's IRI location.
w.Header().Set("Location", activity.GetActivityStreamsId().Get().String())
w.WriteHeader(http.StatusCreated)
return true, nil
}
// GetOutbox implements the generic algorithm for handling a Get request to an
// actor's outbox independent on an application. It relies on a delegate to
// implement application specific functionality.
func (b *baseActor) GetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
// Do nothing if it is not an ActivityPub GET request.
if !isActivityPubGet(r) {
return false, nil
}
// Delegate authenticating and authorizing the request.
shouldReturn, err := b.delegate.AuthenticateGetOutbox(c, w, r)
if err != nil {
return true, err
} else if shouldReturn {
return true, nil
}
// Everything is good to begin processing the request.
oc, err := b.delegate.GetOutbox(c, r)
if err != nil {
return true, err
}
// Request has been processed. Begin responding to the request.
//
// Serialize the OrderedCollection.
m, err := serialize(oc)
if err != nil {
return true, err
}
raw, err := json.Marshal(m)
if err != nil {
return true, err
}
// Write the response.
addResponseHeaders(w.Header(), b.Clock, raw)
w.WriteHeader(http.StatusOK)
n, err := w.Write(raw)
if err != nil {
return true, err
} else if n != len(raw) {
return true, fmt.Errorf("ResponseWriter.Write wrote %d of %d bytes", n, len(raw))
}
return true, nil
}

11
pub/clock.go Normal file
View File

@ -0,0 +1,11 @@
package pub
import (
"time"
)
// Clock determines the time.
type Clock interface {
// Now returns the current time.
Now() time.Time
}

51
pub/common_behavior.go Normal file
View File

@ -0,0 +1,51 @@
package pub
import (
"context"
"net/http"
)
// Common contains functions required for both the Social API and Federating
// Protocol.
type CommonBehavior interface {
// AuthenticateGetInbox delegates the authentication of a GET to an
// inbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
// AuthenticateGetOutbox delegates the authentication of a GET to an
// outbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
}

84
pub/database.go Normal file
View File

@ -0,0 +1,84 @@
package pub
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"net/url"
)
type Database interface {
// Lock takes a lock for the object at the specified id. If an error
// is returned, the lock must not have been taken.
//
// The lock must be able to succeed for an id that does not exist in
// the database. This means acquiring the lock does not guarantee the
// entry exists in the database.
//
// Locks are encouraged to be lightweight and in the Go layer, as some
// processes require tight loops acquiring and releasing locks.
//
// Used to ensure race conditions in multiple requests do not occur.
Lock(c context.Context, id *url.URL) error
// Unlock makes the lock for the object at the specified id available.
// If an error is returned, the lock must have still been freed.
//
// Used to ensure race conditions in multiple requests do not occur.
Unlock(c context.Context, id *url.URL) error
// InboxContains returns true if the OrderedCollection at 'inbox'
// contains the specified 'id'.
//
// The library makes this call only after acquiring a lock first.
InboxContains(c context.Context, inbox, id *url.URL) (contains bool, err error)
// GetInbox returns the first ordered collection page of the outbox at
// the specified IRI, for prepending new items.
//
// The library makes this call only after acquiring a lock first.
GetInbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error)
// SetInbox saves the inbox value given from GetInbox, with new items
// prepended. Note that the new items must not be added as independent
// database entries. Separate calls to Create will do that.
//
// The library makes this call only after acquiring a lock first.
SetInbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error
// Owns returns true if the database has an entry for the IRI and it
// exists in the database.
//
// Owns is called even without acquiring a lock.
Owns(c context.Context, id *url.URL) (owns bool, err error)
// Exists returns true if the database has an entry for the specified
// id. It may not be owned by this application instance.
//
// The library makes this call only after acquiring a lock first.
Exists(c context.Context, id *url.URL) (exists bool, err error)
// Get returns the database entry for the specified id.
//
// The library makes this call only after acquiring a lock first.
Get(c context.Context, id *url.URL) (value vocab.Type, err error)
// Create adds a new entry to the database which must be able to be
// keyed by its id.
//
// Note that Activity values received from federated peers may also be
// created in the database this way if the Federating Protocol is
// enabled.
//
// The library makes this call only after acquiring a lock first.
Create(c context.Context, asType vocab.Type) error
// GetOutbox returns the first ordered collection page of the outbox
// at the specified IRI, for prepending new items.
//
// The library makes this call only after acquiring a lock first.
GetOutbox(c context.Context, inboxIRI *url.URL) (inbox vocab.ActivityStreamsOrderedCollectionPage, err error)
// SetOutbox saves the outbox value given from GetOutbox, with new items
// prepended. Note that the new items must not be added as independent
// database entries. Separate calls to Create will do that.
//
// The library makes this call only after acquiring a lock first.
SetOutbox(c context.Context, inbox vocab.ActivityStreamsOrderedCollectionPage) error
// NewId creates a new IRI id for the provided activity or object. The
// implementation does not need to set the 'id' property and simply
// needs to determine the value.
//
// The go-fed library will handle setting the 'id' property on the
// activity or object provided with the value returned.
NewId(c context.Context, t vocab.Type) (id *url.URL, err error)
}

203
pub/delegate_actor.go Normal file
View File

@ -0,0 +1,203 @@
package pub
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"net/http"
"net/url"
)
// DelegateActor contains the detailed interface an application must satisfy in
// order to implement the ActivityPub specification.
//
// Implementing the DelegateActor requires familiarity with the ActivityPub
// specification, it does not a strong enough abstraction for the client
// application to ignore the ActivityPub spec. It is very possible to implement
// this interface and build a foot-gun that trashes the fediverse without being
// ActivityPub compliant. Please use with due consideration.
//
// Alternatively, build an application that uses the parts of the pub library
// that does not require implementing a DelegateActor so that the ActivityPub
// implementation is completely provided out of the box.
type DelegateActor interface {
// AuthenticatePostInbox delegates the authentication of a POST to an
// inbox.
//
// Only called if the Federated Protocol is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
// AuthenticateGetInbox delegates the authentication of a GET to an
// inbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
// AuthorizePostInbox delegates the authorization of an activity that
// has been sent by POST to an inbox.
//
// Only called if the Federated Protocol is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authorization fails, then shouldReturn
// must be true and error nil. It is expected that the implementation
// handles writing to the ResponseWriter in this case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthorizePostInbox(c context.Context, w http.ResponseWriter, activity Activity) (shouldReturn bool, err error)
// PostInbox delegates the side effects of adding to the inbox and
// determining if it is a request that should be blocked.
//
// Only called if the Federated Protocol is enabled.
//
// As a side effect, PostInbox sets the federated data in the inbox, but
// not on its own in the database, as InboxForwarding (which is called
// later) must decide whether it has seen this activity before in order
// to determine whether to do the forwarding algorithm.
//
// If the error is ErrObjectRequired or ErrTargetRequired, then a Bad
// Request status is sent in the response.
PostInbox(c context.Context, inboxIRI *url.URL, activity Activity) error
// InboxForwarding delegates inbox forwarding logic when a POST request
// is received in the Actor's inbox.
//
// Only called if the Federated Protocol is enabled.
//
// The delegate is responsible for determining whether to do the inbox
// forwarding, as well as actually conducting it if it determines it
// needs to.
//
// As a side effect, InboxForwarding must set the federated data in the
// database, independently of the inbox, however it sees fit in order to
// determine whether it has seen the activity before.
//
// The provided url is the inbox of the recipient of the Activity. The
// Activity is examined for the information about who to inbox forward
// to.
//
// If an error is returned, it is returned to the caller of PostInbox.
InboxForwarding(c context.Context, inboxIRI *url.URL, activity Activity) error
// PostOutbox delegates the logic for side effects and adding to the
// outbox.
//
// Only called if the Social API is enabled.
//
// The delegate is responsible for adding the activity to the database's
// general storage for independent retrieval, and not just within the
// actor's outbox.
//
// If the error is ErrObjectRequired or ErrTargetRequired, then a Bad
// Request status is sent in the response.
PostOutbox(c context.Context, a Activity, outboxIRI *url.URL) (deliverable bool, e error)
// AddNewIds sets new URL ids on the activity. It also does so for all
// 'object' properties if the Activity is a Create type.
//
// Only called if the Social API is enabled.
//
// If an error is returned, it is returned to the caller of PostOutbox.
AddNewIds(c context.Context, a Activity) error
// Deliver sends a federated message. Called only if federation is
// enabled.
//
// Only called if the Social API and Federated Protocol is enabled.
//
// The provided url is the outbox of the sender. The Activity contains
// the information about the intended recipients.
//
// If an error is returned, it is returned to the caller of PostOutbox.
Deliver(c context.Context, outbox *url.URL, activity Activity) error
// AuthenticatePostOutbox delegates the authentication of a POST to an
// outbox.
//
// Only called if the Social API is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
// AuthenticateGetOutbox delegates the authentication of a GET to an
// outbox.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
//
// If an error is returned, it is passed back to the caller of
// GetOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
// WrapInCreate wraps the provided object in a Create ActivityStreams
// activity. The provided URL is the actor's outbox endpoint.
//
// Only called if the Social API is enabled.
WrapInCreate(c context.Context, value vocab.Type, outboxIRI *url.URL) (vocab.ActivityStreamsCreate, error)
// GetOutbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetOutbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
// GetInbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetInbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
}

9
pub/doc.go Normal file
View File

@ -0,0 +1,9 @@
// Package pub implements the ActivityPub protocol.
//
// Note that every time the ActivityStreams types are changed (added, removed)
// due to code generation, the internal function toASType needs to be modified
// to know about these types.
//
// Note that every version change should also include a change in the version.go
// file.
package pub

108
pub/federating_protocol.go Normal file
View File

@ -0,0 +1,108 @@
package pub
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"net/http"
"net/url"
)
// FederatingProtocol contains behaviors an application needs to satisfy for the
// full ActivityPub S2S implementation to be supported by this library.
//
// It is only required if the client application wants to support the server-to-
// server, or federating, protocol.
type FederatingProtocol interface {
// AuthenticatePostInbox delegates the authentication of a POST to an
// inbox.
//
// If an error is returned, it is passed back to the caller of
// PostInbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
// Blocked should determine whether to permit a set of actors given by
// their ids are able to interact with this particular end user due to
// being blocked or other application-specific logic.
//
// If an error is returned, it is passed back to the caller of
// PostInbox.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. An http.StatusForbidden
// will be written in the wresponse.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
Blocked(c context.Context, actorIRIs []*url.URL) (blocked bool, err error)
// Callbacks returns the application logic that handles ActivityStreams
// received from federating peers. Note that certain types of callbacks
// will be 'wrapped' with default behaviors supported natively by the
// library. Other callbacks compatible with streams.TypeResolver can
// be specified by 'other'.
//
// Note that the functions in 'wrapped' cannot be provided in 'other'.
Callbacks() (wrapped WrappedCallbacks, other []interface{})
// MaxInboxForwardingRecursionDepth determines how deep to search within
// an activity to determine if inbox forwarding needs to occur.
//
// Zero or negative numbers indicate infinite recursion.
MaxInboxForwardingRecursionDepth() int
// MaxDeliveryRecursionDepth determines how deep to search within
// collections owned by peers when they are targeted to receive a
// delivery.
//
// Zero or negative numbers indicate infinite recursion.
MaxDeliveryRecursionDepth() int
// FilterForwarding allows the implementation to apply business logic
// such as blocks, spam filtering, and so on to a list of potential
// Collections and OrderedCollections of recipients when inbox
// forwarding has been triggered.
//
// The activity is provided as a reference for more intelligent
// logic to be used, but the implementation must not modify it.
FilterForwarding(c context.Context, potentialRecipients []*url.URL, a Activity) (filteredRecipients []*url.URL, err error)
// NewTransport returns a new Transport on behalf of a specific actor.
//
// The actorBoxIRI will be either the inbox or outbox of an actor who is
// attempting to do the dereferencing or delivery. Any authentication
// scheme applied on the request must be based on this actor. The
// request must contain some sort of credential of the user, such as a
// HTTP Signature.
//
// The gofedAgent passed in should be used by the Transport
// implementation in the User-Agent, as well as the application-specific
// user agent string. The gofedAgent will indicate this library's use as
// well as the library's version number.
//
// Any server-wide rate-limiting that needs to occur should happen in a
// Transport implementation. This factory function allows this to be
// created, so peer servers are not DOS'd.
//
// Any retry logic should also be handled by the Transport
// implementation.
//
// Note that the library will not maintain a long-lived pointer to the
// returned Transport so that any private credentials are able to be
// garbage collected.
NewTransport(actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
// GetInbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetInbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
}

View File

@ -5,8 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/go-fed/activity/streams_old"
"github.com/go-fed/activity/vocab"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/go-fed/httpsig"
"io/ioutil"
"net/http"
@ -22,43 +22,6 @@ var (
// TODO: Helper for sending arbitrary ActivityPub objects.
// Pubber provides methods for interacting with ActivityPub clients and
// ActivityPub federating servers.
type Pubber interface {
// PostInbox returns true if the request was handled as an ActivityPub
// POST to an actor's inbox. If false, the request was not an
// ActivityPub request.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
PostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// GetInbox returns true if the request was handled as an ActivityPub
// GET to an actor's inbox. If false, the request was not an ActivityPub
// request.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
GetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// PostOutbox returns true if the request was handled as an ActivityPub
// POST to an actor's outbox. If false, the request was not an
// ActivityPub request.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
PostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
// GetOutbox returns true if the request was handled as an ActivityPub
// GET to an actor's outbox. If false, the request was not an
// ActivityPub request.
//
// If the error is nil, then the ResponseWriter's headers and response
// has already been written. If a non-nil error is returned, then no
// response has been written.
GetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error)
}
// NewSocialPubber provides a Pubber that implements only the Social API in
// ActivityPub.
func NewSocialPubber(clock Clock, app SocialApplication, cb Callbacker) Pubber {
@ -68,12 +31,14 @@ func NewSocialPubber(clock Clock, app SocialApplication, cb Callbacker) Pubber {
SocialAPI: app,
ClientCallbacker: cb,
EnableClient: true,
}
}, nil
}
// NewFederatingPubber provides a Pubber that implements only the Federating API
// in ActivityPub.
func NewFederatingPubber(clock Clock, app FederateApplication, cb Callbacker, d Deliverer, client HttpClient, userAgent string, maxDeliveryDepth, maxForwardingDepth int) Pubber {
//
// Returns an error if
func NewFederatingPubber(clock Clock, app FederateApplication, cb Callbacker, d Deliverer, client HttpClient, userAgent string, maxDeliveryDepth, maxForwardingDepth int) (p Pubber, e error) {
return &federator{
Clock: clock,
App: app,
@ -83,14 +48,14 @@ func NewFederatingPubber(clock Clock, app FederateApplication, cb Callbacker, d
Agent: userAgent,
MaxDeliveryDepth: maxDeliveryDepth,
MaxInboxForwardingDepth: maxForwardingDepth,
EnableServer: true,
EnableFederatedProtocol: true,
deliverer: d,
}
}
// NewPubber provides a Pubber that implements both the Social API and the
// Federating API in ActivityPub.
func NewPubber(clock Clock, app SocialFederateApplication, client, server Callbacker, d Deliverer, httpClient HttpClient, userAgent string, maxDeliveryDepth, maxForwardingDepth int) Pubber {
func NewPubber(clock Clock, app SocialFederateApplication, client, server Callbacker, d Deliverer, httpClient HttpClient, userAgent string, maxDeliveryDepth, maxForwardingDepth int) (p Pubber, e error) {
return &federator{
Clock: clock,
App: app,
@ -103,7 +68,7 @@ func NewPubber(clock Clock, app SocialFederateApplication, client, server Callba
ServerCallbacker: server,
ClientCallbacker: client,
EnableClient: true,
EnableServer: true,
EnableFederatedProtocol: true,
deliverer: d,
}
}
@ -113,10 +78,10 @@ type federator struct {
// server part of ActivityPub. Useful if permitting remote clients to
// act on behalf of the users of the client application.
EnableClient bool
// EnableServer enables or disables the Federated Protocol, or the
// EnableFederatedProtocol enables or disables the Federated Protocol, or the
// server to server part of ActivityPub. Useful to permit integrating
// with the rest of the federative web.
EnableServer bool
EnableFederatedProtocol bool
// Clock determines the time of this federator.
Clock Clock
// App is the client application that is ActivityPub aware.
@ -126,7 +91,7 @@ type federator struct {
// FederateAPI provides utility when handling incoming messages received
// via the Federated Protocol, or server-to-server communications.
//
// It is only required if EnableServer is true.
// It is only required if EnableFederatedProtocol is true.
FederateAPI FederateAPI
// SocialAPI provides utility when handling incoming messages
// received via the Social API, or client-to-server communications.
@ -139,38 +104,39 @@ type federator struct {
ServerCallbacker Callbacker
// Client is used to federate with other ActivityPub servers.
//
// It is only required if EnableServer is true.
// It is only required if EnableFederatedProtocol is true.
Client HttpClient
// Agent is the User-Agent string to use in HTTP headers when
// federating with another server. It will automatically be appended
// with '(go-fed ActivityPub)'.
//
// It is only required if EnableServer is true.
// It is only required if EnableFederatedProtocol is true.
Agent string
// MaxDeliveryDepth is how deep collections of recipients will be
// expanded for delivery. It must be at least 1 to be compliant with the
// ActivityPub spec.
//
// It is only required if EnableServer is true.
// It is only required if EnableFederatedProtocol is true.
MaxDeliveryDepth int
// MaxInboxForwardingDepth is how deep the values are examined for
// determining ownership of whether to forward an Activity to
// collections or followers. Once this maximum is exceeded, the ghost
// replies issue may become a problem, but users may not mind.
//
// It is only required if EnableServer is true.
// It is only required if EnableFederatedProtocol is true.
MaxInboxForwardingDepth int
// deliverer handles deliveries to other federated servers.
//
// It is only required if EnableServer is true.
// It is only required if EnableFederatedProtocol is true.
deliverer Deliverer
}
// PostInbox handles an HTTP request to an
func (f *federator) PostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (bool, error) {
if !isActivityPubPost(r) {
return false, nil
}
if !f.EnableServer {
if !f.EnableFederatedProtocol {
w.WriteHeader(http.StatusMethodNotAllowed)
return true, nil
}
@ -353,7 +319,7 @@ func (f *federator) PostOutbox(c context.Context, w http.ResponseWriter, r *http
if err = f.addToOutbox(c, r, m); err != nil {
return true, err
}
if f.EnableServer && deliverable {
if f.EnableFederatedProtocol && deliverable {
obj, err := toAnyActivity(m)
if err != nil {
return true, err

View File

@ -22,11 +22,6 @@ import (
// response to send to the requester.
type HandlerFunc func(context.Context, http.ResponseWriter, *http.Request) (bool, error)
// Clock determines the time.
type Clock interface {
Now() time.Time
}
// HttpClient sends http requests.
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
@ -115,16 +110,6 @@ type Application interface {
CanRemove(c context.Context, o vocab.ObjectType, t vocab.ObjectType) bool
}
// RWType indicates the kind of reading being done.
type RWType bool
const (
// Read indicates the object is only being read.
Read RWType = false
// ReadWrite indicates the object is being mutated as well.
ReadWrite = true
)
// SocialAPI is provided by users of this library and designed to handle
// receiving messages from ActivityPub clients through the Social API.
type SocialAPI interface {

View File

@ -220,7 +220,7 @@ func (f *federator) addNewIdsIntransitive(c context.Context, a vocab.Intransitiv
// wrapInCreate will automatically wrap the provided object in a Create
// activity. This will copy over the 'to', 'bto', 'cc', 'bcc', and 'audience'
// properties. It will also copy over the published time if present.
func (f *federator) wrapInCreate(o vocab.ObjectType, actor *url.URL) (c *vocab.Create, err error) {
func wrapInCreate(o vocab.ObjectType, actor *url.URL) (c *vocab.Create, err error) {
c = &vocab.Create{}
c.AppendType("Create")
c.AppendObject(o)
@ -719,45 +719,6 @@ func (f *federator) sameRecipients(a vocab.ActivityType) error {
// TODO: (Section 7) HTTP caching mechanisms [RFC7234] SHOULD be respected when appropriate, both when receiving responses from other servers as well as sending responses to other servers.
// deliver will complete the peer-to-peer sending of a federated message to
// another server.
func (f *federator) deliver(obj vocab.ActivityType, boxIRI *url.URL) error {
recipients, err := f.prepare(boxIRI, obj)
if err != nil {
return err
}
creds := &creds{}
creds.signer, err = f.FederateAPI.NewSigner()
if err != nil {
return err
}
creds.privKey, creds.pubKeyId, err = f.FederateAPI.PrivateKey(boxIRI)
if err != nil {
return err
}
return f.deliverToRecipients(obj, recipients, creds)
}
// deliverToRecipients will take a prepared Activity and send it to specific
// recipients without examining the activity.
func (f *federator) deliverToRecipients(obj vocab.ActivityType, recipients []*url.URL, creds *creds) error {
m, err := obj.Serialize()
if err != nil {
return err
}
addJSONLDContext(m)
b, err := json.Marshal(m)
if err != nil {
return err
}
for _, to := range recipients {
f.deliverer.Do(b, to, func(b []byte, u *url.URL) error {
return postToOutbox(f.Client, b, u, f.Agent, creds, f.Clock)
})
}
return nil
}
// prepare takes a deliverableObject and returns a list of the proper recipient
// target URIs. Additionally, the deliverableObject will have any hidden
// hidden recipients ("bto" and "bcc") stripped from it.
@ -1006,7 +967,7 @@ func dedupeIRIs(recipients, ignored []*url.URL) (out []*url.URL) {
// dedupeOrderedItems will deduplicate the 'orderedItems' within an ordered
// collection type. Deduplication happens by simply examining the 'id'.
func (f *federator) dedupeOrderedItems(oc vocab.OrderedCollectionType) (vocab.OrderedCollectionType, error) {
func dedupeOrderedItems(oc vocab.OrderedCollectionType) (vocab.OrderedCollectionType, error) {
i := 0
seen := make(map[string]bool, oc.OrderedItemsLen())
for i < oc.OrderedItemsLen() {
@ -1794,122 +1755,6 @@ func (f *federator) addToInboxIfNew(c context.Context, r *http.Request, m map[st
return nil
}
// Note: This is a mechanism for causing other victim servers to DDOS
// or forward spam on a malicious user's behalf. The trick is a simple
// one: Reply to a user, and CC a ton of 'follower' collections owned
// by the victim server. Bonus points for listing more 'follower'
// collections from other popular instances as well. Leveraging the
// Inbox Forwarding mechanism, a storm of messages will ensue.
//
// I don't want users of this library to be vulnerable to this kind of
// spam/DDOS storm. So here we allow the client application to filter
// out recipient collections.
func (f *federator) inboxForwarding(c context.Context, m map[string]interface{}) error {
a, err := toAnyActivity(m)
if err != nil {
return err
}
// 1. Must be first time we have seen this Activity.
if ok, err := f.App.Has(c, a.GetId()); err != nil {
return err
} else if ok {
return nil
}
// 2. The values of 'to', 'cc', or 'audience' are Collections owned by
// this server.
var r []*url.URL
r = append(r, getToIRIs(a)...)
r = append(r, getCcIRIs(a)...)
r = append(r, getAudienceIRIs(a)...)
var myIRIs []*url.URL
col := make(map[string]vocab.CollectionType, 0)
oCol := make(map[string]vocab.OrderedCollectionType, 0)
for _, iri := range r {
if ok, err := f.App.Has(c, iri); err != nil {
return err
} else if !ok {
continue
}
obj, err := f.App.Get(c, iri, Read)
if err != nil {
return err
}
if c, ok := obj.(vocab.CollectionType); ok {
col[iri.String()] = c
myIRIs = append(myIRIs, iri)
} else if oc, ok := obj.(vocab.OrderedCollectionType); ok {
oCol[iri.String()] = oc
myIRIs = append(myIRIs, iri)
}
}
if len(myIRIs) == 0 {
return nil
}
// 3. The values of 'inReplyTo', 'object', 'target', or 'tag' are owned
// by this server.
ownsValue := false
objs, l, iris := getInboxForwardingValues(a)
for _, obj := range objs {
if f.hasInboxForwardingValues(c, 0, f.MaxInboxForwardingDepth, obj) {
ownsValue = true
break
}
}
if !ownsValue && f.ownsAnyLinks(c, l) {
ownsValue = true
}
if !ownsValue && f.ownsAnyIRIs(c, iris) {
ownsValue = true
}
if !ownsValue {
return nil
}
// Do the inbox forwarding since the above conditions hold true. Support
// the behavior of letting the application filter out the resulting
// collections to be targeted.
toSend, err := f.FederateAPI.FilterForwarding(c, a, myIRIs)
if err != nil {
return err
}
recipients := make([]*url.URL, 0, len(toSend))
for _, iri := range toSend {
if c, ok := col[iri.String()]; ok {
for i := 0; i < c.ItemsLen(); i++ {
if c.IsItemsObject(i) {
obj := c.GetItemsObject(i)
if obj.HasId() {
recipients = append(recipients, obj.GetId())
}
} else if c.IsItemsLink(i) {
l := c.GetItemsLink(i)
if l.HasHref() {
recipients = append(recipients, l.GetHref())
}
} else if c.IsItemsIRI(i) {
recipients = append(recipients, c.GetItemsIRI(i))
}
}
} else if oc, ok := oCol[iri.String()]; ok {
for i := 0; i < oc.OrderedItemsLen(); i++ {
if oc.IsOrderedItemsObject(i) {
obj := oc.GetOrderedItemsObject(i)
if obj.HasId() {
recipients = append(recipients, obj.GetId())
}
} else if oc.IsOrderedItemsLink(i) {
l := oc.GetItemsLink(i)
if l.HasHref() {
recipients = append(recipients, l.GetHref())
}
} else if oc.IsOrderedItemsIRI(i) {
recipients = append(recipients, oc.GetOrderedItemsIRI(i))
}
}
}
}
return f.deliverToRecipients(a, recipients, nil)
}
// Given an 'inReplyTo', 'object', 'target', or 'tag' object, recursively
// examines those same values to determine if the app owns any, up to a maximum
// depth.

167
pub/old/resolvers.go Normal file
View File

@ -0,0 +1,167 @@
package pub
import (
"fmt"
"github.com/go-fed/activity/streams_old"
"github.com/go-fed/activity/vocab"
"net/url"
)
// ToPubObject transforms a json-deserialized ActivityStream object into a
// PubObject for use with the pub library. Note that for an object to be an
// ActivityPub object, it must have an 'id' and at least one 'type'.
func ToPubObject(m map[string]interface{}) (t []PubObject, e error) {
r := &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
if !i.HasId() {
return fmt.Errorf("object type does not have an id: %q", i)
} else if i.TypeLen() == 0 {
return fmt.Errorf("object type does not have a type: %q", i)
}
t = append(t, i)
return nil
},
AnyLinkCallback: func(i vocab.LinkType) error {
if !i.HasId() {
return fmt.Errorf("link type does not have an id: %q", i)
} else if i.TypeLen() == 0 {
return fmt.Errorf("link type does not have a type: %q", i)
}
t = append(t, i)
return nil
},
}
e = r.Deserialize(m)
return t, e
}
func getActorObject(m map[string]interface{}) (actorObject, error) {
var a actorObject
err := toActorObjectResolver(&a).Deserialize(m)
return a, err
}
func toActorObjectResolver(a *actorObject) *streams.Resolver {
return &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
if o, ok := i.(actorObject); ok {
*a = o
}
return nil
},
}
}
func toActorResolver(a *actor) *streams.Resolver {
return &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
if o, ok := i.(actor); ok {
*a = o
}
return nil
},
}
}
func toActorCollectionResolver(a *actor, c **streams.Collection, oc **streams.OrderedCollection, cp **streams.CollectionPage, ocp **streams.OrderedCollectionPage) *streams.Resolver {
r := toActorResolver(a)
r.CollectionCallback = func(i *streams.Collection) error {
*c = i
return nil
}
r.OrderedCollectionCallback = func(i *streams.OrderedCollection) error {
*oc = i
return nil
}
r.CollectionPageCallback = func(i *streams.CollectionPage) error {
*cp = i
return nil
}
r.OrderedCollectionPageCallback = func(i *streams.OrderedCollectionPage) error {
*ocp = i
return nil
}
return r
}
func toIdResolver(ok *bool, u **url.URL) *streams.Resolver {
return &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
*ok = i.HasId()
if *ok {
*u = i.GetId()
}
return nil
},
}
}
func toCollectionPage(m map[string]interface{}) (c *streams.CollectionPage, err error) {
r := &streams.Resolver{
CollectionPageCallback: func(i *streams.CollectionPage) error {
c = i
return nil
},
}
err = r.Deserialize(m)
return
}
func toOrderedCollectionPage(m map[string]interface{}) (c *streams.OrderedCollectionPage, err error) {
r := &streams.Resolver{
OrderedCollectionPageCallback: func(i *streams.OrderedCollectionPage) error {
c = i
return nil
},
}
err = r.Deserialize(m)
return
}
func toTypeIder(m map[string]interface{}) (tid typeIder, err error) {
var t []typeIder
r := &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
t = append(t, i)
return nil
},
AnyLinkCallback: func(i vocab.LinkType) error {
t = append(t, i)
return nil
},
}
err = r.Deserialize(m)
if err != nil {
return
}
// This should not be more than 1 as clients are not permitted to send
// an array of objects/links.
if len(t) != 1 {
err = fmt.Errorf("too many object/links: %d", len(t))
return
}
tid = t[0]
return
}
func toAnyActivity(m map[string]interface{}) (o vocab.ActivityType, err error) {
r := &streams.Resolver{
AnyActivityCallback: func(i vocab.ActivityType) error {
o = i
return nil
},
}
err = r.Deserialize(m)
return
}
func toAnyObject(m map[string]interface{}) (o vocab.ObjectType, err error) {
r := &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
o = i
return nil
},
}
err = r.Deserialize(m)
return
}

View File

@ -0,0 +1,80 @@
package pub
import (
"github.com/go-fed/activity/streams/vocab"
)
// inReplyToer is an ActivityStreams type with a 'inReplyTo' property
type inReplyToer interface {
GetActivityStreamsInReplyTo() vocab.ActivityStreamsInReplyToProperty
}
// objecter is an ActivityStreams type with a 'object' property
type objecter interface {
GetActivityStreamsObject() vocab.ActivityStreamsObjectProperty
}
// targeter is an ActivityStreams type with a 'target' property
type targeter interface {
GetActivityStreamsTarget() vocab.ActivityStreamsTargetProperty
}
// tagger is an ActivityStreams type with a 'tag' property
type tagger interface {
GetActivityStreamsTag() vocab.ActivityStreamsTagProperty
}
// hrefer is an ActivityStreams type with a 'href' property
type hrefer interface {
GetActivityStreamsHref() vocab.ActivityStreamsHrefProperty
}
// itemser is an ActivityStreams type with a 'items' property
type itemser interface {
GetActivityStreamsItems() vocab.ActivityStreamsItemsProperty
}
// orderedItemser is an ActivityStreams type with a 'orderedItems' property
type orderedItemser interface {
GetActivityStreamsOrderedItems() vocab.ActivityStreamsOrderedItemsProperty
}
// publisheder is an ActivityStreams type with a 'published' property
type publisheder interface {
GetActivityStreamsPublished() vocab.ActivityStreamsPublishedProperty
}
// toer is an ActivityStreams type with a 'to' property
type toer interface {
GetActivityStreamsTo() vocab.ActivityStreamsToProperty
}
// btoer is an ActivityStreams type with a 'bto' property
type btoer interface {
GetActivityStreamsBto() vocab.ActivityStreamsBtoProperty
}
// ccer is an ActivityStreams type with a 'cc' property
type ccer interface {
GetActivityStreamsCc() vocab.ActivityStreamsCcProperty
}
// bccer is an ActivityStreams type with a 'bcc' property
type bccer interface {
GetActivityStreamsBcc() vocab.ActivityStreamsBccProperty
}
// audiencer is an ActivityStreams type with a 'audience' property
type audiencer interface {
GetActivityStreamsAudience() vocab.ActivityStreamsAudienceProperty
}
// inboxer is an ActivityStreams type with a 'inbox' property
type inboxer interface {
GetActivityStreamsInbox() vocab.ActivityStreamsInboxProperty
}
// attributedToer is an ActivityStreams type with a 'attributedTo' property
type attributedToer interface {
GetActivityStreamsAttributedTo() vocab.ActivityStreamsAttributedToProperty
}

View File

@ -1,167 +1,481 @@
package pub
import (
"fmt"
"github.com/go-fed/activity/streams_old"
"github.com/go-fed/activity/vocab"
"net/url"
"context"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
)
// ToPubObject transforms a json-deserialized ActivityStream object into a
// PubObject for use with the pub library. Note that for an object to be an
// ActivityPub object, it must have an 'id' and at least one 'type'.
func ToPubObject(m map[string]interface{}) (t []PubObject, e error) {
r := &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
if !i.HasId() {
return fmt.Errorf("object type does not have an id: %q", i)
} else if i.TypeLen() == 0 {
return fmt.Errorf("object type does not have a type: %q", i)
}
t = append(t, i)
return nil
},
AnyLinkCallback: func(i vocab.LinkType) error {
if !i.HasId() {
return fmt.Errorf("link type does not have an id: %q", i)
} else if i.TypeLen() == 0 {
return fmt.Errorf("link type does not have a type: %q", i)
}
t = append(t, i)
return nil
},
}
e = r.Deserialize(m)
return t, e
// IsAnActivityType returns true if the ActivityStreams value is an Activity or
// extends from the Activity type.
func IsAnActivityType(value vocab.Type) bool {
return value.GetName() == "Activity" || streams.ActivityStreamsActivityIsExtendedBy(value)
}
func getActorObject(m map[string]interface{}) (actorObject, error) {
var a actorObject
err := toActorObjectResolver(&a).Deserialize(m)
return a, err
}
func toActorObjectResolver(a *actorObject) *streams.Resolver {
return &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
if o, ok := i.(actorObject); ok {
*a = o
}
// toAsType converts a generic map[string]interface{} into a known Type.
//
// Returns errors under the same conditions as streams.JSONResolver does.
func toType(c context.Context, m map[string]interface{}) (a vocab.Type, e error) {
var r *streams.JSONResolver
// Every time new types are added, need to update this list. It looks
// painful, but in practice VIM macros make it easier to manage.
//
// TODO: Somehow generate this more easily.
r, e = streams.NewJSONResolver(
func(ctx context.Context, i vocab.ActivityStreamsAccept) error {
a = i
return nil
},
}
}
func toActorResolver(a *actor) *streams.Resolver {
return &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
if o, ok := i.(actor); ok {
*a = o
}
func(ctx context.Context, i vocab.ActivityStreamsActivity) error {
a = i
return nil
},
}
}
func toActorCollectionResolver(a *actor, c **streams.Collection, oc **streams.OrderedCollection, cp **streams.CollectionPage, ocp **streams.OrderedCollectionPage) *streams.Resolver {
r := toActorResolver(a)
r.CollectionCallback = func(i *streams.Collection) error {
*c = i
return nil
}
r.OrderedCollectionCallback = func(i *streams.OrderedCollection) error {
*oc = i
return nil
}
r.CollectionPageCallback = func(i *streams.CollectionPage) error {
*cp = i
return nil
}
r.OrderedCollectionPageCallback = func(i *streams.OrderedCollectionPage) error {
*ocp = i
return nil
}
return r
}
func toIdResolver(ok *bool, u **url.URL) *streams.Resolver {
return &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
*ok = i.HasId()
if *ok {
*u = i.GetId()
}
func(ctx context.Context, i vocab.ActivityStreamsAdd) error {
a = i
return nil
},
}
}
func toCollectionPage(m map[string]interface{}) (c *streams.CollectionPage, err error) {
r := &streams.Resolver{
CollectionPageCallback: func(i *streams.CollectionPage) error {
c = i
func(ctx context.Context, i vocab.ActivityStreamsAnnounce) error {
a = i
return nil
},
}
err = r.Deserialize(m)
return
}
func toOrderedCollectionPage(m map[string]interface{}) (c *streams.OrderedCollectionPage, err error) {
r := &streams.Resolver{
OrderedCollectionPageCallback: func(i *streams.OrderedCollectionPage) error {
c = i
func(ctx context.Context, i vocab.ActivityStreamsApplication) error {
a = i
return nil
},
}
err = r.Deserialize(m)
return
}
func toTypeIder(m map[string]interface{}) (tid typeIder, err error) {
var t []typeIder
r := &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
t = append(t, i)
func(ctx context.Context, i vocab.ActivityStreamsArrive) error {
a = i
return nil
},
AnyLinkCallback: func(i vocab.LinkType) error {
t = append(t, i)
func(ctx context.Context, i vocab.ActivityStreamsArticle) error {
a = i
return nil
},
}
err = r.Deserialize(m)
if err != nil {
func(ctx context.Context, i vocab.ActivityStreamsAudio) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsBlock) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsCollection) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsCollectionPage) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsCreate) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsDelete) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsDislike) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsDocument) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsEvent) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsFlag) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsFollow) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsGroup) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsIgnore) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsImage) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsIntransitiveActivity) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsInvite) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsJoin) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsLeave) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsLike) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsLink) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsListen) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsMention) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsMove) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsNote) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsObject) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsOffer) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsOrderedCollection) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsOrderedCollectionPage) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsOrganization) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsPage) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsPerson) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsPlace) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsProfile) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsQuestion) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsRead) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsReject) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsRelationship) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsRemove) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsService) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsTentativeAccept) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsTentativeReject) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsTombstone) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsTravel) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsUndo) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsUpdate) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsVideo) error {
a = i
return nil
},
func(ctx context.Context, i vocab.ActivityStreamsView) error {
a = i
return nil
},
)
if e != nil {
return
}
// This should not be more than 1 as clients are not permitted to send
// an array of objects/links.
if len(t) != 1 {
err = fmt.Errorf("too many object/links: %d", len(t))
return
}
tid = t[0]
e = r.Resolve(c, m)
return
}
func toAnyActivity(m map[string]interface{}) (o vocab.ActivityType, err error) {
r := &streams.Resolver{
AnyActivityCallback: func(i vocab.ActivityType) error {
o = i
// addToCreate adds the object to the Create activity.
func addToCreate(ctx context.Context, c vocab.ActivityStreamsCreate, o vocab.Type) error {
obj := c.GetActivityStreamsObject()
if obj == nil {
obj = streams.NewActivityStreamsObjectProperty()
}
// Every time new types are added, need to update this list. It looks
// painful, but in practice VIM macros make it easier to manage.
//
// TODO: Somehow generate this more easily.
r, e := streams.NewTypeResolver(
func(ctx context.Context, v vocab.ActivityStreamsAccept) error {
obj.AppendActivityStreamsAccept(v)
return nil
},
}
err = r.Deserialize(m)
return
}
func toAnyObject(m map[string]interface{}) (o vocab.ObjectType, err error) {
r := &streams.Resolver{
AnyObjectCallback: func(i vocab.ObjectType) error {
o = i
func(ctx context.Context, v vocab.ActivityStreamsActivity) error {
obj.AppendActivityStreamsActivity(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsAdd) error {
obj.AppendActivityStreamsAdd(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsAnnounce) error {
obj.AppendActivityStreamsAnnounce(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsApplication) error {
obj.AppendActivityStreamsApplication(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsArrive) error {
obj.AppendActivityStreamsArrive(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsArticle) error {
obj.AppendActivityStreamsArticle(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsAudio) error {
obj.AppendActivityStreamsAudio(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsBlock) error {
obj.AppendActivityStreamsBlock(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsCollection) error {
obj.AppendActivityStreamsCollection(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsCollectionPage) error {
obj.AppendActivityStreamsCollectionPage(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsCreate) error {
obj.AppendActivityStreamsCreate(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsDelete) error {
obj.AppendActivityStreamsDelete(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsDislike) error {
obj.AppendActivityStreamsDislike(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsDocument) error {
obj.AppendActivityStreamsDocument(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsEvent) error {
obj.AppendActivityStreamsEvent(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsFlag) error {
obj.AppendActivityStreamsFlag(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsFollow) error {
obj.AppendActivityStreamsFollow(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsGroup) error {
obj.AppendActivityStreamsGroup(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsIgnore) error {
obj.AppendActivityStreamsIgnore(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsImage) error {
obj.AppendActivityStreamsImage(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsIntransitiveActivity) error {
obj.AppendActivityStreamsIntransitiveActivity(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsInvite) error {
obj.AppendActivityStreamsInvite(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsJoin) error {
obj.AppendActivityStreamsJoin(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsLeave) error {
obj.AppendActivityStreamsLeave(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsLike) error {
obj.AppendActivityStreamsLike(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsLink) error {
obj.AppendActivityStreamsLink(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsListen) error {
obj.AppendActivityStreamsListen(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsMention) error {
obj.AppendActivityStreamsMention(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsMove) error {
obj.AppendActivityStreamsMove(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsNote) error {
obj.AppendActivityStreamsNote(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsObject) error {
obj.AppendActivityStreamsObject(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsOffer) error {
obj.AppendActivityStreamsOffer(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsOrderedCollection) error {
obj.AppendActivityStreamsOrderedCollection(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsOrderedCollectionPage) error {
obj.AppendActivityStreamsOrderedCollectionPage(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsOrganization) error {
obj.AppendActivityStreamsOrganization(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsPage) error {
obj.AppendActivityStreamsPage(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsPerson) error {
obj.AppendActivityStreamsPerson(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsPlace) error {
obj.AppendActivityStreamsPlace(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsProfile) error {
obj.AppendActivityStreamsProfile(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsQuestion) error {
obj.AppendActivityStreamsQuestion(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsRead) error {
obj.AppendActivityStreamsRead(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsReject) error {
obj.AppendActivityStreamsReject(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsRelationship) error {
obj.AppendActivityStreamsRelationship(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsRemove) error {
obj.AppendActivityStreamsRemove(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsService) error {
obj.AppendActivityStreamsService(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsTentativeAccept) error {
obj.AppendActivityStreamsTentativeAccept(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsTentativeReject) error {
obj.AppendActivityStreamsTentativeReject(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsTombstone) error {
obj.AppendActivityStreamsTombstone(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsTravel) error {
obj.AppendActivityStreamsTravel(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsUndo) error {
obj.AppendActivityStreamsUndo(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsUpdate) error {
obj.AppendActivityStreamsUpdate(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsVideo) error {
obj.AppendActivityStreamsVideo(v)
return nil
},
func(ctx context.Context, v vocab.ActivityStreamsView) error {
obj.AppendActivityStreamsView(v)
return nil
},
)
if e != nil {
return e
}
err = r.Deserialize(m)
return
return r.Resolve(ctx, o)
}

686
pub/side_effect_actor.go Normal file
View File

@ -0,0 +1,686 @@
package pub
import (
"context"
"encoding/json"
"fmt"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"net/http"
"net/url"
"strings"
)
// sideEffectActor must satisfy the DelegateActor interface.
var _ DelegateActor = &sideEffectActor{}
// sideEffectActor is a DelegateActor that handles the ActivityPub
// implementation side effects, but requires a more opinionated application to
// be written.
//
// Note that when using the sideEffectActor with an application that good-faith
// implements its required interfaces, the ActivityPub specification is
// guaranteed to be correctly followed.
type sideEffectActor struct {
common CommonBehavior
s2s FederatingProtocol
c2s SocialProtocol
db Database
}
// AuthenticatePostInbox defers to the delegate to authenticate the request.
func (a *sideEffectActor) AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error) {
return a.s2s.AuthenticatePostInbox(c, w, r)
}
// AuthenticateGetInbox defers to the delegate to authenticate the request.
func (a *sideEffectActor) AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error) {
return a.common.AuthenticateGetInbox(c, w, r)
}
// AuthenticatePostOutbox defers to the delegate to authenticate the request.
func (a *sideEffectActor) AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error) {
return a.c2s.AuthenticatePostOutbox(c, w, r)
}
// AuthenticateGetOutbox defers to the delegate to authenticate the request.
func (a *sideEffectActor) AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error) {
return a.common.AuthenticateGetOutbox(c, w, r)
}
// GetOutbox delegates to the SocialProtocol.
func (a *sideEffectActor) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
// Compiler bug? Cannot directly return here:
// cannot use <T> as type vocab.ActivityStreamsOrderedCollectionPage in return argument
v1, v2 := a.c2s.GetOutbox(c, r)
return v1, v2
}
// GetInbox delegates to the FederatingProtocol.
func (a *sideEffectActor) GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
return a.s2s.GetInbox(c, r)
}
// AuthorizePostInbox defers to the federating protocol whether the peer request
// is authorized based on the actors' ids.
func (a *sideEffectActor) AuthorizePostInbox(c context.Context, w http.ResponseWriter, activity Activity) (shouldReturn bool, err error) {
actor := activity.GetActivityStreamsActor()
var iris []*url.URL
for i := 0; i < actor.Len(); i++ {
iter := actor.At(i)
if iter.IsIRI() {
iris = append(iris, iter.GetIRI())
} else if t := iter.GetType(); t != nil {
iris = append(iris, activity.GetActivityStreamsId().Get())
} else {
err = fmt.Errorf("actor at index %d is missing an id", i)
return
}
}
// Determine if the actor(s) sending this request are blocked.
if shouldReturn, err = a.s2s.Blocked(c, iris); err != nil {
return
} else if shouldReturn {
w.WriteHeader(http.StatusForbidden)
return
}
return
}
// PostInbox handles the side effects of determining whether to block the peer's
// request, adding the activity to the actor's inbox, and triggering side
// effects based on the activity's type.
func (a *sideEffectActor) PostInbox(c context.Context, inboxIRI *url.URL, activity Activity) error {
isNew, err := a.addToInboxIfNew(c, inboxIRI, activity)
if err != nil {
return err
}
if isNew {
wrapped, other := a.s2s.Callbacks()
// TODO: Wrap these callbacks with the old implementations.
res, err := streams.NewTypeResolver(append(wrapped.callbacks(), other...))
if err != nil {
return err
}
if err = res.Resolve(c, activity); err != nil {
return err
}
}
return nil
}
// InboxForwarding implements the 3-part inbox forwarding algorithm specified in
// the ActivityPub specification. Does not modify the Activity, but may send
// outbound requests as a side effect.
//
// InboxForwarding sets the federated data in the database.
func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, activity Activity) error {
// 1. Must be first time we have seen this Activity.
//
// Obtain the id of the activity
id := activity.GetActivityStreamsId()
// Acquire a lock for the id. To be held for the rest of execution.
err := a.db.Lock(c, id.Get())
if err != nil {
return err
}
defer a.db.Unlock(c, id.Get())
// If the database already contains the activity, exit early.
exists, err := a.db.Exists(c, id.Get())
if err != nil {
return err
} else if exists {
return nil
}
// Attempt to create the activity entry.
err = a.db.Create(c, activity)
if err != nil {
return err
}
// 2. The values of 'to', 'cc', or 'audience' are Collections owned by
// this server.
var r []*url.URL
to := activity.GetActivityStreamsTo()
for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
val, err := ToId(iter)
if err != nil {
return err
}
r = append(r, val)
}
cc := activity.GetActivityStreamsCc()
for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
val, err := ToId(iter)
if err != nil {
return err
}
r = append(r, val)
}
audience := activity.GetActivityStreamsAudience()
for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() {
val, err := ToId(iter)
if err != nil {
return err
}
r = append(r, val)
}
// Find all IRIs owned by this server. We need to find all of them so
// that forwarding can properly occur.
var myIRIs []*url.URL
for _, iri := range r {
if err != nil {
return err
}
if owns, err := a.db.Owns(c, iri); err != nil {
return err
} else if !owns {
continue
}
myIRIs = append(myIRIs, iri)
}
// Finally, load our IRIs to determine if they are a Collection or
// OrderedCollection.
//
// Load the unfiltered IRIs.
var colIRIs []*url.URL
col := make(map[string]itemser)
oCol := make(map[string]orderedItemser)
for _, iri := range myIRIs {
err = a.db.Lock(c, iri)
if err != nil {
return err
}
defer a.db.Unlock(c, iri)
t, err := a.db.Get(c, iri)
if err != nil {
return err
}
if streams.ActivityStreamsOrderedCollectionIsExtendedBy(t) {
if im, ok := t.(orderedItemser); ok {
oCol[iri.String()] = im
colIRIs = append(colIRIs, iri)
} else {
a.db.Unlock(c, iri)
}
} else if streams.ActivityStreamsCollectionIsExtendedBy(t) {
if im, ok := t.(itemser); ok {
col[iri.String()] = im
colIRIs = append(colIRIs, iri)
} else {
a.db.Unlock(c, iri)
}
} else {
a.db.Unlock(c, iri)
}
}
// If we own none of the Collection IRIs in 'to', 'cc', or 'audience'
// then no need to do inbox forwarding. We have nothing to forward to.
if len(colIRIs) == 0 {
return nil
}
// 3. The values of 'inReplyTo', 'object', 'target', or 'tag' are owned
// by this server. This is only a boolean trigger: As soon as we get
// a hit that we own something, then we should do inbox forwarding.
maxDepth := a.s2s.MaxInboxForwardingRecursionDepth()
ownsValue, err := a.hasInboxForwardingValues(c, activity, maxDepth, 0)
if err != nil {
return err
}
// If we don't own any of the 'inReplyTo', 'object', 'target', or 'tag'
// values, then no need to do inbox forwarding.
if !ownsValue {
return nil
}
// Do the inbox forwarding since the above conditions hold true. Support
// the behavior of letting the application filter out the resulting
// collections to be targeted.
toSend, err := a.s2s.FilterForwarding(c, colIRIs, activity)
if err != nil {
return err
}
recipients := make([]*url.URL, 0, len(toSend))
for _, iri := range toSend {
if c, ok := col[iri.String()]; ok {
it := c.GetActivityStreamsItems()
for iter := it.Begin(); iter != it.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
recipients = append(recipients, id)
}
} else if oc, ok := oCol[iri.String()]; ok {
oit := oc.GetActivityStreamsOrderedItems()
for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
recipients = append(recipients, id)
}
}
}
return a.deliverToRecipients(c, inboxIRI, activity, recipients)
}
// PostOutbox handles the side effects of adding the activity to the actor's
// outbox, and triggering side effects based on the activity's type.
//
// This implementation assumes all types are meant to be delivered except for
// the ActivityStreams Block type.
func (a *sideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL) (deliverable bool, e error) {
wrapped, other := a.c2s.Callbacks()
// TODO: Wrap these callbacks with the old implementations.
// TODO: populate deliverable
res, err := streams.NewTypeResolver(append(wrapped.callbacks(), other...))
if err != nil {
return
}
if err = res.Resolve(c, activity); err != nil {
return
}
err = a.addToOutbox(c, outboxIRI, activity)
return
}
// AddNewIds creates new 'id' entries on an activity and its objects if it is a
// Create activity.
func (a *sideEffectActor) AddNewIds(c context.Context, activity Activity) error {
id, err := a.db.NewId(c, activity)
if err != nil {
return err
}
activityId := streams.NewActivityStreamsIdProperty()
activityId.Set(id)
activity.SetActivityStreamsId(activityId)
if streams.ActivityStreamsCreateIsExtendedBy(activity) {
o, ok := activity.(objecter)
if !ok {
return fmt.Errorf("cannot add new id for Create: %T has no object property", activity)
}
oProp := o.GetActivityStreamsObject()
for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() {
t := iter.GetType()
if t == nil {
return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal")
}
id, err = a.db.NewId(c, t)
if err != nil {
return err
}
objId := streams.NewActivityStreamsIdProperty()
objId.Set(id)
t.SetActivityStreamsId(objId)
}
}
return nil
}
// deliver will complete the peer-to-peer sending of a federated message to
// another server.
//
// Must only be called if both social and federated protocols are supported.
func (a *sideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activity Activity) error {
recipients, err := a.prepare(c, outboxIRI, activity)
if err != nil {
return err
}
return a.deliverToRecipients(c, outboxIRI, activity, recipients)
}
// WrapInCreate wraps an object with a Create activity.
func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) {
actorIri, err := a.c2s.ActorIRI(c, outboxIRI)
if err != nil {
return
}
return wrapInCreate(c, obj, actorIri)
}
// deliverToRecipients will take a prepared Activity and send it to specific
// recipients on behalf of an actor.
func (a *sideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL, activity Activity, recipients []*url.URL) error {
m, err := serialize(activity)
if err != nil {
return err
}
b, err := json.Marshal(m)
if err != nil {
return err
}
tp, err := a.s2s.NewTransport(boxIRI, goFedUserAgent())
if err != nil {
return err
}
mErr := make(map[string]error)
for _, to := range recipients {
err := tp.Deliver(c, b, to)
if err != nil {
mErr[to.String()] = err
}
}
if len(mErr) > 0 {
s := make([]string, 0, len(mErr))
for k, v := range mErr {
s = append(s, fmt.Sprintf("%s=%s", k, v.Error()))
}
return fmt.Errorf("requests failed: %s", strings.Join(s, ";"))
}
return nil
}
// addToOutbox adds the activity to the outbox and creates the activity in the
// internal database as its own entry.
func (a *sideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, activity Activity) error {
// Set the activity in the database first.
id := activity.GetActivityStreamsId()
err := a.db.Lock(c, id.Get())
if err != nil {
return err
}
// WARNING: Unlock not deferred
err = a.db.Create(c, activity)
if err != nil {
a.db.Unlock(c, id.Get())
return err
}
a.db.Unlock(c, id.Get())
// WARNING: Unlock(c, id) should be called by this point and in every
// return before here.
// Acquire a lock to read the outbox. Defer release.
err = a.db.Lock(c, outboxIRI)
if err != nil {
return err
}
defer a.db.Unlock(c, outboxIRI)
outbox, err := a.db.GetOutbox(c, outboxIRI)
if err != nil {
return err
}
// Prepend the activity to the list of 'orderedItems'.
oi := outbox.GetActivityStreamsOrderedItems()
if oi == nil {
oi = streams.NewActivityStreamsOrderedItemsProperty()
}
oi.PrependIRI(id.Get())
outbox.SetActivityStreamsOrderedItems(oi)
// Save in the database.
err = a.db.SetOutbox(c, outbox)
return err
}
// addToInboxIfNew will add the activity to the inbox at the specified IRI if
// the activity's ID has not yet been added to the inbox.
//
// It does not add the activity to this database's know federated data.
//
// Returns true when the activity is novel.
func (a *sideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, activity Activity) (isNew bool, err error) {
// Acquire a lock to read the inbox. Defer release.
err = a.db.Lock(c, inboxIRI)
if err != nil {
return
}
defer a.db.Unlock(c, inboxIRI)
// Obtain the id of the activity
id := activity.GetActivityStreamsId()
// If the inbox already contains the URL, early exit.
contains, err := a.db.InboxContains(c, inboxIRI, id.Get())
if err != nil {
return
} else if contains {
return
}
// It is a new id, acquire the inbox.
isNew = true
inbox, err := a.db.GetInbox(c, inboxIRI)
if err != nil {
return
}
// Prepend the activity to the list of 'orderedItems'.
oi := inbox.GetActivityStreamsOrderedItems()
if oi == nil {
oi = streams.NewActivityStreamsOrderedItemsProperty()
}
oi.PrependIRI(id.Get())
inbox.SetActivityStreamsOrderedItems(oi)
// Save in the database.
err = a.db.SetInbox(c, inbox)
return
}
// Given an ActivityStreams value, recursively examines ownership of the id or
// href and the ones on properties applicable to inbox forwarding.
//
// Recursion may be limited by providing a 'maxDepth' greater than zero. A
// value of zero or a negative number will result in infinite recursion.
func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, val vocab.Type, maxDepth, currDepth int) (bool, error) {
// Stop recurring if we are exceeding the maximum depth and the maximum
// is a positive number.
if maxDepth > 0 && currDepth >= maxDepth {
return false, nil
}
// Determine if we own the 'id' for this value.
id, err := GetId(val)
if err != nil {
return false, err
}
if owns, err := a.db.Owns(c, id); err != nil {
return false, err
} else if owns {
return true, nil
}
// Determine if we own the 'id' of any values on the properties we care
// about.
types, iris := getInboxForwardingValues(val)
// For IRIs, simply check if we own them.
// TODO: Dereference and recur.
for _, iri := range iris {
if owns, err := a.db.Owns(c, iri); err != nil {
return false, err
} else if owns {
return true, nil
}
}
// For embedded literals, recur.
for _, nextVal := range types {
if has, err := a.hasInboxForwardingValues(c, nextVal, maxDepth, currDepth+1); err != nil {
return false, err
} else if has {
return true, nil
}
}
return false, nil
}
// prepare takes a deliverableObject and returns a list of the proper recipient
// target URIs. Additionally, the deliverableObject will have any hidden
// hidden recipients ("bto" and "bcc") stripped from it.
//
// Only call if both the social and federated protocol are supported.
func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activity Activity) (r []*url.URL, err error) {
// Get inboxes of recipients
if to := activity.GetActivityStreamsTo(); to != nil {
for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
if err != nil {
return
}
r = append(r, val)
}
}
if bto := activity.GetActivityStreamsBto(); bto != nil {
for iter := bto.Begin(); iter != bto.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
if err != nil {
return
}
r = append(r, val)
}
}
if cc := activity.GetActivityStreamsCc(); cc != nil {
for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
if err != nil {
return
}
r = append(r, val)
}
}
if bcc := activity.GetActivityStreamsBcc(); bcc != nil {
for iter := bcc.Begin(); iter != bcc.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
if err != nil {
return
}
r = append(r, val)
}
}
if audience := activity.GetActivityStreamsAudience(); audience != nil {
for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() {
var val *url.URL
val, err = ToId(iter)
if err != nil {
return
}
r = append(r, val)
}
}
// TODO: Support delivery to shared inbox
// 1. When an object is being delivered to the originating actor's
// followers, a server MAY reduce the number of receiving actors
// delivered to by identifying all followers which share the same
// sharedInbox who would otherwise be individual recipients and
// instead deliver objects to said sharedInbox.
// 2. If an object is addressed to the Public special collection, a
// server MAY deliver that object to all known sharedInbox endpoints
// on the network.
r = filterURLs(r, IsPublic)
t, err := a.s2s.NewTransport(outboxIRI, goFedUserAgent())
if err != nil {
return nil, err
}
receiverActors, err := a.resolveInboxes(c, t, r, 0, a.s2s.MaxDeliveryRecursionDepth())
if err != nil {
return nil, err
}
targets, err := getInboxes(receiverActors)
if err != nil {
return nil, err
}
// Get inboxes of sender.
actorIRI, err := a.c2s.ActorIRI(c, outboxIRI)
if err != nil {
return nil, err
}
// Make sure this matches the 'attributedTo' on the activity.
attrTo := activity.GetActivityStreamsAttributedTo()
if attrTo.Len() != 1 {
return nil, fmt.Errorf("federated c2s object does not have exactly one attributedTo value: %d", attrTo.Len())
} else if attrToIRI, err := ToId(attrTo.At(0)); err != nil {
return nil, err
} else if attrToIRI.String() != actorIRI.String() {
return nil, fmt.Errorf("federated c2s object attributedTo value does not match this actor")
}
// Get the inbox on the sender.
err = a.db.Lock(c, actorIRI)
if err != nil {
return nil, err
}
// BEGIN LOCK
thisActor, err := a.db.Get(c, actorIRI)
a.db.Unlock(c, actorIRI)
// END LOCK -- Still need to handle err
if err != nil {
return nil, err
}
// Post-processing
var ignore *url.URL
ignore, err = getInbox(thisActor)
if err != nil {
return nil, err
}
r = dedupeIRIs(targets, []*url.URL{ignore})
stripHiddenRecipients(activity)
return r, nil
}
// resolveInboxes takes a list of Actor id URIs and returns them as concrete
// instances of actorObject. It attempts to apply recursively when it encounters
// a target that is a Collection or OrderedCollection.
//
// If maxDepth is zero or negative, then recursion is infinitely applied.
//
// If a recipient is a Collection or OrderedCollection, then the server MUST
// dereference the collection, WITH the user's credentials.
//
// Note that this also applies to CollectionPage and OrderedCollectionPage.
// TODO: Handle Page types by paginating.
func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*url.URL, depth, maxDepth int) (actors []vocab.Type, err error) {
if maxDepth > 0 && depth >= maxDepth {
return
}
for _, u := range r {
var act vocab.Type
var more []*url.URL
act, more, err = a.dereferenceForResolvingInboxes(c, t, u)
if err != nil {
return
}
var recurActors []vocab.Type
recurActors, err = a.resolveInboxes(c, t, more, depth+1, maxDepth)
if err != nil {
return
}
actors = append(actors, act)
actors = append(actors, recurActors...)
}
return
}
// dereferenceForResolvingInboxes dereferences an IRI solely for finding an
// actor's inbox IRI to deliver to.
func (a *sideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Transport, actorIRI *url.URL) (actor vocab.Type, moreActorIRIs []*url.URL, err error) {
var resp []byte
resp, err = t.Dereference(c, actorIRI)
if err != nil {
return
}
var m map[string]interface{}
if err = json.Unmarshal(resp, &m); err != nil {
return
}
actor, err = toType(c, m)
if err != nil {
return
}
// Attempt to see if the 'actor' is really some sort of type that has
// an 'items' or 'orderedItems' property.
if v, ok := actor.(itemser); ok {
i := v.GetActivityStreamsItems()
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
moreActorIRIs = append(moreActorIRIs, id)
}
} else if v, ok := actor.(orderedItemser); ok {
i := v.GetActivityStreamsOrderedItems()
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
moreActorIRIs = append(moreActorIRIs, id)
}
}
return
}

54
pub/social_protocol.go Normal file
View File

@ -0,0 +1,54 @@
package pub
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"net/http"
"net/url"
)
// SocialProtocol contains behaviors an application needs to satisfy for the
// full ActivityPub C2S implementation to be supported by this library.
//
// It is only required if the client application wants to support the client-to-
// server, or social, protocol.
type SocialProtocol interface {
// AuthenticatePostOutbox delegates the authentication of a POST to an
// outbox.
//
// Only called if the Social API is enabled.
//
// If an error is returned, it is passed back to the caller of
// PostOutbox. In this case, the implementation must not write a
// response to the ResponseWriter as is expected that the client will
// do so when handling the error. The 'shouldReturn' is ignored.
//
// If no error is returned, but authentication or authorization fails,
// then shouldReturn must be true and error nil. It is expected that
// the implementation handles writing to the ResponseWriter in this
// case.
//
// Finally, if the authentication and authorization succeeds, then
// shouldReturn must be false and error nil. The request will continue
// to be processed.
AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (shouldReturn bool, err error)
// Callbacks returns the application logic that handles ActivityStreams
// received from C2S clients. Note that certain types of callbacks
// will be 'wrapped' with default behaviors supported natively by the
// library. Other callbacks compatible with streams.TypeResolver can
// be specified by 'other'.
//
// Note that the functions in 'wrapped' cannot be provided in 'other'.
Callbacks() (wrapped WrappedCallbacks, other []interface{})
// ActorIRI fetches the outbox's actor's IRI.
ActorIRI(c context.Context, outboxIRI *url.URL) (actorIRI *url.URL, err error)
// GetOutbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetOutbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
}

130
pub/transport.go Normal file
View File

@ -0,0 +1,130 @@
package pub
import (
"bytes"
"context"
"crypto"
"fmt"
"github.com/go-fed/httpsig"
"io/ioutil"
"net/http"
"net/url"
)
const (
// acceptHeaderValue is the Accept header value indicating that the
// response should contain an ActivityStreams object.
acceptHeaderValue = "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""
)
// Transport makes ActivityStreams calls to other servers in order to POST or
// GET ActivityStreams data.
type Transport interface {
// Dereference fetches the ActivityStreams object located at this IRI
// with a GET request.
Dereference(c context.Context, iri *url.URL) ([]byte, error)
// Deliver sends an ActivityStreams object.
Deliver(c context.Context, b []byte, to *url.URL) error
}
// Transport must be implemented by HttpSigTransport.
var _ Transport = &HttpSigTransport{}
// HttpSigTransport makes a dereference call using HTTP signatures to
// authenticate the request on behalf of a particular actor.
//
// No rate limiting is applied.
//
// Only one request is tried per call.
type HttpSigTransport struct {
client HttpClient
appAgent string
gofedAgent string
clock Clock
signer httpsig.Signer
pubKeyId string
privKey crypto.PrivateKey
}
// NewHttpSigTransport returns a new HttpSigTransport.
func NewHttpSigTransport(
client HttpClient,
appAgent, gofedAgent string,
clock Clock,
signer httpsig.Signer,
pubKeyId string,
privKey crypto.PrivateKey) *HttpSigTransport {
return &HttpSigTransport{
client: client,
appAgent: appAgent,
gofedAgent: gofedAgent,
clock: clock,
signer: signer,
pubKeyId: pubKeyId,
privKey: privKey,
}
}
// Dereferences with a request signed with an HTTP Signature.
func (h HttpSigTransport) Dereference(c context.Context, iri *url.URL) ([]byte, error) {
req, err := http.NewRequest("GET", iri.String(), nil)
if err != nil {
return nil, err
}
req.WithContext(c)
req.Header.Add(acceptHeader, acceptHeaderValue)
req.Header.Add("Accept-Charset", "utf-8")
req.Header.Add("Date", h.clock.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
req.Header.Add("User-Agent", fmt.Sprintf("%s %s", h.appAgent, h.gofedAgent))
err = h.signer.SignRequest(h.privKey, h.pubKeyId, req)
if err != nil {
return nil, err
}
resp, err := h.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("GET request to %s failed (%d): %s", iri.String(), resp.StatusCode, resp.Status)
}
return ioutil.ReadAll(resp.Body)
}
// Deliver sends a POST request with an HTTP Signature.
func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) error {
byteCopy := make([]byte, len(b))
copy(byteCopy, b)
buf := bytes.NewBuffer(byteCopy)
req, err := http.NewRequest("POST", to.String(), buf)
if err != nil {
return err
}
req.WithContext(c)
req.Header.Add(contentTypeHeader, contentTypeHeaderValue)
req.Header.Add("Accept-Charset", "utf-8")
req.Header.Add("Date", h.clock.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
req.Header.Add("User-Agent", fmt.Sprintf("%s %s", h.appAgent, h.gofedAgent))
err = h.signer.SignRequest(h.privKey, h.pubKeyId, req)
if err != nil {
return err
}
resp, err := h.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("POST request to %s failed (%d): %s", to.String(), resp.StatusCode, resp.Status)
}
return nil
}
// HttpClient sends http requests, and is an abstraction only needed by the
// HttpSigTransport. The standard library's Client satisfies this interface.
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}
// HttpClient must be implemented by http.Client.
var _ HttpClient = &http.Client{}

450
pub/util.go Normal file
View File

@ -0,0 +1,450 @@
package pub
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"errors"
"fmt"
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"net/http"
"net/url"
"strings"
)
var (
// ErrObjectRequired indicates the activity needs its object property
// set. Can be returned by DelegateActor's PostInbox or PostOutbox so a
// Bad Request response is set.
ErrObjectRequired = errors.New("object property required on the provided activity")
// ErrTargetRequired indicates the activity needs its target property
// set. Can be returned by DelegateActor's PostInbox or PostOutbox so a
// Bad Request response is set.
ErrTargetRequired = errors.New("target property required on the provided activity")
)
// activityStreamsMediaTypes contains all of the accepted ActivityStreams media
// types. Generated at init time.
var activityStreamsMediaTypes []string
func init() {
activityStreamsMediaTypes = []string{
"application/activity+json",
}
jsonLdType := "application/ld+json"
for _, semi := range []string{";", " ;", " ; ", "; "} {
for _, profile := range []string{
"profile=https://www.w3.org/ns/activitystreams",
"profile=\"https://www.w3.org/ns/activitystreams\"",
} {
activityStreamsMediaTypes = append(
activityStreamsMediaTypes,
fmt.Sprintf("%s%s%s", jsonLdType, semi, profile))
}
}
}
// headerIsActivityPubMediaType returns true if the header string contains one
// of the accepted ActivityStreams media types.
//
// Note we don't try to build a comprehensive parser and instead accept a
// tolerable amount of whitespace since the HTTP specification is ambiguous
// about the format and significance of whitespace.
func headerIsActivityPubMediaType(header string) bool {
for _, mediaType := range activityStreamsMediaTypes {
if strings.Contains(header, mediaType) {
return true
}
}
return false
}
const (
// The Content-Type header.
contentTypeHeader = "Content-Type"
// The Accept header.
acceptHeader = "Accept"
)
// isActivityPubPost returns true if the request is a POST request that has the
// ActivityStreams content type header
func isActivityPubPost(r *http.Request) bool {
return r.Method == "POST" && headerIsActivityPubMediaType(r.Header.Get(contentTypeHeader))
}
// isActivityPubGet returns true if the request is a GET request that has the
// ActivityStreams content type header
func isActivityPubGet(r *http.Request) bool {
return r.Method == "GET" && headerIsActivityPubMediaType(r.Header.Get(acceptHeader))
}
// dedupeOrderedItems deduplicates the 'orderedItems' within an ordered
// collection type. Deduplication happens by the 'id' property.
func dedupeOrderedItems(oc orderedItemser) error {
oi := oc.GetActivityStreamsOrderedItems()
if oi == nil {
return nil
}
seen := make(map[string]bool, oi.Len())
for i := 0; i < oi.Len(); {
var id *url.URL
iter := oi.At(i)
asType := iter.GetType()
if asType != nil {
var err error
id, err = GetId(asType)
if err != nil {
return err
}
} else if iter.IsIRI() {
id = iter.GetIRI()
} else {
return fmt.Errorf("element %d in OrderedCollection does not have an ID nor is an IRI", i)
}
if seen[id.String()] {
oi.Remove(i)
} else {
seen[id.String()] = true
i++
}
}
return nil
}
const (
// jsonLDContext is the key for the JSON-LD specification's context
// value. It contains the definitions of the types contained within the
// rest of the payload. Important for linked-data representations, but
// only applicable to go-fed at code-generation time.
jsonLDContext = "@context"
)
// addJSONLDContext adds the
func serialize(a vocab.Type) (m map[string]interface{}, e error) {
m, e = a.Serialize()
if e != nil {
return
}
v := a.JSONLDContext()
// Transform the map of vocabulary-to-aliases into a context payload,
// but do so in a way that at least keeps it readable for other humans.
var contextValue interface{}
if len(v) == 1 {
for vocab, alias := range v {
if len(alias) == 0 {
contextValue = vocab
} else {
contextValue = map[string]string{
alias: vocab,
}
}
}
} else {
var arr []interface{}
aliases := make(map[string]string)
for vocab, alias := range v {
if len(alias) == 0 {
arr = append(arr, vocab)
} else {
aliases[alias] = vocab
}
}
contextValue = append(arr, aliases)
}
m[jsonLDContext] = contextValue
return
}
const (
// Contains the ActivityStreams Content-Type value.
contentTypeHeaderValue = "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""
// The Date header.
dateHeader = "Date"
// The Digest header.
digestHeader = "Digest"
// The delimiter used in the Digest header.
digestDelimiter = "="
// SHA-256 string for the Digest header.
sha256Digest = "SHA-256"
)
// addResponseHeaders sets headers needed in the HTTP response, such but not
// limited to the Content-Type, Date, and Digest headers.
func addResponseHeaders(h http.Header, c Clock, responseContent []byte) {
h.Set(contentTypeHeader, contentTypeHeaderValue)
// RFC 7231 §7.1.1.2
h.Set(dateHeader, c.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
// RFC 3230 and RFC 5843
var b bytes.Buffer
b.WriteString(sha256Digest)
b.WriteString(digestDelimiter)
hashed := sha256.Sum256(responseContent)
b.WriteString(base64.StdEncoding.EncodeToString(hashed[:]))
h.Set(digestHeader, b.String())
}
// IdProperty is a property that can readily have its id obtained
type IdProperty interface {
// GetIRI returns the IRI of this property. When IsIRI returns false,
// GetIRI will return an arbitrary value.
GetIRI() *url.URL
// GetType returns the value in this property as a Type. Returns nil if
// the value is not an ActivityStreams type, such as an IRI or another
// value.
GetType() vocab.Type
// IsIRI returns true if this property is an IRI.
IsIRI() bool
}
// ToId returns an IdProperty's id.
func ToId(i IdProperty) (*url.URL, error) {
if i.GetType() != nil {
return GetId(i.GetType())
} else if i.IsIRI() {
return i.GetIRI(), nil
}
return nil, fmt.Errorf("cannot determine id of activitystreams property")
}
// GetId will attempt to find the 'id' property or, if it happens to be a
// Link or derived from Link type, the 'href' property instead.
//
// Returns an error if the id is not set and either the 'href' property is not
// valid on this type, or it is also not set.
func GetId(t vocab.Type) (*url.URL, error) {
if id := t.GetActivityStreamsId(); id != nil {
return id.Get(), nil
} else if h, ok := t.(hrefer); ok {
if href := h.GetActivityStreamsHref(); href != nil {
return href.Get(), nil
}
}
return nil, fmt.Errorf("cannot determine id of activitystreams value")
}
// getInboxForwardingValues obtains the 'inReplyTo', 'object', 'target', and
// 'tag' values on an ActivityStreams value.
func getInboxForwardingValues(o vocab.Type) (t []vocab.Type, iri []*url.URL) {
// 'inReplyTo'
if i, ok := o.(inReplyToer); ok {
irt := i.GetActivityStreamsInReplyTo()
for iter := irt.Begin(); iter != irt.End(); iter = iter.Next() {
if tv := iter.GetType(); tv != nil {
t = append(t, tv)
} else {
iri = append(iri, iter.GetIRI())
}
}
}
// 'tag'
if i, ok := o.(tagger); ok {
tag := i.GetActivityStreamsTag()
for iter := tag.Begin(); iter != tag.End(); iter = iter.Next() {
if tv := iter.GetType(); tv != nil {
t = append(t, tv)
} else {
iri = append(iri, iter.GetIRI())
}
}
}
// 'object'
if i, ok := o.(objecter); ok {
obj := i.GetActivityStreamsObject()
for iter := obj.Begin(); iter != obj.End(); iter = iter.Next() {
if tv := iter.GetType(); tv != nil {
t = append(t, tv)
} else {
iri = append(iri, iter.GetIRI())
}
}
}
// 'target'
if i, ok := o.(targeter); ok {
tar := i.GetActivityStreamsTarget()
for iter := tar.Begin(); iter != tar.End(); iter = iter.Next() {
if tv := iter.GetType(); tv != nil {
t = append(t, tv)
} else {
iri = append(iri, iter.GetIRI())
}
}
}
return
}
// wrapInCreate will automatically wrap the provided object in a Create
// activity. This will copy over the 'to', 'bto', 'cc', 'bcc', and 'audience'
// properties. It will also copy over the published time if present.
func wrapInCreate(ctx context.Context, o vocab.Type, actor *url.URL) (c vocab.ActivityStreamsCreate, err error) {
c = streams.NewActivityStreamsCreate()
// Object property
oProp := streams.NewActivityStreamsObjectProperty()
addToCreate(ctx, c, o)
c.SetActivityStreamsObject(oProp)
// Actor Property
actorProp := streams.NewActivityStreamsActorProperty()
actorProp.AppendIRI(actor)
c.SetActivityStreamsActor(actorProp)
// Published Property
if v, ok := o.(publisheder); ok {
c.SetActivityStreamsPublished(v.GetActivityStreamsPublished())
}
// Copying over properties.
if v, ok := o.(toer); ok {
activityTo := streams.NewActivityStreamsToProperty()
to := v.GetActivityStreamsTo()
for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
activityTo.AppendIRI(id)
}
c.SetActivityStreamsTo(activityTo)
}
if v, ok := o.(btoer); ok {
activityBto := streams.NewActivityStreamsBtoProperty()
bto := v.GetActivityStreamsBto()
for iter := bto.Begin(); iter != bto.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
activityBto.AppendIRI(id)
}
c.SetActivityStreamsBto(activityBto)
}
if v, ok := o.(ccer); ok {
activityCc := streams.NewActivityStreamsCcProperty()
cc := v.GetActivityStreamsCc()
for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
activityCc.AppendIRI(id)
}
c.SetActivityStreamsCc(activityCc)
}
if v, ok := o.(bccer); ok {
activityBcc := streams.NewActivityStreamsBccProperty()
bcc := v.GetActivityStreamsBcc()
for iter := bcc.Begin(); iter != bcc.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
activityBcc.AppendIRI(id)
}
c.SetActivityStreamsBcc(activityBcc)
}
if v, ok := o.(audiencer); ok {
activityAudience := streams.NewActivityStreamsAudienceProperty()
aud := v.GetActivityStreamsAudience()
for iter := aud.Begin(); iter != aud.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
activityAudience.AppendIRI(id)
}
c.SetActivityStreamsAudience(activityAudience)
}
return
}
// filterURLs removes urls whose strings match the provided filter
func filterURLs(u []*url.URL, fn func(s string) bool) []*url.URL {
i := 0
for i < len(u) {
if fn(u[i].String()) {
u = append(u[:i], u[i+1:]...)
} else {
i++
}
}
return u
}
const (
// PublicActivityPubIRI is the IRI that indicates an Activity is meant
// to be visible for general public consumption.
PublicActivityPubIRI = "https://www.w3.org/ns/activitystreams#Public"
publicJsonLD = "Public"
publicJsonLDAS = "as:Public"
)
// IsPublic determines if an IRI string is the Public collection as defined in
// the spec, including JSON-LD compliant collections.
func IsPublic(s string) bool {
return s == PublicActivityPubIRI || s == publicJsonLD || s == publicJsonLDAS
}
// getInboxes extracts the 'inbox' IRIs from actor types.
func getInboxes(t []vocab.Type) (u []*url.URL, err error) {
for _, elem := range t {
var iri *url.URL
iri, err = getInbox(elem)
if err != nil {
return
}
u = append(u, iri)
}
return
}
// getInbox extracts the 'inbox' IRI from an actor type.
func getInbox(t vocab.Type) (u *url.URL, err error) {
ib, ok := t.(inboxer)
if !ok {
err = fmt.Errorf("actor type %T has no inbox", t)
return
}
inbox := ib.GetActivityStreamsInbox()
return ToId(inbox)
}
// dedupeIRIs will deduplicate final inbox IRIs. The ignore list is applied to
// the final list.
func dedupeIRIs(recipients, ignored []*url.URL) (out []*url.URL) {
ignoredMap := make(map[string]bool, len(ignored))
for _, elem := range ignored {
ignoredMap[elem.String()] = true
}
outMap := make(map[string]bool, len(recipients))
for _, k := range recipients {
kStr := k.String()
if !ignoredMap[kStr] && !outMap[kStr] {
out = append(out, k)
outMap[kStr] = true
}
}
return
}
// stripHiddenRecipients removes "bto" and "bcc" from the activity.
//
// Note that this requirement of the specification is under "Section 6: Client
// to Server Interactions", the Social API, and not the Federative API.
func stripHiddenRecipients(activity Activity) {
bto := activity.GetActivityStreamsBto()
if bto != nil {
for i := bto.Len() - 1; i >= 0; i-- {
bto.Remove(i)
}
}
bcc := activity.GetActivityStreamsBcc()
if bcc != nil {
for i := bcc.Len() - 1; i >= 0; i-- {
bcc.Remove(i)
}
}
}

15
pub/version.go Normal file
View File

@ -0,0 +1,15 @@
package pub
import (
"fmt"
)
const (
// Version string, used in the User-Agent
version = "v1.0.0"
)
// goFedUserAgent returns the user agent string for the go-fed library.
func goFedUserAgent() string {
return fmt.Sprintf("(go-fed/activity %s)", version)
}

99
pub/wrapped_callbacks.go Normal file
View File

@ -0,0 +1,99 @@
package pub
import (
"context"
"github.com/go-fed/activity/streams/vocab"
)
// WrappedCallbacks lists the callback functions that already have some side
// effect behavior provided by the pub library.
//
// These functions may be wrapped for either the Federating Protocol or the
// Social API. However, the side effects in these callbacks should not be the
// same for both of these use cases.
//
// These are not used when using a DelegateActor directly. The wrapping
// behaviors defined below would need to be handled by another implementation
// of that interface.
type WrappedCallbacks struct {
// Create handles additional side effects for the Create ActivityStreams
// type.
//
// The wrapping callback for the Federating Protocol will ensure the
// 'object' property exists, create an entry in the database, and add it
// to the recipient(s) inbox if not yet already in the inbox.
//
// The wrapping callback for the Social API copies the actor(s) to the
// 'attributedTo' property, copying recipients between the Create
// activity and all objects, save the entry in the database, and adds it
// to the outbox.
Create func(context.Context, vocab.ActivityStreamsCreate) error
// Update handles additional side effects for the Update ActivityStreams
// type.
//
// TODO: Describe
Update func(context.Context, vocab.ActivityStreamsUpdate) error
// Delete handles additional side effects for the Delete ActivityStreams
// type.
//
// TODO: Describe
Delete func(context.Context, vocab.ActivityStreamsDelete) error
// Follow handles additional side effects for the Follow ActivityStreams
// type.
//
// TODO: Describe
Follow func(context.Context, vocab.ActivityStreamsFollow) error
// Accept handles additional side effects for the Accept ActivityStreams
// type.
//
// TODO: Describe
Accept func(context.Context, vocab.ActivityStreamsAccept) error
// Reject handles additional side effects for the Reject ActivityStreams
// type.
//
// TODO: Describe
Reject func(context.Context, vocab.ActivityStreamsReject) error
// Add handles additional side effects for the Add ActivityStreams
// type.
//
// TODO: Describe
Add func(context.Context, vocab.ActivityStreamsAdd) error
// Remove handles additional side effects for the Remove ActivityStreams
// type.
//
// TODO: Describe
Remove func(context.Context, vocab.ActivityStreamsRemove) error
// Like handles additional side effects for the Like ActivityStreams
// type.
//
// TODO: Describe
Like func(context.Context, vocab.ActivityStreamsLike) error
// Undo handles additional side effects for the Undo ActivityStreams
// type.
//
// TODO: Describe
Undo func(context.Context, vocab.ActivityStreamsUndo) error
// Block handles additional side effects for the Block ActivityStreams
// type.
//
// TODO: Describe
Block func(context.Context, vocab.ActivityStreamsBlock) error
}
// callbacks returns the WrappedCallbacks members into a single interface slice
// for use in streams.Resolver callbacks.
func (w WrappedCallbacks) callbacks() []interface{} {
return []interface{}{
w.Create,
w.Update,
w.Delete,
w.Follow,
w.Accept,
w.Reject,
w.Add,
w.Remove,
w.Like,
w.Undo,
w.Block,
}
}