summary refs log tree commit diff
path: root/synapse/replication/slave
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 14:54:01 +0000
committerGitHub <noreply@github.com>2020-03-25 14:54:01 +0000
commit4cff617df1ba6f241fee6957cc44859f57edcc0e (patch)
treec0c0c3ffc496262981fc917362942468191da9d4 /synapse/replication/slave
parentVarious cleanups to INSTALL.md (#7141) (diff)
downloadsynapse-4cff617df1ba6f241fee6957cc44859f57edcc0e.tar.xz
Move catchup of replication streams to worker. (#7024)
This changes the replication protocol so that the server does not send down `RDATA` for rows that happened before the client connected. Instead, the server will send a `POSITION` and clients then query the database (or master out of band) to get up to date.
Diffstat (limited to 'synapse/replication/slave')
-rw-r--r--synapse/replication/slave/storage/_base.py14
-rw-r--r--synapse/replication/slave/storage/pushers.py3
2 files changed, 14 insertions, 3 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index f45cbd37a0..751c799d94 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -18,8 +18,10 @@ from typing import Dict, Optional
 
 import six
 
-from synapse.storage._base import SQLBaseStore
-from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
+from synapse.storage.data_stores.main.cache import (
+    CURRENT_STATE_CACHE_NAME,
+    CacheInvalidationWorkerStore,
+)
 from synapse.storage.database import Database
 from synapse.storage.engines import PostgresEngine
 
@@ -35,7 +37,7 @@ def __func__(inp):
         return inp.__func__
 
 
-class BaseSlavedStore(SQLBaseStore):
+class BaseSlavedStore(CacheInvalidationWorkerStore):
     def __init__(self, database: Database, db_conn, hs):
         super(BaseSlavedStore, self).__init__(database, db_conn, hs)
         if isinstance(self.database_engine, PostgresEngine):
@@ -60,6 +62,12 @@ class BaseSlavedStore(SQLBaseStore):
             pos["caches"] = self._cache_id_gen.get_current_token()
         return pos
 
+    def get_cache_stream_token(self):
+        if self._cache_id_gen:
+            return self._cache_id_gen.get_current_token()
+        else:
+            return 0
+
     def process_replication_rows(self, stream_name, token, rows):
         if stream_name == "caches":
             if self._cache_id_gen:
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index f22c2d44a3..bce8a3d115 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -33,6 +33,9 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
         result["pushers"] = self._pushers_id_gen.get_current_token()
         return result
 
+    def get_pushers_stream_token(self):
+        return self._pushers_id_gen.get_current_token()
+
     def process_replication_rows(self, stream_name, token, rows):
         if stream_name == "pushers":
             self._pushers_id_gen.advance(token)