summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-07-15 10:36:56 +0200
committerGitHub <noreply@github.com>2022-07-15 09:36:56 +0100
commit21eeacc99551febcddcef21db96a2bd82166fc7e (patch)
tree10cc25c6bd1c20ac1c206ee46e343aa4487cf5ce /synapse/handlers
parentRip out auth-event reconciliation code (#12943) (diff)
downloadsynapse-21eeacc99551febcddcef21db96a2bd82166fc7e.tar.xz
Federation Sender & Appservice Pusher Stream Optimisations (#13251)
* Replace `get_new_events_for_appservice` with `get_all_new_events_stream`

The functions were near identical and this brings the AS worker closer
to the way federation senders work which can allow for multiple workers
to handle AS traffic.

* Pull received TS alongside events when processing the stream

This avoids an extra query -per event- when both federation sender
and appservice pusher process events.
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py11
1 files changed, 6 insertions, 5 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 814553e098..203b62e015 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -104,14 +104,15 @@ class ApplicationServicesHandler:
         with Measure(self.clock, "notify_interested_services"):
             self.is_processing = True
             try:
-                limit = 100
                 upper_bound = -1
                 while upper_bound < self.current_max:
+                    last_token = await self.store.get_appservice_last_pos()
                     (
                         upper_bound,
                         events,
-                    ) = await self.store.get_new_events_for_appservice(
-                        self.current_max, limit
+                        event_to_received_ts,
+                    ) = await self.store.get_all_new_events_stream(
+                        last_token, self.current_max, limit=100, get_prev_content=True
                     )
 
                     events_by_room: Dict[str, List[EventBase]] = {}
@@ -150,7 +151,7 @@ class ApplicationServicesHandler:
                             )
 
                         now = self.clock.time_msec()
-                        ts = await self.store.get_received_ts(event.event_id)
+                        ts = event_to_received_ts[event.event_id]
                         assert ts is not None
 
                         synapse.metrics.event_processing_lag_by_event.labels(
@@ -187,7 +188,7 @@ class ApplicationServicesHandler:
 
                     if events:
                         now = self.clock.time_msec()
-                        ts = await self.store.get_received_ts(events[-1].event_id)
+                        ts = event_to_received_ts[events[-1].event_id]
                         assert ts is not None
 
                         synapse.metrics.event_processing_lag.labels(