summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py26
-rw-r--r--synapse/federation/federation_client.py50
-rw-r--r--synapse/federation/federation_server.py14
-rw-r--r--synapse/federation/sender/per_destination_queue.py124
-rw-r--r--synapse/federation/transport/server.py51
5 files changed, 174 insertions, 91 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index dfe6b4aa5c..4b38f7c759 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -265,11 +265,22 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
     ]
 
     more_deferreds = keyring.verify_json_objects_for_server([
-        (p.sender_domain, p.redacted_pdu_json)
+        (p.sender_domain, p.redacted_pdu_json, 0)
         for p in pdus_to_check_sender
     ])
 
+    def sender_err(e, pdu_to_check):
+        errmsg = "event id %s: unable to verify signature for sender %s: %s" % (
+            pdu_to_check.pdu.event_id,
+            pdu_to_check.sender_domain,
+            e.getErrorMessage(),
+        )
+        # XX not really sure if these are the right codes, but they are what
+        # we've done for ages
+        raise SynapseError(400, errmsg, Codes.UNAUTHORIZED)
+
     for p, d in zip(pdus_to_check_sender, more_deferreds):
+        d.addErrback(sender_err, p)
         p.deferreds.append(d)
 
     # now let's look for events where the sender's domain is different to the
@@ -287,11 +298,22 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
         ]
 
         more_deferreds = keyring.verify_json_objects_for_server([
-            (get_domain_from_id(p.pdu.event_id), p.redacted_pdu_json)
+            (get_domain_from_id(p.pdu.event_id), p.redacted_pdu_json, 0)
             for p in pdus_to_check_event_id
         ])
 
+        def event_err(e, pdu_to_check):
+            errmsg = (
+                "event id %s: unable to verify signature for event id domain: %s" % (
+                    pdu_to_check.pdu.event_id,
+                    e.getErrorMessage(),
+                )
+            )
+            # XX as above: not really sure if these are the right codes
+            raise SynapseError(400, errmsg, Codes.UNAUTHORIZED)
+
         for p, d in zip(pdus_to_check_event_id, more_deferreds):
+            d.addErrback(event_err, p)
             p.deferreds.append(d)
 
     # replace lists of deferreds with single Deferreds
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f3fc897a0a..70573746d6 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,7 +17,6 @@
 import copy
 import itertools
 import logging
-import random
 
 from six.moves import range
 
@@ -233,7 +232,8 @@ class FederationClient(FederationBase):
                 moving to the next destination. None indicates no timeout.
 
         Returns:
-            Deferred: Results in the requested PDU.
+            Deferred: Results in the requested PDU, or None if we were unable to find
+               it.
         """
 
         # TODO: Rate limit the number of times we try and get the same event.
@@ -258,7 +258,12 @@ class FederationClient(FederationBase):
                     destination, event_id, timeout=timeout,
                 )
 
-                logger.debug("transaction_data %r", transaction_data)
+                logger.debug(
+                    "retrieved event id %s from %s: %r",
+                    event_id,
+                    destination,
+                    transaction_data,
+                )
 
                 pdu_list = [
                     event_from_pdu_json(p, format_ver, outlier=outlier)
@@ -280,6 +285,7 @@ class FederationClient(FederationBase):
                     "Failed to get PDU %s from %s because %s",
                     event_id, destination, e,
                 )
+                continue
             except NotRetryingDestination as e:
                 logger.info(str(e))
                 continue
@@ -326,12 +332,16 @@ class FederationClient(FederationBase):
             state_event_ids = result["pdu_ids"]
             auth_event_ids = result.get("auth_chain_ids", [])
 
-            fetched_events, failed_to_fetch = yield self.get_events(
-                [destination], room_id, set(state_event_ids + auth_event_ids)
+            fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest(
+                destination, room_id, set(state_event_ids + auth_event_ids)
             )
 
             if failed_to_fetch:
-                logger.warn("Failed to get %r", failed_to_fetch)
+                logger.warning(
+                    "Failed to fetch missing state/auth events for %s: %s",
+                    room_id,
+                    failed_to_fetch
+                )
 
             event_map = {
                 ev.event_id: ev for ev in fetched_events
@@ -397,27 +407,20 @@ class FederationClient(FederationBase):
         defer.returnValue((signed_pdus, signed_auth))
 
     @defer.inlineCallbacks
-    def get_events(self, destinations, room_id, event_ids, return_local=True):
-        """Fetch events from some remote destinations, checking if we already
-        have them.
+    def get_events_from_store_or_dest(self, destination, room_id, event_ids):
+        """Fetch events from a remote destination, checking if we already have them.
 
         Args:
-            destinations (list)
+            destination (str)
             room_id (str)
             event_ids (list)
-            return_local (bool): Whether to include events we already have in
-                the DB in the returned list of events
 
         Returns:
             Deferred: A deferred resolving to a 2-tuple where the first is a list of
             events and the second is a list of event ids that we failed to fetch.
         """
-        if return_local:
-            seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
-            signed_events = list(seen_events.values())
-        else:
-            seen_events = yield self.store.have_seen_events(event_ids)
-            signed_events = []
+        seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
+        signed_events = list(seen_events.values())
 
         failed_to_fetch = set()
 
@@ -428,10 +431,11 @@ class FederationClient(FederationBase):
         if not missing_events:
             defer.returnValue((signed_events, failed_to_fetch))
 
-        def random_server_list():
-            srvs = list(destinations)
-            random.shuffle(srvs)
-            return srvs
+        logger.debug(
+            "Fetching unknown state/auth events %s for room %s",
+            missing_events,
+            event_ids,
+        )
 
         room_version = yield self.store.get_room_version(room_id)
 
@@ -443,7 +447,7 @@ class FederationClient(FederationBase):
             deferreds = [
                 run_in_background(
                     self.get_pdu,
-                    destinations=random_server_list(),
+                    destinations=[destination],
                     event_id=e_id,
                     room_version=room_version,
                 )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index df60828dba..4c28c1dc3c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
     IncompatibleRoomVersionError,
     NotFoundError,
     SynapseError,
+    UnsupportedRoomVersionError,
 )
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.crypto.event_signing import compute_event_signature
@@ -198,11 +199,22 @@ class FederationServer(FederationBase):
 
             try:
                 room_version = yield self.store.get_room_version(room_id)
-                format_ver = room_version_to_event_format(room_version)
             except NotFoundError:
                 logger.info("Ignoring PDU for unknown room_id: %s", room_id)
                 continue
 
+            try:
+                format_ver = room_version_to_event_format(room_version)
+            except UnsupportedRoomVersionError:
+                # this can happen if support for a given room version is withdrawn,
+                # so that we still get events for said room.
+                logger.info(
+                    "Ignoring PDU for room %s with unknown version %s",
+                    room_id,
+                    room_version,
+                )
+                continue
+
             event = event_from_pdu_json(p, format_ver)
             pdus_by_room.setdefault(room_id, []).append(event)
 
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index be99211003..fae8bea392 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -33,12 +33,14 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage import UserPresenceState
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
+# This is defined in the Matrix spec and enforced by the receiver.
+MAX_EDUS_PER_TRANSACTION = 100
+
 logger = logging.getLogger(__name__)
 
 
 sent_edus_counter = Counter(
-    "synapse_federation_client_sent_edus",
-    "Total number of EDUs successfully sent",
+    "synapse_federation_client_sent_edus", "Total number of EDUs successfully sent"
 )
 
 sent_edus_by_type = Counter(
@@ -58,6 +60,7 @@ class PerDestinationQueue(object):
         destination (str): the server_name of the destination that we are managing
             transmission for.
     """
+
     def __init__(self, hs, transaction_manager, destination):
         self._server_name = hs.hostname
         self._clock = hs.get_clock()
@@ -68,17 +71,17 @@ class PerDestinationQueue(object):
         self.transmission_loop_running = False
 
         # a list of tuples of (pending pdu, order)
-        self._pending_pdus = []    # type: list[tuple[EventBase, int]]
-        self._pending_edus = []    # type: list[Edu]
+        self._pending_pdus = []  # type: list[tuple[EventBase, int]]
+        self._pending_edus = []  # type: list[Edu]
 
         # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
         # based on their key (e.g. typing events by room_id)
         # Map of (edu_type, key) -> Edu
-        self._pending_edus_keyed = {}   # type: dict[tuple[str, str], Edu]
+        self._pending_edus_keyed = {}  # type: dict[tuple[str, str], Edu]
 
         # Map of user_id -> UserPresenceState of pending presence to be sent to this
         # destination
-        self._pending_presence = {}   # type: dict[str, UserPresenceState]
+        self._pending_presence = {}  # type: dict[str, UserPresenceState]
 
         # room_id -> receipt_type -> user_id -> receipt_dict
         self._pending_rrs = {}
@@ -120,9 +123,7 @@ class PerDestinationQueue(object):
         Args:
             states (iterable[UserPresenceState]): presence to send
         """
-        self._pending_presence.update({
-            state.user_id: state for state in states
-        })
+        self._pending_presence.update({state.user_id: state for state in states})
         self.attempt_new_transaction()
 
     def queue_read_receipt(self, receipt):
@@ -132,14 +133,9 @@ class PerDestinationQueue(object):
         Args:
             receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
         """
-        self._pending_rrs.setdefault(
-            receipt.room_id, {},
-        ).setdefault(
+        self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
             receipt.receipt_type, {}
-        )[receipt.user_id] = {
-            "event_ids": receipt.event_ids,
-            "data": receipt.data,
-        }
+        )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
 
     def flush_read_receipts_for_room(self, room_id):
         # if we don't have any read-receipts for this room, it may be that we've already
@@ -170,10 +166,7 @@ class PerDestinationQueue(object):
             # request at which point pending_pdus just keeps growing.
             # we need application-layer timeouts of some flavour of these
             # requests
-            logger.debug(
-                "TX [%s] Transaction already in progress",
-                self._destination
-            )
+            logger.debug("TX [%s] Transaction already in progress", self._destination)
             return
 
         logger.debug("TX [%s] Starting transaction loop", self._destination)
@@ -197,7 +190,8 @@ class PerDestinationQueue(object):
             pending_pdus = []
             while True:
                 device_message_edus, device_stream_id, dev_list_id = (
-                    yield self._get_new_device_messages()
+                    # We have to keep 2 free slots for presence and rr_edus
+                    yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
                 )
 
                 # BEGIN CRITICAL SECTION
@@ -216,19 +210,9 @@ class PerDestinationQueue(object):
 
                 pending_edus = []
 
-                pending_edus.extend(self._get_rr_edus(force_flush=False))
-
                 # We can only include at most 100 EDUs per transactions
-                pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
-
-                pending_edus.extend(
-                    self._pending_edus_keyed.values()
-                )
-
-                self._pending_edus_keyed = {}
-
-                pending_edus.extend(device_message_edus)
-
+                # rr_edus and pending_presence take at most one slot each
+                pending_edus.extend(self._get_rr_edus(force_flush=False))
                 pending_presence = self._pending_presence
                 self._pending_presence = {}
                 if pending_presence:
@@ -248,9 +232,23 @@ class PerDestinationQueue(object):
                         )
                     )
 
+                pending_edus.extend(device_message_edus)
+                pending_edus.extend(
+                    self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
+                )
+                while (
+                    len(pending_edus) < MAX_EDUS_PER_TRANSACTION
+                    and self._pending_edus_keyed
+                ):
+                    _, val = self._pending_edus_keyed.popitem()
+                    pending_edus.append(val)
+
                 if pending_pdus:
-                    logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
-                                 self._destination, len(pending_pdus))
+                    logger.debug(
+                        "TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+                        self._destination,
+                        len(pending_pdus),
+                    )
 
                 if not pending_pdus and not pending_edus:
                     logger.debug("TX [%s] Nothing to send", self._destination)
@@ -259,7 +257,7 @@ class PerDestinationQueue(object):
 
                 # if we've decided to send a transaction anyway, and we have room, we
                 # may as well send any pending RRs
-                if len(pending_edus) < 100:
+                if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
                     pending_edus.extend(self._get_rr_edus(force_flush=True))
 
                 # END CRITICAL SECTION
@@ -303,22 +301,25 @@ class PerDestinationQueue(object):
         except HttpResponseException as e:
             logger.warning(
                 "TX [%s] Received %d response to transaction: %s",
-                self._destination, e.code, e,
+                self._destination,
+                e.code,
+                e,
             )
         except RequestSendFailed as e:
-            logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e)
+            logger.warning(
+                "TX [%s] Failed to send transaction: %s", self._destination, e
+            )
 
             for p, _ in pending_pdus:
-                logger.info("Failed to send event %s to %s", p.event_id,
-                            self._destination)
+                logger.info(
+                    "Failed to send event %s to %s", p.event_id, self._destination
+                )
         except Exception:
-            logger.exception(
-                "TX [%s] Failed to send transaction",
-                self._destination,
-            )
+            logger.exception("TX [%s] Failed to send transaction", self._destination)
             for p, _ in pending_pdus:
-                logger.info("Failed to send event %s to %s", p.event_id,
-                            self._destination)
+                logger.info(
+                    "Failed to send event %s to %s", p.event_id, self._destination
+                )
         finally:
             # We want to be *very* sure we clear this after we stop processing
             self.transmission_loop_running = False
@@ -346,33 +347,40 @@ class PerDestinationQueue(object):
         return pending_edus
 
     @defer.inlineCallbacks
-    def _get_new_device_messages(self):
-        last_device_stream_id = self._last_device_stream_id
-        to_device_stream_id = self._store.get_to_device_stream_token()
-        contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
-            self._destination, last_device_stream_id, to_device_stream_id
+    def _get_new_device_messages(self, limit):
+        last_device_list = self._last_device_list_stream_id
+        # Will return at most 20 entries
+        now_stream_id, results = yield self._store.get_devices_by_remote(
+            self._destination, last_device_list
         )
         edus = [
             Edu(
                 origin=self._server_name,
                 destination=self._destination,
-                edu_type="m.direct_to_device",
+                edu_type="m.device_list_update",
                 content=content,
             )
-            for content in contents
+            for content in results
         ]
 
-        last_device_list = self._last_device_list_stream_id
-        now_stream_id, results = yield self._store.get_devices_by_remote(
-            self._destination, last_device_list
+        assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
+
+        last_device_stream_id = self._last_device_stream_id
+        to_device_stream_id = self._store.get_to_device_stream_token()
+        contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
+            self._destination,
+            last_device_stream_id,
+            to_device_stream_id,
+            limit - len(edus),
         )
         edus.extend(
             Edu(
                 origin=self._server_name,
                 destination=self._destination,
-                edu_type="m.device_list_update",
+                edu_type="m.direct_to_device",
                 content=content,
             )
-            for content in results
+            for content in contents
         )
+
         defer.returnValue((edus, stream_id, now_stream_id))
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 452599e1a1..0db8858cf1 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -23,7 +23,11 @@ from twisted.internet import defer
 import synapse
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.room_versions import RoomVersions
-from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
+from synapse.api.urls import (
+    FEDERATION_UNSTABLE_PREFIX,
+    FEDERATION_V1_PREFIX,
+    FEDERATION_V2_PREFIX,
+)
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
@@ -63,11 +67,7 @@ class TransportLayerServer(JsonResource):
         self.authenticator = Authenticator(hs)
         self.ratelimiter = FederationRateLimiter(
             self.clock,
-            window_size=hs.config.federation_rc_window_size,
-            sleep_limit=hs.config.federation_rc_sleep_limit,
-            sleep_msec=hs.config.federation_rc_sleep_delay,
-            reject_limit=hs.config.federation_rc_reject_limit,
-            concurrent_requests=hs.config.federation_rc_concurrent,
+            config=hs.config.rc_federation,
         )
 
         self.register_servlets()
@@ -94,6 +94,7 @@ class NoAuthenticationError(AuthenticationError):
 
 class Authenticator(object):
     def __init__(self, hs):
+        self._clock = hs.get_clock()
         self.keyring = hs.get_keyring()
         self.server_name = hs.hostname
         self.store = hs.get_datastore()
@@ -102,6 +103,7 @@ class Authenticator(object):
     # A method just so we can pass 'self' as the authenticator to the Servlets
     @defer.inlineCallbacks
     def authenticate_request(self, request, content):
+        now = self._clock.time_msec()
         json_request = {
             "method": request.method.decode('ascii'),
             "uri": request.uri.decode('ascii'),
@@ -138,7 +140,7 @@ class Authenticator(object):
                 401, "Missing Authorization headers", Codes.UNAUTHORIZED,
             )
 
-        yield self.keyring.verify_json_for_server(origin, json_request)
+        yield self.keyring.verify_json_for_server(origin, json_request, now)
 
         logger.info("Request from %s", origin)
         request.authenticated_entity = origin
@@ -716,8 +718,17 @@ class PublicRoomList(BaseFederationServlet):
 
     PATH = "/publicRooms"
 
+    def __init__(self, handler, authenticator, ratelimiter, server_name, deny_access):
+        super(PublicRoomList, self).__init__(
+            handler, authenticator, ratelimiter, server_name,
+        )
+        self.deny_access = deny_access
+
     @defer.inlineCallbacks
     def on_GET(self, origin, content, query):
+        if self.deny_access:
+            raise FederationDeniedError(origin)
+
         limit = parse_integer_from_args(query, "limit", 0)
         since_token = parse_string_from_args(query, "since", None)
         include_all_networks = parse_boolean_from_args(
@@ -1299,6 +1310,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
         defer.returnValue((200, new_content))
 
 
+class RoomComplexityServlet(BaseFederationServlet):
+    """
+    Indicates to other servers how complex (and therefore likely
+    resource-intensive) a public room this server knows about is.
+    """
+    PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
+    PREFIX = FEDERATION_UNSTABLE_PREFIX
+
+    @defer.inlineCallbacks
+    def on_GET(self, origin, content, query, room_id):
+
+        store = self.handler.hs.get_datastore()
+
+        is_public = yield store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        if not is_public:
+            raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
+
+        complexity = yield store.get_room_complexity(room_id)
+        defer.returnValue((200, complexity))
+
+
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationEventServlet,
@@ -1322,6 +1357,7 @@ FEDERATION_SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
     FederationVersionServlet,
+    RoomComplexityServlet,
 )
 
 OPENID_SERVLET_CLASSES = (
@@ -1417,6 +1453,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=N
                 authenticator=authenticator,
                 ratelimiter=ratelimiter,
                 server_name=hs.hostname,
+                deny_access=hs.config.restrict_public_rooms_to_local_users,
             ).register(resource)
 
     if "group_server" in servlet_groups: