summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py8
-rw-r--r--synapse/storage/deviceinbox.py36
3 files changed, 46 insertions, 4 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d7dcd1ce5b..5572cb883f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -609,14 +609,14 @@ class SyncHandler(object):
             deleted = yield self.store.delete_messages_for_device(
                 user_id, device_id, since_stream_id
             )
-            logger.info("Deleted %d to-device messages up to %d",
-                        deleted, since_stream_id)
+            logger.debug("Deleted %d to-device messages up to %d",
+                         deleted, since_stream_id)
 
             messages, stream_id = yield self.store.get_new_messages_for_device(
                 user_id, device_id, since_stream_id, now_token.to_device_key
             )
 
-            logger.info(
+            logger.debug(
                 "Returning %d to-device messages between %d and %d (current token: %d)",
                 len(messages), since_stream_id, stream_id, now_token.to_device_key
             )
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index cc860f9f9b..f9102e0d89 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -17,6 +17,7 @@ from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.caches.expiringcache import ExpiringCache
 
 
 class SlavedDeviceInboxStore(BaseSlavedStore):
@@ -34,6 +35,13 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
             self._device_inbox_id_gen.get_current_token()
         )
 
+        self._last_device_delete_cache = ExpiringCache(
+            cache_name="last_device_delete_cache",
+            clock=self._clock,
+            max_len=10000,
+            expiry_ms=30 * 60 * 1000,
+        )
+
     get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
     get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
     get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index bde3b5cbbc..5c7db5e5f6 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -20,6 +20,8 @@ from twisted.internet import defer
 
 from .background_updates import BackgroundUpdateStore
 
+from synapse.util.caches.expiringcache import ExpiringCache
+
 
 logger = logging.getLogger(__name__)
 
@@ -42,6 +44,15 @@ class DeviceInboxStore(BackgroundUpdateStore):
             self._background_drop_index_device_inbox,
         )
 
+        # Map of (user_id, device_id) to the last stream_id that has been
+        # deleted up to. This is so that we can no op deletions.
+        self._last_device_delete_cache = ExpiringCache(
+            cache_name="last_device_delete_cache",
+            clock=self._clock,
+            max_len=10000,
+            expiry_ms=30 * 60 * 1000,
+        )
+
     @defer.inlineCallbacks
     def add_messages_to_device_inbox(self, local_messages_by_user_then_device,
                                      remote_messages_by_destination):
@@ -251,6 +262,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             "get_new_messages_for_device", get_new_messages_for_device_txn,
         )
 
+    @defer.inlineCallbacks
     def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
         """
         Args:
@@ -260,6 +272,18 @@ class DeviceInboxStore(BackgroundUpdateStore):
         Returns:
             A deferred that resolves to the number of messages deleted.
         """
+        # If we have cached the last stream id we've deleted up to, we can
+        # check if there is likely to be anything that needs deleting
+        last_deleted_stream_id = self._last_device_delete_cache.get(
+            (user_id, device_id), None
+        )
+        if last_deleted_stream_id:
+            has_changed = self._device_inbox_stream_cache.has_entity_changed(
+                user_id, last_deleted_stream_id
+            )
+            if not has_changed:
+                defer.returnValue(0)
+
         def delete_messages_for_device_txn(txn):
             sql = (
                 "DELETE FROM device_inbox"
@@ -269,10 +293,20 @@ class DeviceInboxStore(BackgroundUpdateStore):
             txn.execute(sql, (user_id, device_id, up_to_stream_id))
             return txn.rowcount
 
-        return self.runInteraction(
+        count = yield self.runInteraction(
             "delete_messages_for_device", delete_messages_for_device_txn
         )
 
+        # Update the cache, ensuring that we only ever increase the value
+        last_deleted_stream_id = self._last_device_delete_cache.get(
+            (user_id, device_id), 0
+        )
+        self._last_device_delete_cache[(user_id, device_id)] = max(
+            last_deleted_stream_id, up_to_stream_id
+        )
+
+        defer.returnValue(count)
+
     def get_all_new_device_messages(self, last_pos, current_pos, limit):
         """
         Args: