diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 264e625bd7..ae3afdd5d2 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -134,7 +134,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
limit: The maximum number of messages to retrieve.
Returns:
- A list of messages for the device and where in the stream the messages got to.
+ A tuple containing:
+ * A list of messages for the device.
+ * The max stream token of these messages. There may be more to retrieve
+ if the given limit was reached.
"""
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_stream_id
@@ -153,12 +156,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
txn.execute(
sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
)
+
messages = []
+ stream_pos = current_stream_id
+
for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))
+
+ # If the limit was not reached we know that there's no more data for this
+ # user/device pair up to current_stream_id.
if len(messages) < limit:
stream_pos = current_stream_id
+
return messages, stream_pos
return await self.db_pool.runInteraction(
@@ -260,13 +270,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
" LIMIT ?"
)
txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
+
messages = []
+ stream_pos = current_stream_id
+
for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))
+
+ # If the limit was not reached we know that there's no more data for this
+ # user/device pair up to current_stream_id.
if len(messages) < limit:
log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id
+
return messages, stream_pos
return await self.db_pool.runInteraction(
@@ -372,8 +389,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"""Used to send messages from this server.
Args:
- local_messages_by_user_and_device:
- Dictionary of user_id to device_id to message.
+ local_messages_by_user_then_device:
+ Dictionary of recipient user_id to recipient device_id to message.
remote_messages_by_destination:
Dictionary of destination server_name to the EDU JSON to send.
|