summary refs log tree commit diff
path: root/synapse/replication/slave/storage/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-02-25 16:56:55 +0000
committerGitHub <noreply@github.com>2020-02-25 16:56:55 +0000
commitbbf8886a05be6a929556d6f09a1b6ce053a3c403 (patch)
treed3747d92a7e15d7470cb7f603dec4ca617bf7f82 /synapse/replication/slave/storage/events.py
parentcontrib/docker: remove quotes for POSTGRES_INITDB_ARGS (#6984) (diff)
downloadsynapse-bbf8886a05be6a929556d6f09a1b6ce053a3c403.tar.xz
Merge worker apps into one. (#6964)
Diffstat (limited to 'synapse/replication/slave/storage/events.py')
-rw-r--r--synapse/replication/slave/storage/events.py20
1 files changed, 20 insertions, 0 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 3aa6cb8b96..e73342c657 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -32,6 +32,7 @@ from synapse.storage.data_stores.main.state import StateGroupWorkerStore
 from synapse.storage.data_stores.main.stream import StreamWorkerStore
 from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore
 from synapse.storage.database import Database
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
@@ -68,6 +69,21 @@ class SlavedEventStore(
 
         super(SlavedEventStore, self).__init__(database, db_conn, hs)
 
+        events_max = self._stream_id_gen.get_current_token()
+        curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
+            db_conn,
+            "current_state_delta_stream",
+            entity_column="room_id",
+            stream_column="stream_id",
+            max_value=events_max,  # As we share the stream id with events token
+            limit=1000,
+        )
+        self._curr_state_delta_stream_cache = StreamChangeCache(
+            "_curr_state_delta_stream_cache",
+            min_curr_state_delta_id,
+            prefilled_cache=curr_state_delta_prefill,
+        )
+
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
 
@@ -120,6 +136,10 @@ class SlavedEventStore(
                 backfilled=False,
             )
         elif row.type == EventsStreamCurrentStateRow.TypeId:
+            self._curr_state_delta_stream_cache.entity_has_changed(
+                row.data.room_id, token
+            )
+
             if data.type == EventTypes.Member:
                 self.get_rooms_for_user_with_stream_ordering.invalidate(
                     (data.state_key,)