diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index fa4ec2ad3c..a8034bddc6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -12,14 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
from twisted.internet import defer
from .federation_base import FederationBase
from .units import Transaction, Edu
-from synapse.util.async import Linearizer
+from synapse.util import async
from synapse.util.logutils import log_function
from synapse.util.caches.response_cache import ResponseCache
from synapse.events import FrozenEvent
@@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature
import simplejson as json
import logging
+# when processing incoming transactions, we try to handle multiple rooms in
+# parallel, up to this limit.
+TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
@@ -52,7 +53,8 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
- self._server_linearizer = Linearizer("fed_server")
+ self._server_linearizer = async.Linearizer("fed_server")
+ self._transaction_linearizer = async.Linearizer("fed_txn_handler")
# We cache responses to state queries, as they take a while and often
# come in waves.
@@ -111,12 +113,39 @@ class FederationServer(FederationBase):
def on_incoming_transaction(self, transaction_data):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
- request_time = int(self._clock.time_msec())
+ request_time = self._clock.time_msec()
transaction = Transaction(**transaction_data)
+ if not transaction.transaction_id:
+ raise Exception("Transaction missing transaction_id")
+ if not transaction.origin:
+ raise Exception("Transaction missing origin")
+
logger.debug("[%s] Got transaction", transaction.transaction_id)
+ # use a linearizer to ensure that we don't process the same transaction
+ # multiple times in parallel.
+ with (yield self._transaction_linearizer.queue(
+ (transaction.origin, transaction.transaction_id),
+ )):
+ result = yield self._handle_incoming_transaction(
+ transaction, request_time,
+ )
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _handle_incoming_transaction(self, transaction, request_time):
+ """ Process an incoming transaction and return the HTTP response
+
+ Args:
+ transaction (Transaction): incoming transaction
+ request_time (int): timestamp that the HTTP request arrived at
+
+ Returns:
+ Deferred[(int, object)]: http response code and body
+ """
response = yield self.transaction_actions.have_responded(transaction)
if response:
@@ -131,7 +160,7 @@ class FederationServer(FederationBase):
received_pdus_counter.inc_by(len(transaction.pdus))
- pdu_list = []
+ pdus_by_room = {}
for p in transaction.pdus:
if "unsigned" in p:
@@ -143,22 +172,36 @@ class FederationServer(FederationBase):
del p["age"]
event = self.event_from_pdu_json(p)
- pdu_list.append(event)
+ room_id = event.room_id
+ pdus_by_room.setdefault(room_id, []).append(event)
pdu_results = {}
- for pdu in pdu_list:
- event_id = pdu.event_id
- try:
- yield self._handle_received_pdu(transaction.origin, pdu)
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warn("Error handling PDU %s: %s", event_id, e)
- self.send_failure(e, transaction.origin)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- pdu_results[event_id] = {"error": str(e)}
- logger.exception("Failed to handle PDU")
+ # we can process different rooms in parallel (which is useful if they
+ # require callouts to other servers to fetch missing events), but
+ # impose a limit to avoid going too crazy with ram/cpu.
+ @defer.inlineCallbacks
+ def process_pdus_for_room(room_id):
+ logger.debug("Processing PDUs for %s", room_id)
+ for pdu in pdus_by_room[room_id]:
+ event_id = pdu.event_id
+ try:
+ yield self._handle_received_pdu(
+ transaction.origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ self.send_failure(e, transaction.origin)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ pdu_results[event_id] = {"error": str(e)}
+ logger.exception("Failed to handle PDU %s", event_id)
+
+ yield async.concurrently_execute(
+ process_pdus_for_room, pdus_by_room.keys(),
+ TRANSACTION_CONCURRENCY_LIMIT,
+ )
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
@@ -168,8 +211,9 @@ class FederationServer(FederationBase):
edu.content
)
- for failure in getattr(transaction, "pdu_failures", []):
- logger.info("Got failure %r", failure)
+ pdu_failures = getattr(transaction, "pdu_failures", [])
+ for failure in pdu_failures:
+ logger.info("Got failure %r", failure)
response = {
"pdus": pdu_results,
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 52b2a717d2..f96561c1fe 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -471,3 +471,371 @@ class TransportLayerClient(object):
)
defer.returnValue(content)
+
+ @log_function
+ def get_group_profile(self, destination, group_id, requester_user_id):
+ """Get a group profile
+ """
+ path = PREFIX + "/groups/%s/profile" % (group_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_summary(self, destination, group_id, requester_user_id):
+ """Get a group summary
+ """
+ path = PREFIX + "/groups/%s/summary" % (group_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_rooms_in_group(self, destination, group_id, requester_user_id):
+ """Get all rooms in a group
+ """
+ path = PREFIX + "/groups/%s/rooms" % (group_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ def add_room_to_group(self, destination, group_id, requester_user_id, room_id,
+ content):
+ """Add a room to a group
+ """
+ path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
+ """Remove a room from a group
+ """
+ path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+
+ return self.client.delete_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_users_in_group(self, destination, group_id, requester_user_id):
+ """Get users in a group
+ """
+ path = PREFIX + "/groups/%s/users" % (group_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def accept_group_invite(self, destination, group_id, user_id, content):
+ """Accept a group invite
+ """
+ path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
+ """Invite a user to a group
+ """
+ path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def invite_to_group_notification(self, destination, group_id, user_id, content):
+ """Sent by group server to inform a user's server that they have been
+ invited.
+ """
+
+ path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def remove_user_from_group(self, destination, group_id, requester_user_id,
+ user_id, content):
+ """Remove a user fron a group
+ """
+ path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def remove_user_from_group_notification(self, destination, group_id, user_id,
+ content):
+ """Sent by group server to inform a user's server that they have been
+ kicked from the group.
+ """
+
+ path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def renew_group_attestation(self, destination, group_id, user_id, content):
+ """Sent by either a group server or a user's server to periodically update
+ the attestations
+ """
+
+ path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def update_group_summary_room(self, destination, group_id, user_id, room_id,
+ category_id, content):
+ """Update a room entry in a group summary
+ """
+ if category_id:
+ path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+ group_id, category_id, room_id,
+ )
+ else:
+ path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def delete_group_summary_room(self, destination, group_id, user_id, room_id,
+ category_id):
+ """Delete a room entry in a group summary
+ """
+ if category_id:
+ path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+ group_id, category_id, room_id,
+ )
+ else:
+ path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+
+ return self.client.delete_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_categories(self, destination, group_id, requester_user_id):
+ """Get all categories in a group
+ """
+ path = PREFIX + "/groups/%s/categories" % (group_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_category(self, destination, group_id, requester_user_id, category_id):
+ """Get category info in a group
+ """
+ path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def update_group_category(self, destination, group_id, requester_user_id, category_id,
+ content):
+ """Update a category in a group
+ """
+ path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def delete_group_category(self, destination, group_id, requester_user_id,
+ category_id):
+ """Delete a category in a group
+ """
+ path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+
+ return self.client.delete_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_roles(self, destination, group_id, requester_user_id):
+ """Get all roles in a group
+ """
+ path = PREFIX + "/groups/%s/roles" % (group_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def get_group_role(self, destination, group_id, requester_user_id, role_id):
+ """Get a roles info
+ """
+ path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+
+ return self.client.get_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def update_group_role(self, destination, group_id, requester_user_id, role_id,
+ content):
+ """Update a role in a group
+ """
+ path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def delete_group_role(self, destination, group_id, requester_user_id, role_id):
+ """Delete a role in a group
+ """
+ path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+
+ return self.client.delete_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def update_group_summary_user(self, destination, group_id, requester_user_id,
+ user_id, role_id, content):
+ """Update a users entry in a group
+ """
+ if role_id:
+ path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+ group_id, role_id, user_id,
+ )
+ else:
+ path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def delete_group_summary_user(self, destination, group_id, requester_user_id,
+ user_id, role_id):
+ """Delete a users entry in a group
+ """
+ if role_id:
+ path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+ group_id, role_id, user_id,
+ )
+ else:
+ path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+
+ return self.client.delete_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ ignore_backoff=True,
+ )
+
+ def bulk_get_publicised_groups(self, destination, user_ids):
+ """Get the groups a list of users are publicising
+ """
+
+ path = PREFIX + "/get_groups_publicised"
+
+ content = {"user_ids": user_ids}
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a78f01e442..c7565e0737 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -25,7 +25,7 @@ from synapse.http.servlet import (
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
from synapse.util.logcontext import preserve_fn
-from synapse.types import ThirdPartyInstanceID
+from synapse.types import ThirdPartyInstanceID, get_domain_from_id
import functools
import logging
@@ -609,6 +609,475 @@ class FederationVersionServlet(BaseFederationServlet):
}))
+class FederationGroupsProfileServlet(BaseFederationServlet):
+ """Get the basic profile of a group on behalf of a user
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/profile$"
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.get_group_profile(
+ group_id, requester_user_id
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsSummaryServlet(BaseFederationServlet):
+ PATH = "/groups/(?P<group_id>[^/]*)/summary$"
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.get_group_summary(
+ group_id, requester_user_id
+ )
+
+ defer.returnValue((200, new_content))
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.update_group_profile(
+ group_id, requester_user_id, content
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsRoomsServlet(BaseFederationServlet):
+ """Get the rooms in a group on behalf of a user
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/rooms$"
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.get_rooms_in_group(
+ group_id, requester_user_id
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsAddRoomsServlet(BaseFederationServlet):
+ """Add/remove room from group
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/room/(?<room_id>)$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, room_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.add_room_to_group(
+ group_id, requester_user_id, room_id, content
+ )
+
+ defer.returnValue((200, new_content))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, origin, content, query, group_id, room_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.remove_room_from_group(
+ group_id, requester_user_id, room_id,
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsUsersServlet(BaseFederationServlet):
+ """Get the users in a group on behalf of a user
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/users$"
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.get_users_in_group(
+ group_id, requester_user_id
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsInviteServlet(BaseFederationServlet):
+ """Ask a group server to invite someone to the group
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.invite_to_group(
+ group_id, user_id, requester_user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
+ """Accept an invitation from the group server
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ if get_domain_from_id(user_id) != origin:
+ raise SynapseError(403, "user_id doesn't match origin")
+
+ new_content = yield self.handler.accept_invite(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsRemoveUserServlet(BaseFederationServlet):
+ """Leave or kick a user from the group
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.remove_user_from_group(
+ group_id, user_id, requester_user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsLocalInviteServlet(BaseFederationServlet):
+ """A group server has invited a local user
+ """
+ PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ if get_domain_from_id(group_id) != origin:
+ raise SynapseError(403, "group_id doesn't match origin")
+
+ new_content = yield self.handler.on_invite(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet):
+ """A group server has removed a local user
+ """
+ PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ if get_domain_from_id(group_id) != origin:
+ raise SynapseError(403, "user_id doesn't match origin")
+
+ new_content = yield self.handler.user_removed_from_group(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsRenewAttestaionServlet(BaseFederationServlet):
+ """A group or user's server renews their attestation
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ # We don't need to check auth here as we check the attestation signatures
+
+ new_content = yield self.handler.on_renew_group_attestation(
+ origin, content, group_id, user_id
+ )
+
+ defer.returnValue((200, new_content))
+
+
+class FederationGroupsSummaryRoomsServlet(BaseFederationServlet):
+ """Add/remove a room from the group summary, with optional category.
+
+ Matches both:
+ - /groups/:group/summary/rooms/:room_id
+ - /groups/:group/summary/categories/:category/rooms/:room_id
+ """
+ PATH = (
+ "/groups/(?P<group_id>[^/]*)/summary"
+ "(/categories/(?P<category_id>[^/]+))?"
+ "/rooms/(?P<room_id>[^/]*)$"
+ )
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, category_id, room_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if category_id == "":
+ raise SynapseError(400, "category_id cannot be empty string")
+
+ resp = yield self.handler.update_group_summary_room(
+ group_id, requester_user_id,
+ room_id=room_id,
+ category_id=category_id,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, origin, content, query, group_id, category_id, room_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if category_id == "":
+ raise SynapseError(400, "category_id cannot be empty string")
+
+ resp = yield self.handler.delete_group_summary_room(
+ group_id, requester_user_id,
+ room_id=room_id,
+ category_id=category_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class FederationGroupsCategoriesServlet(BaseFederationServlet):
+ """Get all categories for a group
+ """
+ PATH = (
+ "/groups/(?P<group_id>[^/]*)/categories/$"
+ )
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ resp = yield self.handler.get_group_categories(
+ group_id, requester_user_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class FederationGroupsCategoryServlet(BaseFederationServlet):
+ """Add/remove/get a category in a group
+ """
+ PATH = (
+ "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)$"
+ )
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id, category_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ resp = yield self.handler.get_group_category(
+ group_id, requester_user_id, category_id
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, category_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if category_id == "":
+ raise SynapseError(400, "category_id cannot be empty string")
+
+ resp = yield self.handler.upsert_group_category(
+ group_id, requester_user_id, category_id, content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, origin, content, query, group_id, category_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if category_id == "":
+ raise SynapseError(400, "category_id cannot be empty string")
+
+ resp = yield self.handler.delete_group_category(
+ group_id, requester_user_id, category_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class FederationGroupsRolesServlet(BaseFederationServlet):
+ """Get roles in a group
+ """
+ PATH = (
+ "/groups/(?P<group_id>[^/]*)/roles/$"
+ )
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ resp = yield self.handler.get_group_roles(
+ group_id, requester_user_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class FederationGroupsRoleServlet(BaseFederationServlet):
+ """Add/remove/get a role in a group
+ """
+ PATH = (
+ "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)$"
+ )
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, group_id, role_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ resp = yield self.handler.get_group_role(
+ group_id, requester_user_id, role_id
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, role_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if role_id == "":
+ raise SynapseError(400, "role_id cannot be empty string")
+
+ resp = yield self.handler.update_group_role(
+ group_id, requester_user_id, role_id, content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, origin, content, query, group_id, role_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if role_id == "":
+ raise SynapseError(400, "role_id cannot be empty string")
+
+ resp = yield self.handler.delete_group_role(
+ group_id, requester_user_id, role_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class FederationGroupsSummaryUsersServlet(BaseFederationServlet):
+ """Add/remove a user from the group summary, with optional role.
+
+ Matches both:
+ - /groups/:group/summary/users/:user_id
+ - /groups/:group/summary/roles/:role/users/:user_id
+ """
+ PATH = (
+ "/groups/(?P<group_id>[^/]*)/summary"
+ "(/roles/(?P<role_id>[^/]+))?"
+ "/users/(?P<user_id>[^/]*)$"
+ )
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, role_id, user_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if role_id == "":
+ raise SynapseError(400, "role_id cannot be empty string")
+
+ resp = yield self.handler.update_group_summary_user(
+ group_id, requester_user_id,
+ user_id=user_id,
+ role_id=role_id,
+ content=content,
+ )
+
+ defer.returnValue((200, resp))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, origin, content, query, group_id, role_id, user_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ if role_id == "":
+ raise SynapseError(400, "role_id cannot be empty string")
+
+ resp = yield self.handler.delete_group_summary_user(
+ group_id, requester_user_id,
+ user_id=user_id,
+ role_id=role_id,
+ )
+
+ defer.returnValue((200, resp))
+
+
+class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
+ """Get roles in a group
+ """
+ PATH = (
+ "/get_groups_publicised$"
+ )
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query):
+ resp = yield self.handler.bulk_get_publicised_groups(
+ content["user_ids"], proxy=False,
+ )
+
+ defer.returnValue((200, resp))
+
+
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -635,11 +1104,41 @@ FEDERATION_SERVLET_CLASSES = (
FederationVersionServlet,
)
+
ROOM_LIST_CLASSES = (
PublicRoomList,
)
+GROUP_SERVER_SERVLET_CLASSES = (
+ FederationGroupsProfileServlet,
+ FederationGroupsSummaryServlet,
+ FederationGroupsRoomsServlet,
+ FederationGroupsUsersServlet,
+ FederationGroupsInviteServlet,
+ FederationGroupsAcceptInviteServlet,
+ FederationGroupsRemoveUserServlet,
+ FederationGroupsSummaryRoomsServlet,
+ FederationGroupsCategoriesServlet,
+ FederationGroupsCategoryServlet,
+ FederationGroupsRolesServlet,
+ FederationGroupsRoleServlet,
+ FederationGroupsSummaryUsersServlet,
+)
+
+
+GROUP_LOCAL_SERVLET_CLASSES = (
+ FederationGroupsLocalInviteServlet,
+ FederationGroupsRemoveLocalUserServlet,
+ FederationGroupsBulkPublicisedServlet,
+)
+
+
+GROUP_ATTESTATION_SERVLET_CLASSES = (
+ FederationGroupsRenewAttestaionServlet,
+)
+
+
def register_servlets(hs, resource, authenticator, ratelimiter):
for servletclass in FEDERATION_SERVLET_CLASSES:
servletclass(
@@ -656,3 +1155,27 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
ratelimiter=ratelimiter,
server_name=hs.hostname,
).register(resource)
+
+ for servletclass in GROUP_SERVER_SERVLET_CLASSES:
+ servletclass(
+ handler=hs.get_groups_server_handler(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
+ ).register(resource)
+
+ for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
+ servletclass(
+ handler=hs.get_groups_local_handler(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
+ ).register(resource)
+
+ for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
+ servletclass(
+ handler=hs.get_groups_attestation_renewer(),
+ authenticator=authenticator,
+ ratelimiter=ratelimiter,
+ server_name=hs.hostname,
+ ).register(resource)
|