summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-04-10 16:43:48 +0100
committerGitHub <noreply@github.com>2018-04-10 16:43:48 +0100
commit9daf82278fed652d5c7c10b16a2e7952d0d108c4 (patch)
tree5ecbcd0897666824eaa98d66f8aa9d09aa605865 /synapse/federation/transaction_queue.py
parentMerge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse into ... (diff)
parentUse run_in_background instead (diff)
downloadsynapse-9daf82278fed652d5c7c10b16a2e7952d0d108c4.tar.xz
Merge pull request #3078 from matrix-org/erikj/federation_sender
Send federation events concurrently
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py56
1 files changed, 41 insertions, 15 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a141ec9953..5b0b798e57 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -169,7 +169,7 @@ class TransactionQueue(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=20,
+                    last_token, self._last_poked_id, limit=100,
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +177,33 @@ class TransactionQueue(object):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                for event in events:
+                @defer.inlineCallbacks
+                def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.event_id)
                     if not is_mine and send_on_behalf_of is None:
-                        continue
-
-                    # Get the state from before the event.
-                    # We need to make sure that this is the state from before
-                    # the event and not from after it.
-                    # Otherwise if the last member on a server in a room is
-                    # banned then it won't receive the event because it won't
-                    # be in the room after the ban.
-                    destinations = yield self.state.get_current_hosts_in_room(
-                        event.room_id, latest_event_ids=[
-                            prev_id for prev_id, _ in event.prev_events
-                        ],
-                    )
+                        return
+
+                    try:
+                        # Get the state from before the event.
+                        # We need to make sure that this is the state from before
+                        # the event and not from after it.
+                        # Otherwise if the last member on a server in a room is
+                        # banned then it won't receive the event because it won't
+                        # be in the room after the ban.
+                        destinations = yield self.state.get_current_hosts_in_room(
+                            event.room_id, latest_event_ids=[
+                                prev_id for prev_id, _ in event.prev_events
+                            ],
+                        )
+                    except Exception:
+                        logger.exception(
+                            "Failed to calculate hosts in room for event: %s",
+                            event.event_id,
+                        )
+                        return
+
                     destinations = set(destinations)
 
                     if send_on_behalf_of is not None:
@@ -207,6 +216,23 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
+                @defer.inlineCallbacks
+                def handle_room_events(events):
+                    for event in events:
+                        yield handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [
+                        logcontext.run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ],
+                    consumeErrors=True
+                ))
+
                 events_processed_counter.inc_by(len(events))
 
                 yield self.store.update_federation_out_pos(