summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-07-15 10:36:56 +0200
committerGitHub <noreply@github.com>2022-07-15 09:36:56 +0100
commit21eeacc99551febcddcef21db96a2bd82166fc7e (patch)
tree10cc25c6bd1c20ac1c206ee46e343aa4487cf5ce /synapse/federation
parentRip out auth-event reconciliation code (#12943) (diff)
downloadsynapse-21eeacc99551febcddcef21db96a2bd82166fc7e.tar.xz
Federation Sender & Appservice Pusher Stream Optimisations (#13251)
* Replace `get_new_events_for_appservice` with `get_all_new_events_stream`

The functions were near identical and this brings the AS worker closer
to the way federation senders work which can allow for multiple workers
to handle AS traffic.

* Pull received TS alongside events when processing the stream

This avoids an extra query -per event- when both federation sender
and appservice pusher process events.
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/sender/__init__.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 99a794c042..94a65ac65f 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -351,7 +351,11 @@ class FederationSender(AbstractFederationSender):
             self._is_processing = True
             while True:
                 last_token = await self.store.get_federation_out_pos("events")
-                next_token, events = await self.store.get_all_new_events_stream(
+                (
+                    next_token,
+                    events,
+                    event_to_received_ts,
+                ) = await self.store.get_all_new_events_stream(
                     last_token, self._last_poked_id, limit=100
                 )
 
@@ -476,7 +480,7 @@ class FederationSender(AbstractFederationSender):
                         await self._send_pdu(event, sharded_destinations)
 
                         now = self.clock.time_msec()
-                        ts = await self.store.get_received_ts(event.event_id)
+                        ts = event_to_received_ts[event.event_id]
                         assert ts is not None
                         synapse.metrics.event_processing_lag_by_event.labels(
                             "federation_sender"
@@ -509,7 +513,7 @@ class FederationSender(AbstractFederationSender):
 
                 if events:
                     now = self.clock.time_msec()
-                    ts = await self.store.get_received_ts(events[-1].event_id)
+                    ts = event_to_received_ts[events[-1].event_id]
                     assert ts is not None
 
                     synapse.metrics.event_processing_lag.labels(