1 files changed, 19 insertions, 0 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index f19540d6bb..18076e0f3b 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore):
else:
self._cache_id_gen = None
+ self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache"
+ self.http_client = hs.get_simple_http_client()
+
def stream_positions(self):
pos = {}
if self._cache_id_gen:
@@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore):
logger.info("Got unexpected cache_func: %r", cache_func)
self._cache_id_gen.advance(int(stream["position"]))
return defer.succeed(None)
+
+ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+ txn.call_after(cache_func.invalidate, keys)
+ txn.call_after(self._send_invalidation_poke, cache_func, keys)
+
+ @defer.inlineCallbacks
+ def _send_invalidation_poke(self, cache_func, keys):
+ try:
+ yield self.http_client.post_json_get_json(self.expire_cache_url, {
+ "invalidate": [{
+ "name": cache_func.__name__,
+ "keys": list(keys),
+ }]
+ })
+ except:
+ logger.exception("Failed to poke on expire_cache")
|