diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_base.py | 5 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 166 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 48 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 381 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 543 |
5 files changed, 1064 insertions, 79 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index babd9ea078..a0f5d40eb3 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -16,7 +16,6 @@ import logging from synapse.api.errors import SynapseError from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import spamcheck from synapse.events.utils import prune_event from synapse.util import unwrapFirstError, logcontext from twisted.internet import defer @@ -26,7 +25,7 @@ logger = logging.getLogger(__name__) class FederationBase(object): def __init__(self, hs): - pass + self.spam_checker = hs.get_spam_checker() @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, @@ -144,7 +143,7 @@ class FederationBase(object): ) return redacted - if spamcheck.check_event_for_spam(pdu): + if self.spam_checker.check_event_for_spam(pdu): logger.warn( "Event contains spam, redacting %s: %s", pdu.event_id, pdu.get_pdu_json() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 51e3fdea06..e15228e70b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -12,14 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - from twisted.internet import defer from .federation_base import FederationBase from .units import Transaction, Edu -from synapse.util.async import Linearizer +from synapse.util import async from synapse.util.logutils import log_function from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent @@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature import simplejson as json import logging +# when processing incoming transactions, we try to handle multiple rooms in +# parallel, up to this limit. +TRANSACTION_CONCURRENCY_LIMIT = 10 logger = logging.getLogger(__name__) @@ -52,7 +53,8 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() - self._server_linearizer = Linearizer("fed_server") + self._server_linearizer = async.Linearizer("fed_server") + self._transaction_linearizer = async.Linearizer("fed_txn_handler") # We cache responses to state queries, as they take a while and often # come in waves. @@ -109,25 +111,41 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_incoming_transaction(self, transaction_data): + # keep this as early as possible to make the calculated origin ts as + # accurate as possible. + request_time = self._clock.time_msec() + transaction = Transaction(**transaction_data) - received_pdus_counter.inc_by(len(transaction.pdus)) + if not transaction.transaction_id: + raise Exception("Transaction missing transaction_id") + if not transaction.origin: + raise Exception("Transaction missing origin") - for p in transaction.pdus: - if "unsigned" in p: - unsigned = p["unsigned"] - if "age" in unsigned: - p["age"] = unsigned["age"] - if "age" in p: - p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) - del p["age"] + logger.debug("[%s] Got transaction", transaction.transaction_id) - pdu_list = [ - self.event_from_pdu_json(p) for p in transaction.pdus - ] + # use a linearizer to ensure that we don't process the same transaction + # multiple times in parallel. + with (yield self._transaction_linearizer.queue( + (transaction.origin, transaction.transaction_id), + )): + result = yield self._handle_incoming_transaction( + transaction, request_time, + ) - logger.debug("[%s] Got transaction", transaction.transaction_id) + defer.returnValue(result) + + @defer.inlineCallbacks + def _handle_incoming_transaction(self, transaction, request_time): + """ Process an incoming transaction and return the HTTP response + + Args: + transaction (Transaction): incoming transaction + request_time (int): timestamp that the HTTP request arrived at + Returns: + Deferred[(int, object)]: http response code and body + """ response = yield self.transaction_actions.have_responded(transaction) if response: @@ -140,42 +158,49 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - results = [] - - for pdu in pdu_list: - # check that it's actually being sent from a valid destination to - # workaround bug #1753 in 0.18.5 and 0.18.6 - if transaction.origin != get_domain_from_id(pdu.event_id): - # We continue to accept join events from any server; this is - # necessary for the federation join dance to work correctly. - # (When we join over federation, the "helper" server is - # responsible for sending out the join event, rather than the - # origin. See bug #1893). - if not ( - pdu.type == 'm.room.member' and - pdu.content and - pdu.content.get("membership", None) == 'join' - ): - logger.info( - "Discarding PDU %s from invalid origin %s", - pdu.event_id, transaction.origin - ) - continue - else: - logger.info( - "Accepting join PDU %s from %s", - pdu.event_id, transaction.origin - ) + received_pdus_counter.inc_by(len(transaction.pdus)) - try: - yield self._handle_received_pdu(transaction.origin, pdu) - results.append({}) - except FederationError as e: - self.send_failure(e, transaction.origin) - results.append({"error": str(e)}) - except Exception as e: - results.append({"error": str(e)}) - logger.exception("Failed to handle PDU") + pdus_by_room = {} + + for p in transaction.pdus: + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] + if "age" in p: + p["age_ts"] = request_time - int(p["age"]) + del p["age"] + + event = self.event_from_pdu_json(p) + room_id = event.room_id + pdus_by_room.setdefault(room_id, []).append(event) + + pdu_results = {} + + # we can process different rooms in parallel (which is useful if they + # require callouts to other servers to fetch missing events), but + # impose a limit to avoid going too crazy with ram/cpu. + @defer.inlineCallbacks + def process_pdus_for_room(room_id): + logger.debug("Processing PDUs for %s", room_id) + for pdu in pdus_by_room[room_id]: + event_id = pdu.event_id + try: + yield self._handle_received_pdu( + transaction.origin, pdu + ) + pdu_results[event_id] = {} + except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + pdu_results[event_id] = {"error": str(e)} + logger.exception("Failed to handle PDU %s", event_id) + + yield async.concurrently_execute( + process_pdus_for_room, pdus_by_room.keys(), + TRANSACTION_CONCURRENCY_LIMIT, + ) if hasattr(transaction, "edus"): for edu in (Edu(**x) for x in transaction.edus): @@ -185,17 +210,16 @@ class FederationServer(FederationBase): edu.content ) - for failure in getattr(transaction, "pdu_failures", []): - logger.info("Got failure %r", failure) - - logger.debug("Returning: %s", str(results)) + pdu_failures = getattr(transaction, "pdu_failures", []) + for failure in pdu_failures: + logger.info("Got failure %r", failure) response = { - "pdus": dict(zip( - (p.event_id for p in pdu_list), results - )), + "pdus": pdu_results, } + logger.debug("Returning: %s", str(response)) + yield self.transaction_actions.set_response( transaction, 200, response @@ -520,6 +544,30 @@ class FederationServer(FederationBase): Returns (Deferred): completes with None Raises: FederationError if the signatures / hash do not match """ + # check that it's actually being sent from a valid destination to + # workaround bug #1753 in 0.18.5 and 0.18.6 + if origin != get_domain_from_id(pdu.event_id): + # We continue to accept join events from any server; this is + # necessary for the federation join dance to work correctly. + # (When we join over federation, the "helper" server is + # responsible for sending out the join event, rather than the + # origin. See bug #1893). + if not ( + pdu.type == 'm.room.member' and + pdu.content and + pdu.content.get("membership", None) == 'join' + ): + logger.info( + "Discarding PDU %s from invalid origin %s", + pdu.event_id, origin + ) + return + else: + logger.info( + "Accepting join PDU %s from %s", + pdu.event_id, origin + ) + # Check signature. try: pdu = yield self._check_sigs_and_hash(pdu) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 003eaba893..7a3c9cbb70 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -20,8 +20,8 @@ from .persistence import TransactionActions from .units import Transaction, Edu from synapse.api.errors import HttpResponseException +from synapse.util import logcontext from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes @@ -231,11 +231,9 @@ class TransactionQueue(object): (pdu, order) ) - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) - @preserve_fn # the caller should not yield on this + @logcontext.preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): """Send the new presence states to the appropriate destinations. @@ -299,7 +297,7 @@ class TransactionQueue(object): state.user_id: state for state in states }) - preserve_fn(self._attempt_new_transaction)(destination) + self._attempt_new_transaction(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( @@ -321,9 +319,7 @@ class TransactionQueue(object): else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) def send_failure(self, failure, destination): if destination == self.server_name or destination == "localhost": @@ -336,9 +332,7 @@ class TransactionQueue(object): destination, [] ).append(failure) - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) def send_device_messages(self, destination): if destination == self.server_name or destination == "localhost": @@ -347,15 +341,24 @@ class TransactionQueue(object): if not self.can_send_to(destination): return - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + self._attempt_new_transaction(destination) def get_current_token(self): return 0 - @defer.inlineCallbacks def _attempt_new_transaction(self, destination): + """Try to start a new transaction to this destination + + If there is already a transaction in progress to this destination, + returns immediately. Otherwise kicks off the process of sending a + transaction in the background. + + Args: + destination (str): + + Returns: + None + """ # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending @@ -368,6 +371,19 @@ class TransactionQueue(object): ) return + logger.debug("TX [%s] Starting transaction loop", destination) + + # Drop the logcontext before starting the transaction. It doesn't + # really make sense to log all the outbound transactions against + # whatever path led us to this point: that's pretty arbitrary really. + # + # (this also means we can fire off _perform_transaction without + # yielding) + with logcontext.PreserveLoggingContext(): + self._transaction_transmission_loop(destination) + + @defer.inlineCallbacks + def _transaction_transmission_loop(self, destination): pending_pdus = [] try: self.pending_transactions[destination] = 1 diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 52b2a717d2..125d8f3598 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -471,3 +471,384 @@ class TransportLayerClient(object): ) defer.returnValue(content) + + @log_function + def get_group_profile(self, destination, group_id, requester_user_id): + """Get a group profile + """ + path = PREFIX + "/groups/%s/profile" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_summary(self, destination, group_id, requester_user_id): + """Get a group summary + """ + path = PREFIX + "/groups/%s/summary" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_rooms_in_group(self, destination, group_id, requester_user_id): + """Get all rooms in a group + """ + path = PREFIX + "/groups/%s/rooms" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + def add_room_to_group(self, destination, group_id, requester_user_id, room_id, + content): + """Add a room to a group + """ + path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + def remove_room_from_group(self, destination, group_id, requester_user_id, room_id): + """Remove a room from a group + """ + path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_users_in_group(self, destination, group_id, requester_user_id): + """Get users in a group + """ + path = PREFIX + "/groups/%s/users" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_invited_users_in_group(self, destination, group_id, requester_user_id): + """Get users that have been invited to a group + """ + path = PREFIX + "/groups/%s/invited_users" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def accept_group_invite(self, destination, group_id, user_id, content): + """Accept a group invite + """ + path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function + def invite_to_group(self, destination, group_id, user_id, requester_user_id, content): + """Invite a user to a group + """ + path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def invite_to_group_notification(self, destination, group_id, user_id, content): + """Sent by group server to inform a user's server that they have been + invited. + """ + + path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function + def remove_user_from_group(self, destination, group_id, requester_user_id, + user_id, content): + """Remove a user fron a group + """ + path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def remove_user_from_group_notification(self, destination, group_id, user_id, + content): + """Sent by group server to inform a user's server that they have been + kicked from the group. + """ + + path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function + def renew_group_attestation(self, destination, group_id, user_id, content): + """Sent by either a group server or a user's server to periodically update + the attestations + """ + + path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id) + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) + + @log_function + def update_group_summary_room(self, destination, group_id, user_id, room_id, + category_id, content): + """Update a room entry in a group summary + """ + if category_id: + path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % ( + group_id, category_id, room_id, + ) + else: + path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_summary_room(self, destination, group_id, user_id, room_id, + category_id): + """Delete a room entry in a group summary + """ + if category_id: + path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % ( + group_id, category_id, room_id, + ) + else: + path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_categories(self, destination, group_id, requester_user_id): + """Get all categories in a group + """ + path = PREFIX + "/groups/%s/categories" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_category(self, destination, group_id, requester_user_id, category_id): + """Get category info in a group + """ + path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def update_group_category(self, destination, group_id, requester_user_id, category_id, + content): + """Update a category in a group + """ + path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_category(self, destination, group_id, requester_user_id, + category_id): + """Delete a category in a group + """ + path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_roles(self, destination, group_id, requester_user_id): + """Get all roles in a group + """ + path = PREFIX + "/groups/%s/roles" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def get_group_role(self, destination, group_id, requester_user_id, role_id): + """Get a roles info + """ + path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def update_group_role(self, destination, group_id, requester_user_id, role_id, + content): + """Update a role in a group + """ + path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_role(self, destination, group_id, requester_user_id, role_id): + """Delete a role in a group + """ + path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + @log_function + def update_group_summary_user(self, destination, group_id, requester_user_id, + user_id, role_id, content): + """Update a users entry in a group + """ + if role_id: + path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % ( + group_id, role_id, user_id, + ) + else: + path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,) + + return self.client.post_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + data=content, + ignore_backoff=True, + ) + + @log_function + def delete_group_summary_user(self, destination, group_id, requester_user_id, + user_id, role_id): + """Delete a users entry in a group + """ + if role_id: + path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % ( + group_id, role_id, user_id, + ) + else: + path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,) + + return self.client.delete_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + + def bulk_get_publicised_groups(self, destination, user_ids): + """Get the groups a list of users are publicising + """ + + path = PREFIX + "/get_groups_publicised" + + content = {"user_ids": user_ids} + + return self.client.post_json( + destination=destination, + path=path, + data=content, + ignore_backoff=True, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a78f01e442..f0778c65c5 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -25,7 +25,7 @@ from synapse.http.servlet import ( from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string from synapse.util.logcontext import preserve_fn -from synapse.types import ThirdPartyInstanceID +from synapse.types import ThirdPartyInstanceID, get_domain_from_id import functools import logging @@ -609,6 +609,493 @@ class FederationVersionServlet(BaseFederationServlet): })) +class FederationGroupsProfileServlet(BaseFederationServlet): + """Get the basic profile of a group on behalf of a user + """ + PATH = "/groups/(?P<group_id>[^/]*)/profile$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_group_profile( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsSummaryServlet(BaseFederationServlet): + PATH = "/groups/(?P<group_id>[^/]*)/summary$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_group_summary( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.update_group_profile( + group_id, requester_user_id, content + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsRoomsServlet(BaseFederationServlet): + """Get the rooms in a group on behalf of a user + """ + PATH = "/groups/(?P<group_id>[^/]*)/rooms$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_rooms_in_group( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsAddRoomsServlet(BaseFederationServlet): + """Add/remove room from group + """ + PATH = "/groups/(?P<group_id>[^/]*)/room/(?<room_id>)$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, room_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.add_room_to_group( + group_id, requester_user_id, room_id, content + ) + + defer.returnValue((200, new_content)) + + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, room_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.remove_room_from_group( + group_id, requester_user_id, room_id, + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsUsersServlet(BaseFederationServlet): + """Get the users in a group on behalf of a user + """ + PATH = "/groups/(?P<group_id>[^/]*)/users$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_users_in_group( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsInvitedUsersServlet(BaseFederationServlet): + """Get the users that have been invited to a group + """ + PATH = "/groups/(?P<group_id>[^/]*)/invited_users$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_invited_users_in_group( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsInviteServlet(BaseFederationServlet): + """Ask a group server to invite someone to the group + """ + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.invite_to_group( + group_id, user_id, requester_user_id, content, + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsAcceptInviteServlet(BaseFederationServlet): + """Accept an invitation from the group server + """ + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(user_id) != origin: + raise SynapseError(403, "user_id doesn't match origin") + + new_content = yield self.handler.accept_invite( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsRemoveUserServlet(BaseFederationServlet): + """Leave or kick a user from the group + """ + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.remove_user_from_group( + group_id, user_id, requester_user_id, content, + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsLocalInviteServlet(BaseFederationServlet): + """A group server has invited a local user + """ + PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "group_id doesn't match origin") + + new_content = yield self.handler.on_invite( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): + """A group server has removed a local user + """ + PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + if get_domain_from_id(group_id) != origin: + raise SynapseError(403, "user_id doesn't match origin") + + new_content = yield self.handler.user_removed_from_group( + group_id, user_id, content, + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): + """A group or user's server renews their attestation + """ + PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)$" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, user_id): + # We don't need to check auth here as we check the attestation signatures + + new_content = yield self.handler.on_renew_attestation( + group_id, user_id, content + ) + + defer.returnValue((200, new_content)) + + +class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): + """Add/remove a room from the group summary, with optional category. + + Matches both: + - /groups/:group/summary/rooms/:room_id + - /groups/:group/summary/categories/:category/rooms/:room_id + """ + PATH = ( + "/groups/(?P<group_id>[^/]*)/summary" + "(/categories/(?P<category_id>[^/]+))?" + "/rooms/(?P<room_id>[^/]*)$" + ) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, category_id, room_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if category_id == "": + raise SynapseError(400, "category_id cannot be empty string") + + resp = yield self.handler.update_group_summary_room( + group_id, requester_user_id, + room_id=room_id, + category_id=category_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, category_id, room_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if category_id == "": + raise SynapseError(400, "category_id cannot be empty string") + + resp = yield self.handler.delete_group_summary_room( + group_id, requester_user_id, + room_id=room_id, + category_id=category_id, + ) + + defer.returnValue((200, resp)) + + +class FederationGroupsCategoriesServlet(BaseFederationServlet): + """Get all categories for a group + """ + PATH = ( + "/groups/(?P<group_id>[^/]*)/categories/$" + ) + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + resp = yield self.handler.get_group_categories( + group_id, requester_user_id, + ) + + defer.returnValue((200, resp)) + + +class FederationGroupsCategoryServlet(BaseFederationServlet): + """Add/remove/get a category in a group + """ + PATH = ( + "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)$" + ) + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id, category_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + resp = yield self.handler.get_group_category( + group_id, requester_user_id, category_id + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, category_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if category_id == "": + raise SynapseError(400, "category_id cannot be empty string") + + resp = yield self.handler.upsert_group_category( + group_id, requester_user_id, category_id, content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, category_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if category_id == "": + raise SynapseError(400, "category_id cannot be empty string") + + resp = yield self.handler.delete_group_category( + group_id, requester_user_id, category_id, + ) + + defer.returnValue((200, resp)) + + +class FederationGroupsRolesServlet(BaseFederationServlet): + """Get roles in a group + """ + PATH = ( + "/groups/(?P<group_id>[^/]*)/roles/$" + ) + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + resp = yield self.handler.get_group_roles( + group_id, requester_user_id, + ) + + defer.returnValue((200, resp)) + + +class FederationGroupsRoleServlet(BaseFederationServlet): + """Add/remove/get a role in a group + """ + PATH = ( + "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)$" + ) + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id, role_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + resp = yield self.handler.get_group_role( + group_id, requester_user_id, role_id + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, role_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if role_id == "": + raise SynapseError(400, "role_id cannot be empty string") + + resp = yield self.handler.update_group_role( + group_id, requester_user_id, role_id, content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, role_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if role_id == "": + raise SynapseError(400, "role_id cannot be empty string") + + resp = yield self.handler.delete_group_role( + group_id, requester_user_id, role_id, + ) + + defer.returnValue((200, resp)) + + +class FederationGroupsSummaryUsersServlet(BaseFederationServlet): + """Add/remove a user from the group summary, with optional role. + + Matches both: + - /groups/:group/summary/users/:user_id + - /groups/:group/summary/roles/:role/users/:user_id + """ + PATH = ( + "/groups/(?P<group_id>[^/]*)/summary" + "(/roles/(?P<role_id>[^/]+))?" + "/users/(?P<user_id>[^/]*)$" + ) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query, group_id, role_id, user_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if role_id == "": + raise SynapseError(400, "role_id cannot be empty string") + + resp = yield self.handler.update_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + content=content, + ) + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def on_DELETE(self, origin, content, query, group_id, role_id, user_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + if role_id == "": + raise SynapseError(400, "role_id cannot be empty string") + + resp = yield self.handler.delete_group_summary_user( + group_id, requester_user_id, + user_id=user_id, + role_id=role_id, + ) + + defer.returnValue((200, resp)) + + +class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): + """Get roles in a group + """ + PATH = ( + "/get_groups_publicised$" + ) + + @defer.inlineCallbacks + def on_POST(self, origin, content, query): + resp = yield self.handler.bulk_get_publicised_groups( + content["user_ids"], proxy=False, + ) + + defer.returnValue((200, resp)) + + FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -635,10 +1122,40 @@ FEDERATION_SERVLET_CLASSES = ( FederationVersionServlet, ) + ROOM_LIST_CLASSES = ( PublicRoomList, ) +GROUP_SERVER_SERVLET_CLASSES = ( + FederationGroupsProfileServlet, + FederationGroupsSummaryServlet, + FederationGroupsRoomsServlet, + FederationGroupsUsersServlet, + FederationGroupsInvitedUsersServlet, + FederationGroupsInviteServlet, + FederationGroupsAcceptInviteServlet, + FederationGroupsRemoveUserServlet, + FederationGroupsSummaryRoomsServlet, + FederationGroupsCategoriesServlet, + FederationGroupsCategoryServlet, + FederationGroupsRolesServlet, + FederationGroupsRoleServlet, + FederationGroupsSummaryUsersServlet, +) + + +GROUP_LOCAL_SERVLET_CLASSES = ( + FederationGroupsLocalInviteServlet, + FederationGroupsRemoveLocalUserServlet, + FederationGroupsBulkPublicisedServlet, +) + + +GROUP_ATTESTATION_SERVLET_CLASSES = ( + FederationGroupsRenewAttestaionServlet, +) + def register_servlets(hs, resource, authenticator, ratelimiter): for servletclass in FEDERATION_SERVLET_CLASSES: @@ -656,3 +1173,27 @@ def register_servlets(hs, resource, authenticator, ratelimiter): ratelimiter=ratelimiter, server_name=hs.hostname, ).register(resource) + + for servletclass in GROUP_SERVER_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_server_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + for servletclass in GROUP_LOCAL_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_local_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_attestation_renewer(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) |