summary refs log tree commit diff
path: root/synapse/storage/transactions.py
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2018-10-09 10:05:02 +0100
committerDavid Baker <dave@matrix.org>2018-10-09 10:05:02 +0100
commitdc045ef20222bfbe8dcb5dae297e741509cce8d1 (patch)
treeee03ab45ce9791a06c12d15c01d3412cd101330a /synapse/storage/transactions.py
parentApparently this blank line is Very Important (diff)
parentMerge pull request #4017 from matrix-org/rav/optimise_filter_events_for_server (diff)
downloadsynapse-dc045ef20222bfbe8dcb5dae297e741509cce8d1.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/e2e_backups
Diffstat (limited to 'synapse/storage/transactions.py')
-rw-r--r--synapse/storage/transactions.py38
1 files changed, 26 insertions, 12 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 428e7fa36e..a3032cdce9 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -18,14 +18,14 @@ from collections import namedtuple
 
 import six
 
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
 
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.expiringcache import ExpiringCache
 
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, db_to_json
 
 # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
 # despite being deprecated and removed in favor of memoryview
@@ -50,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
     )
 )
 
+SENTINEL = object()
+
 
 class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
@@ -60,6 +62,12 @@ class TransactionStore(SQLBaseStore):
 
         self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
 
+        self._destination_retry_cache = ExpiringCache(
+            cache_name="get_destination_retry_timings",
+            clock=self._clock,
+            expiry_ms=5 * 60 * 1000,
+        )
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -95,7 +103,8 @@ class TransactionStore(SQLBaseStore):
         )
 
         if result and result["response_code"]:
-            return result["response_code"], json.loads(str(result["response_json"]))
+            return result["response_code"], db_to_json(result["response_json"])
+
         else:
             return None
 
@@ -155,7 +164,7 @@ class TransactionStore(SQLBaseStore):
         """
         pass
 
-    @cached(max_entries=10000)
+    @defer.inlineCallbacks
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.
 
@@ -166,10 +175,20 @@ class TransactionStore(SQLBaseStore):
             None if not retrying
             Otherwise a dict for the retry scheme
         """
-        return self.runInteraction(
+
+        result = self._destination_retry_cache.get(destination, SENTINEL)
+        if result is not SENTINEL:
+            defer.returnValue(result)
+
+        result = yield self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
 
+        # We don't hugely care about race conditions between getting and
+        # invalidating the cache, since we time out fairly quickly anyway.
+        self._destination_retry_cache[destination] = result
+        defer.returnValue(result)
+
     def _get_destination_retry_timings(self, txn, destination):
         result = self._simple_select_one_txn(
             txn,
@@ -197,8 +216,7 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
-        # XXX: we could chose to not bother persisting this if our cache thinks
-        # this is a NOOP
+        self._destination_retry_cache.pop(destination, None)
         return self.runInteraction(
             "set_destination_retry_timings",
             self._set_destination_retry_timings,
@@ -211,10 +229,6 @@ class TransactionStore(SQLBaseStore):
                                        retry_last_ts, retry_interval):
         self.database_engine.lock_table(txn, "destinations")
 
-        self._invalidate_cache_and_stream(
-            txn, self.get_destination_retry_timings, (destination,)
-        )
-
         # We need to be careful here as the data may have changed from under us
         # due to a worker setting the timings.