diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index f45cbd37a0..5d7c8871a4 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -14,12 +14,14 @@
# limitations under the License.
import logging
-from typing import Dict, Optional
+from typing import Optional
import six
-from synapse.storage._base import SQLBaseStore
-from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
+from synapse.storage.data_stores.main.cache import (
+ CURRENT_STATE_CACHE_NAME,
+ CacheInvalidationWorkerStore,
+)
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine
@@ -35,7 +37,7 @@ def __func__(inp):
return inp.__func__
-class BaseSlavedStore(SQLBaseStore):
+class BaseSlavedStore(CacheInvalidationWorkerStore):
def __init__(self, database: Database, db_conn, hs):
super(BaseSlavedStore, self).__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
@@ -47,18 +49,11 @@ class BaseSlavedStore(SQLBaseStore):
self.hs = hs
- def stream_positions(self) -> Dict[str, int]:
- """
- Get the current positions of all the streams this store wants to subscribe to
-
- Returns:
- map from stream name to the most recent update we have for
- that stream (ie, the point we want to start replicating from)
- """
- pos = {}
+ def get_cache_stream_token(self):
if self._cache_id_gen:
- pos["caches"] = self._cache_id_gen.get_current_token()
- return pos
+ return self._cache_id_gen.get_current_token()
+ else:
+ return 0
def process_replication_rows(self, stream_name, token, rows):
if stream_name == "caches":
|