diff options
author | Erik Johnston <erikj@jki.re> | 2017-04-04 10:07:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-04 10:07:57 +0100 |
commit | 27cc627e425a4a1d6baf3887b435005a571c2271 (patch) | |
tree | a5fa797ac7e65f85cae52cccd88451e5672257c6 /synapse/storage/pusher.py | |
parent | Merge pull request #2095 from matrix-org/rav/cull_log_preserves (diff) | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into erikj/repl_tcp_s... (diff) | |
download | synapse-27cc627e425a4a1d6baf3887b435005a571c2271.tar.xz |
Merge pull request #2082 from matrix-org/erikj/repl_tcp_server
Replace HTTP replication with TCP replication (Server side part)
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8cc9f0353b..34d2f82b7f 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + def get_all_updated_pushers_rows(self, last_id, current_id, limit): + """Get all the pushers that have changed between the given tokens. + + Returns: + Deferred(list(tuple)): each tuple consists of: + stream_id (str) + user_id (str) + app_id (str) + pushkey (str) + was_deleted (bool): whether the pusher was added/updated (False) + or deleted (True) + """ + + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_pushers_rows_txn(txn): + sql = ( + "SELECT id, user_name, app_id, pushkey" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + results = [list(row) + [False] for row in txn] + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + + results.extend(list(row) + [True] for row in txn) + results.sort() # Sort so that they're ordered by stream id + + return results + return self.runInteraction( + "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn + ) + @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator |