From 24d35ab47bdec651706a221974424409d9ab036b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:03:38 +0100 Subject: Add new storage functions for new replication The new replication protocol will keep all the streams separate, rather than muxing multiple streams into one. --- synapse/storage/devices.py | 6 ++-- synapse/storage/events.py | 88 ++++++++++++++++++++++++++++++++++++++++++++++ synapse/storage/pusher.py | 42 ++++++++++++++++++++++ 3 files changed, 133 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 53e36791d5..c8d5f5ba8b 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -533,7 +533,7 @@ class DeviceStore(SQLBaseStore): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row[0] for row in rows)) - def get_all_device_list_changes_for_remotes(self, from_key): + def get_all_device_list_changes_for_remotes(self, from_key, to_key): """Return a list of `(stream_id, user_id, destination)` which is the combined list of changes to devices, and which destinations need to be poked. `destination` may be None if no destinations need to be poked. @@ -541,11 +541,11 @@ class DeviceStore(SQLBaseStore): sql = """ SELECT stream_id, user_id, destination FROM device_lists_stream LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) - WHERE stream_id > ? + WHERE ? < stream_id AND stream_id <= ? """ return self._execute( "get_all_device_list_changes_for_remotes", None, - sql, from_key, + sql, from_key, to_key ) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3f6833fad2..64fe937bdc 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1771,6 +1771,94 @@ class EventsStore(SQLBaseStore): """The current minimum token that backfilled events have reached""" return -self._backfill_id_gen.get_current_token() + def get_current_events_token(self): + """The current maximum token that events have reached""" + return self._stream_id_gen.get_current_token() + + def get_all_new_forward_event_rows(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_new_forward_event_rows(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + new_event_updates = txn.fetchall() + + if len(new_event_updates) == limit: + upper_bound = new_event_updates[-1][0] + else: + upper_bound = current_id + + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (last_id, upper_bound)) + new_event_updates.extend(txn) + + return new_event_updates + return self.runInteraction( + "get_all_new_forward_event_rows", get_all_new_forward_event_rows + ) + + def get_all_new_backfill_event_rows(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_new_backfill_event_rows(txn): + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (-last_id, -current_id, limit)) + new_event_updates = txn.fetchall() + + if len(new_event_updates) == limit: + upper_bound = new_event_updates[-1][0] + else: + upper_bound = current_id + + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (-last_id, -upper_bound)) + new_event_updates.extend(txn.fetchall()) + + return new_event_updates + return self.runInteraction( + "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows + ) + @cached(num_args=5, max_entries=10) def get_all_new_events(self, last_backfill_id, last_forward_id, current_backfill_id, current_forward_id, limit): diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8cc9f0353b..715c8bef24 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore): "get_all_updated_pushers", get_all_updated_pushers_txn ) + def get_all_updated_pushers_rows(self, last_id, current_id, limit): + """Get all the pushers that have changed between the given tokens. + + Returns: + list(tuple): each tuple consists of: + stream_id (str) + user_id (str) + app_id (str) + pushkey (str) + was_deleted (bool): whether the pusher was added/updated (False) + or deleted (True) + """ + + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_pushers_rows_txn(txn): + sql = ( + "SELECT id, user_name, app_id, pushkey" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + results = [list(row) + [False] for row in txn] + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + + results.extend(list(row) + [True] for row in txn) + results.sort() # Sort so that they're ordered by stream id + + return results + return self.runInteraction( + "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn + ) + @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator -- cgit 1.4.1 From bfcf016714575edb0ad2c19b2f00694d62ca08ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:19:24 +0100 Subject: Fix up docs --- docs/tcp_replication.rst | 16 ++++++++-------- synapse/replication/tcp/__init__.py | 18 +----------------- synapse/replication/tcp/streams.py | 4 ++-- synapse/storage/pusher.py | 2 +- 4 files changed, 12 insertions(+), 28 deletions(-) (limited to 'synapse/storage') diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index be0aa6b28c..7393527f6f 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -1,20 +1,20 @@ TCP Replication =============== -This describes the TCP replication protocol that replaces the HTTP protocol. - Motivation ---------- -The HTTP API used long poll from the workers to the master, this has the problem -of causing a lot of duplicate work on the server. This TCP protocol aims to -solve. +Previously the workers used an HTTP long poll mechanism to get updates from the +master, which had the problem of causing a lot of duplicate work on the server. +This TCP protocol replaces those APIs with the aim of increased efficiency. + + Overview -------- The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: +would be (where '>' indicates master to worker and '<' worker to master flows):: > SERVER example.com < REPLICATE events 53 @@ -24,7 +24,7 @@ would be (where '>' indicates master->worker and '<' worker->master flows):: The example shows the server accepting a new connection and sending its identity with the ``SERVER`` command, followed by the client asking to subscribe to the ``events`` stream from the token ``53``. The server then periodically sends ``RDATA`` -commands which have the format ``RDATA ```, where the +commands which have the format ``RDATA ``, where the format of ```` is defined by the individual streams. Error reporting happens by either the client or server sending an `ERROR` @@ -125,7 +125,7 @@ recovers it can reconnect to the server and ask for missed messages. Reliability ~~~~~~~~~~~ -In general the replication stream should be consisdered an unreliable transport +In general the replication stream should be considered an unreliable transport since e.g. commands are not resent if the connection disappears. The exception to that are the replication streams, i.e. RDATA commands, since diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 8b4e886d4e..81c2ea7ee9 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -16,22 +16,7 @@ """This module implements the TCP replication protocol used by synapse to communicate between the master process and its workers (when they're enabled). -The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: - - > SERVER example.com - < REPLICATE events 53 - > RDATA events 54 ["$foo1:bar.com", ...] - > RDATA events 55 ["$foo4:bar.com", ...] - -The example shows the server accepting a new connection and sending its identity -with the `SERVER` command, followed by the client asking to subscribe to the -`events` stream from the token `53`. The server then periodically sends `RDATA` -commands which have the format `RDATA `, where the -format of `` is defined by the individual streams. - -Error reporting happens by either the client or server sending an `ERROR` -command, and usually the connection will be closed. +Further details can be found in docs/tcp_replication.rst Structure of the module: @@ -42,5 +27,4 @@ Structure of the module: * resource.py - the server classes that accepts and handle client connections * streams.py - the definitons of all the valid streams -Further details can be found in docs/tcp_replication.rst """ diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 07adf9412e..fada40c6ef 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -154,8 +154,8 @@ class Stream(object): True then limit is provided, otherwise it's not. Returns: - list(tuple): the first entry in the tuple is the token for that - update, and the rest of the tuple gets used to construct + Deferred(list(tuple)): the first entry in the tuple is the token for + that update, and the rest of the tuple gets used to construct a ``ROW_TYPE`` instance """ raise NotImplementedError() diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 715c8bef24..34d2f82b7f 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -139,7 +139,7 @@ class PusherStore(SQLBaseStore): """Get all the pushers that have changed between the given tokens. Returns: - list(tuple): each tuple consists of: + Deferred(list(tuple)): each tuple consists of: stream_id (str) user_id (str) app_id (str) -- cgit 1.4.1 From 9f26d3b75b6ce369663ee9db7b1241e376772af1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 16:21:21 +0100 Subject: Deduplicate new deviceinbox rows for replication --- synapse/storage/deviceinbox.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 2714519d21..0b62b493d5 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -325,23 +325,26 @@ class DeviceInboxStore(BackgroundUpdateStore): # we return. upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id, user_id" + "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" + " GROUP BY user_id" ) txn.execute(sql, (last_pos, upper_pos)) rows = txn.fetchall() sql = ( - "SELECT stream_id, destination" + "SELECT max(stream_id), destination" " FROM device_federation_outbox" " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" + " GROUP BY destination" ) txn.execute(sql, (last_pos, upper_pos)) rows.extend(txn) + # Order by ascending stream ordering + rows.sort() + return rows return self.runInteraction( -- cgit 1.4.1 From d72667fcce6f751c55ea510c964d68499cb67305 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 10:10:49 +0100 Subject: Speed up get_current_state_ids Using _simple_select_list is fairly expensive for functions that return a lot of rows and/or get called a lot. (This is because it carefully constructs a list of dicts). get_current_state_ids gets called a lot on startup and e.g. when the IRC bridge decided to send tonnes of joins/leaves (as it invalidates the cache). We therefore replace it with a custon txn function that builds up the final result dict without building up and intermediate representation. --- synapse/storage/state.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fb23f6f462..86e5fdb76b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string from synapse.storage.engines import PostgresEngine @@ -69,17 +69,24 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) - @cachedInlineCallbacks(max_entries=100000, iterable=True) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): - rows = yield self._simple_select_list( - table="current_state_events", - keyvalues={"room_id": room_id}, - retcols=["event_id", "type", "state_key"], - desc="_calculate_state_delta", + def _get_current_state_ids_txn(txn): + txn.execute( + """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? + """, + (room_id,) + ) + + return { + (r[0], r[1]): r[2] for r in txn + } + + return self.runInteraction( + "get_current_state_ids", + _get_current_state_ids_txn, ) - defer.returnValue({ - (r["type"], r["state_key"]): r["event_id"] for r in rows - }) @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): -- cgit 1.4.1 From 2a3e822f4494e42c1b118c2fa0132b3a2f13bbfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 13:47:04 +0100 Subject: Comment --- synapse/storage/state.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 86e5fdb76b..acd69944c4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -71,6 +71,15 @@ class StateStore(SQLBaseStore): @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): + """Get the current state event ids for a room based on the + current_state_events table. + + Args: + room_id (str) + + Returns: + deferred: dict of (type, state_key) -> event_id + """ def _get_current_state_ids_txn(txn): txn.execute( """SELECT type, state_key, event_id FROM current_state_events -- cgit 1.4.1 From 34840cdcef9ecdf721604934b6142040d7561816 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 09:56:54 +0100 Subject: Fix getting latest device IP for user with no devices --- synapse/storage/client_ips.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 71e5ea112f..5bed255eb2 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -90,6 +90,8 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): are (user_id, device_id) tuples. The values are also dicts, with keys giving the column names """ + if not devices: + defer.returnValue({}) res = yield self.runInteraction( "get_last_client_ip_by_device", @@ -110,6 +112,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): @classmethod def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols): + if not devices: + return [] + where_clauses = [] bindings = [] for (user_id, device_id) in devices: -- cgit 1.4.1 From b48045a8f536a503001994066552255c2f6c18ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:23:24 +0100 Subject: Don't bother with outer check for now --- synapse/storage/client_ips.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 5bed255eb2..f95921d731 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -90,9 +90,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): are (user_id, device_id) tuples. The values are also dicts, with keys giving the column names """ - if not devices: - defer.returnValue({}) - res = yield self.runInteraction( "get_last_client_ip_by_device", self._get_last_client_ip_by_device_txn, -- cgit 1.4.1 From 85657eedf8ce54acf0c78e673e58dd33e12c7f75 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:24:31 +0100 Subject: Bail on where clause instead --- synapse/storage/client_ips.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index f95921d731..b01f0046e9 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -90,6 +90,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): are (user_id, device_id) tuples. The values are also dicts, with keys giving the column names """ + res = yield self.runInteraction( "get_last_client_ip_by_device", self._get_last_client_ip_by_device_txn, @@ -109,9 +110,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): @classmethod def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols): - if not devices: - return [] - where_clauses = [] bindings = [] for (user_id, device_id) in devices: @@ -122,6 +120,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): where_clauses.append("(user_id = ? AND device_id = ?)") bindings.extend((user_id, device_id)) + if not where_clauses: + return [] + inner_select = ( "SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips " "WHERE %(where)s " -- cgit 1.4.1 From 73880268ef7184d17ec369074d1d0d72de56f33c Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:34:09 +0100 Subject: Refactor event ordering check to events store --- synapse/handlers/read_marker.py | 32 ++++---------------------------- synapse/storage/events.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 28 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 19decc2c63..c3882313e6 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -42,41 +42,17 @@ class ReadMarkerHandler(BaseHandler): """ # Get ordering for existing read marker - with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)): + with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) existing_read_marker = account_data["m.read_marker"] should_update = True - res = yield self.store._simple_select_one( - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - - if not res: - raise SynapseError(404, 'Event does not exist') - if existing_read_marker: - new_to = int(res["topological_ordering"]) - new_so = int(res["stream_ordering"]) - - # Get ordering for existing read marker - res = yield self.store._simple_select_one( - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": existing_read_marker['marker']}, - allow_none=True + should_update = yield self.store.is_event_after( + existing_read_marker['marker'], + event_id ) - existing_to = int(res["topological_ordering"]) if res else None - existing_so = int(res["stream_ordering"]) if res else None - - # Prevent updating if the existing marker is ahead in the stream - if existing_to > new_to: - should_update = False - elif existing_to == new_to and existing_so >= new_so: - should_update = False if should_update: content = { diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 64fe937bdc..3c6df5c2d2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2159,6 +2159,34 @@ class EventsStore(SQLBaseStore): ] ) + @defer.inlineCallbacks + def is_event_after(self, event_id1, event_id2): + is_after = True + + to_1, so_1 = yield self._get_event_ordering(event_id1) + to_2, so_2 = yield self._get_event_ordering(event_id2) + + # Prevent updating if the existing marker is ahead in the stream + if to_1 > to_2: + is_after = False + elif to_1 == to_2 and so_1 >= so_2: + is_after = False + + defer.returnValue(is_after) + + @defer.inlineCallbacks + def _get_event_ordering(self, event_id): + res = yield self._simple_select_one( + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + if not res: + raise SynapseError(404, "Could not find event %s" % (event_id,)) + + defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", -- cgit 1.4.1 From 7f9470906636f169f9925f8e5205dc3ffb1a057d Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 18:35:45 +0100 Subject: travis flake8.. --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3c6df5c2d2..702bd64b2e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2188,6 +2188,7 @@ class EventsStore(SQLBaseStore): defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) + AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", "forward_ex_outliers", "backward_ex_outliers", -- cgit 1.4.1 From b9557064bf6003a666b8fb6813dd3618fe9e48b4 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 14:36:20 +0100 Subject: Simplify is_event_after logic --- synapse/handlers/read_marker.py | 5 +++-- synapse/storage/events.py | 13 +++---------- 2 files changed, 6 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 800240b8a9..3f46a16b90 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -48,9 +48,10 @@ class ReadMarkerHandler(BaseHandler): should_update = True if existing_read_marker: + # Only update if the new marker is ahead in the stream should_update = yield self.store.is_event_after( - existing_read_marker['marker'], - event_id + event_id, + existing_read_marker['marker'] ) if should_update: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 702bd64b2e..221cb563d8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2161,18 +2161,11 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): - is_after = True - + """Returns True if event_id1 is after event_id2 in the stream + """ to_1, so_1 = yield self._get_event_ordering(event_id1) to_2, so_2 = yield self._get_event_ordering(event_id2) - - # Prevent updating if the existing marker is ahead in the stream - if to_1 > to_2: - is_after = False - elif to_1 == to_2 and so_1 >= so_2: - is_after = False - - defer.returnValue(is_after) + defer.returnValue(to_1 > to_2 and so_1 > so_2) @defer.inlineCallbacks def _get_event_ordering(self, event_id): -- cgit 1.4.1 From 6a70647d453ddeae9599601cc49a6b7cde5de519 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Thu, 13 Apr 2017 13:46:17 +0100 Subject: Correct logic in is_event_after --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 221cb563d8..a3790419dd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2165,7 +2165,7 @@ class EventsStore(SQLBaseStore): """ to_1, so_1 = yield self._get_event_ordering(event_id1) to_2, so_2 = yield self._get_event_ordering(event_id2) - defer.returnValue(to_1 > to_2 and so_1 > so_2) + defer.returnValue((to_1, so_1) > (to_2, so_2)) @defer.inlineCallbacks def _get_event_ordering(self, event_id): -- cgit 1.4.1 From e4f34311166cc93ad26fc12969867e1db30b4a30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Apr 2017 13:27:38 +0100 Subject: Remove unused cache --- synapse/replication/slave/storage/events.py | 3 --- synapse/storage/state.py | 7 +------ 2 files changed, 1 insertion(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 4ca1e5aa8c..ab48ff925e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -102,9 +102,6 @@ class SlavedEventStore(BaseSlavedStore): _get_state_groups_from_groups_txn = ( DataStore._get_state_groups_from_groups_txn.__func__ ) - _get_state_group_from_group = ( - StateStore.__dict__["_get_state_group_from_group"] - ) get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index acd69944c4..927a936013 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -279,12 +279,7 @@ class StateStore(SQLBaseStore): return count - @cached(num_args=2, max_entries=100000, iterable=True) - def _get_state_group_from_group(self, group, types): - raise NotImplementedError() - - @cachedList(cached_method_name="_get_state_group_from_group", - list_name="groups", num_args=2, inlineCallbacks=True) + @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ -- cgit 1.4.1 From 119cb9bbcf429321d27cbc1422049974e2ad8982 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Apr 2017 10:23:11 +0100 Subject: Reduce cache size by not storing deferreds Currently the cache descriptors store deferreds rather than raw values, this is a simple way of triggering only one database hit and sharing the result if two callers attempt to get the same value. However, there are a few caches that simply store a mapping from string to string (or int). These caches can have a large number of entries, under the assumption that each entry is small. However, the size of a deferred (specifically the size of ObservableDeferred) is signigicantly larger than that of the raw value, 2kb vs 32b. This PR therefore changes the cache descriptors to store the raw values rather than the deferreds. As a side effect cached storage function now either return a deferred or the actual value, as the cached list decriptor already does. This is fine as we always end up just yield'ing on the returned value eventually, which handles that case correctly. --- synapse/storage/receipts.py | 11 +++++++---- synapse/util/caches/descriptors.py | 39 ++++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 22 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 6b0f8c2787..efb90c3c91 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore): # Returns an ObservableDeferred res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None) - if res and res.called and user_id in res.result: - # We'd only be adding to the set, so no point invalidating if the - # user is already there - return + if res: + if isinstance(res, defer.Deferred) and res.called: + res = res.result + if user_id in res: + # We'd only be adding to the set, so no point invalidating if the + # user is already there + return self.get_users_with_read_receipts_in_room.invalidate((room_id,)) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 9d0d0be1f9..807e147657 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry -from . import DEBUG_CACHES, register_cache +from . import register_cache from twisted.internet import defer from collections import namedtuple @@ -76,7 +76,7 @@ class Cache(object): self.cache = LruCache( max_size=max_entries, keylen=keylen, cache_type=cache_type, - size_callback=(lambda d: len(d.result)) if iterable else None, + size_callback=(lambda d: len(d)) if iterable else None, ) self.name = name @@ -96,6 +96,17 @@ class Cache(object): ) def get(self, key, default=_CacheSentinel, callback=None): + """Looks the key up in the caches. + + Args: + key(tuple) + default: What is returned if key is not in the caches. If not + specified then function throws KeyError instead + callback(fn): Gets called when the entry in the cache is invalidated + + Returns: + Either a Deferred or the raw result + """ callbacks = [callback] if callback else [] val = self._pending_deferred_cache.get(key, _CacheSentinel) if val is not _CacheSentinel: @@ -137,7 +148,7 @@ class Cache(object): if self.sequence == entry.sequence: existing_entry = self._pending_deferred_cache.pop(key, None) if existing_entry is entry: - self.cache.set(key, entry.deferred, entry.callbacks) + self.cache.set(key, result, entry.callbacks) else: entry.invalidate() else: @@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase): try: cached_result_d = cache.get(cache_key, callback=invalidate_callback) - observer = cached_result_d.observe() - if DEBUG_CACHES: - @defer.inlineCallbacks - def check_result(cached_result): - actual_result = yield self.function_to_call(obj, *args, **kwargs) - if actual_result != cached_result: - logger.error( - "Stale cache entry %s%r: cached: %r, actual %r", - self.orig.__name__, cache_key, - cached_result, actual_result, - ) - raise ValueError("Stale cache entry") - defer.returnValue(cached_result) - observer.addCallback(check_result) + if isinstance(cached_result_d, ObservableDeferred): + observer = cached_result_d.observe() + else: + observer = cached_result_d except KeyError: ret = defer.maybeDeferred( @@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase): try: res = cache.get(tuple(key), callback=invalidate_callback) - if not res.has_succeeded(): + if not isinstance(res, ObservableDeferred): + results[arg] = res + elif not res.has_succeeded(): res = res.observe() res.addCallback(lambda r, arg: (arg, r), arg) cached_defers[arg] = res -- cgit 1.4.1 From 22f3d3ae769a031c42cda1fb233dd91e28e585d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Apr 2017 11:43:03 +0100 Subject: Reduce _get_state_group_for_event cache size --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 927a936013..e89001d994 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -507,7 +507,7 @@ class StateStore(SQLBaseStore): state_map = yield self.get_state_ids_for_events([event_id], types) defer.returnValue(state_map[event_id]) - @cached(num_args=2, max_entries=100000) + @cached(num_args=2, max_entries=50000) def _get_state_group_for_event(self, room_id, event_id): return self._simple_select_one_onecol( table="event_to_state_groups", -- cgit 1.4.1 From d9aa645f86db733bb0419b5f5428ba9a9c799735 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Apr 2017 14:38:51 +0100 Subject: Reduce size of joined_user cache The _get_joined_users_from_context cache stores a mapping from user_id to avatar_url and display_name. Instead of storing those in a dict, store them in a namedtuple as that uses much less memory. We also try converting the string to ascii to further reduce the size. --- synapse/push/bulk_push_rule_evaluator.py | 7 +++++-- synapse/rest/client/v1/room.py | 8 +++++++- synapse/storage/roommember.py | 22 ++++++++++++++-------- synapse/util/stringutils.py | 14 ++++++++++++++ 4 files changed, 40 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 78b095c903..503fef4261 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -87,8 +87,11 @@ class BulkPushRuleEvaluator: condition_cache = {} for uid, rules in self.rules_by_user.items(): - display_name = room_members.get(uid, {}).get("display_name", None) - if not display_name: + display_name = None + profile_info = room_members.get(uid, {}) + if profile_info: + display_name = profile_info.display_name + else: # Handle the case where we are pushing a membership event to # that user, as they might not be already joined. if event.type == EventTypes.Member and event.state_key == uid: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0bdd6b5b36..c376ab8fd7 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -406,7 +406,13 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet): users_with_profile = yield self.state.get_current_user_in_room(room_id) defer.returnValue((200, { - "joined": users_with_profile + "joined": { + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in users_with_profile.iteritems() + } })) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 367dbbbcf6..68e724ac7f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -19,6 +19,7 @@ from collections import namedtuple from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.stringutils import to_ascii from synapse.api.constants import Membership, EventTypes from synapse.types import get_domain_from_id @@ -35,6 +36,11 @@ RoomsForUser = namedtuple( ) +ProfileInfo = namedtuple( + "ProfileInfo", ("avatar_url", "display_name") +) + + _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" @@ -422,20 +428,20 @@ class RoomMemberStore(SQLBaseStore): ) users_in_room = { - row["user_id"]: { - "display_name": row["display_name"], - "avatar_url": row["avatar_url"], - } + to_ascii(row["user_id"]): ProfileInfo( + avatar_url=to_ascii(row["avatar_url"]), + display_name=to_ascii(row["display_name"]), + ) for row in rows } if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: if event.event_id in member_event_ids: - users_in_room[event.state_key] = { - "display_name": event.content.get("displayname", None), - "avatar_url": event.content.get("avatar_url", None), - } + users_in_room[to_ascii(event.state_key)] = ProfileInfo( + display_name=to_ascii(event.content.get("displayname", None)), + avatar_url=to_ascii(event.content.get("avatar_url", None)), + ) defer.returnValue(users_in_room) diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index a100f151d4..95a6168e16 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -40,3 +40,17 @@ def is_ascii(s): return False else: return True + + +def to_ascii(s): + """Converts a string to ascii if it is ascii, otherwise leave it alone. + + If given None then will return None. + """ + if s is None: + return None + + try: + return s.encode("ascii") + except UnicodeEncodeError: + return s -- cgit 1.4.1 From f144365281e0c925ea467669cbfc6253e00f10e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Apr 2017 15:18:26 +0100 Subject: Comment --- synapse/storage/roommember.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 68e724ac7f..7ad2198d96 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -36,6 +36,8 @@ RoomsForUser = namedtuple( ) +# We store this using a namedtuple so that we save about 3x space over using a +# dict. ProfileInfo = namedtuple( "ProfileInfo", ("avatar_url", "display_name") ) -- cgit 1.4.1 From f053a1409e2c0105859fe6c9af02efd49206eb91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Apr 2017 17:22:55 +0100 Subject: Make state caches cache in ascii --- synapse/storage/state.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e89001d994..a16afa8df5 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string +from synapse.util.stringutils import to_ascii from synapse.storage.engines import PostgresEngine from twisted.internet import defer @@ -89,7 +90,7 @@ class StateStore(SQLBaseStore): ) return { - (r[0], r[1]): r[2] for r in txn + (intern_string(r[0]), intern_string(r[1])): to_ascii(r[2]) for r in txn } return self.runInteraction( @@ -655,7 +656,7 @@ class StateStore(SQLBaseStore): state_dict = results[group] state_dict.update( - ((intern_string(k[0]), intern_string(k[1])), v) + ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) for k, v in group_state_dict.iteritems() ) -- cgit 1.4.1 From c84770b87724c6cd84f4127e52b5a66354b8912e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 27 Apr 2017 15:27:48 +0100 Subject: Fix bgupdate error if index already exists (#2167) When creating a new table index in the background, guard against it existing already. Fixes https://github.com/matrix-org/synapse/issues/2135. Also, make sure we restore the autocommit flag when we're done, otherwise we get more failures from other operations later on. Fixes https://github.com/matrix-org/synapse/issues/1890 (hopefully). --- synapse/storage/background_updates.py | 83 ++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 30 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 813ad59e56..d4cf0fc59b 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -228,46 +228,69 @@ class BackgroundUpdateStore(SQLBaseStore): columns (list[str]): columns/expressions to include in index """ - # if this is postgres, we add the indexes concurrently. Otherwise - # we fall back to doing it inline - if isinstance(self.database_engine, engines.PostgresEngine): - conc = True - else: - conc = False - # We don't use partial indices on SQLite as it wasn't introduced - # until 3.8, and wheezy has 3.7 - where_clause = None - - sql = ( - "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" - " %(where_clause)s" - ) % { - "conc": "CONCURRENTLY" if conc else "", - "name": index_name, - "table": table, - "columns": ", ".join(columns), - "where_clause": "WHERE " + where_clause if where_clause else "" - } - - def create_index_concurrently(conn): + def create_index_psql(conn): conn.rollback() # postgres insists on autocommit for the index conn.set_session(autocommit=True) - c = conn.cursor() - c.execute(sql) - conn.set_session(autocommit=False) - def create_index(conn): + try: + c = conn.cursor() + + # If a previous attempt to create the index was interrupted, + # we may already have a half-built index. Let's just drop it + # before trying to create it again. + + sql = "DROP INDEX IF EXISTS %s" % (index_name,) + logger.debug("[SQL] %s", sql) + c.execute(sql) + + sql = ( + "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s" + " (%(columns)s) %(where_clause)s" + ) % { + "name": index_name, + "table": table, + "columns": ", ".join(columns), + "where_clause": "WHERE " + where_clause if where_clause else "" + } + logger.debug("[SQL] %s", sql) + c.execute(sql) + finally: + conn.set_session(autocommit=False) + + def create_index_sqlite(conn): + # Sqlite doesn't support concurrent creation of indexes. + # + # We don't use partial indices on SQLite as it wasn't introduced + # until 3.8, and wheezy has 3.7 + # + # We assume that sqlite doesn't give us invalid indices; however + # we may still end up with the index existing but the + # background_updates not having been recorded if synapse got shut + # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite + # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.) + sql = ( + "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s" + " (%(columns)s)" + ) % { + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + c = conn.cursor() + logger.debug("[SQL] %s", sql) c.execute(sql) + if isinstance(self.database_engine, engines.PostgresEngine): + runner = create_index_psql + else: + runner = create_index_sqlite + @defer.inlineCallbacks def updater(progress, batch_size): logger.info("Adding index %s to %s", index_name, table) - if conc: - yield self.runWithConnection(create_index_concurrently) - else: - yield self.runWithConnection(create_index) + yield self.runWithConnection(runner) yield self._end_background_update(update_name) defer.returnValue(1) -- cgit 1.4.1 From 421fdf74609439edaaffce117436e6a6df147841 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 29 Mar 2017 16:44:21 +0100 Subject: Speed up filtering of a single event in push --- synapse/push/bulk_push_rule_evaluator.py | 27 ++++++++------------------- synapse/storage/account_data.py | 13 +++++++++++++ synapse/storage/push_rule.py | 18 ++++++++++++++++-- synapse/visibility.py | 19 ------------------- 4 files changed, 37 insertions(+), 40 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f943ff640f..cb13874ccf 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -20,7 +20,6 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes -from synapse.visibility import filter_events_for_clients_context logger = logging.getLogger(__name__) @@ -67,17 +66,6 @@ class BulkPushRuleEvaluator: def action_for_event_by_user(self, event, context): actions_by_user = {} - # None of these users can be peeking since this list of users comes - # from the set of users in the room, so we know for sure they're all - # actually in the room. - user_tuples = [ - (u, False) for u in self.rules_by_user.keys() - ] - - filtered_by_user = yield filter_events_for_clients_context( - self.store, user_tuples, [event], {event.event_id: context} - ) - room_members = yield self.store.get_joined_users_from_context( event, context ) @@ -87,6 +75,14 @@ class BulkPushRuleEvaluator: condition_cache = {} for uid, rules in self.rules_by_user.items(): + if event.sender == uid: + continue + + if not event.is_state(): + is_ignored = yield self.store.is_ignored_by(event.sender, uid) + if is_ignored: + continue + display_name = None profile_info = room_members.get(uid) if profile_info: @@ -98,13 +94,6 @@ class BulkPushRuleEvaluator: if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) - filtered = filtered_by_user[uid] - if len(filtered) == 0: - continue - - if filtered[0].sender == uid: - continue - for rule in rules: if 'enabled' in rule and not rule['enabled']: continue diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index aa84ffc2b0..ff14e54c11 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) + + @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) + def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): + ignored_account_data = yield self.get_global_account_data_by_type_for_user( + "m.ignored_user_list", ignorer_user_id, + on_invalidate=cache_context.invalidate, + ) + if not ignored_account_data: + defer.returnValue(False) + + defer.returnValue( + ignored_user_id in ignored_account_data.get("ignored_users", {}) + ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index cbec255966..10b700a9b4 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.push.baserules import list_with_base_rules +from synapse.api.constants import EventTypes from twisted.internet import defer import logging @@ -184,11 +185,23 @@ class PushRuleStore(SQLBaseStore): if uid in local_users_in_room: user_ids.add(uid) + forgotten = yield self.who_forgot_in_room( + event.room_id, on_invalidate=cache_context.invalidate, + ) + + for row in forgotten: + user_id = row["user_id"] + event_id = row["event_id"] + + mem_id = current_state_ids.get((EventTypes.Member, user_id), None) + if event_id == mem_id: + user_ids.discard(user_id) + rules_by_user = yield self.bulk_get_push_rules( user_ids, on_invalidate=cache_context.invalidate, ) - rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} + rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None} defer.returnValue(rules_by_user) @@ -398,7 +411,8 @@ class PushRuleStore(SQLBaseStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids yield self.runInteraction( - "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering + "delete_push_rule", delete_push_rule_txn, stream_id, + event_stream_ordering, ) @defer.inlineCallbacks diff --git a/synapse/visibility.py b/synapse/visibility.py index c4dd9ae2c7..5590b866ed 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -188,25 +188,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): }) -@defer.inlineCallbacks -def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context): - user_ids = set(u[0] for u in user_tuples) - event_id_to_state = {} - for event_id, context in event_id_to_context.items(): - state = yield store.get_events([ - e_id - for key, e_id in context.current_state_ids.iteritems() - if key == (EventTypes.RoomHistoryVisibility, "") - or (key[0] == EventTypes.Member and key[1] in user_ids) - ]) - event_id_to_state[event_id] = state - - res = yield filter_events_for_clients( - store, user_tuples, events, event_id_to_state - ) - defer.returnValue(res) - - @defer.inlineCallbacks def filter_events_for_client(store, user_id, events, is_peeking=False): """ -- cgit 1.4.1 From ad8b316939d59230526e60660caf9094cff62c8f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 09:50:33 +0100 Subject: We don't care about forgotten rooms --- synapse/storage/push_rule.py | 12 ------------ 1 file changed, 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 10b700a9b4..0d88f81a66 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -185,18 +185,6 @@ class PushRuleStore(SQLBaseStore): if uid in local_users_in_room: user_ids.add(uid) - forgotten = yield self.who_forgot_in_room( - event.room_id, on_invalidate=cache_context.invalidate, - ) - - for row in forgotten: - user_id = row["user_id"] - event_id = row["event_id"] - - mem_id = current_state_ids.get((EventTypes.Member, user_id), None) - if event_id == mem_id: - user_ids.discard(user_id) - rules_by_user = yield self.bulk_get_push_rules( user_ids, on_invalidate=cache_context.invalidate, ) -- cgit 1.4.1 From ab37bef83bebd7cdaeb7cfd98553d18883d09103 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2017 09:57:23 +0100 Subject: Remove unused import --- synapse/storage/push_rule.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 0d88f81a66..353a135c4e 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -16,7 +16,6 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.push.baserules import list_with_base_rules -from synapse.api.constants import EventTypes from twisted.internet import defer import logging -- cgit 1.4.1 From 7166854f4169999fee0cd40a5ed389cc684b6dc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 10:36:35 +0100 Subject: Add cache for get_current_hosts_in_room --- synapse/federation/transaction_queue.py | 6 +----- synapse/state.py | 11 ++++++++++ synapse/storage/roommember.py | 38 +++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index dee387eb7f..695f1a7375 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -24,7 +24,6 @@ from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func -from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics @@ -183,15 +182,12 @@ class TransactionQueue(object): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - users_in_room = yield self.state.get_current_user_in_room( + destinations = yield self.state.get_current_hosts_in_room( event.room_id, latest_event_ids=[ prev_id for prev_id, _ in event.prev_events ], ) - destinations = set( - get_domain_from_id(user_id) for user_id in users_in_room - ) if send_on_behalf_of is not None: # If we are sending the event on behalf of another server # then it already has the event and there is no reason to diff --git a/synapse/state.py b/synapse/state.py index f6b83d888a..f8b18a4a2d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -175,6 +175,17 @@ class StateHandler(object): ) defer.returnValue(joined_users) + @defer.inlineCallbacks + def get_current_hosts_in_room(self, room_id, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.debug("calling resolve_state_groups from get_current_user_in_room") + entry = yield self.resolve_state_groups(room_id, latest_event_ids) + joined_hosts = yield self.store.get_joined_hosts( + room_id, entry.state_id, entry.state + ) + defer.returnValue(joined_hosts) + @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7ad2198d96..1c0fa8a680 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -482,6 +482,44 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(False) + def get_joined_hosts(self, room_id, state_group, state_ids): + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + return self._get_joined_hosts( + room_id, state_group, state_ids + ) + + @cachedInlineCallbacks(num_args=3) + def _get_joined_hosts(self, room_id, state_group, current_state_ids): + # We don't use `state_group`, its there so that we can cache based + # on it. However, its important that its never None, since two current_state's + # with a state_group of None are likely to be different. + # See bulk_get_push_rules_for_room for how we work around this. + assert state_group is not None + + joined_hosts = set() + for (etype, state_key), event_id in current_state_ids.items(): + if etype == EventTypes.Member: + try: + host = get_domain_from_id(state_key) + except: + logger.warn("state_key not user_id: %s", state_key) + continue + + if host in joined_hosts: + continue + + event = yield self.get_event(event_id, allow_none=True) + if event and event.content["membership"] == Membership.JOIN: + joined_hosts.add(host) + + defer.returnValue(joined_hosts) + @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( -- cgit 1.4.1 From a2c89a225c567df53cc6d29af397547f4e9a4a2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 10:40:31 +0100 Subject: Prefill state caches --- synapse/storage/_base.py | 8 ++++---- synapse/storage/events.py | 8 ++++++-- synapse/storage/state.py | 8 ++++++++ 3 files changed, 18 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c659004e8d..58b73af7d2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ class LoggingTransaction(object): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args): + def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args)) + self.after_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ class SQLBaseStore(object): inner_func, *args, **kwargs ) finally: - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3790419dd..a8d1b93d99 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -370,6 +370,10 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): @@ -529,7 +533,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -682,7 +686,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a16afa8df5..1e1ce87e0e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,14 @@ class StateStore(SQLBaseStore): ], ) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=context.current_state_ids, + full=True, + ) + self._simple_insert_many_txn( txn, table="event_to_state_groups", -- cgit 1.4.1 From 3e5a62ecd8839fbfb56aa33b92127822a053ef6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 11:36:11 +0100 Subject: Add more granular event send metrics --- synapse/events/snapshot.py | 3 +++ synapse/handlers/message.py | 10 ++++++++-- synapse/handlers/room_member.py | 1 + synapse/rest/client/v1/room.py | 1 + synapse/storage/events.py | 16 ++++++++++++++++ tests/storage/event_injector.py | 4 ++-- tests/storage/test_events.py | 2 +- 7 files changed, 32 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 6be18880b9..e9a732ff03 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -50,6 +50,7 @@ class EventContext(object): "prev_group", "delta_ids", "prev_state_events", + "app_service", ] def __init__(self): @@ -68,3 +69,5 @@ class EventContext(object): self.delta_ids = None self.prev_state_events = None + + self.app_service = None diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 82a2ade1f6..57265c6d7d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,7 +175,8 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): + def create_event(self, requester, event_dict, token_id=None, txn_id=None, + prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -185,6 +186,7 @@ class MessageHandler(BaseHandler): Adds display names to Join membership events. Args: + requester event_dict (dict): An entire event token_id (str) txn_id (str) @@ -226,6 +228,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + requester=requester, prev_event_ids=prev_event_ids, ) @@ -319,6 +322,7 @@ class MessageHandler(BaseHandler): See self.create_event and self.send_nonmember_event. """ event, context = yield self.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id @@ -416,7 +420,7 @@ class MessageHandler(BaseHandler): @measure_func("_create_new_client_event") @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): + def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) @@ -456,6 +460,8 @@ class MessageHandler(BaseHandler): state_handler = self.state_handler context = yield state_handler.compute_event_context(builder) + if requester: + context.app_service = requester.app_service if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 28b2c80a93..ab87632d99 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler): content["kind"] = "guest" event, context = yield msg_handler.create_event( + requester, { "type": EventTypes.Member, "content": content, diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c376ab8fd7..cd388770c8 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -164,6 +164,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): else: msg_handler = self.handlers.message_handler event, context = yield msg_handler.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3790419dd..98707d40ee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.state import resolve_events from synapse.util.caches.descriptors import cached +from synapse.types import get_domain_from_id from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict @@ -49,6 +50,9 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) persist_event_counter = metrics.register_counter("persisted_events") +event_counter = metrics.register_counter( + "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] +) def encode_json(json_object): @@ -370,6 +374,18 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) + + event_counter.inc(event.type, origin_type, origin_entity) @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): diff --git a/tests/storage/event_injector.py b/tests/storage/event_injector.py index 38556da9a7..024ac15069 100644 --- a/tests/storage/event_injector.py +++ b/tests/storage/event_injector.py @@ -27,10 +27,10 @@ class EventInjector: self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def create_room(self, room): + def create_room(self, room, user): builder = self.event_builder_factory.new({ "type": EventTypes.Create, - "sender": "", + "sender": user.to_string(), "room_id": room.to_string(), "content": {}, }) diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 3762b38e37..14443b53bc 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -50,7 +50,7 @@ class EventsStoreTestCase(unittest.TestCase): # Create something to report room = RoomID.from_string("!abc123:test") user = UserID.from_string("@raccoonlover:test") - yield self.event_injector.create_room(room) + yield self.event_injector.create_room(room, user) self.base_event = yield self._get_last_stream_token() -- cgit 1.4.1 From 1827057acc2b8f821ec20538cbf781f798e34700 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 09:56:05 +0100 Subject: Comments --- synapse/storage/events.py | 6 +++--- synapse/storage/state.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a37b7bad5a..d946024c9b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -441,10 +441,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1e1ce87e0e..5d6f7dfa28 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,9 @@ class StateStore(SQLBaseStore): ], ) + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, -- cgit 1.4.1 From 2c2dcf81d03fabfe4a018b60b4b840069627b3fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 10:00:29 +0100 Subject: Update comment --- synapse/storage/state.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5d6f7dfa28..03981f5d2b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -229,7 +229,8 @@ class StateStore(SQLBaseStore): # Prefill the state group cache with this group. # It's fine to use the sequence like this as the state group map - # is immutable. + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, -- cgit 1.4.1 From 7ebf518c028088d1932262649c1042fb76f9d013 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 10:43:34 +0100 Subject: Make get_joined_users faster --- synapse/storage/roommember.py | 54 ++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7ad2198d96..46efdfb3e2 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -417,25 +417,47 @@ class RoomMemberStore(SQLBaseStore): if key[0] == EventTypes.Member ] - rows = yield self._simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=member_event_ids, - retcols=['user_id', 'display_name', 'avatar_url'], - keyvalues={ - "membership": Membership.JOIN, - }, - batch_size=500, - desc="_get_joined_users_from_context", + event_map = self._get_events_from_cache( + member_event_ids, + allow_rejected=False, ) - users_in_room = { - to_ascii(row["user_id"]): ProfileInfo( - avatar_url=to_ascii(row["avatar_url"]), - display_name=to_ascii(row["display_name"]), + missing_member_event_ids = [] + users_in_room = {} + for event_id, ev_entry in event_map.iteritems(): + if event_id: + if ev_entry.event.membership == Membership.JOIN: + users_in_room[to_ascii(ev_entry.event.state_key)] = ProfileInfo( + display_name=to_ascii( + ev_entry.event.content.get("displayname", None) + ), + avatar_url=to_ascii( + ev_entry.event.content.get("avatar_url", None) + ), + ) + else: + missing_member_event_ids.append(event_id) + + if missing_member_event_ids: + rows = yield self._simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=member_event_ids, + retcols=('user_id', 'display_name', 'avatar_url',), + keyvalues={ + "membership": Membership.JOIN, + }, + batch_size=500, + desc="_get_joined_users_from_context", ) - for row in rows - } + + users_in_room.update({ + to_ascii(row["user_id"]): ProfileInfo( + avatar_url=to_ascii(row["avatar_url"]), + display_name=to_ascii(row["display_name"]), + ) + for row in rows + }) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: -- cgit 1.4.1 From 5d8290429ce0ae37a6ef86dd5b96ca82d090ae39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 May 2017 13:43:19 +0100 Subject: Reduce size of get_users_in_room --- synapse/storage/roommember.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7ad2198d96..f2630787ba 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -147,7 +147,7 @@ class RoomMemberStore(SQLBaseStore): hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids) defer.returnValue(hosts) - @cached(max_entries=500000, iterable=True) + @cached(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): def f(txn): sql = ( @@ -160,7 +160,7 @@ class RoomMemberStore(SQLBaseStore): ) txn.execute(sql, (room_id, Membership.JOIN,)) - return [r[0] for r in txn] + return [to_ascii(r[0]) for r in txn] return self.runInteraction("get_users_in_room", f) @cached() -- cgit 1.4.1 From dfaa58f72d3455affcac58c8b604d21935183e88 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 May 2017 14:50:24 +0100 Subject: Fix comment and num args --- synapse/state.py | 2 +- synapse/storage/roommember.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/state.py b/synapse/state.py index f8b18a4a2d..02fee47f39 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -179,7 +179,7 @@ class StateHandler(object): def get_current_hosts_in_room(self, room_id, latest_event_ids=None): if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - logger.debug("calling resolve_state_groups from get_current_user_in_room") + logger.debug("calling resolve_state_groups from get_current_hosts_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) joined_hosts = yield self.store.get_joined_hosts( room_id, entry.state_id, entry.state diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 1c0fa8a680..c571da2ce4 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -494,7 +494,7 @@ class RoomMemberStore(SQLBaseStore): room_id, state_group, state_ids ) - @cachedInlineCallbacks(num_args=3) + @cachedInlineCallbacks(num_args=2) def _get_joined_hosts(self, room_id, state_group, current_state_ids): # We don't use `state_group`, its there so that we can cache based # on it. However, its important that its never None, since two current_state's -- cgit 1.4.1 From 07a07588a01a644837ccea57f2307d0450cd28d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 May 2017 14:52:28 +0100 Subject: Make caches bigger --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c571da2ce4..1963b95724 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -494,7 +494,7 @@ class RoomMemberStore(SQLBaseStore): room_id, state_group, state_ids ) - @cachedInlineCallbacks(num_args=2) + @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) def _get_joined_hosts(self, room_id, state_group, current_state_ids): # We don't use `state_group`, its there so that we can cache based # on it. However, its important that its never None, since two current_state's -- cgit 1.4.1 From 537dbadea05dfcc7fa855d06ae0f81405ed4e81f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 May 2017 14:55:28 +0100 Subject: Intern host strings --- synapse/storage/roommember.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 1963b95724..6ec8a6345d 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,6 +18,7 @@ from twisted.internet import defer from collections import namedtuple from ._base import SQLBaseStore +from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.stringutils import to_ascii @@ -516,7 +517,7 @@ class RoomMemberStore(SQLBaseStore): event = yield self.get_event(event_id, allow_none=True) if event and event.content["membership"] == Membership.JOIN: - joined_hosts.add(host) + joined_hosts.add(intern_string(host)) defer.returnValue(joined_hosts) -- cgit 1.4.1 From aa93cb9f442019dde1704a0c12b47e5b664e192a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 May 2017 14:59:28 +0100 Subject: Add comment --- synapse/storage/roommember.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 46efdfb3e2..dcb95b924a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -417,6 +417,9 @@ class RoomMemberStore(SQLBaseStore): if key[0] == EventTypes.Member ] + # We check if we have any of the member event ids in the event cache + # before we ask the DB + event_map = self._get_events_from_cache( member_event_ids, allow_rejected=False, -- cgit 1.4.1 From 587f07543fcf604da58903b22a7448e631d797ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 May 2017 15:07:27 +0100 Subject: Revert "Prefill state caches" --- synapse/storage/_base.py | 8 ++++---- synapse/storage/events.py | 16 +++++----------- synapse/storage/state.py | 12 ------------ 3 files changed, 9 insertions(+), 27 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 58b73af7d2..c659004e8d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ class LoggingTransaction(object): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args, **kwargs): + def call_after(self, callback, *args): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args, kwargs)) + self.after_callbacks.append((callback, args)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ class SQLBaseStore(object): inner_func, *args, **kwargs ) finally: - for after_callback, after_args, after_kwargs in after_callbacks: - after_callback(*after_args, **after_kwargs) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d946024c9b..98707d40ee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -374,12 +374,6 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) - - for room_id, (_, _, new_state) in current_state_for_room.iteritems(): - self.get_current_state_ids.prefill( - (room_id, ), new_state - ) - for event, context in chunk: if context.app_service: origin_type = "local" @@ -441,10 +435,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 3-tuple (to_delete, to_insert, new_state) where both are state dicts, - i.e. (type, state_key) -> event_id. `to_delete` are the entries to + 2-tuple (to_delete, to_insert) where both are state dicts, i.e. + (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. `new_state` is the full set of state. + to insert. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for @@ -551,7 +545,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert, current_state)) + defer.returnValue((to_delete, to_insert)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -704,7 +698,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert, _ = current_state_tuple + to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 03981f5d2b..a16afa8df5 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,18 +227,6 @@ class StateStore(SQLBaseStore): ], ) - # Prefill the state group cache with this group. - # It's fine to use the sequence like this as the state group map - # is immutable. (If the map wasn't immutable then this prefill could - # race with another update) - txn.call_after( - self._state_group_cache.update, - self._state_group_cache.sequence, - key=context.state_group, - value=context.current_state_ids, - full=True, - ) - self._simple_insert_many_txn( txn, table="event_to_state_groups", -- cgit 1.4.1 From cf589f2c1e68b71db80cbc2cf56f88118faf545b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 May 2017 10:17:56 +0100 Subject: Fixes --- synapse/storage/roommember.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index dcb95b924a..c63c0622dd 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -427,8 +427,9 @@ class RoomMemberStore(SQLBaseStore): missing_member_event_ids = [] users_in_room = {} - for event_id, ev_entry in event_map.iteritems(): - if event_id: + for event_id in member_event_ids: + ev_entry = event_map.get(event_id) + if ev_entry: if ev_entry.event.membership == Membership.JOIN: users_in_room[to_ascii(ev_entry.event.state_key)] = ProfileInfo( display_name=to_ascii( @@ -445,7 +446,7 @@ class RoomMemberStore(SQLBaseStore): rows = yield self._simple_select_many_batch( table="room_memberships", column="event_id", - iterable=member_event_ids, + iterable=missing_member_event_ids, retcols=('user_id', 'display_name', 'avatar_url',), keyvalues={ "membership": Membership.JOIN, -- cgit 1.4.1 From 78f306a6f70552672f8c70171b2d9d79f20f8f8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 13:07:41 +0100 Subject: Revert "Speed up filtering of a single event in push" This reverts commit 421fdf74609439edaaffce117436e6a6df147841. --- synapse/push/bulk_push_rule_evaluator.py | 27 +++++++++++++++++++-------- synapse/storage/account_data.py | 13 ------------- synapse/storage/push_rule.py | 5 ++--- synapse/visibility.py | 19 +++++++++++++++++++ 4 files changed, 40 insertions(+), 24 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index cb13874ccf..f943ff640f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -20,6 +20,7 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_clients_context logger = logging.getLogger(__name__) @@ -66,6 +67,17 @@ class BulkPushRuleEvaluator: def action_for_event_by_user(self, event, context): actions_by_user = {} + # None of these users can be peeking since this list of users comes + # from the set of users in the room, so we know for sure they're all + # actually in the room. + user_tuples = [ + (u, False) for u in self.rules_by_user.keys() + ] + + filtered_by_user = yield filter_events_for_clients_context( + self.store, user_tuples, [event], {event.event_id: context} + ) + room_members = yield self.store.get_joined_users_from_context( event, context ) @@ -75,14 +87,6 @@ class BulkPushRuleEvaluator: condition_cache = {} for uid, rules in self.rules_by_user.items(): - if event.sender == uid: - continue - - if not event.is_state(): - is_ignored = yield self.store.is_ignored_by(event.sender, uid) - if is_ignored: - continue - display_name = None profile_info = room_members.get(uid) if profile_info: @@ -94,6 +98,13 @@ class BulkPushRuleEvaluator: if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) + filtered = filtered_by_user[uid] + if len(filtered) == 0: + continue + + if filtered[0].sender == uid: + continue + for rule in rules: if 'enabled' in rule and not rule['enabled']: continue diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index ff14e54c11..aa84ffc2b0 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,16 +308,3 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) - - @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) - def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): - ignored_account_data = yield self.get_global_account_data_by_type_for_user( - "m.ignored_user_list", ignorer_user_id, - on_invalidate=cache_context.invalidate, - ) - if not ignored_account_data: - defer.returnValue(False) - - defer.returnValue( - ignored_user_id in ignored_account_data.get("ignored_users", {}) - ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 353a135c4e..cbec255966 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -188,7 +188,7 @@ class PushRuleStore(SQLBaseStore): user_ids, on_invalidate=cache_context.invalidate, ) - rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None} + rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} defer.returnValue(rules_by_user) @@ -398,8 +398,7 @@ class PushRuleStore(SQLBaseStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids yield self.runInteraction( - "delete_push_rule", delete_push_rule_txn, stream_id, - event_stream_ordering, + "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering ) @defer.inlineCallbacks diff --git a/synapse/visibility.py b/synapse/visibility.py index 5590b866ed..c4dd9ae2c7 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -188,6 +188,25 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): }) +@defer.inlineCallbacks +def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context): + user_ids = set(u[0] for u in user_tuples) + event_id_to_state = {} + for event_id, context in event_id_to_context.items(): + state = yield store.get_events([ + e_id + for key, e_id in context.current_state_ids.iteritems() + if key == (EventTypes.RoomHistoryVisibility, "") + or (key[0] == EventTypes.Member and key[1] in user_ids) + ]) + event_id_to_state[event_id] = state + + res = yield filter_events_for_clients( + store, user_tuples, events, event_id_to_state + ) + defer.returnValue(res) + + @defer.inlineCallbacks def filter_events_for_client(store, user_id, events, is_peeking=False): """ -- cgit 1.4.1 From fe7c1b969c5b0f51b6fe86e78e96350224fd0fb1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 13:07:43 +0100 Subject: Revert "We don't care about forgotten rooms" This reverts commit ad8b316939d59230526e60660caf9094cff62c8f. --- synapse/storage/push_rule.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index cbec255966..5467ba51c7 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -184,6 +184,18 @@ class PushRuleStore(SQLBaseStore): if uid in local_users_in_room: user_ids.add(uid) + forgotten = yield self.who_forgot_in_room( + event.room_id, on_invalidate=cache_context.invalidate, + ) + + for row in forgotten: + user_id = row["user_id"] + event_id = row["event_id"] + + mem_id = current_state_ids.get((EventTypes.Member, user_id), None) + if event_id == mem_id: + user_ids.discard(user_id) + rules_by_user = yield self.bulk_get_push_rules( user_ids, on_invalidate=cache_context.invalidate, ) -- cgit 1.4.1 From e0f20e9425f5fa0aecf0b8bf5b58ce72c2363d8b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 13:07:43 +0100 Subject: Revert "Remove unused import" This reverts commit ab37bef83bebd7cdaeb7cfd98553d18883d09103. --- synapse/storage/push_rule.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 5467ba51c7..0a819d32c5 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.push.baserules import list_with_base_rules +from synapse.api.constants import EventTypes from twisted.internet import defer import logging -- cgit 1.4.1 From dcabef952c0c75ec756a364fc225a72eea391e1b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 15:09:19 +0100 Subject: Increase client_ip cache size --- synapse/storage/client_ips.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index b01f0046e9..747d2df622 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -33,6 +33,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, + max_entries=5000, ) super(ClientIpStore, self).__init__(hs) -- cgit 1.4.1 From 738ccf61c01df04e1aef521ea7d1ae2844784214 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 15:32:18 +0100 Subject: Cache check to see if device exists --- synapse/storage/devices.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index c8d5f5ba8b..fc87c92182 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -18,7 +18,7 @@ import ujson as json from twisted.internet import defer from synapse.api.errors import StoreError -from ._base import SQLBaseStore +from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks @@ -29,6 +29,12 @@ class DeviceStore(SQLBaseStore): def __init__(self, hs): super(DeviceStore, self).__init__(hs) + self.device_id_exists_cache = Cache( + name="device_id_exists", + keylen=2, + max_entries=10000, + ) + self._clock.looping_call( self._prune_old_outbound_device_pokes, 60 * 60 * 1000 ) @@ -54,6 +60,10 @@ class DeviceStore(SQLBaseStore): defer.Deferred: boolean whether the device was inserted or an existing device existed with that ID. """ + key = (user_id, device_id) + if self.device_id_exists_cache.get(key, None): + defer.returnValue(False) + try: inserted = yield self._simple_insert( "devices", @@ -65,6 +75,7 @@ class DeviceStore(SQLBaseStore): desc="store_device", or_ignore=True, ) + self.device_id_exists_cache.prefill(key, True) defer.returnValue(inserted) except Exception as e: logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" -- cgit 1.4.1 From fc6d4974a60a0d47492f5c5c8dff45abbf9abe03 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 15:33:57 +0100 Subject: Comment --- synapse/storage/devices.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index fc87c92182..6727861eb5 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -29,6 +29,8 @@ class DeviceStore(SQLBaseStore): def __init__(self, hs): super(DeviceStore, self).__init__(hs) + # Map of (user_id, device_id) -> bool. If there is an entry that implies + # the device exists. self.device_id_exists_cache = Cache( name="device_id_exists", keylen=2, -- cgit 1.4.1 From 8571f864d2fc20986341b7e9d6e18c3e061e48e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 15:34:27 +0100 Subject: Cache one time key counts --- synapse/storage/end_to_end_keys.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 7cbc1470fd..c96dae352d 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -15,6 +15,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.util.caches.descriptors import cached from canonicaljson import encode_canonical_json import ujson as json @@ -177,10 +178,14 @@ class EndToEndKeyStore(SQLBaseStore): for algorithm, key_id, json_bytes in new_keys ], ) + txn.call_after( + self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) + ) yield self.runInteraction( "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys ) + @cached(max_entries=10000) def count_e2e_one_time_keys(self, user_id, device_id): """ Count the number of one time keys the server has for a device Returns: @@ -225,6 +230,9 @@ class EndToEndKeyStore(SQLBaseStore): ) for user_id, device_id, algorithm, key_id in delete: txn.execute(sql, (user_id, device_id, algorithm, key_id)) + txn.call_after( + self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) + ) return result return self.runInteraction( "claim_e2e_one_time_keys", _claim_e2e_one_time_keys @@ -242,3 +250,4 @@ class EndToEndKeyStore(SQLBaseStore): keyvalues={"user_id": user_id, "device_id": device_id}, desc="delete_e2e_one_time_keys_by_device" ) + self.count_e2e_one_time_keys.invalidate((user_id, device_id,)) -- cgit 1.4.1 From 94e6ad71f5445e014f3c9f6c260ab664635c7b59 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 15:55:59 +0100 Subject: Invalidate cache on device deletion --- synapse/storage/devices.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 6727861eb5..75c30abc28 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -115,12 +115,14 @@ class DeviceStore(SQLBaseStore): Returns: defer.Deferred """ - return self._simple_delete_one( + self._simple_delete_one( table="devices", keyvalues={"user_id": user_id, "device_id": device_id}, desc="delete_device", ) + self.device_id_exists_cache.invalidate((user_id, device_id)) + def delete_devices(self, user_id, device_ids): """Deletes several devices. @@ -130,13 +132,15 @@ class DeviceStore(SQLBaseStore): Returns: defer.Deferred """ - return self._simple_delete_many( + self._simple_delete_many( table="devices", column="device_id", iterable=device_ids, keyvalues={"user_id": user_id}, desc="delete_devices", ) + for device_id in device_ids: + self.device_id_exists_cache.invalidate((user_id, device_id)) def update_device(self, user_id, device_id, new_display_name=None): """Update a device. -- cgit 1.4.1 From ffad4fe35be3baba5b2fffaa4e9b31f3008d09af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 16:06:17 +0100 Subject: Don't update event cache hit ratio from get_joined_users Otherwise the hit ration of plain get_events gets completely skewed by calls to get_joined_users* functions. --- synapse/storage/events.py | 13 +++++++++++-- synapse/storage/roommember.py | 4 ++++ synapse/util/caches/descriptors.py | 9 ++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 98707d40ee..d944984d61 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1343,11 +1343,20 @@ class EventsStore(SQLBaseStore): def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) - def _get_events_from_cache(self, events, allow_rejected): + def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): + """ + Args: + events (list(str)): list of event_ids to fetch + allow_rejected (bool): Whether to teturn events that were rejected + update_metrics (bool): Whether to update the cache hit ratio metrics + """ event_map = {} for event_id in events: - ret = self._get_event_cache.get((event_id,), None) + ret = self._get_event_cache.get( + (event_id,), None, + update_metrics=update_metrics, + ) if not ret: continue diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ad3c9b06d9..2fa20bd87c 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -421,9 +421,13 @@ class RoomMemberStore(SQLBaseStore): # We check if we have any of the member event ids in the event cache # before we ask the DB + # We don't update the event cache hit ratio as it completely throws off + # the hit ratio counts. After all, we don't populate the cache if we + # miss it here event_map = self._get_events_from_cache( member_event_ids, allow_rejected=False, + update_metrics=False, ) missing_member_event_ids = [] diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index aa182eeac7..48dcbafeef 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -96,7 +96,7 @@ class Cache(object): "Cache objects can only be accessed from the main thread" ) - def get(self, key, default=_CacheSentinel, callback=None): + def get(self, key, default=_CacheSentinel, callback=None, update_metrics=True): """Looks the key up in the caches. Args: @@ -104,6 +104,7 @@ class Cache(object): default: What is returned if key is not in the caches. If not specified then function throws KeyError instead callback(fn): Gets called when the entry in the cache is invalidated + update_metrics (bool): whether to update the cache hit rate metrics Returns: Either a Deferred or the raw result @@ -113,7 +114,8 @@ class Cache(object): if val is not _CacheSentinel: if val.sequence == self.sequence: val.callbacks.update(callbacks) - self.metrics.inc_hits() + if update_metrics: + self.metrics.inc_hits() return val.deferred val = self.cache.get(key, _CacheSentinel, callbacks=callbacks) @@ -121,7 +123,8 @@ class Cache(object): self.metrics.inc_hits() return val - self.metrics.inc_misses() + if update_metrics: + self.metrics.inc_misses() if default is _CacheSentinel: raise KeyError() -- cgit 1.4.1 From 6a12998a83791137db0b7988646cfc4bff572427 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 16:10:51 +0100 Subject: Add missing yields --- synapse/storage/devices.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 75c30abc28..d9936c88bb 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -106,6 +106,7 @@ class DeviceStore(SQLBaseStore): desc="get_device", ) + @defer.inlineCallbacks def delete_device(self, user_id, device_id): """Delete a device. @@ -115,7 +116,7 @@ class DeviceStore(SQLBaseStore): Returns: defer.Deferred """ - self._simple_delete_one( + yield self._simple_delete_one( table="devices", keyvalues={"user_id": user_id, "device_id": device_id}, desc="delete_device", @@ -123,6 +124,7 @@ class DeviceStore(SQLBaseStore): self.device_id_exists_cache.invalidate((user_id, device_id)) + @defer.inlineCallbacks def delete_devices(self, user_id, device_ids): """Deletes several devices. @@ -132,7 +134,7 @@ class DeviceStore(SQLBaseStore): Returns: defer.Deferred """ - self._simple_delete_many( + yield self._simple_delete_many( table="devices", column="device_id", iterable=device_ids, -- cgit 1.4.1 From 093f7e47ccf318181c262c79bb60ffd3b83edaee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 May 2017 16:13:51 +0100 Subject: Expand docstring a bit --- synapse/storage/events.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d944984d61..2ab44ceaa7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1344,11 +1344,17 @@ class EventsStore(SQLBaseStore): self._get_event_cache.invalidate((event_id,)) def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): - """ + """Fetch events from the caches + Args: events (list(str)): list of event_ids to fetch allow_rejected (bool): Whether to teturn events that were rejected update_metrics (bool): Whether to update the cache hit ratio metrics + + Returns: + dict of event_id -> _EventCacheEntry for each event_id in cache. If + allow_rejected is `False` then there will still be an entry but it + will be `None` """ event_map = {} -- cgit 1.4.1 From a7e9d8762ddbcea0fcb7ab87c2c4f4e0d91e639a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 May 2017 18:26:54 +0100 Subject: Allow clients to upload one-time-keys with new sigs When a client retries a key upload, don't give an error if the signature has changed (but the key is the same). Fixes https://github.com/vector-im/riot-android/issues/1208, hopefully. --- synapse/handlers/e2e_keys.py | 70 ++++++++++++++++++++++----- synapse/storage/end_to_end_keys.py | 47 ++++++++++-------- tests/handlers/test_e2e_keys.py | 98 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 33 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index c2b38d72a9..9d994a8f71 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -288,19 +288,8 @@ class E2eKeysHandler(object): one_time_keys = keys.get("one_time_keys", None) if one_time_keys: - logger.info( - "Adding %d one_time_keys for device %r for user %r at %d", - len(one_time_keys), device_id, user_id, time_now - ) - key_list = [] - for key_id, key_json in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append(( - algorithm, key_id, encode_canonical_json(key_json) - )) - - yield self.store.add_e2e_one_time_keys( - user_id, device_id, time_now, key_list + yield self._upload_one_time_keys_for_user( + user_id, device_id, time_now, one_time_keys, ) # the device should have been registered already, but it may have been @@ -313,3 +302,58 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue({"one_time_key_counts": result}) + + @defer.inlineCallbacks + def _upload_one_time_keys_for_user(self, user_id, device_id, time_now, + one_time_keys): + logger.info( + "Adding one_time_keys %r for device %r for user %r at %d", + one_time_keys.keys(), device_id, user_id, time_now, + ) + + # make a list of (alg, id, key) tuples + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, key_obj + )) + + # First we check if we have already persisted any of the keys. + existing_key_map = yield self.store.get_e2e_one_time_keys( + user_id, device_id, [k_id for _, k_id, _ in key_list] + ) + + new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. + for algorithm, key_id, key in key_list: + ex_json = existing_key_map.get((algorithm, key_id), None) + if ex_json: + if not _one_time_keys_match(ex_json, key): + raise SynapseError( + 400, + ("One time key %s:%s already exists. " + "Old key: %s; new key: %r") % + (algorithm, key_id, ex_json, key) + ) + else: + new_keys.append((algorithm, key_id, encode_canonical_json(key))) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, new_keys + ) + + +def _one_time_keys_match(old_key_json, new_key): + old_key = json.loads(old_key_json) + + # if either is a string rather than an object, they must match exactly + if not isinstance(old_key, dict) or not isinstance(new_key, dict): + return old_key == new_key + + # otherwise, we strip off the 'signatures' if any, because it's legitimate + # for different upload attempts to have different signatures. + old_key.pop("signatures", None) + new_key_copy = dict(new_key) + new_key_copy.pop("signatures", None) + + return old_key == new_key_copy diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index c96dae352d..e00f31da2b 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -14,7 +14,6 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.errors import SynapseError from synapse.util.caches.descriptors import cached from canonicaljson import encode_canonical_json @@ -124,18 +123,24 @@ class EndToEndKeyStore(SQLBaseStore): return result @defer.inlineCallbacks - def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): - """Insert some new one time keys for a device. + def get_e2e_one_time_keys(self, user_id, device_id, key_ids): + """Retrieve a number of one-time keys for a user - Checks if any of the keys are already inserted, if they are then check - if they match. If they don't then we raise an error. + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + key_ids(list[str]): list of key ids (excluding algorithm) to + retrieve + + Returns: + deferred resolving to Dict[(str, str), str]: map from (algorithm, + key_id) to json string for key """ - # First we check if we have already persisted any of the keys. rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", - iterable=[key_id for _, key_id, _ in key_list], + iterable=key_ids, retcols=("algorithm", "key_id", "key_json",), keyvalues={ "user_id": user_id, @@ -144,20 +149,22 @@ class EndToEndKeyStore(SQLBaseStore): desc="add_e2e_one_time_keys_check", ) - existing_key_map = { + defer.returnValue({ (row["algorithm"], row["key_id"]): row["key_json"] for row in rows - } - - new_keys = [] # Keys that we need to insert - for algorithm, key_id, json_bytes in key_list: - ex_bytes = existing_key_map.get((algorithm, key_id), None) - if ex_bytes: - if json_bytes != ex_bytes: - raise SynapseError( - 400, "One time key with key_id %r already exists" % (key_id,) - ) - else: - new_keys.append((algorithm, key_id, json_bytes)) + }) + + @defer.inlineCallbacks + def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): + """Insert some new one time keys for a device. Errors if any of the + keys already exist. + + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + time_now(long): insertion time to record (ms since epoch) + new_keys(iterable[(str, str, str)]: keys to add - each a tuple of + (algorithm, key_id, key json) + """ def _add_e2e_one_time_keys(txn): # We are protected from race between lookup and insertion due to diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 878a54dc34..f10a80a8e1 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -14,6 +14,7 @@ # limitations under the License. import mock +from synapse.api import errors from twisted.internet import defer import synapse.api.errors @@ -44,3 +45,100 @@ class E2eKeysHandlerTestCase(unittest.TestCase): local_user = "@boris:" + self.hs.hostname res = yield self.handler.query_local_devices({local_user: None}) self.assertDictEqual(res, {local_user: {}}) + + @defer.inlineCallbacks + def test_reupload_one_time_keys(self): + """we should be able to re-upload the same keys""" + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + # we should be able to change the signature without a problem + keys["alg2:k2"]["signatures"]["k1"] = "sig2" + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + @defer.inlineCallbacks + def test_change_one_time_keys(self): + """attempts to change one-time-keys should be rejected""" + + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}}, + ) + self.fail("No error when changing string key") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}, + ) + self.fail("No error when replacing dict key with string") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": {"alg1:k1": {"key": "key"}} + }, + ) + self.fail("No error when replacing string key with dict") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": { + "alg2:k2": { + "key": "key3", + "signatures": {"k1": "sig1"}, + } + }, + }, + ) + self.fail("No error when replacing dict key") + except errors.SynapseError: + pass -- cgit 1.4.1 From b990b2fce5bb96c1c8eebcb0525ffcd011a22556 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 May 2017 11:05:43 +0100 Subject: Add per user ratelimiting overrides --- synapse/handlers/_base.py | 34 ++++++++++++++++++++++--- synapse/handlers/message.py | 16 +++--------- synapse/handlers/profile.py | 2 +- synapse/handlers/room.py | 2 +- synapse/storage/room.py | 36 ++++++++++++++++++++++++++- synapse/storage/schema/delta/41/ratelimit.sql | 22 ++++++++++++++++ 6 files changed, 93 insertions(+), 19 deletions(-) create mode 100644 synapse/storage/schema/delta/41/ratelimit.sql (limited to 'synapse/storage') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e83adc8339..faa5609c0c 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,7 +53,20 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() - def ratelimit(self, requester): + @defer.inlineCallbacks + def ratelimit(self, requester, update=True): + """Ratelimits requests. + + Args: + requester (Requester) + update (bool): Whether to record that a request is being processed. + Set to False when doing multiple checks for one request (e.g. + to check up front if we would reject the request), and set to + True for the last call for a given request. + + Raises: + LimitExceededError if the request should be ratelimited + """ time_now = self.clock.time() user_id = requester.user.to_string() @@ -67,10 +80,25 @@ class BaseHandler(object): if requester.app_service and not requester.app_service.is_rate_limited(): return + # Check if there is a per user override in the DB. + override = yield self.store.get_ratelimit_for_user(user_id) + if override: + # If overriden with a null Hz then ratelimiting has been entirely + # disabled for the user + if not override.messages_per_second: + return + + messages_per_second = override.messages_per_second + burst_count = override.burst_count + else: + messages_per_second = self.hs.config.rc_messages_per_second + burst_count = self.hs.config.rc_message_burst_count + allowed, time_allowed = self.ratelimiter.send_message( user_id, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, + msg_rate_hz=messages_per_second, + burst_count=burst_count, + update=update, ) if not allowed: raise LimitExceededError( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 57265c6d7d..196925edad 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError +from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -254,17 +254,7 @@ class MessageHandler(BaseHandler): # We check here if we are currently being rate limited, so that we # don't do unnecessary work. We check again just before we actually # send the event. - time_now = self.clock.time() - allowed, time_allowed = self.ratelimiter.send_message( - event.sender, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, - update=False, - ) - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now)), - ) + yield self.ratelimit(requester, update=False) user = UserID.from_string(event.sender) @@ -499,7 +489,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - self.ratelimit(requester) + yield self.ratelimit(requester) try: yield self.auth.check_from_context(event, context) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9bf638f818..7abee98dea 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): return - self.ratelimit(requester) + yield self.ratelimit(requester) room_ids = yield self.store.get_rooms_for_user( user.to_string(), diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 99cb7db0db..d2a0d6520a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -75,7 +75,7 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - self.ratelimit(requester) + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: diff --git a/synapse/storage/room.py b/synapse/storage/room.py index e4c56cc175..5d543652bb 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from ._base import SQLBaseStore from .engines import PostgresEngine, Sqlite3Engine @@ -33,6 +33,11 @@ OpsLevel = collections.namedtuple( ("ban_level", "kick_level", "redact_level",) ) +RatelimitOverride = collections.namedtuple( + "RatelimitOverride", + ("messages_per_second", "burst_count",) +) + class RoomStore(SQLBaseStore): @@ -473,3 +478,32 @@ class RoomStore(SQLBaseStore): return self.runInteraction( "get_all_new_public_rooms", get_all_new_public_rooms ) + + @cachedInlineCallbacks(max_entries=10000) + def get_ratelimit_for_user(self, user_id): + """Check if there are any overrides for ratelimiting for the given + user + + Args: + user_id (str) + + Returns: + RatelimitOverride if there is an override, else None. If the contents + of RatelimitOverride are None or 0 then ratelimitng has been + disabled for that user entirely. + """ + row = yield self._simple_select_one( + table="ratelimit_override", + keyvalues={"user_id": user_id}, + retcols=("messages_per_second", "burst_count"), + allow_none=True, + desc="get_ratelimit_for_user", + ) + + if row: + defer.returnValue(RatelimitOverride( + messages_per_second=row["messages_per_second"], + burst_count=row["burst_count"], + )) + else: + defer.returnValue(None) diff --git a/synapse/storage/schema/delta/41/ratelimit.sql b/synapse/storage/schema/delta/41/ratelimit.sql new file mode 100644 index 0000000000..a194bf0238 --- /dev/null +++ b/synapse/storage/schema/delta/41/ratelimit.sql @@ -0,0 +1,22 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE ratelimit_override ( + user_id TEXT NOT NULL, + messages_per_second BIGINT, + burst_count BIGINT +); + +CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override(user_id); -- cgit 1.4.1 From b64d312421976162a8d41246f11652b5003bb66f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 10 May 2017 17:46:41 +0100 Subject: add some logging to purge_history --- synapse/storage/events.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2ab44ceaa7..512828cf34 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2033,6 +2033,8 @@ class EventsStore(SQLBaseStore): for event_id, state_key in event_rows: txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + logger.debug("[purge] Finding new backward extremities") + # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( @@ -2045,6 +2047,8 @@ class EventsStore(SQLBaseStore): ) new_backwards_extrems = txn.fetchall() + logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems) + txn.execute( "DELETE FROM event_backward_extremities WHERE room_id = ?", (room_id,) @@ -2059,6 +2063,8 @@ class EventsStore(SQLBaseStore): ] ) + logger.debug("[purge] finding redundant state groups") + # Get all state groups that are only referenced by events that are # to be deleted. txn.execute( @@ -2076,6 +2082,10 @@ class EventsStore(SQLBaseStore): state_rows = txn.fetchall() state_groups_to_delete = [sg for sg, in state_rows] + logger.debug( + "[purge] finding state groups which depend on redundant state groups" + ) + # Now we get all the state groups that rely on these state groups new_state_edges = [] chunks = [ @@ -2096,6 +2106,8 @@ class EventsStore(SQLBaseStore): # Now we turn the state groups that reference to-be-deleted state groups # to non delta versions. for new_state_edge in new_state_edges: + logger.debug("[purge] de-delta-ing remaining state group %s", + new_state_edge) curr_state = self._get_state_groups_from_groups_txn( txn, [new_state_edge], types=None ) @@ -2132,6 +2144,7 @@ class EventsStore(SQLBaseStore): ], ) + logger.debug("[purge] removing redundant state groups") txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", state_rows @@ -2140,12 +2153,15 @@ class EventsStore(SQLBaseStore): "DELETE FROM state_groups WHERE id = ?", state_rows ) + # Delete all non-state + logger.debug("[purge] removing events from event_to_state_groups") txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", [(event_id,) for event_id, _ in event_rows] ) + logger.debug("[purge] updating room_depth") txn.execute( "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", (topological_ordering, room_id,) @@ -2171,16 +2187,15 @@ class EventsStore(SQLBaseStore): "event_signatures", "rejections", ): + logger.debug("[purge] removing non-state events from %s", table) + txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), to_delete ) - txn.executemany( - "DELETE FROM events WHERE event_id = ?", - to_delete - ) # Mark all state and own events as outliers + logger.debug("[purge] marking events as outliers") txn.executemany( "UPDATE events SET outlier = ?" " WHERE event_id = ?", @@ -2190,6 +2205,8 @@ class EventsStore(SQLBaseStore): ] ) + logger.debug("[purge] done") + @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream -- cgit 1.4.1 From 8e345ce46532974aac08c15cf4c90924ec4496d5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 10 May 2017 18:17:41 +0100 Subject: Don't de-delta state groups we're about to delete --- synapse/storage/events.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 512828cf34..2a37e6f1a8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2080,19 +2080,14 @@ class EventsStore(SQLBaseStore): ) state_rows = txn.fetchall() - state_groups_to_delete = [sg for sg, in state_rows] - - logger.debug( - "[purge] finding state groups which depend on redundant state groups" - ) + state_groups_to_delete = set([sg for sg, in state_rows]) # Now we get all the state groups that rely on these state groups + logger.debug("[purge] finding state groups which depend on redundant" + " state groups") new_state_edges = [] - chunks = [ - state_groups_to_delete[i:i + 100] - for i in xrange(0, len(state_groups_to_delete), 100) - ] - for chunk in chunks: + for i in xrange(0, len(state_rows), 100): + chunk = [sg for sg, in state_rows[i:i + 100]] rows = self._simple_select_many_txn( txn, table="state_group_edges", @@ -2101,7 +2096,10 @@ class EventsStore(SQLBaseStore): retcols=["state_group"], keyvalues={}, ) - new_state_edges.extend(row["state_group"] for row in rows) + new_state_edges.extend( + row["state_group"] for row in rows + if row["state_group"] not in state_groups_to_delete + ) # Now we turn the state groups that reference to-be-deleted state groups # to non delta versions. -- cgit 1.4.1 From dc026bb16ff552e9424be217ec5c64104c8b193f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 11 May 2017 10:56:12 +0100 Subject: Tidy purge code and add some comments Try to make this clearer with more comments and some variable renames --- synapse/storage/events.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2a37e6f1a8..dbd63078c6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2080,14 +2080,19 @@ class EventsStore(SQLBaseStore): ) state_rows = txn.fetchall() + + # make a set of the redundant state groups, so that we can look them up + # efficiently state_groups_to_delete = set([sg for sg, in state_rows]) # Now we get all the state groups that rely on these state groups logger.debug("[purge] finding state groups which depend on redundant" " state groups") - new_state_edges = [] + remaining_state_groups = [] for i in xrange(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] + # look for state groups whose prev_state_group is one we are about + # to delete rows = self._simple_select_many_txn( txn, table="state_group_edges", @@ -2096,26 +2101,28 @@ class EventsStore(SQLBaseStore): retcols=["state_group"], keyvalues={}, ) - new_state_edges.extend( + remaining_state_groups.extend( row["state_group"] for row in rows + + # exclude state groups we are about to delete: no point in + # updating them if row["state_group"] not in state_groups_to_delete ) - # Now we turn the state groups that reference to-be-deleted state groups - # to non delta versions. - for new_state_edge in new_state_edges: - logger.debug("[purge] de-delta-ing remaining state group %s", - new_state_edge) + # Now we turn the state groups that reference to-be-deleted state + # groups to non delta versions. + for sg in remaining_state_groups: + logger.debug("[purge] de-delta-ing remaining state group %s", sg) curr_state = self._get_state_groups_from_groups_txn( - txn, [new_state_edge], types=None + txn, [sg], types=None ) - curr_state = curr_state[new_state_edge] + curr_state = curr_state[sg] self._simple_delete_txn( txn, table="state_groups_state", keyvalues={ - "state_group": new_state_edge, + "state_group": sg, } ) @@ -2123,7 +2130,7 @@ class EventsStore(SQLBaseStore): txn, table="state_group_edges", keyvalues={ - "state_group": new_state_edge, + "state_group": sg, } ) @@ -2132,7 +2139,7 @@ class EventsStore(SQLBaseStore): table="state_groups_state", values=[ { - "state_group": new_state_edge, + "state_group": sg, "room_id": room_id, "type": key[0], "state_key": key[1], -- cgit 1.4.1 From baafb85ba461b7d5073de94422fed0c43a417f46 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 11 May 2017 11:57:02 +0100 Subject: Add an index to event_search - to make the purge API quicker --- synapse/storage/background_updates.py | 10 +++++++--- synapse/storage/events.py | 11 +++++++++++ .../schema/delta/41/event_search_event_id_idx.sql | 17 +++++++++++++++++ 3 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 synapse/storage/schema/delta/41/event_search_event_id_idx.sql (limited to 'synapse/storage') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index d4cf0fc59b..12a8b8259c 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -210,7 +210,8 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers[update_name] = update_handler def register_background_index_update(self, update_name, index_name, - table, columns, where_clause=None): + table, columns, where_clause=None, + unique=False): """Helper for store classes to do a background index addition To use: @@ -245,9 +246,11 @@ class BackgroundUpdateStore(SQLBaseStore): c.execute(sql) sql = ( - "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s" + "CREATE %(unique)s INDEX CONCURRENTLY %(name)s" + " ON %(table)s" " (%(columns)s) %(where_clause)s" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), @@ -270,9 +273,10 @@ class BackgroundUpdateStore(SQLBaseStore): # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.) sql = ( - "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s" + "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s" " (%(columns)s)" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dbd63078c6..1fae1aeacf 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -207,6 +207,17 @@ class EventsStore(SQLBaseStore): where_clause="contains_url = true AND outlier = false", ) + # an event_id index on event_search is useful for the purge_history + # api. Plus it means we get to enforce some integrity with a UNIQUE + # clause + self.register_background_index_update( + "event_search_event_id_idx", + index_name="event_search_event_id_idx", + table="event_search", + columns=["event_id"], + unique=True, + ) + self._event_persist_queue = _EventPeristenceQueue() def persist_events(self, events_and_contexts, backfilled=False): diff --git a/synapse/storage/schema/delta/41/event_search_event_id_idx.sql b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql new file mode 100644 index 0000000000..5d9cfecf36 --- /dev/null +++ b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_event_id_idx', '{}'); -- cgit 1.4.1 From 114f2909479b4396f3da8cff651990f075b4bfba Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 11 May 2017 12:06:28 +0100 Subject: Add more logging for purging Log the number of events we will be deleting at info. --- synapse/storage/events.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1fae1aeacf..627e91a522 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2033,6 +2033,8 @@ class EventsStore(SQLBaseStore): 400, "topological_ordering is greater than forward extremeties" ) + logger.debug("[purge] looking for events to delete") + txn.execute( "SELECT event_id, state_key FROM events" " LEFT JOIN state_events USING (room_id, event_id)" @@ -2041,6 +2043,14 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + to_delete = [ + (event_id,) for event_id, state_key in event_rows + if state_key is None and not self.hs.is_mine_id(event_id) + ] + logger.info( + "[purge] found %i events before cutoff, of which %i are remote" + " non-state events to delete", len(event_rows), len(to_delete)) + for event_id, state_key in event_rows: txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) @@ -2091,6 +2101,7 @@ class EventsStore(SQLBaseStore): ) state_rows = txn.fetchall() + logger.debug("[purge] found %i redundant state groups", len(state_rows)) # make a set of the redundant state groups, so that we can look them up # efficiently @@ -2184,10 +2195,6 @@ class EventsStore(SQLBaseStore): ) # Delete all remote non-state events - to_delete = [ - (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) - ] for table in ( "events", "event_json", @@ -2203,7 +2210,7 @@ class EventsStore(SQLBaseStore): "event_signatures", "rejections", ): - logger.debug("[purge] removing non-state events from %s", table) + logger.debug("[purge] removing remote non-state events from %s", table) txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -2211,7 +2218,7 @@ class EventsStore(SQLBaseStore): ) # Mark all state and own events as outliers - logger.debug("[purge] marking events as outliers") + logger.debug("[purge] marking remaining events as outliers") txn.executemany( "UPDATE events SET outlier = ?" " WHERE event_id = ?", @@ -2221,7 +2228,7 @@ class EventsStore(SQLBaseStore): ] ) - logger.debug("[purge] done") + logger.info("[purge] done") @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): -- cgit 1.4.1 From 34194aaff7c86d57c6dabfb016b7597d999b2001 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 11 May 2017 12:46:55 +0100 Subject: Don't create event_search index on sqlite ... because the table is virtual --- synapse/storage/background_updates.py | 13 ++++++++++--- synapse/storage/events.py | 1 + 2 files changed, 11 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 12a8b8259c..7157fb1dfb 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -211,7 +211,8 @@ class BackgroundUpdateStore(SQLBaseStore): def register_background_index_update(self, update_name, index_name, table, columns, where_clause=None, - unique=False): + unique=False, + psql_only=False): """Helper for store classes to do a background index addition To use: @@ -227,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore): index_name (str): name of index to add table (str): table to add index to columns (list[str]): columns/expressions to include in index + unique (bool): true to make a UNIQUE index + psql_only: true to only create this index on psql databases (useful + for virtual sqlite tables) """ def create_index_psql(conn): @@ -288,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore): if isinstance(self.database_engine, engines.PostgresEngine): runner = create_index_psql + elif psql_only: + runner = None else: runner = create_index_sqlite @defer.inlineCallbacks def updater(progress, batch_size): - logger.info("Adding index %s to %s", index_name, table) - yield self.runWithConnection(runner) + if runner is not None: + logger.info("Adding index %s to %s", index_name, table) + yield self.runWithConnection(runner) yield self._end_background_update(update_name) defer.returnValue(1) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 627e91a522..ea6879c619 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -216,6 +216,7 @@ class EventsStore(SQLBaseStore): table="event_search", columns=["event_id"], unique=True, + psql_only=True, ) self._event_persist_queue = _EventPeristenceQueue() -- cgit 1.4.1 From ff3d810ea8e84a48508a08e6246c7d70739c94ea Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 11 May 2017 12:48:50 +0100 Subject: Add a comment to old delta --- synapse/storage/schema/delta/37/remove_auth_idx.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py index 784f3b348f..20ad8bd5a6 100644 --- a/synapse/storage/schema/delta/37/remove_auth_idx.py +++ b/synapse/storage/schema/delta/37/remove_auth_idx.py @@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref; -- and is used incredibly rarely. DROP INDEX IF EXISTS events_order_topo_stream_room; +-- an equivalent index to this actually gets re-created in delta 41, because it +-- turned out that deleting it wasn't a great plan :/. In any case, let's +-- delete it here, and delta 41 will create a new one with an added UNIQUE +-- constraint DROP INDEX IF EXISTS event_search_ev_idx; """ -- cgit 1.4.1 From bfbc907cec96ce9a64730930f63ed400c1aa3b5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 10:40:31 +0100 Subject: Prefill state caches --- synapse/storage/_base.py | 8 ++++---- synapse/storage/events.py | 10 ++++++++-- synapse/storage/state.py | 8 ++++++++ 3 files changed, 20 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c659004e8d..58b73af7d2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ class LoggingTransaction(object): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args): + def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args)) + self.after_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ class SQLBaseStore(object): inner_func, *args, **kwargs ) finally: - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dbd63078c6..0dffafd90d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -374,6 +374,7 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: if context.app_service: origin_type = "local" @@ -387,6 +388,11 @@ class EventsStore(SQLBaseStore): event_counter.inc(event.type, origin_type, origin_entity) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) + @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): """Calculates the new forward extremeties for a room given events to @@ -545,7 +551,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -698,7 +704,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a16afa8df5..1e1ce87e0e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,14 @@ class StateStore(SQLBaseStore): ], ) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=context.current_state_ids, + full=True, + ) + self._simple_insert_many_txn( txn, table="event_to_state_groups", -- cgit 1.4.1 From e0d2f6d5b02dd208bc55434b5c2d386827486e9f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 11:36:11 +0100 Subject: Add more granular event send metrics --- synapse/storage/events.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0dffafd90d..36574f78b8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -374,6 +374,18 @@ class EventsStore(SQLBaseStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) + + event_counter.inc(event.type, origin_type, origin_entity) for event, context in chunk: if context.app_service: -- cgit 1.4.1 From 871605f4e20cce3f093b2eae0f3d2ad7fb43a640 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 09:56:05 +0100 Subject: Comments --- synapse/storage/events.py | 6 +++--- synapse/storage/state.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 36574f78b8..5db7ec1622 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -453,10 +453,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1e1ce87e0e..5d6f7dfa28 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,9 @@ class StateStore(SQLBaseStore): ], ) + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, -- cgit 1.4.1 From e4435b014e50a10ad89c201d6f91b6be35a9b02f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 10:00:29 +0100 Subject: Update comment --- synapse/storage/state.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5d6f7dfa28..03981f5d2b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -229,7 +229,8 @@ class StateStore(SQLBaseStore): # Prefill the state group cache with this group. # It's fine to use the sequence like this as the state group map - # is immutable. + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, -- cgit 1.4.1 From 608b5a6317ce3797ff279f6d1a8a39f475b55736 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 12:55:29 +0100 Subject: Take a copy before prefilling, as it may be a frozendict --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 03981f5d2b..85acf2ad1e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -235,7 +235,7 @@ class StateStore(SQLBaseStore): self._state_group_cache.update, self._state_group_cache.sequence, key=context.state_group, - value=context.current_state_ids, + value=dict(context.current_state_ids), full=True, ) -- cgit 1.4.1 From 331570ea6f97d570cf2774cd0700eb588e9fb1d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 15:33:07 +0100 Subject: Remove spurious merge artifacts --- synapse/storage/events.py | 13 ------------- 1 file changed, 13 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5db7ec1622..12dd74daa3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -387,19 +387,6 @@ class EventsStore(SQLBaseStore): event_counter.inc(event.type, origin_type, origin_entity) - for event, context in chunk: - if context.app_service: - origin_type = "local" - origin_entity = context.app_service.id - elif self.hs.is_mine_id(event.sender): - origin_type = "local" - origin_entity = "*client*" - else: - origin_type = "remote" - origin_entity = get_domain_from_id(event.sender) - - event_counter.inc(event.type, origin_type, origin_entity) - for room_id, (_, _, new_state) in current_state_for_room.iteritems(): self.get_current_state_ids.prefill( (room_id, ), new_state -- cgit 1.4.1 From 13f540ef1b94e6173bdd4f2d84d90e0948cf5bf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 14:07:24 +0100 Subject: Speed up get_joined_hosts --- synapse/handlers/room_member.py | 3 ++- synapse/storage/roommember.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ab87632d99..1ca88517a2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -739,10 +739,11 @@ class RoomMemberHandler(BaseHandler): if len(current_state_ids) == 1 and create_event_id: defer.returnValue(self.hs.is_mine_id(create_event_id)) - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): continue + event_id = current_state_ids[(etype, state_key)] event = yield self.store.get_event(event_id, allow_none=True) if not event: continue diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 2fa20bd87c..404f3583eb 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -534,7 +534,7 @@ class RoomMemberStore(SQLBaseStore): assert state_group is not None joined_hosts = set() - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype == EventTypes.Member: try: host = get_domain_from_id(state_key) @@ -545,6 +545,7 @@ class RoomMemberStore(SQLBaseStore): if host in joined_hosts: continue + event_id = current_state_ids[(etype, state_key)] event = yield self.get_event(event_id, allow_none=True) if event and event.content["membership"] == Membership.JOIN: joined_hosts.add(intern_string(host)) -- cgit 1.4.1 From ad53fc3cf49492fca55cd775f6ff6b2cd353f588 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 13:40:01 +0100 Subject: Short circuit when we have delta ids --- synapse/storage/roommember.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 404f3583eb..a5df1d46b4 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -387,7 +387,9 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_users_from_context( - event.room_id, state_group, context.current_state_ids, event=event, + event.room_id, state_group, context.current_state_ids, + event=event, + context=context, ) def get_joined_users_from_state(self, room_id, state_group, state_ids): @@ -405,18 +407,33 @@ class RoomMemberStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, max_entries=100000) def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, - cache_context, event=None): + cache_context, event=None, context=None): # We don't use `state_group`, it's there so that we can cache based # on it. However, it's important that it's never None, since two current_states # with a state_group of None are likely to be different. # See bulk_get_push_rules_for_room for how we work around this. assert state_group is not None + users_in_room = {} member_event_ids = [ e_id for key, e_id in current_state_ids.iteritems() if key[0] == EventTypes.Member ] + if context is not None: + if context.prev_group and context.delta_ids: + prev_res = self._get_joined_users_from_context.cache.get( + (room_id, context.prev_group), None + ) + if prev_res and isinstance(prev_res, dict): + users_in_room = dict(prev_res) + member_event_ids = [ + e_id + for key, e_id in context.delta_ids.iteritems() + if key[0] == EventTypes.Member + ] + for etype, state_key in context.delta_ids: + users_in_room.pop(state_key, None) # We check if we have any of the member event ids in the event cache # before we ask the DB @@ -431,7 +448,6 @@ class RoomMemberStore(SQLBaseStore): ) missing_member_event_ids = [] - users_in_room = {} for event_id in member_event_ids: ev_entry = event_map.get(event_id) if ev_entry: -- cgit 1.4.1 From 85e8092ccab2b7a479b71225fc24e69a2f58f980 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 May 2017 10:03:09 +0100 Subject: Comment --- synapse/storage/roommember.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index a5df1d46b4..0829ae5bee 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -420,7 +420,12 @@ class RoomMemberStore(SQLBaseStore): for key, e_id in current_state_ids.iteritems() if key[0] == EventTypes.Member ] + if context is not None: + # If we have a context with a delta from a previous state group, + # check if we also have the result from the previous group in cache. + # If we do then we can reuse that result and simply update it with + # any membership changes in `delta_ids` if context.prev_group and context.delta_ids: prev_res = self._get_joined_users_from_context.cache.get( (room_id, context.prev_group), None -- cgit 1.4.1