summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/replication/slave/storage/_base.py2
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py12
-rw-r--r--synapse/storage/schema/delta/34/cache_stream.py6
4 files changed, 16 insertions, 6 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 24c9946d6a..d839d169ab 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -29,7 +29,7 @@ class BaseSlavedStore(SQLBaseStore):
         super(BaseSlavedStore, self).__init__(hs)
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = SlavedIdTracker(
-                db_conn, "cache_stream", "stream_id",
+                db_conn, "cache_invalidation_stream", "stream_id",
             )
         else:
             self._cache_id_gen = None
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8af492b69f..7efc5bfeef 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -126,7 +126,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = StreamIdGenerator(
-                db_conn, "cache_stream", "stream_id",
+                db_conn, "cache_invalidation_stream", "stream_id",
             )
         else:
             self._cache_id_gen = None
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e3edc2cde6..c55776994d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -863,6 +863,13 @@ class SQLBaseStore(object):
         return cache, min_val
 
     def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+        """Invalidates the cache and adds it to the cache stream so slaves
+        will know to invalidate their caches.
+
+        This should only be used to invalidate caches where slaves won't
+        otherwise know from other replication streams that the cache should
+        be invalidated.
+        """
         txn.call_after(cache_func.invalidate, keys)
 
         if isinstance(self.database_engine, PostgresEngine):
@@ -872,7 +879,7 @@ class SQLBaseStore(object):
 
             self._simple_insert_txn(
                 txn,
-                table="cache_stream",
+                table="cache_invalidation_stream",
                 values={
                     "stream_id": stream_id,
                     "cache_func": cache_func.__name__,
@@ -887,7 +894,8 @@ class SQLBaseStore(object):
             # send across cache invalidations as quickly as possible. Cache
             # invalidations are idempotent, so duplicates are fine.
             sql = (
-                "SELECT stream_id, cache_func, keys, invalidation_ts FROM cache_stream"
+                "SELECT stream_id, cache_func, keys, invalidation_ts"
+                " FROM cache_invalidation_stream"
                 " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
             )
             txn.execute(sql, (last_id, limit,))
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
index 4c350bfb11..3b63a1562d 100644
--- a/synapse/storage/schema/delta/34/cache_stream.py
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -20,15 +20,17 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+# This stream is used to notify replication slaves that some caches have
+# been invalidated that they cannot infer from the other streams.
 CREATE_TABLE = """
-CREATE TABLE cache_stream (
+CREATE TABLE cache_invalidation_stream (
     stream_id       BIGINT,
     cache_func      TEXT,
     keys            TEXT[],
     invalidation_ts BIGINT
 );
 
-CREATE INDEX cache_stream_id ON cache_stream(stream_id);
+CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id);
 """