summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py60
-rw-r--r--synapse/federation/federation_server.py18
-rw-r--r--synapse/federation/send_queue.py53
-rw-r--r--synapse/federation/transport/client.py5
-rw-r--r--synapse/federation/transport/server.py26
5 files changed, 69 insertions, 93 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index af652a7659..f99d17a7de 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,6 +17,7 @@
 import copy
 import itertools
 import logging
+from typing import Dict, Iterable
 
 from prometheus_client import Counter
 
@@ -29,6 +30,7 @@ from synapse.api.errors import (
     FederationDeniedError,
     HttpResponseException,
     SynapseError,
+    UnsupportedRoomVersionError,
 )
 from synapse.api.room_versions import (
     KNOWN_ROOM_VERSIONS,
@@ -196,7 +198,7 @@ class FederationClient(FederationBase):
 
         logger.debug("backfill transaction_data=%r", transaction_data)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         pdus = [
@@ -334,7 +336,7 @@ class FederationClient(FederationBase):
     def get_event_auth(self, destination, room_id, event_id):
         res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         auth_chain = [
@@ -385,6 +387,8 @@ class FederationClient(FederationBase):
                 return res
             except InvalidResponseError as e:
                 logger.warning("Failed to %s via %s: %s", description, destination, e)
+            except UnsupportedRoomVersionError:
+                raise
             except HttpResponseException as e:
                 if not 500 <= e.code < 600:
                     raise e.to_synapse_error()
@@ -404,7 +408,13 @@ class FederationClient(FederationBase):
         raise SynapseError(502, "Failed to %s via any server" % (description,))
 
     def make_membership_event(
-        self, destinations, room_id, user_id, membership, content, params
+        self,
+        destinations: Iterable[str],
+        room_id: str,
+        user_id: str,
+        membership: str,
+        content: dict,
+        params: Dict[str, str],
     ):
         """
         Creates an m.room.member event, with context, without participating in the room.
@@ -417,21 +427,23 @@ class FederationClient(FederationBase):
         Note that this does not append any events to any graphs.
 
         Args:
-            destinations (Iterable[str]): Candidate homeservers which are probably
+            destinations: Candidate homeservers which are probably
                 participating in the room.
-            room_id (str): The room in which the event will happen.
-            user_id (str): The user whose membership is being evented.
-            membership (str): The "membership" property of the event. Must be
-                one of "join" or "leave".
-            content (dict): Any additional data to put into the content field
-                of the event.
-            params (dict[str, str|Iterable[str]]): Query parameters to include in the
-                request.
+            room_id: The room in which the event will happen.
+            user_id: The user whose membership is being evented.
+            membership: The "membership" property of the event. Must be one of
+                "join" or "leave".
+            content: Any additional data to put into the content field of the
+                event.
+            params: Query parameters to include in the request.
         Return:
-            Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
-            `(origin, event, event_format)` where origin is the remote
-            homeserver which generated the event, and event_format is one of
-            `synapse.api.room_versions.EventFormatVersions`.
+            Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of
+            `(origin, event, room_version)` where origin is the remote
+            homeserver which generated the event, and room_version is the
+            version of the room.
+
+            Fails with a `UnsupportedRoomVersionError` if remote responds with
+            a room version we don't understand.
 
             Fails with a ``SynapseError`` if the chosen remote server
             returns a 300/400 code.
@@ -453,8 +465,10 @@ class FederationClient(FederationBase):
 
             # Note: If not supplied, the room version may be either v1 or v2,
             # however either way the event format version will be v1.
-            room_version = ret.get("room_version", RoomVersions.V1.identifier)
-            event_format = room_version_to_event_format(room_version)
+            room_version_id = ret.get("room_version", RoomVersions.V1.identifier)
+            room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+            if not room_version:
+                raise UnsupportedRoomVersionError()
 
             pdu_dict = ret.get("event", None)
             if not isinstance(pdu_dict, dict):
@@ -474,11 +488,11 @@ class FederationClient(FederationBase):
                 self._clock,
                 self.hostname,
                 self.signing_key,
-                format_version=event_format,
+                room_version=room_version,
                 event_dict=pdu_dict,
             )
 
-            return (destination, ev, event_format)
+            return (destination, ev, room_version)
 
         return self._try_destination_list(
             "make_" + membership, destinations, send_request
@@ -633,7 +647,7 @@ class FederationClient(FederationBase):
 
     @defer.inlineCallbacks
     def send_invite(self, destination, room_id, event_id, pdu):
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
 
         content = yield self._do_send_invite(destination, pdu, room_version)
 
@@ -641,7 +655,7 @@ class FederationClient(FederationBase):
 
         logger.debug("Got response to send_invite: %s", pdu_dict)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         pdu = event_from_pdu_json(pdu_dict, format_ver)
@@ -843,7 +857,7 @@ class FederationClient(FederationBase):
                 timeout=timeout,
             )
 
-            room_version = yield self.store.get_room_version(room_id)
+            room_version = yield self.store.get_room_version_id(room_id)
             format_ver = room_version_to_event_format(room_version)
 
             events = [
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8eddb3bf2c..a4c97ed458 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -234,7 +234,7 @@ class FederationServer(FederationBase):
                 continue
 
             try:
-                room_version = await self.store.get_room_version(room_id)
+                room_version = await self.store.get_room_version_id(room_id)
             except NotFoundError:
                 logger.info("Ignoring PDU for unknown room_id: %s", room_id)
                 continue
@@ -334,7 +334,7 @@ class FederationServer(FederationBase):
                 )
             )
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         resp["room_version"] = room_version
 
         return 200, resp
@@ -385,7 +385,7 @@ class FederationServer(FederationBase):
         origin_host, _ = parse_server_name(origin)
         await self.check_server_matches_acl(origin_host, room_id)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         if room_version not in supported_versions:
             logger.warning(
                 "Room version %s not in %s", room_version, supported_versions
@@ -410,14 +410,14 @@ class FederationServer(FederationBase):
         origin_host, _ = parse_server_name(origin)
         await self.check_server_matches_acl(origin_host, pdu.room_id)
         pdu = await self._check_sigs_and_hash(room_version, pdu)
-        ret_pdu = await self.handler.on_invite_request(origin, pdu)
+        ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
         time_now = self._clock.time_msec()
         return {"event": ret_pdu.get_pdu_json(time_now)}
 
     async def on_send_join_request(self, origin, content, room_id):
         logger.debug("on_send_join_request: content: %s", content)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
         pdu = event_from_pdu_json(content, format_ver)
 
@@ -440,7 +440,7 @@ class FederationServer(FederationBase):
         await self.check_server_matches_acl(origin_host, room_id)
         pdu = await self.handler.on_make_leave_request(origin, room_id, user_id)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
 
         time_now = self._clock.time_msec()
         return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
@@ -448,7 +448,7 @@ class FederationServer(FederationBase):
     async def on_send_leave_request(self, origin, content, room_id):
         logger.debug("on_send_leave_request: content: %s", content)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
         pdu = event_from_pdu_json(content, format_ver)
 
@@ -495,7 +495,7 @@ class FederationServer(FederationBase):
             origin_host, _ = parse_server_name(origin)
             await self.check_server_matches_acl(origin_host, room_id)
 
-            room_version = await self.store.get_room_version(room_id)
+            room_version = await self.store.get_room_version_id(room_id)
             format_ver = room_version_to_event_format(room_version)
 
             auth_chain = [
@@ -664,7 +664,7 @@ class FederationServer(FederationBase):
                 logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
 
         # We've already checked that we know the room version by this point
-        room_version = await self.store.get_room_version(pdu.room_id)
+        room_version = await self.store.get_room_version_id(pdu.room_id)
 
         # Check signature.
         try:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 174f6e42be..001bb304ae 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -69,8 +69,6 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
-        self.device_messages = SortedDict()  # stream position -> destination
-
         self.pos = 1
         self.pos_time = SortedDict()
 
@@ -92,7 +90,6 @@ class FederationRemoteSendQueue(object):
             "keyed_edu",
             "keyed_edu_changed",
             "edus",
-            "device_messages",
             "pos_time",
             "presence_destinations",
         ]:
@@ -171,12 +168,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of device map
-            keys = self.device_messages.keys()
-            i = self.device_messages.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.device_messages[key]
-
     def notify_new_events(self, current_id):
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
@@ -249,9 +240,8 @@ class FederationRemoteSendQueue(object):
 
     def send_device_messages(self, destination):
         """As per FederationSender"""
-        pos = self._next_pos()
-        self.device_messages[pos] = destination
-        self.notifier.on_new_replication_data()
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
 
     def get_current_token(self):
         return self.pos - 1
@@ -339,14 +329,6 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed device messages
-        i = self.device_messages.bisect_right(from_token)
-        j = self.device_messages.bisect_right(to_token) + 1
-        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
-        for (destination, pos) in iteritems(device_messages):
-            rows.append((pos, DeviceRow(destination=destination)))
-
         # Sort rows based on pos
         rows.sort()
 
@@ -472,28 +454,9 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))):  # str
-    """Streams the fact that either a) there is pending to device messages for
-    users on the remote, or b) a local users device has changed and needs to
-    be sent to the remote.
-    """
-
-    TypeId = "d"
-
-    @staticmethod
-    def from_data(data):
-        return DeviceRow(destination=data["destination"])
-
-    def to_data(self):
-        return {"destination": self.destination}
-
-    def add_to_buffer(self, buff):
-        buff.device_destinations.add(self.destination)
-
-
 TypeToRow = {
     Row.TypeId: Row
-    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
+    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow,)
 }
 
 
@@ -504,7 +467,6 @@ ParsedFederationStreamData = namedtuple(
         "presence_destinations",  # list of tuples of UserPresenceState and destinations
         "keyed_edus",  # dict of destination -> { key -> Edu }
         "edus",  # dict of destination -> [Edu]
-        "device_destinations",  # set of destinations
     ),
 )
 
@@ -523,11 +485,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence=[],
-        presence_destinations=[],
-        keyed_edus={},
-        edus={},
-        device_destinations=set(),
+        presence=[], presence_destinations=[], keyed_edus={}, edus={},
     )
 
     # Parse the rows in the stream and add to the buffer
@@ -555,6 +513,3 @@ def process_rows_for_federation(transaction_queue, rows):
     for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(edu, None)
-
-    for destination in buff.device_destinations:
-        transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 198257414b..dc563538de 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from typing import Any, Dict
 
 from six.moves import urllib
 
@@ -352,7 +353,9 @@ class TransportLayerClient(object):
         else:
             path = _create_v1_path("/publicRooms")
 
-            args = {"include_all_networks": "true" if include_all_networks else "false"}
+            args = {
+                "include_all_networks": "true" if include_all_networks else "false"
+            }  # type: Dict[str, Any]
             if third_party_instance_id:
                 args["third_party_instance_id"] = (third_party_instance_id,)
             if limit:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d8cf9ed299..125eadd796 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,6 +18,7 @@
 import functools
 import logging
 import re
+from typing import Optional, Tuple, Type
 
 from twisted.internet.defer import maybeDeferred
 
@@ -267,6 +268,8 @@ class BaseFederationServlet(object):
                 returned.
     """
 
+    PATH = ""  # Overridden in subclasses, the regex to match against the path.
+
     REQUIRE_AUTH = True
 
     PREFIX = FEDERATION_V1_PREFIX  # Allows specifying the API version
@@ -347,9 +350,6 @@ class BaseFederationServlet(object):
 
             return response
 
-        # Extra logic that functools.wraps() doesn't finish
-        new_func.__self__ = func.__self__
-
         return new_func
 
     def register(self, server):
@@ -824,7 +824,7 @@ class PublicRoomList(BaseFederationServlet):
         if not self.allow_access:
             raise FederationDeniedError(origin)
 
-        limit = int(content.get("limit", 100))
+        limit = int(content.get("limit", 100))  # type: Optional[int]
         since_token = content.get("since", None)
         search_filter = content.get("filter", None)
 
@@ -971,7 +971,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        result = await self.groups_handler.update_room_in_group(
+        result = await self.handler.update_room_in_group(
             group_id, requester_user_id, room_id, config_key, content
         )
 
@@ -1422,11 +1422,13 @@ FEDERATION_SERVLET_CLASSES = (
     On3pidBindServlet,
     FederationVersionServlet,
     RoomComplexityServlet,
-)
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
-OPENID_SERVLET_CLASSES = (OpenIdUserInfo,)
+OPENID_SERVLET_CLASSES = (
+    OpenIdUserInfo,
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
-ROOM_LIST_CLASSES = (PublicRoomList,)
+ROOM_LIST_CLASSES = (PublicRoomList,)  # type: Tuple[Type[PublicRoomList], ...]
 
 GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsProfileServlet,
@@ -1447,17 +1449,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsAddRoomsServlet,
     FederationGroupsAddRoomsConfigServlet,
     FederationGroupsSettingJoinPolicyServlet,
-)
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
 
 GROUP_LOCAL_SERVLET_CLASSES = (
     FederationGroupsLocalInviteServlet,
     FederationGroupsRemoveLocalUserServlet,
     FederationGroupsBulkPublicisedServlet,
-)
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
 
-GROUP_ATTESTATION_SERVLET_CLASSES = (FederationGroupsRenewAttestaionServlet,)
+GROUP_ATTESTATION_SERVLET_CLASSES = (
+    FederationGroupsRenewAttestaionServlet,
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
 DEFAULT_SERVLET_GROUPS = (
     "federation",