summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-04-04 10:07:57 +0100
committerGitHub <noreply@github.com>2017-04-04 10:07:57 +0100
commit27cc627e425a4a1d6baf3887b435005a571c2271 (patch)
treea5fa797ac7e65f85cae52cccd88451e5672257c6 /synapse/storage/pusher.py
parentMerge pull request #2095 from matrix-org/rav/cull_log_preserves (diff)
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/repl_tcp_s... (diff)
downloadsynapse-27cc627e425a4a1d6baf3887b435005a571c2271.tar.xz
Merge pull request #2082 from matrix-org/erikj/repl_tcp_server
Replace HTTP replication with TCP replication (Server side part)
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py42
1 files changed, 42 insertions, 0 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 8cc9f0353b..34d2f82b7f 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
+    def get_all_updated_pushers_rows(self, last_id, current_id, limit):
+        """Get all the pushers that have changed between the given tokens.
+
+        Returns:
+            Deferred(list(tuple)): each tuple consists of:
+                stream_id (str)
+                user_id (str)
+                app_id (str)
+                pushkey (str)
+                was_deleted (bool): whether the pusher was added/updated (False)
+                    or deleted (True)
+        """
+
+        if last_id == current_id:
+            return defer.succeed([])
+
+        def get_all_updated_pushers_rows_txn(txn):
+            sql = (
+                "SELECT id, user_name, app_id, pushkey"
+                " FROM pushers"
+                " WHERE ? < id AND id <= ?"
+                " ORDER BY id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            results = [list(row) + [False] for row in txn]
+
+            sql = (
+                "SELECT stream_id, user_id, app_id, pushkey"
+                " FROM deleted_pushers"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+
+            results.extend(list(row) + [True] for row in txn)
+            results.sort()  # Sort so that they're ordered by stream id
+
+            return results
+        return self.runInteraction(
+            "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
+        )
+
     @cachedInlineCallbacks(num_args=1, max_entries=15000)
     def get_if_user_has_pusher(self, user_id):
         # This only exists for the cachedList decorator