diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index 0613b49f4a..9a1178fb39 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -207,6 +207,50 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)
+ def get_all_new_device_messages(self, last_pos, current_pos, limit):
+ """
+ Args:
+ last_pos(int):
+ current_pos(int):
+ limit(int):
+ Returns:
+ A deferred list of rows from the device inbox
+ """
+ if last_pos == current_pos:
+ return defer.succeed([])
+
+ def get_all_new_device_messages_txn(txn):
+ # We limit like this as we might have multiple rows per stream_id, and
+ # we want to make sure we always get all entries for any stream_id
+ # we return.
+ upper_pos = min(current_pos, last_pos + limit)
+ sql = (
+ "SELECT max(stream_id), user_id"
+ " FROM device_inbox"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " GROUP BY user_id"
+ )
+ txn.execute(sql, (last_pos, upper_pos))
+ rows = txn.fetchall()
+
+ sql = (
+ "SELECT max(stream_id), destination"
+ " FROM device_federation_outbox"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " GROUP BY destination"
+ )
+ txn.execute(sql, (last_pos, upper_pos))
+ rows.extend(txn)
+
+ # Order by ascending stream ordering
+ rows.sort()
+
+ return rows
+
+ return self.db.runInteraction(
+ "get_all_new_device_messages", get_all_new_device_messages_txn
+ )
+
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
@@ -411,47 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
rows.append((user_id, device_id, stream_id, message_json))
txn.executemany(sql, rows)
-
- def get_all_new_device_messages(self, last_pos, current_pos, limit):
- """
- Args:
- last_pos(int):
- current_pos(int):
- limit(int):
- Returns:
- A deferred list of rows from the device inbox
- """
- if last_pos == current_pos:
- return defer.succeed([])
-
- def get_all_new_device_messages_txn(txn):
- # We limit like this as we might have multiple rows per stream_id, and
- # we want to make sure we always get all entries for any stream_id
- # we return.
- upper_pos = min(current_pos, last_pos + limit)
- sql = (
- "SELECT max(stream_id), user_id"
- " FROM device_inbox"
- " WHERE ? < stream_id AND stream_id <= ?"
- " GROUP BY user_id"
- )
- txn.execute(sql, (last_pos, upper_pos))
- rows = txn.fetchall()
-
- sql = (
- "SELECT max(stream_id), destination"
- " FROM device_federation_outbox"
- " WHERE ? < stream_id AND stream_id <= ?"
- " GROUP BY destination"
- )
- txn.execute(sql, (last_pos, upper_pos))
- rows.extend(txn)
-
- # Order by ascending stream ordering
- rows.sort()
-
- return rows
-
- return self.db.runInteraction(
- "get_all_new_device_messages", get_all_new_device_messages_txn
- )
|