summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-04-09 11:45:22 +0100
committerErik Johnston <erik@matrix.org>2018-04-09 11:47:10 +0100
commit11974f37871f41a5827f243a3fc2057703f04d72 (patch)
treeb012b1c171c01f24f1f971a2921dfa1803bcf110 /synapse/federation
parentHandle exceptions in get_hosts_for_room when sending events over federation (diff)
downloadsynapse-11974f37871f41a5827f243a3fc2057703f04d72.tar.xz
Send federation events concurrently
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/transaction_queue.py22
1 files changed, 18 insertions, 4 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 12e8df9cc6..43daf673c0 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -169,7 +169,7 @@ class TransactionQueue(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=20,
+                    last_token, self._last_poked_id, limit=100,
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,12 +177,13 @@ class TransactionQueue(object):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                for event in events:
+                @defer.inlineCallbacks
+                def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.event_id)
                     if not is_mine and send_on_behalf_of is None:
-                        continue
+                        return
 
                     try:
                         # Get the state from before the event.
@@ -198,7 +199,7 @@ class TransactionQueue(object):
                         )
                     except Exception:
                         logger.exception("Failed to calculate hosts in room")
-                        continue
+                        return
 
                     destinations = set(destinations)
 
@@ -212,6 +213,19 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
+                def handle_room_events(events):
+                    for event in events:
+                        return handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [handle_room_events(evs) for evs in events_by_room.itervalues()],
+                    consumeErrors=True
+                ))
+
                 events_processed_counter.inc_by(len(events))
 
                 yield self.store.update_federation_out_pos(