summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-27 13:47:50 +0000
committerGitHub <noreply@github.com>2020-01-27 13:47:50 +0000
commitd5275fc55f4edc42d1543825da2c13df63d96927 (patch)
tree2244270ee23c522bab16e28e7c3fac94aed78eab /synapse/replication/tcp/resource.py
parentMerge pull request #6775 from matrix-org/jaywink/worker-docs-tweaks (diff)
downloadsynapse-d5275fc55f4edc42d1543825da2c13df63d96927.tar.xz
Propagate cache invalidates from workers to other workers. (#6748)
Currently if a worker invalidates a cache it will be streamed to master, which then didn't forward those to other workers.
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 6ebf944f66..ce60ae2e07 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,7 +17,7 @@
 
 import logging
 import random
-from typing import List
+from typing import Any, List
 
 from six import itervalues
 
@@ -271,11 +271,14 @@ class ReplicationStreamer(object):
         self.notifier.on_new_replication_data()
 
     @measure_func("repl.on_invalidate_cache")
-    def on_invalidate_cache(self, cache_func, keys):
+    async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
         """The client has asked us to invalidate a cache
         """
         invalidate_cache_counter.inc()
-        getattr(self.store, cache_func).invalidate(tuple(keys))
+
+        # We invalidate the cache locally, but then also stream that to other
+        # workers.
+        await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))
 
     @measure_func("repl.on_user_ip")
     async def on_user_ip(