summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py38
-rw-r--r--synapse/federation/federation_client.py29
-rw-r--r--synapse/federation/federation_server.py3
-rw-r--r--synapse/federation/send_queue.py73
-rw-r--r--synapse/federation/sender/__init__.py19
-rw-r--r--synapse/federation/sender/per_destination_queue.py124
-rw-r--r--synapse/federation/transport/server.py14
7 files changed, 214 insertions, 86 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py

index a7a2ec4523..cffa831d80 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -20,8 +20,9 @@ import six from twisted.internet import defer from twisted.internet.defer import DeferredList -from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions +from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions from synapse.crypto.event_signing import check_event_content_hash from synapse.events import event_type_from_format_version from synapse.events.utils import prune_event @@ -268,15 +269,29 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): for p in pdus_to_check_sender ]) + def sender_err(e, pdu_to_check): + errmsg = "event id %s: unable to verify signature for sender %s: %s" % ( + pdu_to_check.pdu.event_id, + pdu_to_check.sender_domain, + e.getErrorMessage(), + ) + # XX not really sure if these are the right codes, but they are what + # we've done for ages + raise SynapseError(400, errmsg, Codes.UNAUTHORIZED) + for p, d in zip(pdus_to_check_sender, more_deferreds): + d.addErrback(sender_err, p) p.deferreds.append(d) # now let's look for events where the sender's domain is different to the # event id's domain (normally only the case for joins/leaves), and add additional # checks. Only do this if the room version has a concept of event ID domain - if room_version in ( - RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST, - ): + # (ie, the room version uses old-style non-hash event IDs). + v = KNOWN_ROOM_VERSIONS.get(room_version) + if not v: + raise RuntimeError("Unrecognized room version %s" % (room_version,)) + + if v.event_format == EventFormatVersions.V1: pdus_to_check_event_id = [ p for p in pdus_to_check if p.sender_domain != get_domain_from_id(p.pdu.event_id) @@ -287,12 +302,19 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): for p in pdus_to_check_event_id ]) + def event_err(e, pdu_to_check): + errmsg = ( + "event id %s: unable to verify signature for event id domain: %s" % ( + pdu_to_check.pdu.event_id, + e.getErrorMessage(), + ) + ) + # XX as above: not really sure if these are the right codes + raise SynapseError(400, errmsg, Codes.UNAUTHORIZED) + for p, d in zip(pdus_to_check_event_id, more_deferreds): + d.addErrback(event_err, p) p.deferreds.append(d) - elif room_version in (RoomVersions.V3,): - pass # No further checks needed, as event IDs are hashes here - else: - raise RuntimeError("Unrecognized room version %s" % (room_version,)) # replace lists of deferreds with single Deferreds return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check] diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 58e04d81ab..f3fc897a0a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -25,12 +25,7 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import ( - KNOWN_ROOM_VERSIONS, - EventTypes, - Membership, - RoomVersions, -) +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( CodeMessageException, Codes, @@ -38,6 +33,11 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + EventFormatVersions, + RoomVersions, +) from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError @@ -570,7 +570,7 @@ class FederationClient(FederationBase): Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of `(origin, event, event_format)` where origin is the remote homeserver which generated the event, and event_format is one of - `synapse.api.constants.EventFormatVersions`. + `synapse.api.room_versions.EventFormatVersions`. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. @@ -592,7 +592,7 @@ class FederationClient(FederationBase): # Note: If not supplied, the room version may be either v1 or v2, # however either way the event format version will be v1. - room_version = ret.get("room_version", RoomVersions.V1) + room_version = ret.get("room_version", RoomVersions.V1.identifier) event_format = room_version_to_event_format(room_version) pdu_dict = ret.get("event", None) @@ -695,7 +695,9 @@ class FederationClient(FederationBase): room_version = None for e in state: if (e.type, e.state_key) == (EventTypes.Create, ""): - room_version = e.content.get("room_version", RoomVersions.V1) + room_version = e.content.get( + "room_version", RoomVersions.V1.identifier + ) break if room_version is None: @@ -802,11 +804,10 @@ class FederationClient(FederationBase): raise err # Otherwise, we assume that the remote server doesn't understand - # the v2 invite API. - - if room_version in (RoomVersions.V1, RoomVersions.V2): - pass # We'll fall through - else: + # the v2 invite API. That's ok provided the room uses old-style event + # IDs. + v = KNOWN_ROOM_VERSIONS.get(room_version) + if v.event_format != EventFormatVersions.V1: raise SynapseError( 400, "User's homeserver does not support this room version", diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 81f3b4b1ff..df60828dba 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -25,7 +25,7 @@ from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, @@ -34,6 +34,7 @@ from synapse.api.errors import ( NotFoundError, SynapseError, ) +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.crypto.event_signing import compute_event_signature from synapse.events import room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 04d04a4457..0240b339b0 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -55,7 +55,12 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = SortedDict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> list[user_id] + + # Stores the destinations we need to explicitly send presence to about a + # given user. + # Stream position -> (user_id, destinations) + self.presence_destinations = SortedDict() self.keyed_edu = {} # (destination, key) -> EDU self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) @@ -77,7 +82,7 @@ class FederationRemoteSendQueue(object): for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", - "edus", "device_messages", "pos_time", + "edus", "device_messages", "pos_time", "presence_destinations", ]: register(queue_name, getattr(self, queue_name)) @@ -121,6 +126,15 @@ class FederationRemoteSendQueue(object): for user_id in uids ) + keys = self.presence_destinations.keys() + i = self.presence_destinations.bisect_left(position_to_delete) + for key in keys[:i]: + del self.presence_destinations[key] + + user_ids.update( + user_id for user_id, _ in self.presence_destinations.values() + ) + to_del = [ user_id for user_id in self.presence_map if user_id not in user_ids ] @@ -209,6 +223,20 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() + def send_presence_to_destinations(self, states, destinations): + """As per FederationSender + + Args: + states (list[UserPresenceState]) + destinations (list[str]) + """ + for state in states: + pos = self._next_pos() + self.presence_map.update({state.user_id: state for state in states}) + self.presence_destinations[pos] = (state.user_id, destinations) + + self.notifier.on_new_replication_data() + def send_device_messages(self, destination): """As per FederationSender""" pos = self._next_pos() @@ -261,6 +289,16 @@ class FederationRemoteSendQueue(object): state=self.presence_map[user_id], ))) + # Fetch presence to send to destinations + i = self.presence_destinations.bisect_right(from_token) + j = self.presence_destinations.bisect_right(to_token) + 1 + + for pos, (user_id, dests) in self.presence_destinations.items()[i:j]: + rows.append((pos, PresenceDestinationsRow( + state=self.presence_map[user_id], + destinations=list(dests), + ))) + # Fetch changes keyed edus i = self.keyed_edu_changed.bisect_right(from_token) j = self.keyed_edu_changed.bisect_right(to_token) + 1 @@ -357,6 +395,29 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( buff.presence.append(self.state) +class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", ( + "state", # UserPresenceState + "destinations", # list[str] +))): + TypeId = "pd" + + @staticmethod + def from_data(data): + return PresenceDestinationsRow( + state=UserPresenceState.from_dict(data["state"]), + destinations=data["dests"], + ) + + def to_data(self): + return { + "state": self.state.as_dict(), + "dests": self.destinations, + } + + def add_to_buffer(self, buff): + buff.presence_destinations.append((self.state, self.destinations)) + + class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( "key", # tuple(str) - the edu key passed to send_edu "edu", # Edu @@ -428,6 +489,7 @@ TypeToRow = { Row.TypeId: Row for Row in ( PresenceRow, + PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow, @@ -437,6 +499,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( "presence", # list(UserPresenceState) + "presence_destinations", # list of tuples of UserPresenceState and destinations "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "device_destinations", # set of destinations @@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows): buff = ParsedFederationStreamData( presence=[], + presence_destinations=[], keyed_edus={}, edus={}, device_destinations=set(), @@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows): if buff.presence: transaction_queue.send_presence(buff.presence) + for state, destinations in buff.presence_destinations: + transaction_queue.send_presence_to_destinations( + states=[state], destinations=destinations, + ) + for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): transaction_queue.send_edu(edu, key) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1dc041752b..4f0f939102 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -371,7 +371,7 @@ class FederationSender(object): return # First we queue up the new presence by user ID, so multiple presence - # updates in quick successtion are correctly handled + # updates in quick succession are correctly handled. # We only want to send presence for our own users, so lets always just # filter here just in case. self.pending_presence.update({ @@ -402,6 +402,23 @@ class FederationSender(object): finally: self._processing_pending_presence = False + def send_presence_to_destinations(self, states, destinations): + """Send the given presence states to the given destinations. + + Args: + states (list[UserPresenceState]) + destinations (list[str]) + """ + + if not states or not self.hs.config.use_presence: + # No-op if presence is disabled. + return + + for destination in destinations: + if destination == self.server_name: + continue + self._get_per_destination_queue(destination).send_presence(states) + @measure_func("txnqueue._process_presence") @defer.inlineCallbacks def _process_presence_inner(self, states): diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index be99211003..fae8bea392 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -33,12 +33,14 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import UserPresenceState from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter +# This is defined in the Matrix spec and enforced by the receiver. +MAX_EDUS_PER_TRANSACTION = 100 + logger = logging.getLogger(__name__) sent_edus_counter = Counter( - "synapse_federation_client_sent_edus", - "Total number of EDUs successfully sent", + "synapse_federation_client_sent_edus", "Total number of EDUs successfully sent" ) sent_edus_by_type = Counter( @@ -58,6 +60,7 @@ class PerDestinationQueue(object): destination (str): the server_name of the destination that we are managing transmission for. """ + def __init__(self, hs, transaction_manager, destination): self._server_name = hs.hostname self._clock = hs.get_clock() @@ -68,17 +71,17 @@ class PerDestinationQueue(object): self.transmission_loop_running = False # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: list[tuple[EventBase, int]] - self._pending_edus = [] # type: list[Edu] + self._pending_pdus = [] # type: list[tuple[EventBase, int]] + self._pending_edus = [] # type: list[Edu] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered # based on their key (e.g. typing events by room_id) # Map of (edu_type, key) -> Edu - self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu] + self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu] # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence = {} # type: dict[str, UserPresenceState] + self._pending_presence = {} # type: dict[str, UserPresenceState] # room_id -> receipt_type -> user_id -> receipt_dict self._pending_rrs = {} @@ -120,9 +123,7 @@ class PerDestinationQueue(object): Args: states (iterable[UserPresenceState]): presence to send """ - self._pending_presence.update({ - state.user_id: state for state in states - }) + self._pending_presence.update({state.user_id: state for state in states}) self.attempt_new_transaction() def queue_read_receipt(self, receipt): @@ -132,14 +133,9 @@ class PerDestinationQueue(object): Args: receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued """ - self._pending_rrs.setdefault( - receipt.room_id, {}, - ).setdefault( + self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( receipt.receipt_type, {} - )[receipt.user_id] = { - "event_ids": receipt.event_ids, - "data": receipt.data, - } + )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} def flush_read_receipts_for_room(self, room_id): # if we don't have any read-receipts for this room, it may be that we've already @@ -170,10 +166,7 @@ class PerDestinationQueue(object): # request at which point pending_pdus just keeps growing. # we need application-layer timeouts of some flavour of these # requests - logger.debug( - "TX [%s] Transaction already in progress", - self._destination - ) + logger.debug("TX [%s] Transaction already in progress", self._destination) return logger.debug("TX [%s] Starting transaction loop", self._destination) @@ -197,7 +190,8 @@ class PerDestinationQueue(object): pending_pdus = [] while True: device_message_edus, device_stream_id, dev_list_id = ( - yield self._get_new_device_messages() + # We have to keep 2 free slots for presence and rr_edus + yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2) ) # BEGIN CRITICAL SECTION @@ -216,19 +210,9 @@ class PerDestinationQueue(object): pending_edus = [] - pending_edus.extend(self._get_rr_edus(force_flush=False)) - # We can only include at most 100 EDUs per transactions - pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus))) - - pending_edus.extend( - self._pending_edus_keyed.values() - ) - - self._pending_edus_keyed = {} - - pending_edus.extend(device_message_edus) - + # rr_edus and pending_presence take at most one slot each + pending_edus.extend(self._get_rr_edus(force_flush=False)) pending_presence = self._pending_presence self._pending_presence = {} if pending_presence: @@ -248,9 +232,23 @@ class PerDestinationQueue(object): ) ) + pending_edus.extend(device_message_edus) + pending_edus.extend( + self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) + ) + while ( + len(pending_edus) < MAX_EDUS_PER_TRANSACTION + and self._pending_edus_keyed + ): + _, val = self._pending_edus_keyed.popitem() + pending_edus.append(val) + if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - self._destination, len(pending_pdus)) + logger.debug( + "TX [%s] len(pending_pdus_by_dest[dest]) = %d", + self._destination, + len(pending_pdus), + ) if not pending_pdus and not pending_edus: logger.debug("TX [%s] Nothing to send", self._destination) @@ -259,7 +257,7 @@ class PerDestinationQueue(object): # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < 100: + if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: pending_edus.extend(self._get_rr_edus(force_flush=True)) # END CRITICAL SECTION @@ -303,22 +301,25 @@ class PerDestinationQueue(object): except HttpResponseException as e: logger.warning( "TX [%s] Received %d response to transaction: %s", - self._destination, e.code, e, + self._destination, + e.code, + e, ) except RequestSendFailed as e: - logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e) + logger.warning( + "TX [%s] Failed to send transaction: %s", self._destination, e + ) for p, _ in pending_pdus: - logger.info("Failed to send event %s to %s", p.event_id, - self._destination) + logger.info( + "Failed to send event %s to %s", p.event_id, self._destination + ) except Exception: - logger.exception( - "TX [%s] Failed to send transaction", - self._destination, - ) + logger.exception("TX [%s] Failed to send transaction", self._destination) for p, _ in pending_pdus: - logger.info("Failed to send event %s to %s", p.event_id, - self._destination) + logger.info( + "Failed to send event %s to %s", p.event_id, self._destination + ) finally: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False @@ -346,33 +347,40 @@ class PerDestinationQueue(object): return pending_edus @defer.inlineCallbacks - def _get_new_device_messages(self): - last_device_stream_id = self._last_device_stream_id - to_device_stream_id = self._store.get_to_device_stream_token() - contents, stream_id = yield self._store.get_new_device_msgs_for_remote( - self._destination, last_device_stream_id, to_device_stream_id + def _get_new_device_messages(self, limit): + last_device_list = self._last_device_list_stream_id + # Will return at most 20 entries + now_stream_id, results = yield self._store.get_devices_by_remote( + self._destination, last_device_list ) edus = [ Edu( origin=self._server_name, destination=self._destination, - edu_type="m.direct_to_device", + edu_type="m.device_list_update", content=content, ) - for content in contents + for content in results ] - last_device_list = self._last_device_list_stream_id - now_stream_id, results = yield self._store.get_devices_by_remote( - self._destination, last_device_list + assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs" + + last_device_stream_id = self._last_device_stream_id + to_device_stream_id = self._store.get_to_device_stream_token() + contents, stream_id = yield self._store.get_new_device_msgs_for_remote( + self._destination, + last_device_stream_id, + to_device_stream_id, + limit - len(edus), ) edus.extend( Edu( origin=self._server_name, destination=self._destination, - edu_type="m.device_list_update", + edu_type="m.direct_to_device", content=content, ) - for content in results + for content in contents ) + defer.returnValue((edus, stream_id, now_stream_id)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index efb6bdca48..9030eb18c5 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -21,8 +21,8 @@ import re from twisted.internet import defer import synapse -from synapse.api.constants import RoomVersions from synapse.api.errors import Codes, FederationDeniedError, SynapseError +from synapse.api.room_versions import RoomVersions from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.server import JsonResource @@ -513,7 +513,7 @@ class FederationV1InviteServlet(BaseFederationServlet): # state resolution algorithm, and we don't use that for processing # invites content = yield self.handler.on_invite_request( - origin, content, room_version=RoomVersions.V1, + origin, content, room_version=RoomVersions.V1.identifier, ) # V1 federation API is defined to return a content of `[200, {...}]` @@ -716,8 +716,17 @@ class PublicRoomList(BaseFederationServlet): PATH = "/publicRooms" + def __init__(self, handler, authenticator, ratelimiter, server_name, deny_access): + super(PublicRoomList, self).__init__( + handler, authenticator, ratelimiter, server_name, + ) + self.deny_access = deny_access + @defer.inlineCallbacks def on_GET(self, origin, content, query): + if self.deny_access: + raise FederationDeniedError(origin) + limit = parse_integer_from_args(query, "limit", 0) since_token = parse_string_from_args(query, "since", None) include_all_networks = parse_boolean_from_args( @@ -1417,6 +1426,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=N authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, + deny_access=hs.config.restrict_public_rooms_to_local_users, ).register(resource) if "group_server" in servlet_groups: