diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-07-15 10:36:56 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-15 09:36:56 +0100 |
commit | 21eeacc99551febcddcef21db96a2bd82166fc7e (patch) | |
tree | 10cc25c6bd1c20ac1c206ee46e343aa4487cf5ce /synapse/federation/sender | |
parent | Rip out auth-event reconciliation code (#12943) (diff) | |
download | synapse-21eeacc99551febcddcef21db96a2bd82166fc7e.tar.xz |
Federation Sender & Appservice Pusher Stream Optimisations (#13251)
* Replace `get_new_events_for_appservice` with `get_all_new_events_stream` The functions were near identical and this brings the AS worker closer to the way federation senders work which can allow for multiple workers to handle AS traffic. * Pull received TS alongside events when processing the stream This avoids an extra query -per event- when both federation sender and appservice pusher process events.
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r-- | synapse/federation/sender/__init__.py | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 99a794c042..94a65ac65f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -351,7 +351,11 @@ class FederationSender(AbstractFederationSender): self._is_processing = True while True: last_token = await self.store.get_federation_out_pos("events") - next_token, events = await self.store.get_all_new_events_stream( + ( + next_token, + events, + event_to_received_ts, + ) = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 ) @@ -476,7 +480,7 @@ class FederationSender(AbstractFederationSender): await self._send_pdu(event, sharded_destinations) now = self.clock.time_msec() - ts = await self.store.get_received_ts(event.event_id) + ts = event_to_received_ts[event.event_id] assert ts is not None synapse.metrics.event_processing_lag_by_event.labels( "federation_sender" @@ -509,7 +513,7 @@ class FederationSender(AbstractFederationSender): if events: now = self.clock.time_msec() - ts = await self.store.get_received_ts(events[-1].event_id) + ts = event_to_received_ts[events[-1].event_id] assert ts is not None synapse.metrics.event_processing_lag.labels( |