diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 69051101a6..a07d2f1a17 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400,
- "This application service has not reserved" " this kind of alias.",
+ "This application service has not reserved this kind of alias.",
errcode=Codes.EXCLUSIVE,
)
else:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import (
UserID,
get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
self._edu_updater = SigningKeyEduUpdater(hs, self)
+ self._is_master = hs.config.worker_app is None
+ if not self._is_master:
+ self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
+ hs
+ )
+
federation_registry = hs.get_federation_registry()
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
# probably be tracking their device lists. However, we haven't
# done an initial sync on the device list so we do it now.
try:
- user_devices = yield self.device_handler.device_list_updater.user_device_resync(
- user_id
- )
+ if self._is_master:
+ user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+ user_id
+ )
+ else:
+ user_devices = yield self._user_device_resync_client(
+ user_id=user_id
+ )
+
user_devices = user_devices["devices"]
for device in user_devices:
results[user_id] = {device["device_id"]: device["keys"]}
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0e904f2da0..97d045db10 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2040,8 +2040,10 @@ class FederationHandler(BaseHandler):
auth_events (dict[(str, str)->synapse.events.EventBase]):
Map from (event_type, state_key) to event
- What we expect the event's auth_events to be, based on the event's
- position in the dag. I think? maybe??
+ Normally, our calculated auth_events based on the state of the room
+ at the event's position in the DAG, though occasionally (eg if the
+ event is an outlier), may be the auth events claimed by the remote
+ server.
Also NB that this function adds entries to it.
Returns:
@@ -2091,30 +2093,35 @@ class FederationHandler(BaseHandler):
origin (str):
event (synapse.events.EventBase):
context (synapse.events.snapshot.EventContext):
+
auth_events (dict[(str, str)->synapse.events.EventBase]):
+ Map from (event_type, state_key) to event
+
+ Normally, our calculated auth_events based on the state of the room
+ at the event's position in the DAG, though occasionally (eg if the
+ event is an outlier), may be the auth events claimed by the remote
+ server.
+
+ Also NB that this function adds entries to it.
Returns:
defer.Deferred[EventContext]: updated context
"""
event_auth_events = set(event.auth_event_ids())
- if event.is_state():
- event_key = (event.type, event.state_key)
- else:
- event_key = None
-
- # if the event's auth_events refers to events which are not in our
- # calculated auth_events, we need to fetch those events from somewhere.
- #
- # we start by fetching them from the store, and then try calling /event_auth/.
+ # missing_auth is the set of the event's auth_events which we don't yet have
+ # in auth_events.
missing_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
+ # if we have missing events, we need to fetch those events from somewhere.
+ #
+ # we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
- logger.debug("Got events %s from store", have_events)
+ logger.debug("Found events %s in the store", have_events)
missing_auth.difference_update(have_events.keys())
else:
have_events = {}
@@ -2169,15 +2176,17 @@ class FederationHandler(BaseHandler):
event.auth_event_ids()
)
except Exception:
- # FIXME:
logger.exception("Failed to get auth chain")
if event.internal_metadata.is_outlier():
+ # XXX: given that, for an outlier, we'll be working with the
+ # event's *claimed* auth events rather than those we calculated:
+ # (a) is there any point in this test, since different_auth below will
+ # obviously be empty
+ # (b) alternatively, why don't we do it earlier?
logger.info("Skipping auth_event fetch for outlier")
return context
- # FIXME: Assumes we have and stored all the state for all the
- # prev_events
different_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
@@ -2191,27 +2200,22 @@ class FederationHandler(BaseHandler):
different_auth,
)
+ # now we state-resolve between our own idea of the auth events, and the remote's
+ # idea of them.
+
room_version = yield self.store.get_room_version(event.room_id)
+ different_event_ids = [
+ d for d in different_auth if d in have_events and not have_events[d]
+ ]
- different_events = yield make_deferred_yieldable(
- defer.gatherResults(
- [
- run_in_background(
- self.store.get_event, d, allow_none=True, allow_rejected=False
- )
- for d in different_auth
- if d in have_events and not have_events[d]
- ],
- consumeErrors=True,
- )
- ).addErrback(unwrapFirstError)
+ if different_event_ids:
+ # XXX: currently this checks for redactions but I'm not convinced that is
+ # necessary?
+ different_events = yield self.store.get_events_as_list(different_event_ids)
- if different_events:
local_view = dict(auth_events)
remote_view = dict(auth_events)
- remote_view.update(
- {(d.type, d.state_key): d for d in different_events if d}
- )
+ remote_view.update({(d.type, d.state_key): d for d in different_events})
new_state = yield self.state_handler.resolve_events(
room_version,
@@ -2231,13 +2235,13 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
context = yield self._update_context_for_auth_events(
- event, context, auth_events, event_key
+ event, context, auth_events
)
return context
@defer.inlineCallbacks
- def _update_context_for_auth_events(self, event, context, auth_events, event_key):
+ def _update_context_for_auth_events(self, event, context, auth_events):
"""Update the state_ids in an event context after auth event resolution,
storing the changes as a new state group.
@@ -2246,18 +2250,21 @@ class FederationHandler(BaseHandler):
context (synapse.events.snapshot.EventContext): initial event context
- auth_events (dict[(str, str)->str]): Events to update in the event
+ auth_events (dict[(str, str)->EventBase]): Events to update in the event
context.
- event_key ((str, str)): (type, state_key) for the current event.
- this will not be included in the current_state in the context.
-
Returns:
Deferred[EventContext]: new event context
"""
+ # exclude the state key of the new event from the current_state in the context.
+ if event.is_state():
+ event_key = (event.type, event.state_key)
+ else:
+ event_key = None
state_updates = {
k: a.event_id for k, a in iteritems(auth_events) if k != event_key
}
+
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = dict(current_state_ids)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d682dc2b7a..c05b9e83be 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -209,7 +209,7 @@ class MessageHandler(object):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index c615206df1..2252a86f77 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -43,7 +43,8 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
- self.response_cache = ResponseCache(hs, "room_list")
+
+ self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 6cfee4b361..dd096a8608 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -62,6 +62,7 @@ class RoomMemberHandler(object):
self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -266,19 +267,38 @@ class RoomMemberHandler(object):
):
key = (room_id,)
- with (yield self.member_linearizer.queue(key)):
- result = yield self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- require_consent=require_consent,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ then = self.clock.time_msec()
+
+ with (yield self.member_limiter.queue(as_id)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ with (yield self.member_linearizer.queue(key)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ result = yield self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ require_consent=require_consent,
+ )
return result
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b536d410e5..49c025e991 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -42,6 +42,7 @@ logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -227,7 +228,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS
+ )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
|