summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py41
-rw-r--r--synapse/federation/federation_client.py50
-rw-r--r--synapse/federation/sender/__init__.py3
-rw-r--r--synapse/federation/sender/per_destination_queue.py45
-rw-r--r--synapse/federation/transport/server.py45
5 files changed, 124 insertions, 60 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py

index cffa831d80..fc5cfb7d83 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -223,9 +223,6 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): the signatures are valid, or fail (with a SynapseError) if not. """ - # (currently this is written assuming the v1 room structure; we'll probably want a - # separate function for checking v2 rooms) - # we want to check that the event is signed by: # # (a) the sender's server @@ -257,6 +254,10 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): for p in pdus ] + v = KNOWN_ROOM_VERSIONS.get(room_version) + if not v: + raise RuntimeError("Unrecognized room version %s" % (room_version,)) + # First we check that the sender event is signed by the sender's domain # (except if its a 3pid invite, in which case it may be sent by any server) pdus_to_check_sender = [ @@ -264,10 +265,17 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): if not _is_invite_via_3pid(p.pdu) ] - more_deferreds = keyring.verify_json_objects_for_server([ - (p.sender_domain, p.redacted_pdu_json) - for p in pdus_to_check_sender - ]) + more_deferreds = keyring.verify_json_objects_for_server( + [ + ( + p.sender_domain, + p.redacted_pdu_json, + p.pdu.origin_server_ts if v.enforce_key_validity else 0, + p.pdu.event_id, + ) + for p in pdus_to_check_sender + ] + ) def sender_err(e, pdu_to_check): errmsg = "event id %s: unable to verify signature for sender %s: %s" % ( @@ -287,20 +295,23 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): # event id's domain (normally only the case for joins/leaves), and add additional # checks. Only do this if the room version has a concept of event ID domain # (ie, the room version uses old-style non-hash event IDs). - v = KNOWN_ROOM_VERSIONS.get(room_version) - if not v: - raise RuntimeError("Unrecognized room version %s" % (room_version,)) - if v.event_format == EventFormatVersions.V1: pdus_to_check_event_id = [ p for p in pdus_to_check if p.sender_domain != get_domain_from_id(p.pdu.event_id) ] - more_deferreds = keyring.verify_json_objects_for_server([ - (get_domain_from_id(p.pdu.event_id), p.redacted_pdu_json) - for p in pdus_to_check_event_id - ]) + more_deferreds = keyring.verify_json_objects_for_server( + [ + ( + get_domain_from_id(p.pdu.event_id), + p.redacted_pdu_json, + p.pdu.origin_server_ts if v.enforce_key_validity else 0, + p.pdu.event_id, + ) + for p in pdus_to_check_event_id + ] + ) def event_err(e, pdu_to_check): errmsg = ( diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f3fc897a0a..70573746d6 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -17,7 +17,6 @@ import copy import itertools import logging -import random from six.moves import range @@ -233,7 +232,8 @@ class FederationClient(FederationBase): moving to the next destination. None indicates no timeout. Returns: - Deferred: Results in the requested PDU. + Deferred: Results in the requested PDU, or None if we were unable to find + it. """ # TODO: Rate limit the number of times we try and get the same event. @@ -258,7 +258,12 @@ class FederationClient(FederationBase): destination, event_id, timeout=timeout, ) - logger.debug("transaction_data %r", transaction_data) + logger.debug( + "retrieved event id %s from %s: %r", + event_id, + destination, + transaction_data, + ) pdu_list = [ event_from_pdu_json(p, format_ver, outlier=outlier) @@ -280,6 +285,7 @@ class FederationClient(FederationBase): "Failed to get PDU %s from %s because %s", event_id, destination, e, ) + continue except NotRetryingDestination as e: logger.info(str(e)) continue @@ -326,12 +332,16 @@ class FederationClient(FederationBase): state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) - fetched_events, failed_to_fetch = yield self.get_events( - [destination], room_id, set(state_event_ids + auth_event_ids) + fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest( + destination, room_id, set(state_event_ids + auth_event_ids) ) if failed_to_fetch: - logger.warn("Failed to get %r", failed_to_fetch) + logger.warning( + "Failed to fetch missing state/auth events for %s: %s", + room_id, + failed_to_fetch + ) event_map = { ev.event_id: ev for ev in fetched_events @@ -397,27 +407,20 @@ class FederationClient(FederationBase): defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks - def get_events(self, destinations, room_id, event_ids, return_local=True): - """Fetch events from some remote destinations, checking if we already - have them. + def get_events_from_store_or_dest(self, destination, room_id, event_ids): + """Fetch events from a remote destination, checking if we already have them. Args: - destinations (list) + destination (str) room_id (str) event_ids (list) - return_local (bool): Whether to include events we already have in - the DB in the returned list of events Returns: Deferred: A deferred resolving to a 2-tuple where the first is a list of events and the second is a list of event ids that we failed to fetch. """ - if return_local: - seen_events = yield self.store.get_events(event_ids, allow_rejected=True) - signed_events = list(seen_events.values()) - else: - seen_events = yield self.store.have_seen_events(event_ids) - signed_events = [] + seen_events = yield self.store.get_events(event_ids, allow_rejected=True) + signed_events = list(seen_events.values()) failed_to_fetch = set() @@ -428,10 +431,11 @@ class FederationClient(FederationBase): if not missing_events: defer.returnValue((signed_events, failed_to_fetch)) - def random_server_list(): - srvs = list(destinations) - random.shuffle(srvs) - return srvs + logger.debug( + "Fetching unknown state/auth events %s for room %s", + missing_events, + event_ids, + ) room_version = yield self.store.get_room_version(room_id) @@ -443,7 +447,7 @@ class FederationClient(FederationBase): deferreds = [ run_in_background( self.get_pdu, - destinations=random_server_list(), + destinations=[destination], event_id=e_id, room_version=room_version, ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 4f0f939102..4224b29ecf 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -168,6 +168,9 @@ class FederationSender(object): if not is_mine and send_on_behalf_of is None: return + if not event.internal_metadata.should_proactively_send(): + return + try: # Get the state from before the event. # We need to make sure that this is the state from before diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index fae8bea392..22a2735405 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -189,11 +189,21 @@ class PerDestinationQueue(object): pending_pdus = [] while True: - device_message_edus, device_stream_id, dev_list_id = ( - # We have to keep 2 free slots for presence and rr_edus - yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2) + # We have to keep 2 free slots for presence and rr_edus + limit = MAX_EDUS_PER_TRANSACTION - 2 + + device_update_edus, dev_list_id = ( + yield self._get_device_update_edus(limit) + ) + + limit -= len(device_update_edus) + + to_device_edus, device_stream_id = ( + yield self._get_to_device_message_edus(limit) ) + pending_edus = device_update_edus + to_device_edus + # BEGIN CRITICAL SECTION # # In order to avoid a race condition, we need to make sure that @@ -208,10 +218,6 @@ class PerDestinationQueue(object): # We can only include at most 50 PDUs per transactions pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:] - pending_edus = [] - - # We can only include at most 100 EDUs per transactions - # rr_edus and pending_presence take at most one slot each pending_edus.extend(self._get_rr_edus(force_flush=False)) pending_presence = self._pending_presence self._pending_presence = {} @@ -232,7 +238,6 @@ class PerDestinationQueue(object): ) ) - pending_edus.extend(device_message_edus) pending_edus.extend( self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) ) @@ -272,10 +277,13 @@ class PerDestinationQueue(object): sent_edus_by_type.labels(edu.edu_type).inc() # Remove the acknowledged device messages from the database # Only bother if we actually sent some device messages - if device_message_edus: + if to_device_edus: yield self._store.delete_device_msgs_for_remote( self._destination, device_stream_id ) + + # also mark the device updates as sent + if device_update_edus: logger.info( "Marking as sent %r %r", self._destination, dev_list_id ) @@ -347,11 +355,12 @@ class PerDestinationQueue(object): return pending_edus @defer.inlineCallbacks - def _get_new_device_messages(self, limit): + def _get_device_update_edus(self, limit): last_device_list = self._last_device_list_stream_id - # Will return at most 20 entries + + # Retrieve list of new device updates to send to the destination now_stream_id, results = yield self._store.get_devices_by_remote( - self._destination, last_device_list + self._destination, last_device_list, limit=limit, ) edus = [ Edu( @@ -365,15 +374,19 @@ class PerDestinationQueue(object): assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs" + defer.returnValue((edus, now_stream_id)) + + @defer.inlineCallbacks + def _get_to_device_message_edus(self, limit): last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() contents, stream_id = yield self._store.get_new_device_msgs_for_remote( self._destination, last_device_stream_id, to_device_stream_id, - limit - len(edus), + limit, ) - edus.extend( + edus = [ Edu( origin=self._server_name, destination=self._destination, @@ -381,6 +394,6 @@ class PerDestinationQueue(object): content=content, ) for content in contents - ) + ] - defer.returnValue((edus, stream_id, now_stream_id)) + defer.returnValue((edus, stream_id)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 385eda2dca..6cf213b895 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -23,7 +23,11 @@ from twisted.internet import defer import synapse from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.room_versions import RoomVersions -from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX +from synapse.api.urls import ( + FEDERATION_UNSTABLE_PREFIX, + FEDERATION_V1_PREFIX, + FEDERATION_V2_PREFIX, +) from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.server import JsonResource from synapse.http.servlet import ( @@ -90,6 +94,7 @@ class NoAuthenticationError(AuthenticationError): class Authenticator(object): def __init__(self, hs): + self._clock = hs.get_clock() self.keyring = hs.get_keyring() self.server_name = hs.hostname self.store = hs.get_datastore() @@ -98,6 +103,7 @@ class Authenticator(object): # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks def authenticate_request(self, request, content): + now = self._clock.time_msec() json_request = { "method": request.method.decode('ascii'), "uri": request.uri.decode('ascii'), @@ -134,7 +140,9 @@ class Authenticator(object): 401, "Missing Authorization headers", Codes.UNAUTHORIZED, ) - yield self.keyring.verify_json_for_server(origin, json_request) + yield self.keyring.verify_json_for_server( + origin, json_request, now, "Incoming request" + ) logger.info("Request from %s", origin) request.authenticated_entity = origin @@ -712,15 +720,15 @@ class PublicRoomList(BaseFederationServlet): PATH = "/publicRooms" - def __init__(self, handler, authenticator, ratelimiter, server_name, deny_access): + def __init__(self, handler, authenticator, ratelimiter, server_name, allow_access): super(PublicRoomList, self).__init__( handler, authenticator, ratelimiter, server_name, ) - self.deny_access = deny_access + self.allow_access = allow_access @defer.inlineCallbacks def on_GET(self, origin, content, query): - if self.deny_access: + if not self.allow_access: raise FederationDeniedError(origin) limit = parse_integer_from_args(query, "limit", 0) @@ -1304,6 +1312,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class RoomComplexityServlet(BaseFederationServlet): + """ + Indicates to other servers how complex (and therefore likely + resource-intensive) a public room this server knows about is. + """ + PATH = "/rooms/(?P<room_id>[^/]*)/complexity" + PREFIX = FEDERATION_UNSTABLE_PREFIX + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, room_id): + + store = self.handler.hs.get_datastore() + + is_public = yield store.is_room_world_readable_or_publicly_joinable( + room_id + ) + + if not is_public: + raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM) + + complexity = yield store.get_room_complexity(room_id) + defer.returnValue((200, complexity)) + + FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationEventServlet, @@ -1327,6 +1359,7 @@ FEDERATION_SERVLET_CLASSES = ( FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, FederationVersionServlet, + RoomComplexityServlet, ) OPENID_SERVLET_CLASSES = ( @@ -1422,7 +1455,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=N authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, - deny_access=hs.config.restrict_public_rooms_to_local_users, + allow_access=hs.config.allow_public_rooms_over_federation, ).register(resource) if "group_server" in servlet_groups: