diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 8af492b69f..7efc5bfeef 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -126,7 +126,7 @@ class DataStore(RoomMemberStore, RoomStore,
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = StreamIdGenerator(
- db_conn, "cache_stream", "stream_id",
+ db_conn, "cache_invalidation_stream", "stream_id",
)
else:
self._cache_id_gen = None
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e3edc2cde6..c55776994d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -863,6 +863,13 @@ class SQLBaseStore(object):
return cache, min_val
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+ """Invalidates the cache and adds it to the cache stream so slaves
+ will know to invalidate their caches.
+
+ This should only be used to invalidate caches where slaves won't
+ otherwise know from other replication streams that the cache should
+ be invalidated.
+ """
txn.call_after(cache_func.invalidate, keys)
if isinstance(self.database_engine, PostgresEngine):
@@ -872,7 +879,7 @@ class SQLBaseStore(object):
self._simple_insert_txn(
txn,
- table="cache_stream",
+ table="cache_invalidation_stream",
values={
"stream_id": stream_id,
"cache_func": cache_func.__name__,
@@ -887,7 +894,8 @@ class SQLBaseStore(object):
# send across cache invalidations as quickly as possible. Cache
# invalidations are idempotent, so duplicates are fine.
sql = (
- "SELECT stream_id, cache_func, keys, invalidation_ts FROM cache_stream"
+ "SELECT stream_id, cache_func, keys, invalidation_ts"
+ " FROM cache_invalidation_stream"
" WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
)
txn.execute(sql, (last_id, limit,))
diff --git a/synapse/storage/schema/delta/34/cache_stream.py b/synapse/storage/schema/delta/34/cache_stream.py
index 4c350bfb11..3b63a1562d 100644
--- a/synapse/storage/schema/delta/34/cache_stream.py
+++ b/synapse/storage/schema/delta/34/cache_stream.py
@@ -20,15 +20,17 @@ import logging
logger = logging.getLogger(__name__)
+# This stream is used to notify replication slaves that some caches have
+# been invalidated that they cannot infer from the other streams.
CREATE_TABLE = """
-CREATE TABLE cache_stream (
+CREATE TABLE cache_invalidation_stream (
stream_id BIGINT,
cache_func TEXT,
keys TEXT[],
invalidation_ts BIGINT
);
-CREATE INDEX cache_stream_id ON cache_stream(stream_id);
+CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id);
"""
|