summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2021-11-09 14:31:15 +0000
committerGitHub <noreply@github.com>2021-11-09 14:31:15 +0000
commita026695083d66f9fc67a892b59eee66f039e2038 (patch)
treeef8e4185c84135f82fd7a3ece87196ffaf1bbcab /synapse/storage/databases/main/deviceinbox.py
parentAllow admins to proactively block rooms (#11228) (diff)
downloadsynapse-a026695083d66f9fc67a892b59eee66f039e2038.tar.xz
Clarifications and small fixes to to-device related code (#11247)
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py23
1 files changed, 20 insertions, 3 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py

index 264e625bd7..ae3afdd5d2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -134,7 +134,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): limit: The maximum number of messages to retrieve. Returns: - A list of messages for the device and where in the stream the messages got to. + A tuple containing: + * A list of messages for the device. + * The max stream token of these messages. There may be more to retrieve + if the given limit was reached. """ has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_stream_id @@ -153,12 +156,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): txn.execute( sql, (user_id, device_id, last_stream_id, current_stream_id, limit) ) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -260,13 +270,20 @@ class DeviceInboxWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (destination, last_stream_id, current_stream_id, limit)) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -372,8 +389,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): """Used to send messages from this server. Args: - local_messages_by_user_and_device: - Dictionary of user_id to device_id to message. + local_messages_by_user_then_device: + Dictionary of recipient user_id to recipient device_id to message. remote_messages_by_destination: Dictionary of destination server_name to the EDU JSON to send.