summary refs log tree commit diff
path: root/synapse/replication/slave/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/slave/storage/_base.py')
-rw-r--r--synapse/replication/slave/storage/_base.py25
1 files changed, 10 insertions, 15 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py

index f45cbd37a0..5d7c8871a4 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py
@@ -14,12 +14,14 @@ # limitations under the License. import logging -from typing import Dict, Optional +from typing import Optional import six -from synapse.storage._base import SQLBaseStore -from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME +from synapse.storage.data_stores.main.cache import ( + CURRENT_STATE_CACHE_NAME, + CacheInvalidationWorkerStore, +) from synapse.storage.database import Database from synapse.storage.engines import PostgresEngine @@ -35,7 +37,7 @@ def __func__(inp): return inp.__func__ -class BaseSlavedStore(SQLBaseStore): +class BaseSlavedStore(CacheInvalidationWorkerStore): def __init__(self, database: Database, db_conn, hs): super(BaseSlavedStore, self).__init__(database, db_conn, hs) if isinstance(self.database_engine, PostgresEngine): @@ -47,18 +49,11 @@ class BaseSlavedStore(SQLBaseStore): self.hs = hs - def stream_positions(self) -> Dict[str, int]: - """ - Get the current positions of all the streams this store wants to subscribe to - - Returns: - map from stream name to the most recent update we have for - that stream (ie, the point we want to start replicating from) - """ - pos = {} + def get_cache_stream_token(self): if self._cache_id_gen: - pos["caches"] = self._cache_id_gen.get_current_token() - return pos + return self._cache_id_gen.get_current_token() + else: + return 0 def process_replication_rows(self, stream_name, token, rows): if stream_name == "caches":