diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 8cc9f0353b..715c8bef24 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn
)
+ def get_all_updated_pushers_rows(self, last_id, current_id, limit):
+ """Get all the pushers that have changed between the given tokens.
+
+ Returns:
+ list(tuple): each tuple consists of:
+ stream_id (str)
+ user_id (str)
+ app_id (str)
+ pushkey (str)
+ was_deleted (bool): whether the pusher was added/updated (False)
+ or deleted (True)
+ """
+
+ if last_id == current_id:
+ return defer.succeed([])
+
+ def get_all_updated_pushers_rows_txn(txn):
+ sql = (
+ "SELECT id, user_name, app_id, pushkey"
+ " FROM pushers"
+ " WHERE ? < id AND id <= ?"
+ " ORDER BY id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ results = [list(row) + [False] for row in txn]
+
+ sql = (
+ "SELECT stream_id, user_id, app_id, pushkey"
+ " FROM deleted_pushers"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " ORDER BY stream_id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+
+ results.extend(list(row) + [True] for row in txn)
+ results.sort() # Sort so that they're ordered by stream id
+
+ return results
+ return self.runInteraction(
+ "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
+ )
+
@cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id):
# This only exists for the cachedList decorator
|