summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-07-15 10:36:56 +0200
committerGitHub <noreply@github.com>2022-07-15 09:36:56 +0100
commit21eeacc99551febcddcef21db96a2bd82166fc7e (patch)
tree10cc25c6bd1c20ac1c206ee46e343aa4487cf5ce /synapse/federation/sender
parentRip out auth-event reconciliation code (#12943) (diff)
downloadsynapse-21eeacc99551febcddcef21db96a2bd82166fc7e.tar.xz
Federation Sender & Appservice Pusher Stream Optimisations (#13251)
* Replace `get_new_events_for_appservice` with `get_all_new_events_stream`

The functions were near identical and this brings the AS worker closer
to the way federation senders work which can allow for multiple workers
to handle AS traffic.

* Pull received TS alongside events when processing the stream

This avoids an extra query -per event- when both federation sender
and appservice pusher process events.
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py

index 99a794c042..94a65ac65f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -351,7 +351,11 @@ class FederationSender(AbstractFederationSender): self._is_processing = True while True: last_token = await self.store.get_federation_out_pos("events") - next_token, events = await self.store.get_all_new_events_stream( + ( + next_token, + events, + event_to_received_ts, + ) = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 ) @@ -476,7 +480,7 @@ class FederationSender(AbstractFederationSender): await self._send_pdu(event, sharded_destinations) now = self.clock.time_msec() - ts = await self.store.get_received_ts(event.event_id) + ts = event_to_received_ts[event.event_id] assert ts is not None synapse.metrics.event_processing_lag_by_event.labels( "federation_sender" @@ -509,7 +513,7 @@ class FederationSender(AbstractFederationSender): if events: now = self.clock.time_msec() - ts = await self.store.get_received_ts(events[-1].event_id) + ts = event_to_received_ts[events[-1].event_id] assert ts is not None synapse.metrics.event_processing_lag.labels(