summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 6ebf944f66..ce60ae2e07 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,7 +17,7 @@
 
 import logging
 import random
-from typing import List
+from typing import Any, List
 
 from six import itervalues
 
@@ -271,11 +271,14 @@ class ReplicationStreamer(object):
         self.notifier.on_new_replication_data()
 
     @measure_func("repl.on_invalidate_cache")
-    def on_invalidate_cache(self, cache_func, keys):
+    async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
         """The client has asked us to invalidate a cache
         """
         invalidate_cache_counter.inc()
-        getattr(self.store, cache_func).invalidate(tuple(keys))
+
+        # We invalidate the cache locally, but then also stream that to other
+        # workers.
+        await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))
 
     @measure_func("repl.on_user_ip")
     async def on_user_ip(