diff options
author | Erik Johnston <erik@matrix.org> | 2017-03-27 14:03:38 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-03-30 11:48:35 +0100 |
commit | 24d35ab47bdec651706a221974424409d9ab036b (patch) | |
tree | 17231cca811506a14ab23094c0ffc2c7c621df95 | |
parent | Use txn.fetchall() so we can reuse txn (diff) | |
download | synapse-24d35ab47bdec651706a221974424409d9ab036b.tar.xz |
Add new storage functions for new replication
The new replication protocol will keep all the streams separate, rather than muxing multiple streams into one.
-rw-r--r-- | synapse/handlers/typing.py | 3 | ||||
-rw-r--r-- | synapse/replication/resource.py | 2 | ||||
-rw-r--r-- | synapse/storage/devices.py | 6 | ||||
-rw-r--r-- | synapse/storage/events.py | 88 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 42 |
5 files changed, 137 insertions, 4 deletions
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0eea7f8f9c..d6809862e0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -293,6 +293,9 @@ class TypingHandler(object): rows.sort() return rows + def get_current_token(self): + return self._latest_room_serial + class TypingNotificationEventSource(object): def __init__(self, hs): diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 03930fe958..2d3ec2eca2 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -504,7 +504,7 @@ class ReplicationResource(Resource): if device_lists is not None and device_lists != current_position: changes = yield self.store.get_all_device_list_changes_for_remotes( - device_lists, + device_lists, current_position, ) writer.write_header_and_rows("device_lists", changes, ( "position", "user_id", "destination", diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 53e36791d5..c8d5f5ba8b 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -533,7 +533,7 @@ class DeviceStore(SQLBaseStore): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row[0] for row in rows)) - def get_all_device_list_changes_for_remotes(self, from_key): + def get_all_device_list_changes_for_remotes(self, from_key, to_key): """Return a list of `(stream_id, user_id, destination)` which is the combined list of changes to devices, and which destinations need to be poked. `destination` may be None if no destinations need to be poked. @@ -541,11 +541,11 @@ class DeviceStore(SQLBaseStore): sql = """ SELECT stream_id, user_id, destination FROM device_lists_stream LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) - WHERE stream_id > ? + WHERE ? < stream_id AND stream_id <= ? """ return self._execute( "get_all_device_list_changes_for_remotes", None, - sql, from_key, + sql, from_key, to_key ) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3f6833fad2..64fe937bdc 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1771,6 +1771,94 @@ class EventsStore(SQLBaseStore): """The current minimum token that backfilled events have reached""" return -self._backfill_id_gen.get_current_token() + def get_current_events_token(self): + """The current maximum token that events have reached""" + return self._stream_id_gen.get_current_token() + + def get_all_new_forward_event_rows(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_new_forward_event_rows(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + new_event_updates = txn.fetchall() + + if len(new_event_updates) == limit: + upper_bound = new_event_updates[-1][0] + else: + upper_bound = current_id + + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (last_id, upper_bound)) + new_event_updates.extend(txn) + + return new_event_updates + return self.runInteraction( + "get_all_new_forward_event_rows", get_all_new_forward_event_rows + ) + + def get_all_new_backfill_event_rows(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_new_backfill_event_rows(txn): + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (-last_id, -current_id, limit)) + new_event_updates = txn.fetchall() + + if len(new_event_updates) == limit: + upper_bound = new_event_updates[-1][0] + else: + upper_bound = current_id + + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (-last_id, -upper_bound)) + new_event_updates.extend(txn.fetchall()) + + return new_event_updates + return self.runInteraction( + "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows + ) + @cached(num_args=5, max_entries=10) def get_all_new_events(self, last_backfill_id, last_forward_id, current_backfill_id, current_forward_id, limit): diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8cc9f0353b..715c8bef24 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + def get_all_updated_pushers_rows(self, last_id, current_id, limit): + """Get all the pushers that have changed between the given tokens. + + Returns: + list(tuple): each tuple consists of: + stream_id (str) + user_id (str) + app_id (str) + pushkey (str) + was_deleted (bool): whether the pusher was added/updated (False) + or deleted (True) + """ + + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_pushers_rows_txn(txn): + sql = ( + "SELECT id, user_name, app_id, pushkey" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + results = [list(row) + [False] for row in txn] + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + + results.extend(list(row) + [True] for row in txn) + results.sort() # Sort so that they're ordered by stream id + + return results + return self.runInteraction( + "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn + ) + @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator |