summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/directory.py2
-rw-r--r--synapse/handlers/e2e_keys.py19
-rw-r--r--synapse/handlers/federation.py81
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/room_list.py3
-rw-r--r--synapse/handlers/room_member.py46
-rw-r--r--synapse/handlers/sync.py5
7 files changed, 101 insertions, 57 deletions
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py

index 69051101a6..a07d2f1a17 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler): if not service.is_interested_in_alias(room_alias.to_string()): raise SynapseError( 400, - "This application service has not reserved" " this kind of alias.", + "This application service has not reserved this kind of alias.", errcode=Codes.EXCLUSIVE, ) else: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace +from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import ( UserID, get_domain_from_id, @@ -53,6 +54,12 @@ class E2eKeysHandler(object): self._edu_updater = SigningKeyEduUpdater(hs, self) + self._is_master = hs.config.worker_app is None + if not self._is_master: + self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client( + hs + ) + federation_registry = hs.get_federation_registry() # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec @@ -191,9 +198,15 @@ class E2eKeysHandler(object): # probably be tracking their device lists. However, we haven't # done an initial sync on the device list so we do it now. try: - user_devices = yield self.device_handler.device_list_updater.user_device_resync( - user_id - ) + if self._is_master: + user_devices = yield self.device_handler.device_list_updater.user_device_resync( + user_id + ) + else: + user_devices = yield self._user_device_resync_client( + user_id=user_id + ) + user_devices = user_devices["devices"] for device in user_devices: results[user_id] = {device["device_id"]: device["keys"]} diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0e904f2da0..97d045db10 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -2040,8 +2040,10 @@ class FederationHandler(BaseHandler): auth_events (dict[(str, str)->synapse.events.EventBase]): Map from (event_type, state_key) to event - What we expect the event's auth_events to be, based on the event's - position in the dag. I think? maybe?? + Normally, our calculated auth_events based on the state of the room + at the event's position in the DAG, though occasionally (eg if the + event is an outlier), may be the auth events claimed by the remote + server. Also NB that this function adds entries to it. Returns: @@ -2091,30 +2093,35 @@ class FederationHandler(BaseHandler): origin (str): event (synapse.events.EventBase): context (synapse.events.snapshot.EventContext): + auth_events (dict[(str, str)->synapse.events.EventBase]): + Map from (event_type, state_key) to event + + Normally, our calculated auth_events based on the state of the room + at the event's position in the DAG, though occasionally (eg if the + event is an outlier), may be the auth events claimed by the remote + server. + + Also NB that this function adds entries to it. Returns: defer.Deferred[EventContext]: updated context """ event_auth_events = set(event.auth_event_ids()) - if event.is_state(): - event_key = (event.type, event.state_key) - else: - event_key = None - - # if the event's auth_events refers to events which are not in our - # calculated auth_events, we need to fetch those events from somewhere. - # - # we start by fetching them from the store, and then try calling /event_auth/. + # missing_auth is the set of the event's auth_events which we don't yet have + # in auth_events. missing_auth = event_auth_events.difference( e.event_id for e in auth_events.values() ) + # if we have missing events, we need to fetch those events from somewhere. + # + # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: # TODO: can we use store.have_seen_events here instead? have_events = yield self.store.get_seen_events_with_rejections(missing_auth) - logger.debug("Got events %s from store", have_events) + logger.debug("Found events %s in the store", have_events) missing_auth.difference_update(have_events.keys()) else: have_events = {} @@ -2169,15 +2176,17 @@ class FederationHandler(BaseHandler): event.auth_event_ids() ) except Exception: - # FIXME: logger.exception("Failed to get auth chain") if event.internal_metadata.is_outlier(): + # XXX: given that, for an outlier, we'll be working with the + # event's *claimed* auth events rather than those we calculated: + # (a) is there any point in this test, since different_auth below will + # obviously be empty + # (b) alternatively, why don't we do it earlier? logger.info("Skipping auth_event fetch for outlier") return context - # FIXME: Assumes we have and stored all the state for all the - # prev_events different_auth = event_auth_events.difference( e.event_id for e in auth_events.values() ) @@ -2191,27 +2200,22 @@ class FederationHandler(BaseHandler): different_auth, ) + # now we state-resolve between our own idea of the auth events, and the remote's + # idea of them. + room_version = yield self.store.get_room_version(event.room_id) + different_event_ids = [ + d for d in different_auth if d in have_events and not have_events[d] + ] - different_events = yield make_deferred_yieldable( - defer.gatherResults( - [ - run_in_background( - self.store.get_event, d, allow_none=True, allow_rejected=False - ) - for d in different_auth - if d in have_events and not have_events[d] - ], - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) + if different_event_ids: + # XXX: currently this checks for redactions but I'm not convinced that is + # necessary? + different_events = yield self.store.get_events_as_list(different_event_ids) - if different_events: local_view = dict(auth_events) remote_view = dict(auth_events) - remote_view.update( - {(d.type, d.state_key): d for d in different_events if d} - ) + remote_view.update({(d.type, d.state_key): d for d in different_events}) new_state = yield self.state_handler.resolve_events( room_version, @@ -2231,13 +2235,13 @@ class FederationHandler(BaseHandler): auth_events.update(new_state) context = yield self._update_context_for_auth_events( - event, context, auth_events, event_key + event, context, auth_events ) return context @defer.inlineCallbacks - def _update_context_for_auth_events(self, event, context, auth_events, event_key): + def _update_context_for_auth_events(self, event, context, auth_events): """Update the state_ids in an event context after auth event resolution, storing the changes as a new state group. @@ -2246,18 +2250,21 @@ class FederationHandler(BaseHandler): context (synapse.events.snapshot.EventContext): initial event context - auth_events (dict[(str, str)->str]): Events to update in the event + auth_events (dict[(str, str)->EventBase]): Events to update in the event context. - event_key ((str, str)): (type, state_key) for the current event. - this will not be included in the current_state in the context. - Returns: Deferred[EventContext]: new event context """ + # exclude the state key of the new event from the current_state in the context. + if event.is_state(): + event_key = (event.type, event.state_key) + else: + event_key = None state_updates = { k: a.event_id for k, a in iteritems(auth_events) if k != event_key } + current_state_ids = yield context.get_current_state_ids(self.store) current_state_ids = dict(current_state_ids) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d682dc2b7a..c05b9e83be 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -209,7 +209,7 @@ class MessageHandler(object): # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: + if False and requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: if requester.app_service.is_interested_in_user(uid): break diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index c615206df1..2252a86f77 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py
@@ -43,7 +43,8 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.enable_room_list_search = hs.config.enable_room_list_search - self.response_cache = ResponseCache(hs, "room_list") + + self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000) self.remote_response_cache = ResponseCache( hs, "remote_room_list", timeout_ms=30 * 1000 ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 6cfee4b361..dd096a8608 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -62,6 +62,7 @@ class RoomMemberHandler(object): self.event_creation_handler = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") + self.member_limiter = Linearizer(max_count=10, name="member_as_limiter") self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() @@ -266,19 +267,38 @@ class RoomMemberHandler(object): ): key = (room_id,) - with (yield self.member_linearizer.queue(key)): - result = yield self._update_membership( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - require_consent=require_consent, - ) + as_id = object() + if requester.app_service: + as_id = requester.app_service.id + + then = self.clock.time_msec() + + with (yield self.member_limiter.queue(as_id)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + with (yield self.member_linearizer.queue(key)): + diff = self.clock.time_msec() - then + + if diff > 80 * 1000: + # haproxy would have timed the request out anyway... + raise SynapseError(504, "took to long to process") + + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + require_consent=require_consent, + ) return result diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b536d410e5..49c025e991 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -42,6 +42,7 @@ logger = logging.getLogger(__name__) # Debug logger for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") +SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000 # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -227,7 +228,9 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS + ) self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage()