summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 14:54:01 +0000
committerGitHub <noreply@github.com>2020-03-25 14:54:01 +0000
commit4cff617df1ba6f241fee6957cc44859f57edcc0e (patch)
treec0c0c3ffc496262981fc917362942468191da9d4 /synapse/replication/tcp/streams/events.py
parentVarious cleanups to INSTALL.md (#7141) (diff)
downloadsynapse-4cff617df1ba6f241fee6957cc44859f57edcc0e.tar.xz
Move catchup of replication streams to worker. (#7024)
This changes the replication protocol so that the server does not send down `RDATA` for rows that happened before the client connected. Instead, the server will send a `POSITION` and clients then query the database (or master out of band) to get up to date.
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r--synapse/replication/tcp/streams/events.py5
1 files changed, 3 insertions, 2 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index b3afabb8cd..c6a595629f 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,7 @@ from typing import Tuple, Type
 
 import attr
 
-from ._base import Stream
+from ._base import Stream, db_query_to_update_function
 
 
 """Handling of the 'events' replication stream
@@ -117,10 +117,11 @@ class EventsStream(Stream):
     def __init__(self, hs):
         self._store = hs.get_datastore()
         self.current_token = self._store.get_current_events_token  # type: ignore
+        self.update_function = db_query_to_update_function(self._update_function)  # type: ignore
 
         super(EventsStream, self).__init__(hs)
 
-    async def update_function(self, from_token, current_token, limit=None):
+    async def _update_function(self, from_token, current_token, limit=None):
         event_rows = await self._store.get_all_new_forward_event_rows(
             from_token, current_token, limit
         )