diff options
author | Erik Johnston <erik@matrix.org> | 2017-03-27 14:03:38 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-03-30 11:48:35 +0100 |
commit | 24d35ab47bdec651706a221974424409d9ab036b (patch) | |
tree | 17231cca811506a14ab23094c0ffc2c7c621df95 /synapse/storage/pusher.py | |
parent | Use txn.fetchall() so we can reuse txn (diff) | |
download | synapse-24d35ab47bdec651706a221974424409d9ab036b.tar.xz |
Add new storage functions for new replication
The new replication protocol will keep all the streams separate, rather than muxing multiple streams into one.
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r-- | synapse/storage/pusher.py | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8cc9f0353b..715c8bef24 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + def get_all_updated_pushers_rows(self, last_id, current_id, limit): + """Get all the pushers that have changed between the given tokens. + + Returns: + list(tuple): each tuple consists of: + stream_id (str) + user_id (str) + app_id (str) + pushkey (str) + was_deleted (bool): whether the pusher was added/updated (False) + or deleted (True) + """ + + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_pushers_rows_txn(txn): + sql = ( + "SELECT id, user_name, app_id, pushkey" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + results = [list(row) + [False] for row in txn] + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + + results.extend(list(row) + [True] for row in txn) + results.sort() # Sort so that they're ordered by stream id + + return results + return self.runInteraction( + "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn + ) + @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator |