summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-27 13:47:50 +0000
committerGitHub <noreply@github.com>2020-01-27 13:47:50 +0000
commitd5275fc55f4edc42d1543825da2c13df63d96927 (patch)
tree2244270ee23c522bab16e28e7c3fac94aed78eab /synapse/replication
parentMerge pull request #6775 from matrix-org/jaywink/worker-docs-tweaks (diff)
downloadsynapse-d5275fc55f4edc42d1543825da2c13df63d96927.tar.xz
Propagate cache invalidates from workers to other workers. (#6748)
Currently if a worker invalidates a cache it will be streamed to master, which then didn't forward those to other workers.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/protocol.py2
-rw-r--r--synapse/replication/tcp/resource.py9
2 files changed, 7 insertions, 4 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 131e5acb09..bc1482a9bb 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -459,7 +459,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
 
     async def on_INVALIDATE_CACHE(self, cmd):
-        self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
+        await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
     async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
         self.streamer.on_remote_server_up(cmd.data)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 6ebf944f66..ce60ae2e07 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,7 +17,7 @@
 
 import logging
 import random
-from typing import List
+from typing import Any, List
 
 from six import itervalues
 
@@ -271,11 +271,14 @@ class ReplicationStreamer(object):
         self.notifier.on_new_replication_data()
 
     @measure_func("repl.on_invalidate_cache")
-    def on_invalidate_cache(self, cache_func, keys):
+    async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
         """The client has asked us to invalidate a cache
         """
         invalidate_cache_counter.inc()
-        getattr(self.store, cache_func).invalidate(tuple(keys))
+
+        # We invalidate the cache locally, but then also stream that to other
+        # workers.
+        await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))
 
     @measure_func("repl.on_user_ip")
     async def on_user_ip(