diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d7dcd1ce5b..5572cb883f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -609,14 +609,14 @@ class SyncHandler(object):
deleted = yield self.store.delete_messages_for_device(
user_id, device_id, since_stream_id
)
- logger.info("Deleted %d to-device messages up to %d",
- deleted, since_stream_id)
+ logger.debug("Deleted %d to-device messages up to %d",
+ deleted, since_stream_id)
messages, stream_id = yield self.store.get_new_messages_for_device(
user_id, device_id, since_stream_id, now_token.to_device_key
)
- logger.info(
+ logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
len(messages), since_stream_id, stream_id, now_token.to_device_key
)
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index bde3b5cbbc..5c7db5e5f6 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -20,6 +20,8 @@ from twisted.internet import defer
from .background_updates import BackgroundUpdateStore
+from synapse.util.caches.expiringcache import ExpiringCache
+
logger = logging.getLogger(__name__)
@@ -42,6 +44,15 @@ class DeviceInboxStore(BackgroundUpdateStore):
self._background_drop_index_device_inbox,
)
+ # Map of (user_id, device_id) to the last stream_id that has been
+ # deleted up to. This is so that we can no op deletions.
+ self._last_device_delete_cache = ExpiringCache(
+ cache_name="last_device_delete_cache",
+ clock=self._clock,
+ max_len=10000,
+ expiry_ms=30 * 60 * 1000,
+ )
+
@defer.inlineCallbacks
def add_messages_to_device_inbox(self, local_messages_by_user_then_device,
remote_messages_by_destination):
@@ -251,6 +262,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
"get_new_messages_for_device", get_new_messages_for_device_txn,
)
+ @defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
"""
Args:
@@ -260,6 +272,18 @@ class DeviceInboxStore(BackgroundUpdateStore):
Returns:
A deferred that resolves to the number of messages deleted.
"""
+ # If we have cached the last stream id we've deleted up to, we can
+ # check if there is likely to be anything that needs deleting
+ last_deleted_stream_id = self._last_device_delete_cache.get(
+ (user_id, device_id), None
+ )
+ if last_deleted_stream_id:
+ has_changed = self._device_inbox_stream_cache.has_entity_changed(
+ user_id, last_deleted_stream_id
+ )
+ if not has_changed:
+ defer.returnValue(0)
+
def delete_messages_for_device_txn(txn):
sql = (
"DELETE FROM device_inbox"
@@ -269,10 +293,20 @@ class DeviceInboxStore(BackgroundUpdateStore):
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
- return self.runInteraction(
+ count = yield self.runInteraction(
"delete_messages_for_device", delete_messages_for_device_txn
)
+ # Update the cache, ensuring that we only ever increase the value
+ last_deleted_stream_id = self._last_device_delete_cache.get(
+ (user_id, device_id), 0
+ )
+ self._last_device_delete_cache[(user_id, device_id)] = max(
+ last_deleted_stream_id, up_to_stream_id
+ )
+
+ defer.returnValue(count)
+
def get_all_new_device_messages(self, last_pos, current_pos, limit):
"""
Args:
|