diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-07-15 10:36:56 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-15 09:36:56 +0100 |
commit | 21eeacc99551febcddcef21db96a2bd82166fc7e (patch) | |
tree | 10cc25c6bd1c20ac1c206ee46e343aa4487cf5ce /synapse/handlers/appservice.py | |
parent | Rip out auth-event reconciliation code (#12943) (diff) | |
download | synapse-21eeacc99551febcddcef21db96a2bd82166fc7e.tar.xz |
Federation Sender & Appservice Pusher Stream Optimisations (#13251)
* Replace `get_new_events_for_appservice` with `get_all_new_events_stream` The functions were near identical and this brings the AS worker closer to the way federation senders work which can allow for multiple workers to handle AS traffic. * Pull received TS alongside events when processing the stream This avoids an extra query -per event- when both federation sender and appservice pusher process events.
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r-- | synapse/handlers/appservice.py | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 814553e098..203b62e015 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -104,14 +104,15 @@ class ApplicationServicesHandler: with Measure(self.clock, "notify_interested_services"): self.is_processing = True try: - limit = 100 upper_bound = -1 while upper_bound < self.current_max: + last_token = await self.store.get_appservice_last_pos() ( upper_bound, events, - ) = await self.store.get_new_events_for_appservice( - self.current_max, limit + event_to_received_ts, + ) = await self.store.get_all_new_events_stream( + last_token, self.current_max, limit=100, get_prev_content=True ) events_by_room: Dict[str, List[EventBase]] = {} @@ -150,7 +151,7 @@ class ApplicationServicesHandler: ) now = self.clock.time_msec() - ts = await self.store.get_received_ts(event.event_id) + ts = event_to_received_ts[event.event_id] assert ts is not None synapse.metrics.event_processing_lag_by_event.labels( @@ -187,7 +188,7 @@ class ApplicationServicesHandler: if events: now = self.clock.time_msec() - ts = await self.store.get_received_ts(events[-1].event_id) + ts = event_to_received_ts[events[-1].event_id] assert ts is not None synapse.metrics.event_processing_lag.labels( |