summary refs log tree commit diff
path: root/synapse/replication/slave/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-02-18 17:53:31 +0000
committerErik Johnston <erik@matrix.org>2019-02-18 17:53:31 +0000
commita9b5ea6fc1e26ff791118b67af01fdad8e9c68c8 (patch)
tree3fc801c2c0ba1106363da1344dffc52728335f2d /synapse/replication/slave/storage
parentMerge pull request #4632 from matrix-org/erikj/basic_sentry (diff)
downloadsynapse-a9b5ea6fc1e26ff791118b67af01fdad8e9c68c8.tar.xz
Batch cache invalidation over replication
Currently whenever the current state changes in a room invalidate a lot
of caches, which cause *a lot* of traffic over replication. Instead,
lets batch up all those invalidations and send a single poke down
the replication streams.

Hopefully this will reduce load on the master process by substantially
reducing traffic.
Diffstat (limited to 'synapse/replication/slave/storage')
-rw-r--r--synapse/replication/slave/storage/_base.py19
1 files changed, 12 insertions, 7 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 2d81d49e9a..1353a32d00 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -17,7 +17,7 @@ import logging
 
 import six
 
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore
 from synapse.storage.engines import PostgresEngine
 
 from ._slaved_id_tracker import SlavedIdTracker
@@ -54,12 +54,17 @@ class BaseSlavedStore(SQLBaseStore):
         if stream_name == "caches":
             self._cache_id_gen.advance(token)
             for row in rows:
-                try:
-                    getattr(self, row.cache_func).invalidate(tuple(row.keys))
-                except AttributeError:
-                    # We probably haven't pulled in the cache in this worker,
-                    # which is fine.
-                    pass
+                if row.cache_func == _CURRENT_STATE_CACHE_NAME:
+                    room_id = row.keys[0]
+                    members_changed = set(row.keys[1:])
+                    self._invalidate_state_caches(room_id, members_changed)
+                else:
+                    try:
+                        getattr(self, row.cache_func).invalidate(tuple(row.keys))
+                    except AttributeError:
+                        # We probably haven't pulled in the cache in this worker,
+                        # which is fine.
+                        pass
 
     def _invalidate_cache_and_stream(self, txn, cache_func, keys):
         txn.call_after(cache_func.invalidate, keys)