diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index ab133db872..b962641166 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -15,7 +15,6 @@
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
-from twisted.internet import defer
from ._slaved_id_tracker import SlavedIdTracker
@@ -34,8 +33,7 @@ class BaseSlavedStore(SQLBaseStore):
else:
self._cache_id_gen = None
- self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache"
- self.http_client = hs.get_simple_http_client()
+ self.hs = hs
def stream_positions(self):
pos = {}
@@ -43,35 +41,20 @@ class BaseSlavedStore(SQLBaseStore):
pos["caches"] = self._cache_id_gen.get_current_token()
return pos
- def process_replication(self, result):
- stream = result.get("caches")
- if stream:
- for row in stream["rows"]:
- (
- position, cache_func, keys, invalidation_ts,
- ) = row
-
+ def process_replication_rows(self, stream_name, token, rows):
+ if stream_name == "caches":
+ self._cache_id_gen.advance(token)
+ for row in rows:
try:
- getattr(self, cache_func).invalidate(tuple(keys))
+ getattr(self, row.cache_func).invalidate(tuple(row.keys))
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
pass
- self._cache_id_gen.advance(int(stream["position"]))
- return defer.succeed(None)
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
txn.call_after(self._send_invalidation_poke, cache_func, keys)
- @defer.inlineCallbacks
def _send_invalidation_poke(self, cache_func, keys):
- try:
- yield self.http_client.post_json_get_json(self.expire_cache_url, {
- "invalidate": [{
- "name": cache_func.__name__,
- "keys": list(keys),
- }]
- })
- except:
- logger.exception("Failed to poke on expire_cache")
+ self.hs.get_tcp_replication().send_invalidate_cache(cache_func, keys)
|