diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 15ba417e06..85e2757227 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -73,8 +73,6 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
- @log_function
- @defer.inlineCallbacks
def handle_new_event(self, event, destinations):
""" Takes in an event from the client to server side, that has already
been authed and handled by the state module, and sends it to any
@@ -89,9 +87,7 @@ class FederationHandler(BaseHandler):
processing.
"""
- yield run_on_reactor()
-
- self.replication_layer.send_pdu(event, destinations)
+ return self.replication_layer.send_pdu(event, destinations)
@log_function
@defer.inlineCallbacks
@@ -179,7 +175,7 @@ class FederationHandler(BaseHandler):
# it's probably a good idea to mark it as not in retry-state
# for sending (although this is a bit of a leap)
retry_timings = yield self.store.get_destination_retry_timings(origin)
- if (retry_timings and retry_timings.retry_last_ts):
+ if retry_timings and retry_timings["retry_last_ts"]:
self.store.set_destination_retry_timings(origin, 0, 0)
room = yield self.store.get_room(event.room_id)
@@ -201,10 +197,18 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
event, extra_users=extra_users
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
user = UserID.from_string(event.state_key)
@@ -427,10 +431,18 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
new_event, extra_users=[joinee]
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ new_event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -500,10 +512,18 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
event, extra_users=extra_users
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
@@ -574,10 +594,18 @@ class FederationHandler(BaseHandler):
)
target_user = UserID.from_string(event.state_key)
- yield self.notifier.on_new_room_event(
+ d = self.notifier.on_new_room_event(
event, extra_users=[target_user],
)
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
defer.returnValue(event)
@defer.inlineCallbacks
|