summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-05-03 14:59:29 +0100
committerRichard van der Hoff <richard@matrix.org>2018-05-03 14:59:29 +0100
commit093d8c415a303fa7c8900e3fe685843b60a85eed (patch)
tree8dd5ceabb4c8ecbae18fa95f10108ba335c6bbbf /synapse/federation/transaction_queue.py
parentMake 'unexpected logging context' into warnings (diff)
parentMerge pull request #3183 from matrix-org/rav/moar_logcontext_leaks (diff)
downloadsynapse-093d8c415a303fa7c8900e3fe685843b60a85eed.tar.xz
Merge remote-tracking branch 'origin/develop' into rav/warn_on_logcontext_fail
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py75
1 files changed, 59 insertions, 16 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a141ec9953..ded2b1871a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -169,7 +169,7 @@ class TransactionQueue(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=20,
+                    last_token, self._last_poked_id, limit=100,
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +177,33 @@ class TransactionQueue(object):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                for event in events:
+                @defer.inlineCallbacks
+                def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.event_id)
                     if not is_mine and send_on_behalf_of is None:
-                        continue
-
-                    # Get the state from before the event.
-                    # We need to make sure that this is the state from before
-                    # the event and not from after it.
-                    # Otherwise if the last member on a server in a room is
-                    # banned then it won't receive the event because it won't
-                    # be in the room after the ban.
-                    destinations = yield self.state.get_current_hosts_in_room(
-                        event.room_id, latest_event_ids=[
-                            prev_id for prev_id, _ in event.prev_events
-                        ],
-                    )
+                        return
+
+                    try:
+                        # Get the state from before the event.
+                        # We need to make sure that this is the state from before
+                        # the event and not from after it.
+                        # Otherwise if the last member on a server in a room is
+                        # banned then it won't receive the event because it won't
+                        # be in the room after the ban.
+                        destinations = yield self.state.get_current_hosts_in_room(
+                            event.room_id, latest_event_ids=[
+                                prev_id for prev_id, _ in event.prev_events
+                            ],
+                        )
+                    except Exception:
+                        logger.exception(
+                            "Failed to calculate hosts in room for event: %s",
+                            event.event_id,
+                        )
+                        return
+
                     destinations = set(destinations)
 
                     if send_on_behalf_of is not None:
@@ -207,12 +216,44 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
-                events_processed_counter.inc_by(len(events))
+                @defer.inlineCallbacks
+                def handle_room_events(events):
+                    for event in events:
+                        yield handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [
+                        logcontext.run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ],
+                    consumeErrors=True
+                ))
 
                 yield self.store.update_federation_out_pos(
                     "events", next_token
                 )
 
+                if events:
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_lag.set(
+                        now - ts, "federation_sender",
+                    )
+                    synapse.metrics.event_processing_last_ts.set(
+                        ts, "federation_sender",
+                    )
+
+                events_processed_counter.inc_by(len(events))
+
+                synapse.metrics.event_processing_positions.set(
+                    next_token, "federation_sender",
+                )
+
         finally:
             self._is_processing = False
 
@@ -282,6 +323,8 @@ class TransactionQueue(object):
                     break
 
                 yield self._process_presence_inner(states_map.values())
+        except Exception:
+            logger.exception("Error sending presence states to servers")
         finally:
             self._processing_pending_presence = False