summary refs log tree commit diff
path: root/synapse/replication/slave/storage/_base.py
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2020-05-28 15:12:35 +0100
committerGitHub <noreply@github.com>2020-05-28 15:12:35 +0100
commit0aa9449cd412a19550a74d8d6d4b1714746ebba1 (patch)
treee5915221b57a9dc9c53aa293a373cd8fc46412c6 /synapse/replication/slave/storage/_base.py
parentFixes an attribute error when using the default display name during registrat... (diff)
parentMove sql schema delta files to their new location (diff)
downloadsynapse-0aa9449cd412a19550a74d8d6d4b1714746ebba1.tar.xz
Merge pull request #39 from matrix-org/dinsic-release-v1.12.x
Merge Synapse release v1.12.0 into 'dinsic'

Accompanying Sytest PR: https://github.com/matrix-org/sytest/issues/843
Diffstat (limited to 'synapse/replication/slave/storage/_base.py')
-rw-r--r--synapse/replication/slave/storage/_base.py34
1 files changed, 25 insertions, 9 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py

index 817d1f67f9..f45cbd37a0 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py
@@ -14,10 +14,13 @@ # limitations under the License. import logging +from typing import Dict, Optional import six -from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore +from synapse.storage._base import SQLBaseStore +from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME +from synapse.storage.database import Database from synapse.storage.engines import PostgresEngine from ._slaved_id_tracker import SlavedIdTracker @@ -33,18 +36,25 @@ def __func__(inp): class BaseSlavedStore(SQLBaseStore): - def __init__(self, db_conn, hs): - super(BaseSlavedStore, self).__init__(db_conn, hs) + def __init__(self, database: Database, db_conn, hs): + super(BaseSlavedStore, self).__init__(database, db_conn, hs) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = SlavedIdTracker( - db_conn, "cache_invalidation_stream", "stream_id", - ) + db_conn, "cache_invalidation_stream", "stream_id" + ) # type: Optional[SlavedIdTracker] else: self._cache_id_gen = None self.hs = hs - def stream_positions(self): + def stream_positions(self) -> Dict[str, int]: + """ + Get the current positions of all the streams this store wants to subscribe to + + Returns: + map from stream name to the most recent update we have for + that stream (ie, the point we want to start replicating from) + """ pos = {} if self._cache_id_gen: pos["caches"] = self._cache_id_gen.get_current_token() @@ -52,14 +62,20 @@ class BaseSlavedStore(SQLBaseStore): def process_replication_rows(self, stream_name, token, rows): if stream_name == "caches": - self._cache_id_gen.advance(token) + if self._cache_id_gen: + self._cache_id_gen.advance(token) for row in rows: - if row.cache_func == _CURRENT_STATE_CACHE_NAME: + if row.cache_func == CURRENT_STATE_CACHE_NAME: + if row.keys is None: + raise Exception( + "Can't send an 'invalidate all' for current state cache" + ) + room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) else: - self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys)) + self._attempt_to_invalidate_cache(row.cache_func, row.keys) def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys)