diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index fd11935b40..b63a660c06 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -111,6 +111,11 @@ class E2eKeysHandler(object):
failures[destination] = {
"status": 503, "message": "Not ready for retry",
}
+ except Exception as e:
+ # include ConnectionRefused and other errors
+ failures[destination] = {
+ "status": 503, "message": e.message
+ }
yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
@@ -222,6 +227,11 @@ class E2eKeysHandler(object):
failures[destination] = {
"status": 503, "message": "Not ready for retry",
}
+ except Exception as e:
+ # include ConnectionRefused and other errors
+ failures[destination] = {
+ "status": 503, "message": e.message
+ }
yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fd09397226..7a57a69bd3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -24,7 +24,7 @@ from synapse.push.action_generator import ActionGenerator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
-from synapse.util.async import run_on_reactor, ReadWriteLock
+from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@@ -50,6 +50,10 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock()
+ # We arbitrarily limit concurrent event creation for a room to 5.
+ # This is to stop us from diverging history *too* much.
+ self.limiter = Limiter(max_count=5)
+
@defer.inlineCallbacks
def purge_history(self, room_id, event_id):
event = yield self.store.get_event(event_id)
@@ -191,36 +195,38 @@ class MessageHandler(BaseHandler):
"""
builder = self.event_builder_factory.new(event_dict)
- self.validator.validate_new(builder)
-
- if builder.type == EventTypes.Member:
- membership = builder.content.get("membership", None)
- target = UserID.from_string(builder.state_key)
+ with (yield self.limiter.queue(builder.room_id)):
+ self.validator.validate_new(builder)
+
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ target = UserID.from_string(builder.state_key)
+
+ if membership in {Membership.JOIN, Membership.INVITE}:
+ # If event doesn't include a display name, add one.
+ profile = self.hs.get_handlers().profile_handler
+ content = builder.content
+
+ try:
+ content["displayname"] = yield profile.get_displayname(target)
+ content["avatar_url"] = yield profile.get_avatar_url(target)
+ except Exception as e:
+ logger.info(
+ "Failed to get profile information for %r: %s",
+ target, e
+ )
- if membership in {Membership.JOIN, Membership.INVITE}:
- # If event doesn't include a display name, add one.
- profile = self.hs.get_handlers().profile_handler
- content = builder.content
+ if token_id is not None:
+ builder.internal_metadata.token_id = token_id
- try:
- content["displayname"] = yield profile.get_displayname(target)
- content["avatar_url"] = yield profile.get_avatar_url(target)
- except Exception as e:
- logger.info(
- "Failed to get profile information for %r: %s",
- target, e
- )
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
-
- if txn_id is not None:
- builder.internal_metadata.txn_id = txn_id
+ event, context = yield self._create_new_client_event(
+ builder=builder,
+ prev_event_ids=prev_event_ids,
+ )
- event, context = yield self._create_new_client_event(
- builder=builder,
- prev_event_ids=prev_event_ids,
- )
defer.returnValue((event, context))
@defer.inlineCallbacks
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 916e80a48e..50aa513935 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsHandler(BaseHandler):
if not res:
# res will be None if this read receipt is 'old'
- defer.returnValue(False)
+ continue
stream_id, max_persisted_id = res
@@ -109,6 +109,10 @@ class ReceiptsHandler(BaseHandler):
if max_batch_id is None or max_persisted_id > max_batch_id:
max_batch_id = max_persisted_id
+ if min_batch_id is None:
+ # no new receipts
+ defer.returnValue(False)
+
affected_room_ids = list(set([r["room_id"] for r in receipts]))
with PreserveLoggingContext():
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 886fec8701..286f0cef0a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -81,7 +81,7 @@ class RegistrationHandler(BaseHandler):
"User ID already taken.",
errcode=Codes.USER_IN_USE,
)
- user_data = yield self.auth.get_user_from_macaroon(guest_access_token)
+ user_data = yield self.auth.get_user_by_access_token(guest_access_token)
if not user_data["is_guest"] or user_data["user"].localpart != localpart:
raise AuthError(
403,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a86996689c..b62773dcbe 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -510,6 +510,7 @@ class SyncHandler(object):
Returns:
Deferred(SyncResult)
"""
+ logger.info("Calculating sync response for %r", sync_config.user)
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
|