summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-27 14:03:38 +0100
committerErik Johnston <erik@matrix.org>2017-03-30 11:48:35 +0100
commit24d35ab47bdec651706a221974424409d9ab036b (patch)
tree17231cca811506a14ab23094c0ffc2c7c621df95 /synapse/storage/pusher.py
parentUse txn.fetchall() so we can reuse txn (diff)
downloadsynapse-24d35ab47bdec651706a221974424409d9ab036b.tar.xz
Add new storage functions for new replication
The new replication protocol will keep all the streams separate, rather
than muxing multiple streams into one.
Diffstat (limited to '')
-rw-r--r--synapse/storage/pusher.py42
1 files changed, 42 insertions, 0 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 8cc9f0353b..715c8bef24 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
+    def get_all_updated_pushers_rows(self, last_id, current_id, limit):
+        """Get all the pushers that have changed between the given tokens.
+
+        Returns:
+            list(tuple): each tuple consists of:
+                stream_id (str)
+                user_id (str)
+                app_id (str)
+                pushkey (str)
+                was_deleted (bool): whether the pusher was added/updated (False)
+                    or deleted (True)
+        """
+
+        if last_id == current_id:
+            return defer.succeed([])
+
+        def get_all_updated_pushers_rows_txn(txn):
+            sql = (
+                "SELECT id, user_name, app_id, pushkey"
+                " FROM pushers"
+                " WHERE ? < id AND id <= ?"
+                " ORDER BY id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            results = [list(row) + [False] for row in txn]
+
+            sql = (
+                "SELECT stream_id, user_id, app_id, pushkey"
+                " FROM deleted_pushers"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+
+            results.extend(list(row) + [True] for row in txn)
+            results.sort()  # Sort so that they're ordered by stream id
+
+            return results
+        return self.runInteraction(
+            "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
+        )
+
     @cachedInlineCallbacks(num_args=1, max_entries=15000)
     def get_if_user_has_pusher(self, user_id):
         # This only exists for the cachedList decorator