diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 48816a242d..dffb033fbd 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -16,7 +16,6 @@
from twisted.internet import defer
from synapse.api.errors import LimitExceededError, SynapseError
-from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID
@@ -58,8 +57,6 @@ class BaseHandler(object):
@defer.inlineCallbacks
def _create_new_client_event(self, builder):
- yield run_on_reactor()
-
latest_ret = yield self.store.get_latest_events_in_room(
builder.room_id,
)
@@ -101,8 +98,6 @@ class BaseHandler(object):
@defer.inlineCallbacks
def handle_new_client_event(self, event, context, extra_destinations=[],
extra_users=[], suppress_auth=False):
- yield run_on_reactor()
-
# We now need to go and hit out to wherever we need to hit out to.
if not suppress_auth:
@@ -143,7 +138,9 @@ class BaseHandler(object):
)
# Don't block waiting on waking up all the listeners.
- d = self.notifier.on_new_room_event(event, extra_users=extra_users)
+ notify_d = self.notifier.on_new_room_event(
+ event, extra_users=extra_users
+ )
def log_failure(f):
logger.warn(
@@ -151,8 +148,10 @@ class BaseHandler(object):
event.event_id, f.value
)
- d.addErrback(log_failure)
+ notify_d.addErrback(log_failure)
- yield federation_handler.handle_new_event(
+ fed_d = federation_handler.handle_new_event(
event, destinations=destinations,
)
+
+ fed_d.addErrback(log_failure)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8aceac28cf..98148c13d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -179,7 +179,7 @@ class FederationHandler(BaseHandler):
# it's probably a good idea to mark it as not in retry-state
# for sending (although this is a bit of a leap)
retry_timings = yield self.store.get_destination_retry_timings(origin)
- if (retry_timings and retry_timings.retry_last_ts):
+ if retry_timings and retry_timings["retry_last_ts"]:
self.store.set_destination_retry_timings(origin, 0, 0)
room = yield self.store.get_room(event.room_id)
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index f7f3698340..91d87d503d 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -53,7 +53,7 @@ class LoginHandler(BaseHandler):
logger.warn("Attempted to login as %s but they do not exist", user)
raise LoginError(403, "", errcode=Codes.FORBIDDEN)
- stored_hash = user_info[0]["password_hash"]
+ stored_hash = user_info["password_hash"]
if bcrypt.checkpw(password, stored_hash):
# generate an access token and store it.
token = self.reg_handler._generate_token(user)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b9685be7f..9667bb8674 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -274,7 +274,8 @@ class MessageHandler(BaseHandler):
if limit is None:
limit = 10
- for event in room_list:
+ @defer.inlineCallbacks
+ def handle_room(event):
d = {
"room_id": event.room_id,
"membership": event.membership,
@@ -290,12 +291,19 @@ class MessageHandler(BaseHandler):
rooms_ret.append(d)
if event.membership != Membership.JOIN:
- continue
+ return
try:
- messages, token = yield self.store.get_recent_events_for_room(
- event.room_id,
- limit=limit,
- end_token=now_token.room_key,
+ (messages, token), current_state = yield defer.gatherResults(
+ [
+ self.store.get_recent_events_for_room(
+ event.room_id,
+ limit=limit,
+ end_token=now_token.room_key,
+ ),
+ self.state_handler.get_current_state(
+ event.room_id
+ ),
+ ]
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -311,9 +319,6 @@ class MessageHandler(BaseHandler):
"end": end_token.to_string(),
}
- current_state = yield self.state_handler.get_current_state(
- event.room_id
- )
d["state"] = [
serialize_event(c, time_now, as_client_event)
for c in current_state.values()
@@ -321,6 +326,11 @@ class MessageHandler(BaseHandler):
except:
logger.exception("Failed to get snapshot")
+ yield defer.gatherResults(
+ [handle_room(e) for e in room_list],
+ consumeErrors=True
+ )
+
ret = {
"rooms": rooms_ret,
"presence": presence,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 1226b23bc7..47456a28e9 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -312,25 +312,6 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(chunk_data)
@defer.inlineCallbacks
- def get_room_member(self, room_id, member_user_id, auth_user_id):
- """Retrieve a room member from a room.
-
- Args:
- room_id : The room the member is in.
- member_user_id : The member's user ID
- auth_user_id : The user ID of the user making this request.
- Returns:
- The room member, or None if this member does not exist.
- Raises:
- SynapseError if something goes wrong.
- """
- yield self.auth.check_joined_room(room_id, auth_user_id)
-
- member = yield self.store.get_room_member(user_id=member_user_id,
- room_id=room_id)
- defer.returnValue(member)
-
- @defer.inlineCallbacks
def change_membership(self, event, context, do_auth=True):
""" Change the membership status of a user in a room.
|