summary refs log tree commit diff
path: root/synapse/replication/slave/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-22 17:45:44 +0000
committerErik Johnston <erik@matrix.org>2016-11-22 17:45:44 +0000
commit90565d015e97a494f516cc6f06596ca5c6d490ec (patch)
tree77e37de19c6476bf5b9312168b06e74347e005db /synapse/replication/slave/storage
parentComments (diff)
downloadsynapse-90565d015e97a494f516cc6f06596ca5c6d490ec.tar.xz
Invalidate retry cache in both directions
Diffstat (limited to 'synapse/replication/slave/storage')
-rw-r--r--synapse/replication/slave/storage/_base.py19
-rw-r--r--synapse/replication/slave/storage/transactions.py9
2 files changed, 22 insertions, 6 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index f19540d6bb..18076e0f3b 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore):
         else:
             self._cache_id_gen = None
 
+        self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache"
+        self.http_client = hs.get_simple_http_client()
+
     def stream_positions(self):
         pos = {}
         if self._cache_id_gen:
@@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore):
                     logger.info("Got unexpected cache_func: %r", cache_func)
             self._cache_id_gen.advance(int(stream["position"]))
         return defer.succeed(None)
+
+    def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+        txn.call_after(cache_func.invalidate, keys)
+        txn.call_after(self._send_invalidation_poke, cache_func, keys)
+
+    @defer.inlineCallbacks
+    def _send_invalidation_poke(self, cache_func, keys):
+        try:
+            yield self.http_client.post_json_get_json(self.expire_cache_url, {
+                "invalidate": [{
+                    "name": cache_func.__name__,
+                    "keys": list(keys),
+                }]
+            })
+        except:
+            logger.exception("Failed to poke on expire_cache")
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index d92cea4ab1..fbb58f35da 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
 from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
@@ -22,12 +21,10 @@ from synapse.storage.transactions import TransactionStore
 class TransactionStore(BaseSlavedStore):
     get_destination_retry_timings = TransactionStore.__dict__[
         "get_destination_retry_timings"
-    ].orig
+    ]
     _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
+    set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
+    _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
 
     prep_send_transaction = DataStore.prep_send_transaction.__func__
     delivered_txn = DataStore.delivered_txn.__func__
-
-    # For now, don't record the destination rety timings
-    def set_destination_retry_timings(*args, **kwargs):
-        return defer.succeed(None)