From 16cb9a71b8b46604d49944f0b9c316687becca93 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 30 Sep 2019 09:38:41 +0100 Subject: Drop unused tables (#6115) These tables are unused since #5893 (as amended by #6047), so we can now drop them. Fixes #6048. --- .../schema/delta/56/drop_unused_event_tables.sql | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 synapse/storage/schema/delta/56/drop_unused_event_tables.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql new file mode 100644 index 0000000000..9f09922c67 --- /dev/null +++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- these tables are never used. +DROP TABLE IF EXISTS room_names; +DROP TABLE IF EXISTS topics; +DROP TABLE IF EXISTS history_visibility; +DROP TABLE IF EXISTS guest_access; -- cgit 1.4.1 From 9267741a5f7732d7d16f8445edc68bc68b730601 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Sep 2019 11:58:36 +0100 Subject: Fix `devices_last_seen` background update. Fixes #6134. --- synapse/storage/client_ips.py | 46 +++++++++++++++++++++++++++++++------ synapse/storage/engines/postgres.py | 7 ++++++ synapse/storage/engines/sqlite.py | 8 +++++++ 3 files changed, 54 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 539584288d..bb135166ce 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -463,14 +463,46 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): last_device_id = progress.get("last_device_id", "") def _devices_last_seen_update_txn(txn): + # This consists of two queries: + # + # 1. The sub-query searches for the next N devices and joins + # against user_ips to find the max last_seen associated with + # that device. + # 2. The outer query then joins again against user_ips on + # user/device/last_seen. This *should* hopefully only + # return one row, but if it does return more than one then + # we'll just end up updating the same device row multiple + # times, which is fine. + + if self.database_engine.supports_tuple_comparison: + where_clause = "(user_id, device_id) > (?, ?)" + where_args = [last_user_id, last_device_id] + else: + # We explicitly do a `user_id >= ? AND (...)` here to ensure + # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)` + # makes it hard for query optimiser to tell that it can use the + # index on user_id + where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)" + where_args = [last_user_id, last_user_id, last_device_id] + sql = """ - SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices - INNER JOIN user_ips AS u USING (user_id, device_id) - WHERE user_id > ? OR (user_id = ? AND device_id > ?) - ORDER BY user_id ASC, device_id ASC - LIMIT ? - """ - txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size)) + SELECT + last_seen, ip, user_agent, user_id, device_id + FROM ( + SELECT + user_id, device_id, MAX(u.last_seen) AS last_seen + FROM devices + INNER JOIN user_ips AS u USING (user_id, device_id) + WHERE %(where_clause)s + GROUP BY user_id, device_id + ORDER BY user_id ASC, device_id ASC + LIMIT ? + ) c + INNER JOIN user_ips AS u USING (user_id, device_id, last_seen) + """ % { + "where_clause": where_clause + } + txn.execute(sql, where_args + [batch_size]) rows = txn.fetchall() if not rows: diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 289b6bc281..601617b21e 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -72,6 +72,13 @@ class PostgresEngine(object): """ return True + @property + def supports_tuple_comparison(self): + """ + Do we support comparing tuples, i.e. `(a, b) > (c, d)`? + """ + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index e9b9caa49a..ac92109366 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -38,6 +38,14 @@ class Sqlite3Engine(object): """ return self.module.sqlite_version_info >= (3, 24, 0) + @property + def supports_tuple_comparison(self): + """ + Do we support comparing tuples, i.e. `(a, b) > (c, d)`? This requires + SQLite 3.15+. + """ + return self.module.sqlite_version_info >= (3, 15, 0) + def check_database(self, txn): pass -- cgit 1.4.1 From a27fb7d5cac3cacc55a1c778f02d074d4115eea6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Oct 2019 11:05:48 +0100 Subject: Don't repeatedly attempt to censor events we don't have. Currently we don't set `have_censored` column if we don't have the target event of a redaction, which means we repeatedly attempt to censor the same non-existant event. When we persist non-redacted events we unset the `have_censored` column for any redactions that target said event. --- synapse/storage/events.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ddf7ab6479..5d56ceeab3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1389,6 +1389,22 @@ class EventsStore( ], ) + for event, _ in events_and_contexts: + if not event.internal_metadata.is_redacted(): + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + self._simple_update_txn( + txn, + table="redactions", + keyvalues={ + "redacts": event.event_id, + }, + updatevalues={ + "have_censored": False, + } + ) + def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were rejected @@ -1589,7 +1605,7 @@ class EventsStore( sql = """ SELECT redact_event.event_id, redacts FROM redactions INNER JOIN events AS redact_event USING (event_id) - INNER JOIN events AS original_event ON ( + LEFT JOIN events AS original_event ON ( redact_event.room_id = original_event.room_id AND redacts = original_event.event_id ) -- cgit 1.4.1 From 898dde981b41a4dfb79b5830f17e6eb9871ef762 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Oct 2019 13:23:34 +0100 Subject: Add received_ts column to redactions. This will allow us to efficiently search for uncensored redactions in the DB before a given time. --- synapse/storage/events.py | 20 +++---- synapse/storage/events_bg_updates.py | 61 ++++++++++++++++++++++ .../storage/schema/delta/56/redaction_censor2.sql | 20 +++++++ 3 files changed, 92 insertions(+), 9 deletions(-) create mode 100644 synapse/storage/schema/delta/56/redaction_censor2.sql (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5d56ceeab3..3104815f1a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1397,12 +1397,8 @@ class EventsStore( self._simple_update_txn( txn, table="redactions", - keyvalues={ - "redacts": event.event_id, - }, - updatevalues={ - "have_censored": False, - } + keyvalues={"redacts": event.event_id}, + updatevalues={"have_censored": False}, ) def _store_rejected_events_txn(self, txn, events_and_contexts): @@ -1568,9 +1564,15 @@ class EventsStore( def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) - txn.execute( - "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts), + + self._simple_insert_txn( + txn, + table="redactions", + values={ + "event_id": event.event_id, + "redacts": event.redacts, + "received_ts": self._clock.time_msec(), + }, ) @defer.inlineCallbacks diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py index 6587f31e2b..5717baf48c 100644 --- a/synapse/storage/events_bg_updates.py +++ b/synapse/storage/events_bg_updates.py @@ -67,6 +67,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update ) + self.register_background_update_handler( + "redactions_received_ts", self._redactions_received_ts + ) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] @@ -397,3 +401,60 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): ) return num_handled + + @defer.inlineCallbacks + def _redactions_received_ts(self, progress, batch_size): + """Handles filling out the `received_ts` column in redactions. + """ + last_event_id = progress.get("last_event_id", "") + + def _redactions_received_ts_txn(txn): + # Fetch the set of event IDs that we want to update + sql = """ + SELECT event_id FROM redactions + WHERE event_id > ? + ORDER BY event_id ASC + LIMIT ? + """ + + txn.execute(sql, (last_event_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + upper_event_id, = rows[-1] + + # Update the redactions with the received_ts. + # + # Note: Not all events have an associated received_ts, so we + # fallback to using origin_server_ts. If we for some reason don't + # have an origin_server_ts, lets just use the current timestamp. + # + # We don't want to leave it null, as then we'll never try and + # censor those redactions. + sql = """ + UPDATE redactions + SET received_ts = ( + SELECT COALESCE(received_ts, origin_server_ts, ?) FROM events + WHERE events.event_id = redactions.event_id + ) + WHERE ? <= event_id AND event_id <= ? + """ + + txn.execute(sql, (self._clock.time_msec(), last_event_id, upper_event_id)) + + self._background_update_progress_txn( + txn, "redactions_received_ts", {"last_event_id": upper_event_id} + ) + + return len(rows) + + count = yield self.runInteraction( + "_redactions_received_ts", _redactions_received_ts_txn + ) + + if not count: + yield self._end_background_update("redactions_received_ts") + + return count diff --git a/synapse/storage/schema/delta/56/redaction_censor2.sql b/synapse/storage/schema/delta/56/redaction_censor2.sql new file mode 100644 index 0000000000..77a5eca499 --- /dev/null +++ b/synapse/storage/schema/delta/56/redaction_censor2.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE redactions ADD COLUMN received_ts BIGINT; +CREATE INDEX redactions_have_censored_ts ON redactions(received_ts) WHERE not have_censored; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('redactions_received_ts', '{}'); -- cgit 1.4.1 From 5e8387af9e771ae42c7c8c4dc186000d862d3787 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 1 Oct 2019 13:28:41 +0100 Subject: Use `received_ts` to find uncensored redacted events Joining against `events` and ordering by `stream_ordering` is inefficient as it forced scanning the entirety of the redactions table. This isn't the case if we use `redactions.received_ts` column as we can then use an index. --- synapse/storage/events.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3104815f1a..2e485c8644 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1589,36 +1589,29 @@ class EventsStore( if self.hs.config.redaction_retention_period is None: return - max_pos = yield self.find_first_stream_ordering_after_ts( - self._clock.time_msec() - self.hs.config.redaction_retention_period - ) + before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period # We fetch all redactions that: # 1. point to an event we have, - # 2. has a stream ordering from before the cut off, and + # 2. has a received_ts from before the cut off, and # 3. we haven't yet censored. # # This is limited to 100 events to ensure that we don't try and do too # much at once. We'll get called again so this should eventually catch # up. - # - # We use the range [-max_pos, max_pos] to handle backfilled events, - # which are given negative stream ordering. sql = """ - SELECT redact_event.event_id, redacts FROM redactions - INNER JOIN events AS redact_event USING (event_id) + SELECT redactions.event_id, redacts FROM redactions LEFT JOIN events AS original_event ON ( - redact_event.room_id = original_event.room_id - AND redacts = original_event.event_id + redacts = original_event.event_id ) WHERE NOT have_censored - AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ? - ORDER BY redact_event.stream_ordering ASC + AND redactions.received_ts <= ? + ORDER BY redactions.received_ts ASC LIMIT ? """ rows = yield self._execute( - "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 + "_censor_redactions_fetch", None, sql, before_ts, 100 ) updates = [] -- cgit 1.4.1 From ce7a3e7e27ba4215ed86112d2967b0acca4cfb77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 10:14:01 +0100 Subject: Fix fetching censored redactions from DB Fetching a censored redactions caused an exception due to the code expecting redactions to have a `redact` key, which redacted redactions don't have. --- synapse/storage/events_worker.py | 14 ++++++++++++++ tests/storage/test_redaction.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index c6fa7f82fd..57ce0304e9 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -238,6 +238,20 @@ class EventsWorkerStore(SQLBaseStore): # we have to recheck auth now. if not allow_rejected and entry.event.type == EventTypes.Redaction: + if not hasattr(entry.event, "redacts"): + # A redacted redaction doesn't have a `redacts` key, in + # which case lets just withhold the event. + # + # Note: Most of the time if the redactions has been + # redacted we still have the un-redacted event in the DB + # and so we'll still see the `redacts` key. However, this + # isn't always true e.g. if we have censored the event. + logger.debug( + "Withholding redaction event %s as we don't have redacts key", + event_id, + ) + continue + redacted_event_id = entry.event.redacts event_map = yield self._get_events_from_cache_or_db([redacted_event_id]) original_event_entry = event_map.get(redacted_event_id) diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index deecfad9fb..427d3c49c5 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -118,6 +118,8 @@ class RedactionTestCase(unittest.HomeserverTestCase): self.get_success(self.store.persist_event(event, context)) + return event + def test_redact(self): self.get_success( self.inject_room_member(self.room1, self.u_alice, Membership.JOIN) @@ -361,3 +363,37 @@ class RedactionTestCase(unittest.HomeserverTestCase): ) self.assert_dict({"content": {}}, json.loads(event_json)) + + def test_redact_redaction(self): + """Tests that we can redact a redaction and can fetch it again. + """ + + self.get_success( + self.inject_room_member(self.room1, self.u_alice, Membership.JOIN) + ) + + msg_event = self.get_success(self.inject_message(self.room1, self.u_alice, "t")) + + first_redact_event = self.get_success( + self.inject_redaction( + self.room1, msg_event.event_id, self.u_alice, "Redacting message" + ) + ) + + self.get_success( + self.inject_redaction( + self.room1, + first_redact_event.event_id, + self.u_alice, + "Redacting redaction", + ) + ) + + # Now lets jump to the future where we have censored the redaction event + # in the DB. + self.reactor.advance(60 * 60 * 24 * 31) + + # We just want to check that fetching the event doesn't raise an exception. + self.get_success( + self.store.get_event(first_redact_event.event_id, allow_none=True) + ) -- cgit 1.4.1 From f44f1d2e8374b7250a8a68cf3a49e6d1ac63b0fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 10:36:27 +0100 Subject: Fix errors storing large retry intervals. We have set the max retry interval to a value larger than a postgres or sqlite int can hold, which caused exceptions when updating the destinations table. To fix postgres we need to change the column to a bigint, and for sqlite we lower the max interval to 2**62 (which is still incredibly long). --- .../56/destinations_retry_interval_type.sql.postgres | 18 ++++++++++++++++++ synapse/util/retryutils.py | 2 +- tests/storage/test_transactions.py | 11 +++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres new file mode 100644 index 0000000000..b9bbb18a91 --- /dev/null +++ b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres @@ -0,0 +1,18 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We want to store large retry intervals so we upgrade the column from INT +-- to BIGINT. We don't need to do this on SQLite. +ALTER TABLE destinations ALTER retry_interval SET DATA TYPE BIGINT; diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index a5f2fbef5c..af69587196 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -29,7 +29,7 @@ MIN_RETRY_INTERVAL = 10 * 60 * 1000 RETRY_MULTIPLIER = 5 # a cap on the backoff. (Essentially none) -MAX_RETRY_INTERVAL = 2 ** 63 +MAX_RETRY_INTERVAL = 2 ** 62 class NotRetryingDestination(Exception): diff --git a/tests/storage/test_transactions.py b/tests/storage/test_transactions.py index a771d5af29..8e817e2c7f 100644 --- a/tests/storage/test_transactions.py +++ b/tests/storage/test_transactions.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.retryutils import MAX_RETRY_INTERVAL + from tests.unittest import HomeserverTestCase @@ -45,3 +47,12 @@ class TransactionStoreTestCase(HomeserverTestCase): """ d = self.store.set_destination_retry_timings("example.com", 1000, 50, 100) self.get_success(d) + + def test_large_destination_retry(self): + d = self.store.set_destination_retry_timings( + "example.com", MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL + ) + self.get_success(d) + + d = self.store.get_destination_retry_timings("example.com") + self.get_success(d) -- cgit 1.4.1 From d69fd53f74f693e0ec756eec479f3d51d93fd2aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 11:21:52 +0100 Subject: Bound find_next_generated_user_id DB query. We can easily bound the set of user IDs we pull out of the DB, so lets do that. --- synapse/storage/registration.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 241a7be51e..1a859352b6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -493,7 +493,9 @@ class RegistrationWorkerStore(SQLBaseStore): """ def _find_next_generated_user_id(txn): - txn.execute("SELECT name FROM users") + # We bound between '@1' and '@a' to avoid pulling the entire table + # out. + txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'") regex = re.compile(r"^@(\d+):") -- cgit 1.4.1 From 2a1470cd05558d4a3bc69a0bc5e8969ba8631426 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 2 Oct 2019 12:04:22 +0100 Subject: Fix yields and copy instead of move push rules on room upgrade (#6144) Copy push rules during a room upgrade from the old room to the new room, instead of deleting them from the old room. For instance, we've defined upgrading of a room multiple times to be possible, and push rules won't be transferred on the second upgrade if they're deleted during the first. Also fix some missing yields that probably broke things quite a bit. --- changelog.d/6144.bugfix | 1 + synapse/handlers/room_member.py | 4 ++-- synapse/storage/push_rule.py | 16 ++++++---------- 3 files changed, 9 insertions(+), 12 deletions(-) create mode 100644 changelog.d/6144.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/6144.bugfix b/changelog.d/6144.bugfix new file mode 100644 index 0000000000..eee63961e4 --- /dev/null +++ b/changelog.d/6144.bugfix @@ -0,0 +1 @@ +Prevent user push rules being deleted from a room when it is upgraded. \ No newline at end of file diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 8abdb1b6e6..95a244d86c 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -216,8 +216,8 @@ class RoomMemberHandler(object): self.copy_room_tags_and_direct_to_room( predecessor["room_id"], room_id, user_id ) - # Move over old push rules - self.store.move_push_rules_from_room_to_room_for_user( + # Copy over push rules + yield self.store.copy_push_rules_from_room_to_room_for_user( predecessor["room_id"], room_id, user_id ) elif event.membership == Membership.LEAVE: diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index a6517c4cf3..c4e24edff2 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -183,8 +183,8 @@ class PushRulesWorkerStore( return results @defer.inlineCallbacks - def move_push_rule_from_room_to_room(self, new_room_id, user_id, rule): - """Move a single push rule from one room to another for a specific user. + def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): + """Copy a single push rule from one room to another for a specific user. Args: new_room_id (str): ID of the new room. @@ -209,14 +209,11 @@ class PushRulesWorkerStore( actions=rule["actions"], ) - # Delete push rule for the old room - yield self.delete_push_rule(user_id, rule["rule_id"]) - @defer.inlineCallbacks - def move_push_rules_from_room_to_room_for_user( + def copy_push_rules_from_room_to_room_for_user( self, old_room_id, new_room_id, user_id ): - """Move all of the push rules from one room to another for a specific + """Copy all of the push rules from one room to another for a specific user. Args: @@ -227,15 +224,14 @@ class PushRulesWorkerStore( # Retrieve push rules for this user user_push_rules = yield self.get_push_rules_for_user(user_id) - # Get rules relating to the old room, move them to the new room, then - # delete them from the old room + # Get rules relating to the old room and copy them to the new room for rule in user_push_rules: conditions = rule.get("conditions", []) if any( (c.get("key") == "room_id" and c.get("pattern") == old_room_id) for c in conditions ): - self.move_push_rule_from_room_to_room(new_room_id, user_id, rule) + yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) @defer.inlineCallbacks def bulk_get_push_rules_for_room(self, event, context): -- cgit 1.4.1 From a5166e4d5febc0e03ba9da9db99127a797a0bc4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 14:08:35 +0100 Subject: Land improved room list based on room stats (#6019) Use room_stats and room_state for room directory search --- changelog.d/6019.misc | 1 + synapse/federation/transport/server.py | 8 + synapse/handlers/room_list.py | 323 ++++++--------------- synapse/rest/client/v1/room.py | 8 + synapse/storage/room.py | 228 ++++++++++----- .../schema/delta/56/public_room_list_idx.sql | 16 + tests/handlers/test_roomlist.py | 39 --- 7 files changed, 273 insertions(+), 350 deletions(-) create mode 100644 changelog.d/6019.misc create mode 100644 synapse/storage/schema/delta/56/public_room_list_idx.sql delete mode 100644 tests/handlers/test_roomlist.py (limited to 'synapse/storage') diff --git a/changelog.d/6019.misc b/changelog.d/6019.misc new file mode 100644 index 0000000000..dfee73c28f --- /dev/null +++ b/changelog.d/6019.misc @@ -0,0 +1 @@ +Improve performance of the public room list directory. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 7f8a16e355..0f16f21c2d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -765,6 +765,10 @@ class PublicRoomList(BaseFederationServlet): else: network_tuple = ThirdPartyInstanceID(None, None) + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + data = await maybeDeferred( self.handler.get_local_public_room_list, limit, @@ -800,6 +804,10 @@ class PublicRoomList(BaseFederationServlet): if search_filter is None: logger.warning("Nonefilter") + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + data = await self.handler.get_local_public_room_list( limit=limit, since_token=since_token, diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index a7e55f00e5..4e1cc5460f 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -16,8 +16,7 @@ import logging from collections import namedtuple -from six import PY3, iteritems -from six.moves import range +from six import iteritems import msgpack from unpaddedbase64 import decode_base64, encode_base64 @@ -27,7 +26,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, HttpResponseException from synapse.types import ThirdPartyInstanceID -from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache @@ -37,7 +35,6 @@ logger = logging.getLogger(__name__) REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 - # This is used to indicate we should only return rooms published to the main list. EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) @@ -72,6 +69,8 @@ class RoomListHandler(BaseHandler): This can be (None, None) to indicate the main list, or a particular appservice and network id to use an appservice specific one. Setting to None returns all public rooms across all lists. + from_federation (bool): true iff the request comes from the federation + API """ if not self.enable_room_list_search: return defer.succeed({"chunk": [], "total_room_count_estimate": 0}) @@ -133,239 +132,109 @@ class RoomListHandler(BaseHandler): from_federation (bool): Whether this request originated from a federating server or a client. Used for room filtering. timeout (int|None): Amount of seconds to wait for a response before - timing out. + timing out. TODO """ - if since_token and since_token != "END": - since_token = RoomListNextBatch.from_token(since_token) - else: - since_token = None - rooms_to_order_value = {} - rooms_to_num_joined = {} + # Pagination tokens work by storing the room ID sent in the last batch, + # plus the direction (forwards or backwards). Next batch tokens always + # go forwards, prev batch tokens always go backwards. - newly_visible = [] - newly_unpublished = [] if since_token: - stream_token = since_token.stream_ordering - current_public_id = yield self.store.get_current_public_room_stream_id() - public_room_stream_id = since_token.public_room_stream_id - newly_visible, newly_unpublished = yield self.store.get_public_room_changes( - public_room_stream_id, current_public_id, network_tuple=network_tuple - ) - else: - stream_token = yield self.store.get_room_max_stream_ordering() - public_room_stream_id = yield self.store.get_current_public_room_stream_id() - - room_ids = yield self.store.get_public_room_ids_at_stream_id( - public_room_stream_id, network_tuple=network_tuple - ) - - # We want to return rooms in a particular order: the number of joined - # users. We then arbitrarily use the room_id as a tie breaker. - - @defer.inlineCallbacks - def get_order_for_room(room_id): - # Most of the rooms won't have changed between the since token and - # now (especially if the since token is "now"). So, we can ask what - # the current users are in a room (that will hit a cache) and then - # check if the room has changed since the since token. (We have to - # do it in that order to avoid races). - # If things have changed then fall back to getting the current state - # at the since token. - joined_users = yield self.store.get_users_in_room(room_id) - if self.store.has_room_changed_since(room_id, stream_token): - latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_token - ) - - if not latest_event_ids: - return + batch_token = RoomListNextBatch.from_token(since_token) - joined_users = yield self.state_handler.get_current_users_in_room( - room_id, latest_event_ids - ) - - num_joined_users = len(joined_users) - rooms_to_num_joined[room_id] = num_joined_users + last_room_id = batch_token.last_room_id + forwards = batch_token.direction_is_forward + else: + batch_token = None - if num_joined_users == 0: - return + last_room_id = None + forwards = True - # We want larger rooms to be first, hence negating num_joined_users - rooms_to_order_value[room_id] = (-num_joined_users, room_id) + # we request one more than wanted to see if there are more pages to come + probing_limit = limit + 1 if limit is not None else None - logger.info( - "Getting ordering for %i rooms since %s", len(room_ids), stream_token + results = yield self.store.get_largest_public_rooms( + network_tuple, + search_filter, + probing_limit, + last_room_id=last_room_id, + forwards=forwards, + ignore_non_federatable=from_federation, ) - yield concurrently_execute(get_order_for_room, room_ids, 10) - sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) - sorted_rooms = [room_id for room_id, _ in sorted_entries] + def build_room_entry(room): + entry = { + "room_id": room["room_id"], + "name": room["name"], + "topic": room["topic"], + "canonical_alias": room["canonical_alias"], + "num_joined_members": room["joined_members"], + "avatar_url": room["avatar"], + "world_readable": room["history_visibility"] == "world_readable", + "guest_can_join": room["guest_access"] == "can_join", + } - # `sorted_rooms` should now be a list of all public room ids that is - # stable across pagination. Therefore, we can use indices into this - # list as our pagination tokens. + # Filter out Nones – rather omit the field altogether + return {k: v for k, v in entry.items() if v is not None} - # Filter out rooms that we don't want to return - rooms_to_scan = [ - r - for r in sorted_rooms - if r not in newly_unpublished and rooms_to_num_joined[r] > 0 - ] + results = [build_room_entry(r) for r in results] - total_room_count = len(rooms_to_scan) + response = {} + num_results = len(results) + if limit is not None: + more_to_come = num_results == probing_limit - if since_token: - # Filter out rooms we've already returned previously - # `since_token.current_limit` is the index of the last room we - # sent down, so we exclude it and everything before/after it. - if since_token.direction_is_forward: - rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :] + # Depending on direction we trim either the front or back. + if forwards: + results = results[:limit] else: - rooms_to_scan = rooms_to_scan[: since_token.current_limit] - rooms_to_scan.reverse() - - logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan)) - - # _append_room_entry_to_chunk will append to chunk but will stop if - # len(chunk) > limit - # - # Normally we will generate enough results on the first iteration here, - # but if there is a search filter, _append_room_entry_to_chunk may - # filter some results out, in which case we loop again. - # - # We don't want to scan over the entire range either as that - # would potentially waste a lot of work. - # - # XXX if there is no limit, we may end up DoSing the server with - # calls to get_current_state_ids for every single room on the - # server. Surely we should cap this somehow? - # - if limit: - step = limit + 1 + results = results[-limit:] else: - # step cannot be zero - step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 - - chunk = [] - for i in range(0, len(rooms_to_scan), step): - if timeout and self.clock.time() > timeout: - raise Exception("Timed out searching room directory") - - batch = rooms_to_scan[i : i + step] - logger.info("Processing %i rooms for result", len(batch)) - yield concurrently_execute( - lambda r: self._append_room_entry_to_chunk( - r, - rooms_to_num_joined[r], - chunk, - limit, - search_filter, - from_federation=from_federation, - ), - batch, - 5, - ) - logger.info("Now %i rooms in result", len(chunk)) - if len(chunk) >= limit + 1: - break - - chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) - - # Work out the new limit of the batch for pagination, or None if we - # know there are no more results that would be returned. - # i.e., [since_token.current_limit..new_limit] is the batch of rooms - # we've returned (or the reverse if we paginated backwards) - # We tried to pull out limit + 1 rooms above, so if we have <= limit - # then we know there are no more results to return - new_limit = None - if chunk and (not limit or len(chunk) > limit): - - if not since_token or since_token.direction_is_forward: - if limit: - chunk = chunk[:limit] - last_room_id = chunk[-1]["room_id"] + more_to_come = False + + if num_results > 0: + final_room_id = results[-1]["room_id"] + initial_room_id = results[0]["room_id"] + + if forwards: + if batch_token: + # If there was a token given then we assume that there + # must be previous results. + response["prev_batch"] = RoomListNextBatch( + last_room_id=initial_room_id, direction_is_forward=False + ).to_token() + + if more_to_come: + response["next_batch"] = RoomListNextBatch( + last_room_id=final_room_id, direction_is_forward=True + ).to_token() else: - if limit: - chunk = chunk[-limit:] - last_room_id = chunk[0]["room_id"] - - new_limit = sorted_rooms.index(last_room_id) - - results = {"chunk": chunk, "total_room_count_estimate": total_room_count} - - if since_token: - results["new_rooms"] = bool(newly_visible) - - if not since_token or since_token.direction_is_forward: - if new_limit is not None: - results["next_batch"] = RoomListNextBatch( - stream_ordering=stream_token, - public_room_stream_id=public_room_stream_id, - current_limit=new_limit, - direction_is_forward=True, - ).to_token() - - if since_token: - results["prev_batch"] = since_token.copy_and_replace( - direction_is_forward=False, - current_limit=since_token.current_limit + 1, - ).to_token() - else: - if new_limit is not None: - results["prev_batch"] = RoomListNextBatch( - stream_ordering=stream_token, - public_room_stream_id=public_room_stream_id, - current_limit=new_limit, - direction_is_forward=False, - ).to_token() - - if since_token: - results["next_batch"] = since_token.copy_and_replace( - direction_is_forward=True, - current_limit=since_token.current_limit - 1, - ).to_token() - - return results - - @defer.inlineCallbacks - def _append_room_entry_to_chunk( - self, - room_id, - num_joined_users, - chunk, - limit, - search_filter, - from_federation=False, - ): - """Generate the entry for a room in the public room list and append it - to the `chunk` if it matches the search filter - - Args: - room_id (str): The ID of the room. - num_joined_users (int): The number of joined users in the room. - chunk (list) - limit (int|None): Maximum amount of rooms to display. Function will - return if length of chunk is greater than limit + 1. - search_filter (dict|None) - from_federation (bool): Whether this request originated from a - federating server or a client. Used for room filtering. - """ - if limit and len(chunk) > limit + 1: - # We've already got enough, so lets just drop it. - return + if batch_token: + response["next_batch"] = RoomListNextBatch( + last_room_id=final_room_id, direction_is_forward=True + ).to_token() + + if more_to_come: + response["prev_batch"] = RoomListNextBatch( + last_room_id=initial_room_id, direction_is_forward=False + ).to_token() + + for room in results: + # populate search result entries with additional fields, namely + # 'aliases' + room_id = room["room_id"] + + aliases = yield self.store.get_aliases_for_room(room_id) + if aliases: + room["aliases"] = aliases - result = yield self.generate_room_entry(room_id, num_joined_users) - if not result: - return + response["chunk"] = results - if from_federation and not result.get("m.federate", True): - # This is a room that other servers cannot join. Do not show them - # this room. - return + response["total_room_count_estimate"] = yield self.store.count_public_rooms( + network_tuple, ignore_non_federatable=from_federation + ) - if _matches_room_entry(result, search_filter): - chunk.append(result) + return response @cachedInlineCallbacks(num_args=1, cache_context=True) def generate_room_entry( @@ -580,32 +449,18 @@ class RoomListNextBatch( namedtuple( "RoomListNextBatch", ( - "stream_ordering", # stream_ordering of the first public room list - "public_room_stream_id", # public room stream id for first public room list - "current_limit", # The number of previous rooms returned + "last_room_id", # The room_id to get rooms after/before "direction_is_forward", # Bool if this is a next_batch, false if prev_batch ), ) ): - - KEY_DICT = { - "stream_ordering": "s", - "public_room_stream_id": "p", - "current_limit": "n", - "direction_is_forward": "d", - } + KEY_DICT = {"last_room_id": "r", "direction_is_forward": "d"} REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()} @classmethod def from_token(cls, token): - if PY3: - # The argument raw=False is only available on new versions of - # msgpack, and only really needed on Python 3. Gate it behind - # a PY3 check to avoid causing issues on Debian-packaged versions. - decoded = msgpack.loads(decode_base64(token), raw=False) - else: - decoded = msgpack.loads(decode_base64(token)) + decoded = msgpack.loads(decode_base64(token), raw=False) return RoomListNextBatch( **{cls.REVERSE_KEY_DICT[key]: val for key, val in decoded.items()} ) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 6bf924dedc..9c1d41421c 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -361,6 +361,10 @@ class PublicRoomListRestServlet(TransactionRestServlet): limit = parse_integer(request, "limit", 0) since_token = parse_string(request, "since", None) + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + handler = self.hs.get_room_list_handler() if server: data = yield handler.get_remote_public_room_list( @@ -398,6 +402,10 @@ class PublicRoomListRestServlet(TransactionRestServlet): else: network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id) + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + handler = self.hs.get_room_list_handler() if server: data = yield handler.get_remote_public_room_list( diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 08e13f3a3b..c02787a73d 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -63,103 +64,176 @@ class RoomWorkerStore(SQLBaseStore): desc="get_public_room_ids", ) - @cached(num_args=2, max_entries=100) - def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): - """Get pulbic rooms for a particular list, or across all lists. + def count_public_rooms(self, network_tuple, ignore_non_federatable): + """Counts the number of public rooms as tracked in the room_stats_current + and room_stats_state table. Args: - stream_id (int) - network_tuple (ThirdPartyInstanceID): The list to use (None, None) - means the main list, None means all lsits. + network_tuple (ThirdPartyInstanceID|None) + ignore_non_federatable (bool): If true filters out non-federatable rooms """ - return self.runInteraction( - "get_public_room_ids_at_stream_id", - self.get_public_room_ids_at_stream_id_txn, - stream_id, - network_tuple=network_tuple, - ) - - def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, network_tuple): - return { - rm - for rm, vis in self.get_published_at_stream_id_txn( - txn, stream_id, network_tuple=network_tuple - ).items() - if vis - } - def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple): - if network_tuple: - # We want to get from a particular list. No aggregation required. + def _count_public_rooms_txn(txn): + query_args = [] + + if network_tuple: + if network_tuple.appservice_id: + published_sql = """ + SELECT room_id from appservice_room_list + WHERE appservice_id = ? AND network_id = ? + """ + query_args.append(network_tuple.appservice_id) + query_args.append(network_tuple.network_id) + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + """ + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + UNION SELECT room_id from appservice_room_list + """ sql = """ - SELECT room_id, visibility FROM public_room_list_stream - INNER JOIN ( - SELECT room_id, max(stream_id) AS stream_id - FROM public_room_list_stream - WHERE stream_id <= ? %s - GROUP BY room_id - ) grouped USING (room_id, stream_id) - """ + SELECT + COALESCE(COUNT(*), 0) + FROM ( + %(published_sql)s + ) published + INNER JOIN room_stats_state USING (room_id) + INNER JOIN room_stats_current USING (room_id) + WHERE + ( + join_rules = 'public' OR history_visibility = 'world_readable' + ) + AND joined_members > 0 + """ % { + "published_sql": published_sql + } - if network_tuple.appservice_id is not None: - txn.execute( - sql % ("AND appservice_id = ? AND network_id = ?",), - (stream_id, network_tuple.appservice_id, network_tuple.network_id), - ) - else: - txn.execute(sql % ("AND appservice_id IS NULL",), (stream_id,)) - return dict(txn) - else: - # We want to get from all lists, so we need to aggregate the results + txn.execute(sql, query_args) + return txn.fetchone()[0] - logger.info("Executing full list") + return self.runInteraction("count_public_rooms", _count_public_rooms_txn) - sql = """ - SELECT room_id, visibility - FROM public_room_list_stream - INNER JOIN ( - SELECT - room_id, max(stream_id) AS stream_id, appservice_id, - network_id - FROM public_room_list_stream - WHERE stream_id <= ? - GROUP BY room_id, appservice_id, network_id - ) grouped USING (room_id, stream_id) - """ + @defer.inlineCallbacks + def get_largest_public_rooms( + self, + network_tuple, + search_filter, + limit, + last_room_id, + forwards, + ignore_non_federatable=False, + ): + """Gets the largest public rooms (where largest is in terms of joined + members, as tracked in the statistics table). - txn.execute(sql, (stream_id,)) + Args: + network_tuple (ThirdPartyInstanceID|None): + search_filter (dict|None): + limit (int|None): Maxmimum number of rows to return, unlimited otherwise. + last_room_id (str|None): if present, a room ID which bounds the + result set, and is always *excluded* from the result set. + forwards (bool): true iff going forwards, going backwards otherwise + ignore_non_federatable (bool): If true filters out non-federatable rooms. - results = {} - # A room is visible if its visible on any list. - for room_id, visibility in txn: - results[room_id] = bool(visibility) or results.get(room_id, False) + Returns: + Rooms in order: biggest number of joined users first. + We then arbitrarily use the room_id as a tie breaker. - return results + """ - def get_public_room_changes(self, prev_stream_id, new_stream_id, network_tuple): - def get_public_room_changes_txn(txn): - then_rooms = self.get_public_room_ids_at_stream_id_txn( - txn, prev_stream_id, network_tuple - ) + where_clauses = [] + query_args = [] - now_rooms_dict = self.get_published_at_stream_id_txn( - txn, new_stream_id, network_tuple - ) + if last_room_id: + if forwards: + where_clauses.append("room_id < ?") + else: + where_clauses.append("? < room_id") - now_rooms_visible = set(rm for rm, vis in now_rooms_dict.items() if vis) - now_rooms_not_visible = set( - rm for rm, vis in now_rooms_dict.items() if not vis + query_args += [last_room_id] + + if search_filter and search_filter.get("generic_search_term", None): + search_term = "%" + search_filter["generic_search_term"] + "%" + + where_clauses.append( + """ + ( + name LIKE ? + OR topic LIKE ? + OR canonical_alias LIKE ? + ) + """ ) + query_args += [search_term, search_term, search_term] + + if network_tuple: + if network_tuple.appservice_id: + published_sql = """ + SELECT room_id from appservice_room_list + WHERE appservice_id = ? AND network_id = ? + """ + query_args.append(network_tuple.appservice_id) + query_args.append(network_tuple.network_id) + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + """ + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + UNION SELECT room_id from appservice_room_list + """ - newly_visible = now_rooms_visible - then_rooms - newly_unpublished = now_rooms_not_visible & then_rooms + where_clause = "" + if where_clauses: + where_clause = " AND " + " AND ".join(where_clauses) + + sql = """ + SELECT + room_id, name, topic, canonical_alias, joined_members, + avatar, history_visibility, joined_members, guest_access + FROM ( + %(published_sql)s + ) published + INNER JOIN room_stats_state USING (room_id) + INNER JOIN room_stats_current USING (room_id) + WHERE + ( + join_rules = 'public' OR history_visibility = 'world_readable' + ) + AND joined_members > 0 + %(where_clause)s + ORDER BY joined_members %(dir)s, room_id %(dir)s + """ % { + "published_sql": published_sql, + "where_clause": where_clause, + "dir": "DESC" if forwards else "ASC", + } - return newly_visible, newly_unpublished + if limit is not None: + query_args.append(limit) - return self.runInteraction( - "get_public_room_changes", get_public_room_changes_txn + sql += """ + LIMIT ? + """ + + def _get_largest_public_rooms_txn(txn): + txn.execute(sql, query_args) + + results = self.cursor_to_dict(txn) + + if not forwards: + results.reverse() + + return results + + ret_val = yield self.runInteraction( + "get_largest_public_rooms", _get_largest_public_rooms_txn ) + defer.returnValue(ret_val) @cached(max_entries=10000) def is_room_blocked(self, room_id): diff --git a/synapse/storage/schema/delta/56/public_room_list_idx.sql b/synapse/storage/schema/delta/56/public_room_list_idx.sql new file mode 100644 index 0000000000..7be31ffebb --- /dev/null +++ b/synapse/storage/schema/delta/56/public_room_list_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX public_room_list_stream_network ON public_room_list_stream (appservice_id, network_id, room_id); diff --git a/tests/handlers/test_roomlist.py b/tests/handlers/test_roomlist.py deleted file mode 100644 index 61eebb6985..0000000000 --- a/tests/handlers/test_roomlist.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.handlers.room_list import RoomListNextBatch - -import tests.unittest -import tests.utils - - -class RoomListTestCase(tests.unittest.TestCase): - """ Tests RoomList's RoomListNextBatch. """ - - def setUp(self): - pass - - def test_check_read_batch_tokens(self): - batch_token = RoomListNextBatch( - stream_ordering="abcdef", - public_room_stream_id="123", - current_limit=20, - direction_is_forward=True, - ).to_token() - next_batch = RoomListNextBatch.from_token(batch_token) - self.assertEquals(next_batch.stream_ordering, "abcdef") - self.assertEquals(next_batch.public_room_stream_id, "123") - self.assertEquals(next_batch.current_limit, 20) - self.assertEquals(next_batch.direction_is_forward, True) -- cgit 1.4.1 From 03cf4385e098ae73730b9c5ef695fa3f16c1806f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 15:09:10 +0100 Subject: Fix public room list pagination. We incorrectly used `room_id` as to bound the result set, even though we order by `joined_members, room_id`, leading to incorrect results after pagination. --- synapse/handlers/room_list.py | 33 +++++++++++++++++++-------- synapse/storage/room.py | 53 +++++++++++++++++++++++++++++-------------- 2 files changed, 59 insertions(+), 27 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 4e1cc5460f..cfed344d4d 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -142,12 +142,12 @@ class RoomListHandler(BaseHandler): if since_token: batch_token = RoomListNextBatch.from_token(since_token) - last_room_id = batch_token.last_room_id + bounds = (batch_token.last_joined_members, batch_token.last_room_id) forwards = batch_token.direction_is_forward else: batch_token = None + bounds = None - last_room_id = None forwards = True # we request one more than wanted to see if there are more pages to come @@ -157,7 +157,7 @@ class RoomListHandler(BaseHandler): network_tuple, search_filter, probing_limit, - last_room_id=last_room_id, + bounds=bounds, forwards=forwards, ignore_non_federatable=from_federation, ) @@ -193,30 +193,38 @@ class RoomListHandler(BaseHandler): more_to_come = False if num_results > 0: - final_room_id = results[-1]["room_id"] - initial_room_id = results[0]["room_id"] + final_entry = results[-1] + initial_entry = results[0] if forwards: if batch_token: # If there was a token given then we assume that there # must be previous results. response["prev_batch"] = RoomListNextBatch( - last_room_id=initial_room_id, direction_is_forward=False + last_joined_members=initial_entry["num_joined_members"], + last_room_id=initial_entry["room_id"], + direction_is_forward=False, ).to_token() if more_to_come: response["next_batch"] = RoomListNextBatch( - last_room_id=final_room_id, direction_is_forward=True + last_joined_members=final_entry["num_joined_members"], + last_room_id=final_entry["room_id"], + direction_is_forward=True, ).to_token() else: if batch_token: response["next_batch"] = RoomListNextBatch( - last_room_id=final_room_id, direction_is_forward=True + last_joined_members=final_entry["num_joined_members"], + last_room_id=final_entry["room_id"], + direction_is_forward=True, ).to_token() if more_to_come: response["prev_batch"] = RoomListNextBatch( - last_room_id=initial_room_id, direction_is_forward=False + last_joined_members=initial_entry["num_joined_members"], + last_room_id=initial_entry["room_id"], + direction_is_forward=False, ).to_token() for room in results: @@ -449,12 +457,17 @@ class RoomListNextBatch( namedtuple( "RoomListNextBatch", ( + "last_joined_members", # The count to get rooms after/before "last_room_id", # The room_id to get rooms after/before "direction_is_forward", # Bool if this is a next_batch, false if prev_batch ), ) ): - KEY_DICT = {"last_room_id": "r", "direction_is_forward": "d"} + KEY_DICT = { + "last_joined_members": "m", + "last_room_id": "r", + "direction_is_forward": "d", + } REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()} diff --git a/synapse/storage/room.py b/synapse/storage/room.py index c02787a73d..9b7e31583c 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -17,6 +17,7 @@ import collections import logging import re +from typing import Optional, Tuple from canonicaljson import json @@ -25,6 +26,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore from synapse.storage.search import SearchStore +from synapse.types import ThirdPartyInstanceID from synapse.util.caches.descriptors import cached, cachedInlineCallbacks logger = logging.getLogger(__name__) @@ -119,24 +121,25 @@ class RoomWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_largest_public_rooms( self, - network_tuple, - search_filter, - limit, - last_room_id, - forwards, - ignore_non_federatable=False, + network_tuple: Optional[ThirdPartyInstanceID], + search_filter: Optional[dict], + limit: Optional[int], + bounds: Optional[Tuple[int, str]], + forwards: bool, + ignore_non_federatable: bool = False, ): """Gets the largest public rooms (where largest is in terms of joined members, as tracked in the statistics table). Args: - network_tuple (ThirdPartyInstanceID|None): - search_filter (dict|None): - limit (int|None): Maxmimum number of rows to return, unlimited otherwise. - last_room_id (str|None): if present, a room ID which bounds the - result set, and is always *excluded* from the result set. - forwards (bool): true iff going forwards, going backwards otherwise - ignore_non_federatable (bool): If true filters out non-federatable rooms. + network_tuple + search_filter + limit: Maxmimum number of rows to return, unlimited otherwise. + bounds: An uppoer or lower bound to apply to result set if given, + consists of a joined member count and room_id (these are + excluded from result set). + forwards: true iff going forwards, going backwards otherwise + ignore_non_federatable: If true filters out non-federatable rooms. Returns: Rooms in order: biggest number of joined users first. @@ -147,13 +150,29 @@ class RoomWorkerStore(SQLBaseStore): where_clauses = [] query_args = [] - if last_room_id: + # Work out the bounds if we're given them, these bounds look slightly + # odd, but are designed to help query planner use indices by pulling + # out a common bound. + if bounds: + last_joined_members, last_room_id = bounds if forwards: - where_clauses.append("room_id < ?") + where_clauses.append( + """ + joined_members <= ? AND ( + joined_members < ? OR room_id < ? + ) + """ + ) else: - where_clauses.append("? < room_id") + where_clauses.append( + """ + joined_members >= ? AND ( + joined_members > ? OR room_id > ? + ) + """ + ) - query_args += [last_room_id] + query_args += [last_joined_members, last_joined_members, last_room_id] if search_filter and search_filter.get("generic_search_term", None): search_term = "%" + search_filter["generic_search_term"] + "%" -- cgit 1.4.1 From 4c4f44930d2153056dc1b992c571f43f2d360d07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 15:20:36 +0100 Subject: Fix not showing non-federatable rooms to remote room list queries --- synapse/storage/room.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 9b7e31583c..615c0d3f65 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -174,6 +174,9 @@ class RoomWorkerStore(SQLBaseStore): query_args += [last_joined_members, last_joined_members, last_room_id] + if ignore_non_federatable: + where_clauses.append("is_federatable") + if search_filter and search_filter.get("generic_search_term", None): search_term = "%" + search_filter["generic_search_term"] + "%" -- cgit 1.4.1 From 7a5f080f91f42fe011a1ab497acdce481187da5f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Oct 2019 15:47:22 +0100 Subject: Fix appservice room list pagination --- synapse/storage/room.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 9b7e31583c..70bd719521 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -150,6 +150,24 @@ class RoomWorkerStore(SQLBaseStore): where_clauses = [] query_args = [] + if network_tuple: + if network_tuple.appservice_id: + published_sql = """ + SELECT room_id from appservice_room_list + WHERE appservice_id = ? AND network_id = ? + """ + query_args.append(network_tuple.appservice_id) + query_args.append(network_tuple.network_id) + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + """ + else: + published_sql = """ + SELECT room_id FROM rooms WHERE is_public + UNION SELECT room_id from appservice_room_list + """ + # Work out the bounds if we're given them, these bounds look slightly # odd, but are designed to help query planner use indices by pulling # out a common bound. @@ -188,24 +206,6 @@ class RoomWorkerStore(SQLBaseStore): ) query_args += [search_term, search_term, search_term] - if network_tuple: - if network_tuple.appservice_id: - published_sql = """ - SELECT room_id from appservice_room_list - WHERE appservice_id = ? AND network_id = ? - """ - query_args.append(network_tuple.appservice_id) - query_args.append(network_tuple.network_id) - else: - published_sql = """ - SELECT room_id FROM rooms WHERE is_public - """ - else: - published_sql = """ - SELECT room_id FROM rooms WHERE is_public - UNION SELECT room_id from appservice_room_list - """ - where_clause = "" if where_clauses: where_clause = " AND " + " AND ".join(where_clauses) -- cgit 1.4.1 From c8145af8a9446911bbb52fc0114eebf4eebede4b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:11:04 +0100 Subject: Cache room membership lookups in _get_joined_users_from_context --- synapse/storage/roommember.py | 64 ++++++++++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 19 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4df8ebdacd..f11bbd05f8 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -16,6 +16,7 @@ import logging from collections import namedtuple +from typing import Iterable from six import iteritems, itervalues @@ -32,7 +33,7 @@ from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.stringutils import to_ascii logger = logging.getLogger(__name__) @@ -567,25 +568,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): missing_member_event_ids.append(event_id) if missing_member_event_ids: - rows = yield self._simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=missing_member_event_ids, - retcols=("user_id", "display_name", "avatar_url"), - keyvalues={"membership": Membership.JOIN}, - batch_size=500, - desc="_get_joined_users_from_context", - ) - - users_in_room.update( - { - to_ascii(row["user_id"]): ProfileInfo( - avatar_url=to_ascii(row["avatar_url"]), - display_name=to_ascii(row["display_name"]), - ) - for row in rows - } + event_to_memberships = yield self._get_membership_from_event_ids( + missing_member_event_ids ) + users_in_room.update((row for row in event_to_memberships.values() if row)) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -597,6 +583,46 @@ class RoomMemberWorkerStore(EventsWorkerStore): return users_in_room + @cached(max_entries=10000) + def _get_membership_from_event_id(self, event_id): + raise NotADirectoryError() + + @cachedList( + cached_method_name="_get_membership_from_event_id", + list_name="event_ids", + inlineCallbacks=True, + ) + def _get_membership_from_event_ids(self, event_ids: Iterable[str]): + """Lookup profile info for set of member event IDs. + + Args: + event_ids: The member event IDs to lookup + + Returns: + Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID + to `user_id` and ProfileInfo (or None if couldn't find event). + """ + + rows = yield self._simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=event_ids, + retcols=("user_id", "display_name", "avatar_url"), + keyvalues={"membership": Membership.JOIN}, + batch_size=500, + desc="_get_membership_from_event_ids", + ) + + return { + row["event_id"]: ( + row["user_id"], + ProfileInfo( + avatar_url=row["avatar_url"], display_name=row["display_name"] + ), + ) + for row in rows + } + @cachedInlineCallbacks(max_entries=10000) def is_host_joined(self, room_id, host): if "%" in host or "_" in host: -- cgit 1.4.1 From d89ebf7c25b4f55d513da41a3ea20b9f8adc62d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:23:11 +0100 Subject: cachedList descriptor doesn't like typing --- synapse/storage/roommember.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index f11bbd05f8..ef6179cbe5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -592,11 +592,11 @@ class RoomMemberWorkerStore(EventsWorkerStore): list_name="event_ids", inlineCallbacks=True, ) - def _get_membership_from_event_ids(self, event_ids: Iterable[str]): + def _get_membership_from_event_ids(self, event_ids): """Lookup profile info for set of member event IDs. Args: - event_ids: The member event IDs to lookup + event_ids (Iterable[str]): The member event IDs to lookup Returns: Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID -- cgit 1.4.1 From a9610cdf02e04ecd8df52ebe74b1cb1338a5be97 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:26:56 +0100 Subject: Fixup names and comments --- synapse/storage/roommember.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ef6179cbe5..37c0723eb6 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -568,7 +568,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): missing_member_event_ids.append(event_id) if missing_member_event_ids: - event_to_memberships = yield self._get_membership_from_event_ids( + event_to_memberships = yield self._get_joined_profiles_from_event_ids( missing_member_event_ids ) users_in_room.update((row for row in event_to_memberships.values() if row)) @@ -584,23 +584,24 @@ class RoomMemberWorkerStore(EventsWorkerStore): return users_in_room @cached(max_entries=10000) - def _get_membership_from_event_id(self, event_id): + def _get_joined_profile_from_event_id(self, event_id): raise NotADirectoryError() @cachedList( - cached_method_name="_get_membership_from_event_id", + cached_method_name="_get_joined_profile_from_event_id", list_name="event_ids", inlineCallbacks=True, ) - def _get_membership_from_event_ids(self, event_ids): - """Lookup profile info for set of member event IDs. + def _get_joined_profiles_from_event_ids(self, event_ids): + """For given set of member event_ids check if they point to a join + event and if so return the associated user and profile info. Args: event_ids (Iterable[str]): The member event IDs to lookup Returns: Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID - to `user_id` and ProfileInfo (or None if couldn't find event). + to `user_id` and ProfileInfo (or None if not join event). """ rows = yield self._simple_select_many_batch( -- cgit 1.4.1 From 84691da6c3058f265f8b86d9a6592ba8ce90e2ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:27:18 +0100 Subject: pep8 --- synapse/storage/roommember.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 37c0723eb6..ed7d936b3d 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -16,7 +16,6 @@ import logging from collections import namedtuple -from typing import Iterable from six import iteritems, itervalues -- cgit 1.4.1 From 91f61fc6d74e923822eb92933e0d028848285a40 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:28:31 +0100 Subject: Use the right error.... --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ed7d936b3d..1160b98ccc 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -584,7 +584,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=10000) def _get_joined_profile_from_event_id(self, event_id): - raise NotADirectoryError() + raise NotImplementedError() @cachedList( cached_method_name="_get_joined_profile_from_event_id", -- cgit 1.4.1 From 66537e10ce77e47fac52e3f27569ac1ef0f1aaa3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 3 Oct 2019 17:47:20 +0100 Subject: add some metrics on the federation sender (#6160) --- changelog.d/6160.misc | 1 + synapse/federation/sender/__init__.py | 11 ++++++----- synapse/state/__init__.py | 24 ++++++++++++++++++------ synapse/storage/roommember.py | 21 +++++++++++++++------ synapse/util/metrics.py | 6 ++++-- 5 files changed, 44 insertions(+), 19 deletions(-) create mode 100644 changelog.d/6160.misc (limited to 'synapse/storage') diff --git a/changelog.d/6160.misc b/changelog.d/6160.misc new file mode 100644 index 0000000000..3d7cce00e1 --- /dev/null +++ b/changelog.d/6160.misc @@ -0,0 +1 @@ +Add some metrics on the federation sender. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d46f4aaeb1..2b2ee8612a 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -38,7 +38,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.metrics import measure_func +from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -183,8 +183,8 @@ class FederationSender(object): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=event.prev_event_ids() + destinations = yield self.state.get_hosts_in_room_at_events( + event.room_id, event_ids=event.prev_event_ids() ) except Exception: logger.exception( @@ -207,8 +207,9 @@ class FederationSender(object): @defer.inlineCallbacks def handle_room_events(events): - for event in events: - yield handle_event(event) + with Measure(self.clock, "handle_room_events"): + for event in events: + yield handle_event(event) events_by_room = {} for event in events: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 2b0f4c79ee..dc9f5a9008 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -33,7 +33,7 @@ from synapse.state import v1, v2 from synapse.util.async_helpers import Linearizer from synapse.util.caches import get_cache_factor_for from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.metrics import Measure +from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -191,11 +191,22 @@ class StateHandler(object): return joined_users @defer.inlineCallbacks - def get_current_hosts_in_room(self, room_id, latest_event_ids=None): - if not latest_event_ids: - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - logger.debug("calling resolve_state_groups from get_current_hosts_in_room") - entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) + def get_current_hosts_in_room(self, room_id): + event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + return (yield self.get_hosts_in_room_at_events(room_id, event_ids)) + + @defer.inlineCallbacks + def get_hosts_in_room_at_events(self, room_id, event_ids): + """Get the hosts that were in a room at the given event ids + + Args: + room_id (str): + event_ids (list[str]): + + Returns: + Deferred[list[str]]: the hosts in the room at the given events + """ + entry = yield self.resolve_state_groups_for_events(room_id, event_ids) joined_hosts = yield self.store.get_joined_hosts(room_id, entry) return joined_hosts @@ -344,6 +355,7 @@ class StateHandler(object): return context + @measure_func() @defer.inlineCallbacks def resolve_state_groups_for_events(self, room_id, event_ids): """ Given a list of event_ids this method fetches the state at each diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4df8ebdacd..1550d827ba 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -33,6 +33,7 @@ from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.metrics import Measure from synapse.util.stringutils import to_ascii logger = logging.getLogger(__name__) @@ -483,6 +484,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) return result + @defer.inlineCallbacks def get_joined_users_from_state(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: @@ -492,9 +494,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_users_from_context( - room_id, state_group, state_entry.state, context=state_entry - ) + with Measure(self._clock, "get_joined_users_from_state"): + return ( + yield self._get_joined_users_from_context( + room_id, state_group, state_entry.state, context=state_entry + ) + ) @cachedInlineCallbacks( num_args=2, cache_context=True, iterable=True, max_entries=100000 @@ -669,6 +674,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): return True + @defer.inlineCallbacks def get_joined_hosts(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: @@ -678,9 +684,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_hosts( - room_id, state_group, state_entry.state, state_entry=state_entry - ) + with Measure(self._clock, "get_joined_hosts"): + return ( + yield self._get_joined_hosts( + room_id, state_group, state_entry.state, state_entry=state_entry + ) + ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) # @defer.inlineCallbacks diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 0910930c21..4b1bcdf23c 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -60,12 +60,14 @@ in_flight = InFlightGauge( ) -def measure_func(name): +def measure_func(name=None): def wrapper(func): + block_name = func.__name__ if name is None else name + @wraps(func) @defer.inlineCallbacks def measured_func(self, *args, **kwargs): - with Measure(self.clock, name): + with Measure(self.clock, block_name): r = yield func(self, *args, **kwargs) return r -- cgit 1.4.1 From 13c4345c844e75b0d1a4ce66e4fb2eb9820cb7f6 Mon Sep 17 00:00:00 2001 From: Alexander Maznev Date: Fri, 4 Oct 2019 04:34:16 -0500 Subject: Update `user_filters` table to have a unique index, and non-null columns (#1172) --- changelog.d/1172.misc | 1 + .../schema/delta/56/unique_user_filter_index.py | 46 ++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 changelog.d/1172.misc create mode 100644 synapse/storage/schema/delta/56/unique_user_filter_index.py (limited to 'synapse/storage') diff --git a/changelog.d/1172.misc b/changelog.d/1172.misc new file mode 100644 index 0000000000..30b3e56082 --- /dev/null +++ b/changelog.d/1172.misc @@ -0,0 +1 @@ +Update `user_filters` table to have a unique index, and non-null columns. Thanks to @pik for contributing this. \ No newline at end of file diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py new file mode 100644 index 0000000000..4efc1a586f --- /dev/null +++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py @@ -0,0 +1,46 @@ +import logging + +from synapse.storage.engines import PostgresEngine + +logger = logging.getLogger(__name__) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + select_clause = """ + CREATE TEMPORARY TABLE user_filters_migration AS + SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json + FROM user_filters; + """ + else: + select_clause = """ + CREATE TEMPORARY TABLE user_filters_migration AS + SELECT * FROM user_filters GROUP BY user_id, filter_id; + """ + sql = ( + """ + BEGIN; + %s + DROP INDEX user_filters_by_user_id_filter_id; + DELETE FROM user_filters; + ALTER TABLE user_filters + ALTER COLUMN user_id SET NOT NULL + ALTER COLUMN filter_id SET NOT NULL + ALTER COLUMN filter_json SET NOT NULL; + INSERT INTO user_filters(user_id, filter_id, filter_json) + SELECT * FROM user_filters_migration; + DROP TABLE user_filters_migration; + CREATE UNIQUE INDEX user_filters_by_user_id_filter_id_unique + ON user_filters(user_id, filter_id); + END; + """ + % select_clause + ) + if isinstance(database_engine, PostgresEngine): + cur.execute(sql) + else: + cur.executescript(sql) + + +def run_create(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From 5119a4cac7f42691beb455eedbefd99a39d64897 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 7 Oct 2019 12:21:17 +0100 Subject: Fix bug where we didn't pull out event ID --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 81a9ab6dc8..59a89fad60 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -612,7 +612,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): table="room_memberships", column="event_id", iterable=event_ids, - retcols=("user_id", "display_name", "avatar_url"), + retcols=("user_id", "display_name", "avatar_url", "event_id"), keyvalues={"membership": Membership.JOIN}, batch_size=500, desc="_get_membership_from_event_ids", -- cgit 1.4.1 From c8e6c308c6358f20b1d0ef1292f8e9e2f36a549e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 7 Oct 2019 13:15:35 +0100 Subject: Fix unique_user_filter_index schema update --- synapse/storage/schema/delta/56/unique_user_filter_index.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py index 4efc1a586f..60031f23ca 100644 --- a/synapse/storage/schema/delta/56/unique_user_filter_index.py +++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py @@ -24,8 +24,8 @@ def run_upgrade(cur, database_engine, *args, **kwargs): DROP INDEX user_filters_by_user_id_filter_id; DELETE FROM user_filters; ALTER TABLE user_filters - ALTER COLUMN user_id SET NOT NULL - ALTER COLUMN filter_id SET NOT NULL + ALTER COLUMN user_id SET NOT NULL, + ALTER COLUMN filter_id SET NOT NULL, ALTER COLUMN filter_json SET NOT NULL; INSERT INTO user_filters(user_id, filter_id, filter_json) SELECT * FROM user_filters_migration; -- cgit 1.4.1 From 276ae5c63eaef656d486e190298f7a5ec99a7a5b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 7 Oct 2019 14:41:39 +0100 Subject: add some logging to the rooms stats updates, to try to track down a flaky test (#6167) --- changelog.d/6167.misc | 1 + synapse/handlers/stats.py | 1 + synapse/storage/stats.py | 3 +++ 3 files changed, 5 insertions(+) create mode 100644 changelog.d/6167.misc (limited to 'synapse/storage') diff --git a/changelog.d/6167.misc b/changelog.d/6167.misc new file mode 100644 index 0000000000..32c96b3681 --- /dev/null +++ b/changelog.d/6167.misc @@ -0,0 +1 @@ +Add some logging to the rooms stats updates, to try to track down a flaky test. diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index cbac7c347a..c62b113115 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -293,6 +293,7 @@ class StatsHandler(StateDeltasHandler): room_state["guest_access"] = event_content.get("guest_access") for room_id, state in room_to_state_updates.items(): + logger.info("Updating room_stats_state for %s: %s", room_id, state) yield self.store.update_room_state(room_id, state) return room_to_stats_deltas, user_to_stats_deltas diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 09190d684e..7c224cd3d9 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -332,6 +332,9 @@ class StatsStore(StateDeltasStore): def _bulk_update_stats_delta_txn(txn): for stats_type, stats_updates in updates.items(): for stats_id, fields in stats_updates.items(): + logger.info( + "Updating %s stats for %s: %s", stats_type, stats_id, fields + ) self._update_stats_delta_txn( txn, ts=ts, -- cgit 1.4.1 From 88957199e7edbd33a66d419224df9ba0eb9e604d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:16:39 +0100 Subject: Move client_ips's bg updates to a dedicated store --- synapse/storage/client_ips.py | 200 ++++++++++++++++++++++-------------------- 1 file changed, 106 insertions(+), 94 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index bb135166ce..1d89b50f57 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -33,14 +33,14 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(background_updates.BackgroundUpdateStore): +class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore): def __init__(self, db_conn, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR ) - super(ClientIpStore, self).__init__(db_conn, hs) + super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs) self.user_ips_max_age = hs.config.user_ips_max_age @@ -92,19 +92,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "devices_last_seen", self._devices_last_seen_update ) - # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) - self._batch_row_update = {} - - self._client_ip_looper = self._clock.looping_call( - self._update_client_ips_batch, 5 * 1000 - ) - self.hs.get_reactor().addSystemEventTrigger( - "before", "shutdown", self._update_client_ips_batch - ) - - if self.user_ips_max_age: - self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) - @defer.inlineCallbacks def _remove_user_ip_nonunique(self, progress, batch_size): def f(conn): @@ -303,6 +290,110 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): return batch_size + @defer.inlineCallbacks + def _devices_last_seen_update(self, progress, batch_size): + """Background update to insert last seen info into devices table + """ + + last_user_id = progress.get("last_user_id", "") + last_device_id = progress.get("last_device_id", "") + + def _devices_last_seen_update_txn(txn): + # This consists of two queries: + # + # 1. The sub-query searches for the next N devices and joins + # against user_ips to find the max last_seen associated with + # that device. + # 2. The outer query then joins again against user_ips on + # user/device/last_seen. This *should* hopefully only + # return one row, but if it does return more than one then + # we'll just end up updating the same device row multiple + # times, which is fine. + + if self.database_engine.supports_tuple_comparison: + where_clause = "(user_id, device_id) > (?, ?)" + where_args = [last_user_id, last_device_id] + else: + # We explicitly do a `user_id >= ? AND (...)` here to ensure + # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)` + # makes it hard for query optimiser to tell that it can use the + # index on user_id + where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)" + where_args = [last_user_id, last_user_id, last_device_id] + + sql = """ + SELECT + last_seen, ip, user_agent, user_id, device_id + FROM ( + SELECT + user_id, device_id, MAX(u.last_seen) AS last_seen + FROM devices + INNER JOIN user_ips AS u USING (user_id, device_id) + WHERE %(where_clause)s + GROUP BY user_id, device_id + ORDER BY user_id ASC, device_id ASC + LIMIT ? + ) c + INNER JOIN user_ips AS u USING (user_id, device_id, last_seen) + """ % { + "where_clause": where_clause + } + txn.execute(sql, where_args + [batch_size]) + + rows = txn.fetchall() + if not rows: + return 0 + + sql = """ + UPDATE devices + SET last_seen = ?, ip = ?, user_agent = ? + WHERE user_id = ? AND device_id = ? + """ + txn.execute_batch(sql, rows) + + _, _, _, user_id, device_id = rows[-1] + self._background_update_progress_txn( + txn, + "devices_last_seen", + {"last_user_id": user_id, "last_device_id": device_id}, + ) + + return len(rows) + + updated = yield self.runInteraction( + "_devices_last_seen_update", _devices_last_seen_update_txn + ) + + if not updated: + yield self._end_background_update("devices_last_seen") + + return updated + + +class ClientIpStore(ClientIpBackgroundUpdateStore): + def __init__(self, db_conn, hs): + + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR + ) + + super(ClientIpStore, self).__init__(db_conn, hs) + + self.user_ips_max_age = hs.config.user_ips_max_age + + # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) + self._batch_row_update = {} + + self._client_ip_looper = self._clock.looping_call( + self._update_client_ips_batch, 5 * 1000 + ) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_client_ips_batch + ) + + if self.user_ips_max_age: + self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) + @defer.inlineCallbacks def insert_client_ip( self, user_id, access_token, ip, user_agent, device_id, now=None @@ -454,85 +545,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): for (access_token, ip), (user_agent, last_seen) in iteritems(results) ) - @defer.inlineCallbacks - def _devices_last_seen_update(self, progress, batch_size): - """Background update to insert last seen info into devices table - """ - - last_user_id = progress.get("last_user_id", "") - last_device_id = progress.get("last_device_id", "") - - def _devices_last_seen_update_txn(txn): - # This consists of two queries: - # - # 1. The sub-query searches for the next N devices and joins - # against user_ips to find the max last_seen associated with - # that device. - # 2. The outer query then joins again against user_ips on - # user/device/last_seen. This *should* hopefully only - # return one row, but if it does return more than one then - # we'll just end up updating the same device row multiple - # times, which is fine. - - if self.database_engine.supports_tuple_comparison: - where_clause = "(user_id, device_id) > (?, ?)" - where_args = [last_user_id, last_device_id] - else: - # We explicitly do a `user_id >= ? AND (...)` here to ensure - # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)` - # makes it hard for query optimiser to tell that it can use the - # index on user_id - where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)" - where_args = [last_user_id, last_user_id, last_device_id] - - sql = """ - SELECT - last_seen, ip, user_agent, user_id, device_id - FROM ( - SELECT - user_id, device_id, MAX(u.last_seen) AS last_seen - FROM devices - INNER JOIN user_ips AS u USING (user_id, device_id) - WHERE %(where_clause)s - GROUP BY user_id, device_id - ORDER BY user_id ASC, device_id ASC - LIMIT ? - ) c - INNER JOIN user_ips AS u USING (user_id, device_id, last_seen) - """ % { - "where_clause": where_clause - } - txn.execute(sql, where_args + [batch_size]) - - rows = txn.fetchall() - if not rows: - return 0 - - sql = """ - UPDATE devices - SET last_seen = ?, ip = ?, user_agent = ? - WHERE user_id = ? AND device_id = ? - """ - txn.execute_batch(sql, rows) - - _, _, _, user_id, device_id = rows[-1] - self._background_update_progress_txn( - txn, - "devices_last_seen", - {"last_user_id": user_id, "last_device_id": device_id}, - ) - - return len(rows) - - updated = yield self.runInteraction( - "_devices_last_seen_update", _devices_last_seen_update_txn - ) - - if not updated: - yield self._end_background_update("devices_last_seen") - - return updated - @wrap_as_background_process("prune_old_user_ips") async def _prune_old_user_ips(self): """Removes entries in user IPs older than the configured period. -- cgit 1.4.1 From 2d3b4f42f029da54132b26119e9ec0d902505eef Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:19:25 +0100 Subject: Move deviceinbox's bg updates to a dedicated store --- synapse/storage/deviceinbox.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 6b7458304e..70bc2bb2cc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -208,11 +208,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) -class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): +class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" def __init__(self, db_conn, hs): - super(DeviceInboxStore, self).__init__(db_conn, hs) + super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( "device_inbox_stream_index", @@ -225,6 +225,26 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) + @defer.inlineCallbacks + def _background_drop_index_device_inbox(self, progress, batch_size): + def reindex_txn(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") + txn.close() + + yield self.runWithConnection(reindex_txn) + + yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) + + return 1 + + +class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): + DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + + def __init__(self, db_conn, hs): + super(DeviceInboxStore, self).__init__(db_conn, hs) + # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. self._last_device_delete_cache = ExpiringCache( @@ -435,16 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn ) - - @defer.inlineCallbacks - def _background_drop_index_device_inbox(self, progress, batch_size): - def reindex_txn(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") - txn.close() - - yield self.runWithConnection(reindex_txn) - - yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) - - return 1 -- cgit 1.4.1 From cef9f6753e51c1fb7b24f2122b0d0ada767d6e08 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:24:03 +0100 Subject: Move devices's bg updates to a dedicated store --- synapse/storage/devices.py | 49 +++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 22 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 79a58df591..111bfb3d64 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -512,17 +512,9 @@ class DeviceWorkerStore(SQLBaseStore): return results -class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): +class DeviceBackgroundUpdateStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): - super(DeviceStore, self).__init__(db_conn, hs) - - # Map of (user_id, device_id) -> bool. If there is an entry that implies - # the device exists. - self.device_id_exists_cache = Cache( - name="device_id_exists", keylen=2, max_entries=10000 - ) - - self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) + super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( "device_lists_stream_idx", @@ -555,6 +547,31 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): self._drop_device_list_streams_non_unique_indexes, ) + @defer.inlineCallbacks + def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): + def f(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id") + txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id") + txn.close() + + yield self.runWithConnection(f) + yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES) + return 1 + + +class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): + def __init__(self, db_conn, hs): + super(DeviceStore, self).__init__(db_conn, hs) + + # Map of (user_id, device_id) -> bool. If there is an entry that implies + # the device exists. + self.device_id_exists_cache = Cache( + name="device_id_exists", keylen=2, max_entries=10000 + ) + + self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) + @defer.inlineCallbacks def store_device(self, user_id, device_id, initial_device_display_name): """Ensure the given device is known; add it to the store if not @@ -910,15 +927,3 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): "_prune_old_outbound_device_pokes", _prune_txn, ) - - @defer.inlineCallbacks - def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): - def f(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id") - txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id") - txn.close() - - yield self.runWithConnection(f) - yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES) - return 1 -- cgit 1.4.1 From 54f87e07342ddbf82e9b8ee7c7ce227624c2f97b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:30:22 +0100 Subject: Move media_repository's bg updates to a dedicated store --- synapse/storage/media_repository.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 6b1238ce4a..2eb2740d07 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -15,11 +15,10 @@ from synapse.storage.background_updates import BackgroundUpdateStore -class MediaRepositoryStore(BackgroundUpdateStore): - """Persistence for attachments and avatars""" +class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): - super(MediaRepositoryStore, self).__init__(db_conn, hs) + super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( update_name="local_media_repository_url_idx", @@ -29,6 +28,13 @@ class MediaRepositoryStore(BackgroundUpdateStore): where_clause="url_cache IS NOT NULL", ) + +class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): + """Persistence for attachments and avatars""" + + def __init__(self, db_conn, hs): + super(MediaRepositoryStore, self).__init__(db_conn, hs) + def get_local_media(self, media_id): """Get the metadata for a local piece of media Returns: -- cgit 1.4.1 From 81e6ffb536b19c662a81736f88ef2f243d425532 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:44:26 +0100 Subject: Move registration's bg updates to a dedicated store --- synapse/storage/registration.py | 198 +++++++++++++++++++++------------------- 1 file changed, 103 insertions(+), 95 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 1a859352b6..1f6c93b73b 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -37,7 +37,57 @@ THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 logger = logging.getLogger(__name__) -class RegistrationWorkerStore(SQLBaseStore): +class RegistrationDeactivationStore(SQLBaseStore): + @cachedInlineCallbacks() + def get_user_deactivated_status(self, user_id): + """Retrieve the value for the `deactivated` property for the provided user. + + Args: + user_id (str): The ID of the user to retrieve the status for. + + Returns: + defer.Deferred(bool): The requested value. + """ + + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user_id}, + retcol="deactivated", + desc="get_user_deactivated_status", + ) + + # Convert the integer into a boolean. + return res == 1 + + @defer.inlineCallbacks + def set_user_deactivated_status(self, user_id, deactivated): + """Set the `deactivated` property for the provided user to the provided value. + + Args: + user_id (str): The ID of the user to set the status for. + deactivated (bool): The value to set for `deactivated`. + """ + + yield self.runInteraction( + "set_user_deactivated_status", + self.set_user_deactivated_status_txn, + user_id, + deactivated, + ) + + def set_user_deactivated_status_txn(self, txn, user_id, deactivated): + self._simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"deactivated": 1 if deactivated else 0}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_deactivated_status, (user_id,) + ) + + +class RegistrationWorkerStore(RegistrationDeactivationStore): def __init__(self, db_conn, hs): super(RegistrationWorkerStore, self).__init__(db_conn, hs) @@ -673,27 +723,6 @@ class RegistrationWorkerStore(SQLBaseStore): desc="get_id_servers_user_bound", ) - @cachedInlineCallbacks() - def get_user_deactivated_status(self, user_id): - """Retrieve the value for the `deactivated` property for the provided user. - - Args: - user_id (str): The ID of the user to retrieve the status for. - - Returns: - defer.Deferred(bool): The requested value. - """ - - res = yield self._simple_select_one_onecol( - table="users", - keyvalues={"name": user_id}, - retcol="deactivated", - desc="get_user_deactivated_status", - ) - - # Convert the integer into a boolean. - return res == 1 - def get_threepid_validation_session( self, medium, client_secret, address=None, sid=None, validated=True ): @@ -787,13 +816,14 @@ class RegistrationWorkerStore(SQLBaseStore): ) -class RegistrationStore( - RegistrationWorkerStore, background_updates.BackgroundUpdateStore +class RegistrationBackgroundUpdateStore( + RegistrationDeactivationStore, background_updates.BackgroundUpdateStore ): def __init__(self, db_conn, hs): - super(RegistrationStore, self).__init__(db_conn, hs) + super(RegistrationBackgroundUpdateStore, self).__init__(db_conn, hs) self.clock = hs.get_clock() + self.config = hs.config self.register_background_index_update( "access_tokens_device_index", @@ -809,8 +839,6 @@ class RegistrationStore( columns=["creation_ts"], ) - self._account_validity = hs.config.account_validity - # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. @@ -824,17 +852,6 @@ class RegistrationStore( "users_set_deactivated_flag", self._background_update_set_deactivated_flag ) - # Create a background job for culling expired 3PID validity tokens - def start_cull(): - # run as a background process to make sure that the database transactions - # have a logcontext to report to - return run_as_background_process( - "cull_expired_threepid_validation_tokens", - self.cull_expired_threepid_validation_tokens, - ) - - hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) - @defer.inlineCallbacks def _background_update_set_deactivated_flag(self, progress, batch_size): """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1 @@ -896,6 +913,54 @@ class RegistrationStore( return nb_processed + @defer.inlineCallbacks + def _bg_user_threepids_grandfather(self, progress, batch_size): + """We now track which identity servers a user binds their 3PID to, so + we need to handle the case of existing bindings where we didn't track + this. + + We do this by grandfathering in existing user threepids assuming that + they used one of the server configured trusted identity servers. + """ + id_servers = set(self.config.trusted_third_party_id_servers) + + def _bg_user_threepids_grandfather_txn(txn): + sql = """ + INSERT INTO user_threepid_id_server + (user_id, medium, address, id_server) + SELECT user_id, medium, address, ? + FROM user_threepids + """ + + txn.executemany(sql, [(id_server,) for id_server in id_servers]) + + if id_servers: + yield self.runInteraction( + "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn + ) + + yield self._end_background_update("user_threepids_grandfather") + + return 1 + + +class RegistrationStore(RegistrationWorkerStore, RegistrationBackgroundUpdateStore): + def __init__(self, db_conn, hs): + super(RegistrationStore, self).__init__(db_conn, hs) + + self._account_validity = hs.config.account_validity + + # Create a background job for culling expired 3PID validity tokens + def start_cull(): + # run as a background process to make sure that the database transactions + # have a logcontext to report to + return run_as_background_process( + "cull_expired_threepid_validation_tokens", + self.cull_expired_threepid_validation_tokens, + ) + + hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms): """Adds an access token for the given user. @@ -1244,36 +1309,6 @@ class RegistrationStore( desc="get_users_pending_deactivation", ) - @defer.inlineCallbacks - def _bg_user_threepids_grandfather(self, progress, batch_size): - """We now track which identity servers a user binds their 3PID to, so - we need to handle the case of existing bindings where we didn't track - this. - - We do this by grandfathering in existing user threepids assuming that - they used one of the server configured trusted identity servers. - """ - id_servers = set(self.config.trusted_third_party_id_servers) - - def _bg_user_threepids_grandfather_txn(txn): - sql = """ - INSERT INTO user_threepid_id_server - (user_id, medium, address, id_server) - SELECT user_id, medium, address, ? - FROM user_threepids - """ - - txn.executemany(sql, [(id_server,) for id_server in id_servers]) - - if id_servers: - yield self.runInteraction( - "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn - ) - - yield self._end_background_update("user_threepids_grandfather") - - return 1 - def validate_threepid_session(self, session_id, client_secret, token, current_ts): """Attempt to validate a threepid session using a token @@ -1464,30 +1499,3 @@ class RegistrationStore( cull_expired_threepid_validation_tokens_txn, self.clock.time_msec(), ) - - def set_user_deactivated_status_txn(self, txn, user_id, deactivated): - self._simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"deactivated": 1 if deactivated else 0}, - ) - self._invalidate_cache_and_stream( - txn, self.get_user_deactivated_status, (user_id,) - ) - - @defer.inlineCallbacks - def set_user_deactivated_status(self, user_id, deactivated): - """Set the `deactivated` property for the provided user to the provided value. - - Args: - user_id (str): The ID of the user to set the status for. - deactivated (bool): The value to set for `deactivated`. - """ - - yield self.runInteraction( - "set_user_deactivated_status", - self.set_user_deactivated_status_txn, - user_id, - deactivated, - ) -- cgit 1.4.1 From 841054ad96b44161d7a990bc1349ecc70fcd736c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:47:42 +0100 Subject: Move search's bg updates to a dedicated store --- synapse/storage/search.py | 56 ++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 25 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index df87ab6a6d..9a41e78002 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -36,7 +36,7 @@ SearchEntry = namedtuple( ) -class SearchStore(BackgroundUpdateStore): +class SearchBackgroundUpdateStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" @@ -44,7 +44,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" def __init__(self, db_conn, hs): - super(SearchStore, self).__init__(db_conn, hs) + super(SearchBackgroundUpdateStore, self).__init__(db_conn, hs) if not hs.config.enable_search: return @@ -289,29 +289,6 @@ class SearchStore(BackgroundUpdateStore): return num_rows - def store_event_search_txn(self, txn, event, key, value): - """Add event to the search table - - Args: - txn (cursor): - event (EventBase): - key (str): - value (str): - """ - self.store_search_entries_txn( - txn, - ( - SearchEntry( - key=key, - value=value, - event_id=event.event_id, - room_id=event.room_id, - stream_ordering=event.internal_metadata.stream_ordering, - origin_server_ts=event.origin_server_ts, - ), - ), - ) - def store_search_entries_txn(self, txn, entries): """Add entries to the search table @@ -358,6 +335,35 @@ class SearchStore(BackgroundUpdateStore): # This should be unreachable. raise Exception("Unrecognized database engine") + +class SearchStore(SearchBackgroundUpdateStore): + + def __init__(self, db_conn, hs): + super(SearchStore, self).__init__(db_conn, hs) + + def store_event_search_txn(self, txn, event, key, value): + """Add event to the search table + + Args: + txn (cursor): + event (EventBase): + key (str): + value (str): + """ + self.store_search_entries_txn( + txn, + ( + SearchEntry( + key=key, + value=value, + event_id=event.event_id, + room_id=event.room_id, + stream_ordering=event.internal_metadata.stream_ordering, + origin_server_ts=event.origin_server_ts, + ), + ), + ) + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. -- cgit 1.4.1 From cfccd2d78a0b8338f33a5631d8f0637d4ed07e7e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:56:16 +0100 Subject: Move state's bg updates to a dedicated store --- synapse/storage/state.py | 394 ++++++++++++++++++++++++----------------------- 1 file changed, 204 insertions(+), 190 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1980a87108..71b533c006 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -353,8 +353,158 @@ class StateFilter(object): return member_filter, non_member_filter +class StateGroupBackgroundUpdateStore(SQLBaseStore): + """Defines functions related to state groups needed to run the state backgroud + updates. + """ + + def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ + if isinstance(self.database_engine, PostgresEngine): + sql = """ + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """ + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + + def _get_state_groups_from_groups_txn( + self, txn, groups, state_filter=StateFilter.all() + ): + results = {group: {} for group in groups} + + where_clause, where_args = state_filter.make_sql_filter_clause() + + # Unless the filter clause is empty, we're going to append it after an + # existing where clause + if where_clause: + where_clause = " AND (%s)" % (where_clause,) + + if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. This is + # a temporary hack until we can add the right indices in + txn.execute("SET LOCAL enable_seqscan=off") + + # The below query walks the state_group tree so that the "state" + # table includes all state_groups in the tree. It then joins + # against `state_groups_state` to fetch the latest state. + # It assumes that previous state groups are always numerically + # lesser. + # The PARTITION is used to get the event_id in the greatest state + # group for the given type, state_key. + # This may return multiple rows per (type, state_key), but last_value + # should be the same. + sql = """ + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT DISTINCT type, state_key, last_value(event_id) OVER ( + PARTITION BY type, state_key ORDER BY state_group ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS event_id FROM state_groups_state + WHERE state_group IN ( + SELECT state_group FROM state + ) + """ + + for group in groups: + args = [group] + args.extend(where_args) + + txn.execute(sql + where_clause, args) + for row in txn: + typ, state_key, event_id = row + key = (typ, state_key) + results[group][key] = event_id + else: + max_entries_returned = state_filter.max_entries_returned() + + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + for group in groups: + next_group = group + + while next_group: + # We did this before by getting the list of group ids, and + # then passing that list to sqlite to get latest event for + # each (type, state_key). However, that was terribly slow + # without the right indices (which we can't add until + # after we finish deduping state, which requires this func) + args = [next_group] + args.extend(where_args) + + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ? " + where_clause, + args, + ) + results[group].update( + ((typ, state_key), event_id) + for typ, state_key, event_id in txn + if (typ, state_key) not in results[group] + ) + + # If the number of entries in the (type,state_key)->event_id dict + # matches the number of (type,state_keys) types we were searching + # for, then we must have found them all, so no need to go walk + # further down the tree... UNLESS our types filter contained + # wildcards (i.e. Nones) in which case we have to do an exhaustive + # search + if ( + max_entries_returned is not None + and len(results[group]) == max_entries_returned + ): + break + + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + + return results + + # this inherits from EventsWorkerStore because it calls self.get_events -class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): +class StateGroupWorkerStore( + EventsWorkerStore, StateGroupBackgroundUpdateStore, SQLBaseStore +): """The parts of StateGroupStore that can be called from workers. """ @@ -694,107 +844,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return results - def _get_state_groups_from_groups_txn( - self, txn, groups, state_filter=StateFilter.all() - ): - results = {group: {} for group in groups} - - where_clause, where_args = state_filter.make_sql_filter_clause() - - # Unless the filter clause is empty, we're going to append it after an - # existing where clause - if where_clause: - where_clause = " AND (%s)" % (where_clause,) - - if isinstance(self.database_engine, PostgresEngine): - # Temporarily disable sequential scans in this transaction. This is - # a temporary hack until we can add the right indices in - txn.execute("SET LOCAL enable_seqscan=off") - - # The below query walks the state_group tree so that the "state" - # table includes all state_groups in the tree. It then joins - # against `state_groups_state` to fetch the latest state. - # It assumes that previous state groups are always numerically - # lesser. - # The PARTITION is used to get the event_id in the greatest state - # group for the given type, state_key. - # This may return multiple rows per (type, state_key), but last_value - # should be the same. - sql = """ - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT DISTINCT type, state_key, last_value(event_id) OVER ( - PARTITION BY type, state_key ORDER BY state_group ASC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) AS event_id FROM state_groups_state - WHERE state_group IN ( - SELECT state_group FROM state - ) - """ - - for group in groups: - args = [group] - args.extend(where_args) - - txn.execute(sql + where_clause, args) - for row in txn: - typ, state_key, event_id = row - key = (typ, state_key) - results[group][key] = event_id - else: - max_entries_returned = state_filter.max_entries_returned() - - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - for group in groups: - next_group = group - - while next_group: - # We did this before by getting the list of group ids, and - # then passing that list to sqlite to get latest event for - # each (type, state_key). However, that was terribly slow - # without the right indices (which we can't add until - # after we finish deduping state, which requires this func) - args = [next_group] - args.extend(where_args) - - txn.execute( - "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ? " + where_clause, - args, - ) - results[group].update( - ((typ, state_key), event_id) - for typ, state_key, event_id in txn - if (typ, state_key) not in results[group] - ) - - # If the number of entries in the (type,state_key)->event_id dict - # matches the number of (type,state_keys) types we were searching - # for, then we must have found them all, so no need to go walk - # further down the tree... UNLESS our types filter contained - # wildcards (i.e. Nones) in which case we have to do an exhaustive - # search - if ( - max_entries_returned is not None - and len(results[group]) == max_entries_returned - ): - break - - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - - return results - @defer.inlineCallbacks def get_state_for_events(self, event_ids, state_filter=StateFilter.all()): """Given a list of event_ids and type tuples, return a list of state @@ -1238,66 +1287,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return self.runInteraction("store_state_group", _store_state_group_txn) - def _count_state_group_hops_txn(self, txn, state_group): - """Given a state group, count how many hops there are in the tree. - - This is used to ensure the delta chains don't get too long. - """ - if isinstance(self.database_engine, PostgresEngine): - sql = """ - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT count(*) FROM state; - """ - - txn.execute(sql, (state_group,)) - row = txn.fetchone() - if row and row[0]: - return row[0] - else: - return 0 - else: - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - next_group = state_group - count = 0 - - while next_group: - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - if next_group: - count += 1 - - return count - -class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): - """ Keeps track of the state at a given event. - - This is done by the concept of `state groups`. Every event is a assigned - a state group (identified by an arbitrary string), which references a - collection of state events. The current state of an event is then the - collection of state events referenced by the event's state group. - - Hence, every change in the current state causes a new state group to be - generated. However, if no change happens (e.g., if we get a message event - with only one parent it inherits the state group from its parent.) - - There are three tables: - * `state_groups`: Stores group name, first event with in the group and - room id. - * `event_to_state_groups`: Maps events to state groups. - * `state_groups_state`: Maps state group to state events. - """ +class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore, BackgroundUpdateStore): STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" @@ -1305,7 +1296,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" def __init__(self, db_conn, hs): - super(StateStore, self).__init__(db_conn, hs) + super(StateBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_update_handler( self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, self._background_deduplicate_state, @@ -1327,34 +1318,6 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): columns=["state_group"], ) - def _store_event_state_mappings_txn(self, txn, events_and_contexts): - state_groups = {} - for event, context in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - - # if the event was rejected, just give it the same state as its - # predecessor. - if context.rejected: - state_groups[event.event_id] = context.prev_group - continue - - state_groups[event.event_id] = context.state_group - - self._simple_insert_many_txn( - txn, - table="event_to_state_groups", - values=[ - {"state_group": state_group_id, "event_id": event_id} - for event_id, state_group_id in iteritems(state_groups) - ], - ) - - for event_id, state_group_id in iteritems(state_groups): - txn.call_after( - self._get_state_group_for_event.prefill, (event_id,), state_group_id - ) - @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): """This background update will slowly deduplicate state by reencoding @@ -1527,3 +1490,54 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) return 1 + + +class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore): + """ Keeps track of the state at a given event. + + This is done by the concept of `state groups`. Every event is a assigned + a state group (identified by an arbitrary string), which references a + collection of state events. The current state of an event is then the + collection of state events referenced by the event's state group. + + Hence, every change in the current state causes a new state group to be + generated. However, if no change happens (e.g., if we get a message event + with only one parent it inherits the state group from its parent.) + + There are three tables: + * `state_groups`: Stores group name, first event with in the group and + room id. + * `event_to_state_groups`: Maps events to state groups. + * `state_groups_state`: Maps state group to state events. + """ + + def __init__(self, db_conn, hs): + super(StateStore, self).__init__(db_conn, hs) + + def _store_event_state_mappings_txn(self, txn, events_and_contexts): + state_groups = {} + for event, context in events_and_contexts: + if event.internal_metadata.is_outlier(): + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group + continue + + state_groups[event.event_id] = context.state_group + + self._simple_insert_many_txn( + txn, + table="event_to_state_groups", + values=[ + {"state_group": state_group_id, "event_id": event_id} + for event_id, state_group_id in iteritems(state_groups) + ], + ) + + for event_id, state_group_id in iteritems(state_groups): + txn.call_after( + self._get_state_group_for_event.prefill, (event_id,), state_group_id + ) -- cgit 1.4.1 From e106a0e4db23fa1fedcce1c169985a8c91181c86 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 18:19:08 +0100 Subject: Move user_directory's bg updates to a dedicated store --- synapse/storage/user_directory.py | 178 ++++++++++++++++++++------------------ 1 file changed, 94 insertions(+), 84 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index b5188d9bee..1b1e4751b9 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -32,14 +32,14 @@ logger = logging.getLogger(__name__) TEMP_TABLE = "_temp_populate_user_directory" -class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): +class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore): # How many records do we calculate before sending it to # add_users_who_share_private_rooms? SHARE_PRIVATE_WORKING_SET = 500 def __init__(self, db_conn, hs): - super(UserDirectoryStore, self).__init__(db_conn, hs) + super(UserDirectoryBackgroundUpdateStore, self).__init__(db_conn, hs) self.server_name = hs.hostname @@ -452,55 +452,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): "update_profile_in_user_dir", _update_profile_in_user_dir_txn ) - def remove_from_user_dir(self, user_id): - def _remove_from_user_dir_txn(txn): - self._simple_delete_txn( - txn, table="user_directory", keyvalues={"user_id": user_id} - ) - self._simple_delete_txn( - txn, table="user_directory_search", keyvalues={"user_id": user_id} - ) - self._simple_delete_txn( - txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} - ) - self._simple_delete_txn( - txn, - table="users_who_share_private_rooms", - keyvalues={"user_id": user_id}, - ) - self._simple_delete_txn( - txn, - table="users_who_share_private_rooms", - keyvalues={"other_user_id": user_id}, - ) - txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) - - return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn) - - @defer.inlineCallbacks - def get_users_in_dir_due_to_room(self, room_id): - """Get all user_ids that are in the room directory because they're - in the given room_id - """ - user_ids_share_pub = yield self._simple_select_onecol( - table="users_in_public_rooms", - keyvalues={"room_id": room_id}, - retcol="user_id", - desc="get_users_in_dir_due_to_room", - ) - - user_ids_share_priv = yield self._simple_select_onecol( - table="users_who_share_private_rooms", - keyvalues={"room_id": room_id}, - retcol="other_user_id", - desc="get_users_in_dir_due_to_room", - ) - - user_ids = set(user_ids_share_pub) - user_ids.update(user_ids_share_priv) - - return user_ids - def add_users_who_share_private_room(self, room_id, user_id_tuples): """Insert entries into the users_who_share_private_rooms table. The first user should be a local user. @@ -551,6 +502,98 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): "add_users_in_public_rooms", _add_users_in_public_rooms_txn ) + def delete_all_from_user_dir(self): + """Delete the entire user directory + """ + + def _delete_all_from_user_dir_txn(txn): + txn.execute("DELETE FROM user_directory") + txn.execute("DELETE FROM user_directory_search") + txn.execute("DELETE FROM users_in_public_rooms") + txn.execute("DELETE FROM users_who_share_private_rooms") + txn.call_after(self.get_user_in_directory.invalidate_all) + + return self.runInteraction( + "delete_all_from_user_dir", _delete_all_from_user_dir_txn + ) + + @cached() + def get_user_in_directory(self, user_id): + return self._simple_select_one( + table="user_directory", + keyvalues={"user_id": user_id}, + retcols=("display_name", "avatar_url"), + allow_none=True, + desc="get_user_in_directory", + ) + + def update_user_directory_stream_pos(self, stream_id): + return self._simple_update_one( + table="user_directory_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_user_directory_stream_pos", + ) + + +class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): + + # How many records do we calculate before sending it to + # add_users_who_share_private_rooms? + SHARE_PRIVATE_WORKING_SET = 500 + + def __init__(self, db_conn, hs): + super(UserDirectoryStore, self).__init__(db_conn, hs) + + def remove_from_user_dir(self, user_id): + def _remove_from_user_dir_txn(txn): + self._simple_delete_txn( + txn, table="user_directory", keyvalues={"user_id": user_id} + ) + self._simple_delete_txn( + txn, table="user_directory_search", keyvalues={"user_id": user_id} + ) + self._simple_delete_txn( + txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"other_user_id": user_id}, + ) + txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) + + return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn) + + @defer.inlineCallbacks + def get_users_in_dir_due_to_room(self, room_id): + """Get all user_ids that are in the room directory because they're + in the given room_id + """ + user_ids_share_pub = yield self._simple_select_onecol( + table="users_in_public_rooms", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids_share_priv = yield self._simple_select_onecol( + table="users_who_share_private_rooms", + keyvalues={"room_id": room_id}, + retcol="other_user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids = set(user_ids_share_pub) + user_ids.update(user_ids_share_priv) + + return user_ids + def remove_user_who_share_room(self, user_id, room_id): """ Deletes entries in the users_who_share_*_rooms table. The first @@ -637,31 +680,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): return [room_id for room_id, in rows] - def delete_all_from_user_dir(self): - """Delete the entire user directory - """ - - def _delete_all_from_user_dir_txn(txn): - txn.execute("DELETE FROM user_directory") - txn.execute("DELETE FROM user_directory_search") - txn.execute("DELETE FROM users_in_public_rooms") - txn.execute("DELETE FROM users_who_share_private_rooms") - txn.call_after(self.get_user_in_directory.invalidate_all) - - return self.runInteraction( - "delete_all_from_user_dir", _delete_all_from_user_dir_txn - ) - - @cached() - def get_user_in_directory(self, user_id): - return self._simple_select_one( - table="user_directory", - keyvalues={"user_id": user_id}, - retcols=("display_name", "avatar_url"), - allow_none=True, - desc="get_user_in_directory", - ) - def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -670,14 +688,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): desc="get_user_directory_stream_pos", ) - def update_user_directory_stream_pos(self, stream_id): - return self._simple_update_one( - table="user_directory_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_user_directory_stream_pos", - ) - @defer.inlineCallbacks def search_user_dir(self, user_id, search_term, limit): """Searches for users in directory -- cgit 1.4.1 From 0496eafbf4277523430114cf965a254241b290e7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 19:00:55 +0100 Subject: Move roommember's bg updates to a dedicated store --- synapse/storage/roommember.py | 222 ++++++++++++++++++++++-------------------- 1 file changed, 114 insertions(+), 108 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 59a89fad60..4e606a8380 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -27,6 +27,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import Sqlite3Engine from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id @@ -820,9 +821,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): return set(room_ids) -class RoomMemberStore(RoomMemberWorkerStore): +class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): - super(RoomMemberStore, self).__init__(db_conn, hs) + super(RoomMemberBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_update_handler( _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile ) @@ -838,112 +839,6 @@ class RoomMemberStore(RoomMemberWorkerStore): where_clause="forgotten = 1", ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database. - """ - self._simple_insert_many_txn( - txn, - table="room_memberships", - values=[ - { - "event_id": event.event_id, - "user_id": event.state_key, - "sender": event.user_id, - "room_id": event.room_id, - "membership": event.membership, - "display_name": event.content.get("displayname", None), - "avatar_url": event.content.get("avatar_url", None), - } - for event in events - ], - ) - - for event in events: - txn.call_after( - self._membership_stream_cache.entity_has_changed, - event.state_key, - event.internal_metadata.stream_ordering, - ) - txn.call_after( - self.get_invited_rooms_for_user.invalidate, (event.state_key,) - ) - - # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. If the event is an - # outlier it is only current if its an "out of band membership", - # like a remote invite or a rejection of a remote invite. - is_new_state = not backfilled and ( - not event.internal_metadata.is_outlier() - or event.internal_metadata.is_out_of_band_membership() - ) - is_mine = self.hs.is_mine_id(event.state_key) - if is_new_state and is_mine: - if event.membership == Membership.INVITE: - self._simple_insert_txn( - txn, - table="local_invites", - values={ - "event_id": event.event_id, - "invitee": event.state_key, - "inviter": event.sender, - "room_id": event.room_id, - "stream_id": event.internal_metadata.stream_ordering, - }, - ) - else: - sql = ( - "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - txn.execute( - sql, - ( - event.internal_metadata.stream_ordering, - event.event_id, - event.room_id, - event.state_key, - ), - ) - - @defer.inlineCallbacks - def locally_reject_invite(self, user_id, room_id): - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - with self._stream_id_gen.get_next() as stream_ordering: - yield self.runInteraction("locally_reject_invite", f, stream_ordering) - - def forget(self, user_id, room_id): - """Indicate that user_id wishes to discard history for room_id.""" - - def f(txn): - sql = ( - "UPDATE" - " room_memberships" - " SET" - " forgotten = 1" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - ) - txn.execute(sql, (user_id, room_id)) - - self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.get_forgotten_rooms_for_user, (user_id,) - ) - - return self.runInteraction("forget_membership", f) - @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( @@ -1078,6 +973,117 @@ class RoomMemberStore(RoomMemberWorkerStore): return row_count +class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): + def __init__(self, db_conn, hs): + super(RoomMemberStore, self).__init__(db_conn, hs) + + def _store_room_members_txn(self, txn, events, backfilled): + """Store a room member in the database. + """ + self._simple_insert_many_txn( + txn, + table="room_memberships", + values=[ + { + "event_id": event.event_id, + "user_id": event.state_key, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), + } + for event in events + ], + ) + + for event in events: + txn.call_after( + self._membership_stream_cache.entity_has_changed, + event.state_key, + event.internal_metadata.stream_ordering, + ) + txn.call_after( + self.get_invited_rooms_for_user.invalidate, (event.state_key,) + ) + + # We update the local_invites table only if the event is "current", + # i.e., its something that has just happened. If the event is an + # outlier it is only current if its an "out of band membership", + # like a remote invite or a rejection of a remote invite. + is_new_state = not backfilled and ( + not event.internal_metadata.is_outlier() + or event.internal_metadata.is_out_of_band_membership() + ) + is_mine = self.hs.is_mine_id(event.state_key) + if is_new_state and is_mine: + if event.membership == Membership.INVITE: + self._simple_insert_txn( + txn, + table="local_invites", + values={ + "event_id": event.event_id, + "invitee": event.state_key, + "inviter": event.sender, + "room_id": event.room_id, + "stream_id": event.internal_metadata.stream_ordering, + }, + ) + else: + sql = ( + "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + txn.execute( + sql, + ( + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.state_key, + ), + ) + + @defer.inlineCallbacks + def locally_reject_invite(self, user_id, room_id): + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, (stream_ordering, True, room_id, user_id)) + + with self._stream_id_gen.get_next() as stream_ordering: + yield self.runInteraction("locally_reject_invite", f, stream_ordering) + + def forget(self, user_id, room_id): + """Indicate that user_id wishes to discard history for room_id.""" + + def f(txn): + sql = ( + "UPDATE" + " room_memberships" + " SET" + " forgotten = 1" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + ) + txn.execute(sql, (user_id, room_id)) + + self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.get_forgotten_rooms_for_user, (user_id,) + ) + + return self.runInteraction("forget_membership", f) + + class _JoinedHostsCache(object): """Cache for joined hosts in a room that is optimised to handle updates via state deltas. -- cgit 1.4.1 From 66ebea17235d9d3988d56cd1355656bbb508b3be Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 18:23:05 +0100 Subject: Lint --- synapse/storage/media_repository.py | 1 - synapse/storage/search.py | 1 - synapse/storage/state.py | 4 +++- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 2eb2740d07..84b5f3ad5e 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -16,7 +16,6 @@ from synapse.storage.background_updates import BackgroundUpdateStore class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore): - def __init__(self, db_conn, hs): super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 9a41e78002..6ba4190f1a 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -337,7 +337,6 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore): class SearchStore(SearchBackgroundUpdateStore): - def __init__(self, db_conn, hs): super(SearchStore, self).__init__(db_conn, hs) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 71b533c006..a941a5ae3f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1288,7 +1288,9 @@ class StateGroupWorkerStore( return self.runInteraction("store_state_group", _store_state_group_txn) -class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore, BackgroundUpdateStore): +class StateBackgroundUpdateStore( + StateGroupBackgroundUpdateStore, BackgroundUpdateStore +): STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" -- cgit 1.4.1 From 8f1b385accbf8be15c35b6f06b18eb6d998544e4 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 8 Oct 2019 14:36:33 +0100 Subject: Don't end up with 4 classes in registration --- synapse/storage/registration.py | 102 ++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 52 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 1f6c93b73b..524b5eeaba 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -37,57 +37,7 @@ THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 logger = logging.getLogger(__name__) -class RegistrationDeactivationStore(SQLBaseStore): - @cachedInlineCallbacks() - def get_user_deactivated_status(self, user_id): - """Retrieve the value for the `deactivated` property for the provided user. - - Args: - user_id (str): The ID of the user to retrieve the status for. - - Returns: - defer.Deferred(bool): The requested value. - """ - - res = yield self._simple_select_one_onecol( - table="users", - keyvalues={"name": user_id}, - retcol="deactivated", - desc="get_user_deactivated_status", - ) - - # Convert the integer into a boolean. - return res == 1 - - @defer.inlineCallbacks - def set_user_deactivated_status(self, user_id, deactivated): - """Set the `deactivated` property for the provided user to the provided value. - - Args: - user_id (str): The ID of the user to set the status for. - deactivated (bool): The value to set for `deactivated`. - """ - - yield self.runInteraction( - "set_user_deactivated_status", - self.set_user_deactivated_status_txn, - user_id, - deactivated, - ) - - def set_user_deactivated_status_txn(self, txn, user_id, deactivated): - self._simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"deactivated": 1 if deactivated else 0}, - ) - self._invalidate_cache_and_stream( - txn, self.get_user_deactivated_status, (user_id,) - ) - - -class RegistrationWorkerStore(RegistrationDeactivationStore): +class RegistrationWorkerStore(SQLBaseStore): def __init__(self, db_conn, hs): super(RegistrationWorkerStore, self).__init__(db_conn, hs) @@ -723,6 +673,27 @@ class RegistrationWorkerStore(RegistrationDeactivationStore): desc="get_id_servers_user_bound", ) + @cachedInlineCallbacks() + def get_user_deactivated_status(self, user_id): + """Retrieve the value for the `deactivated` property for the provided user. + + Args: + user_id (str): The ID of the user to retrieve the status for. + + Returns: + defer.Deferred(bool): The requested value. + """ + + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user_id}, + retcol="deactivated", + desc="get_user_deactivated_status", + ) + + # Convert the integer into a boolean. + return res == 1 + def get_threepid_validation_session( self, medium, client_secret, address=None, sid=None, validated=True ): @@ -817,7 +788,7 @@ class RegistrationWorkerStore(RegistrationDeactivationStore): class RegistrationBackgroundUpdateStore( - RegistrationDeactivationStore, background_updates.BackgroundUpdateStore + RegistrationWorkerStore, background_updates.BackgroundUpdateStore ): def __init__(self, db_conn, hs): super(RegistrationBackgroundUpdateStore, self).__init__(db_conn, hs) @@ -1499,3 +1470,30 @@ class RegistrationStore(RegistrationWorkerStore, RegistrationBackgroundUpdateSto cull_expired_threepid_validation_tokens_txn, self.clock.time_msec(), ) + + @defer.inlineCallbacks + def set_user_deactivated_status(self, user_id, deactivated): + """Set the `deactivated` property for the provided user to the provided value. + + Args: + user_id (str): The ID of the user to set the status for. + deactivated (bool): The value to set for `deactivated`. + """ + + yield self.runInteraction( + "set_user_deactivated_status", + self.set_user_deactivated_status_txn, + user_id, + deactivated, + ) + + def set_user_deactivated_status_txn(self, txn, user_id, deactivated): + self._simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"deactivated": 1 if deactivated else 0}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_deactivated_status, (user_id,) + ) -- cgit 1.4.1 From b1c0a4ceb3c2c5ca51f0b32efcd58d8493fd9b99 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 8 Oct 2019 14:38:14 +0100 Subject: Cleanup client_ips --- synapse/storage/client_ips.py | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 1d89b50f57..067820a5da 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -35,15 +35,8 @@ LAST_SEEN_GRANULARITY = 120 * 1000 class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore): def __init__(self, db_conn, hs): - - self.client_ip_last_seen = Cache( - name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR - ) - super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs) - self.user_ips_max_age = hs.config.user_ips_max_age - self.register_background_index_update( "user_ips_device_index", index_name="user_ips_device_id", -- cgit 1.4.1 From c69324ffb588f72786c37b864d510abd279e47a2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 8 Oct 2019 14:48:33 +0100 Subject: Fix RegistrationStore --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 524b5eeaba..6c5b29288a 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -915,7 +915,7 @@ class RegistrationBackgroundUpdateStore( return 1 -class RegistrationStore(RegistrationWorkerStore, RegistrationBackgroundUpdateStore): +class RegistrationStore(RegistrationBackgroundUpdateStore): def __init__(self, db_conn, hs): super(RegistrationStore, self).__init__(db_conn, hs) -- cgit 1.4.1 From ced4784592ab6b9080ec1d6c7aa2664a42b3a38e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Oct 2019 14:31:43 +0100 Subject: Fix inserting bytes as text --- synapse/storage/events.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2e485c8644..bb6ff0595a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,7 +23,7 @@ from functools import wraps from six import iteritems, text_type from six.moves import range -from canonicaljson import encode_canonical_json, json +from canonicaljson import json from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -1632,9 +1632,7 @@ class EventsStore( and original_event.internal_metadata.is_redacted() ): # Redaction was allowed - pruned_json = encode_canonical_json( - prune_event_dict(original_event.get_dict()) - ) + pruned_json = encode_json(prune_event_dict(original_event.get_dict())) else: # Redaction wasn't allowed pruned_json = None -- cgit 1.4.1 From e7631d84e62e05b0dd0087b1b32b1f5f3ac521c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Oct 2019 15:09:05 +0100 Subject: Fix existing hex encoded json values in DB --- .../56/redaction_censor3_fix_update.sql.postgres | 26 ++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres new file mode 100644 index 0000000000..f7bcc5e2f2 --- /dev/null +++ b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres @@ -0,0 +1,26 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- There was a bug where we may have updated censored redactions as bytes, +-- which can (somehow) cause json to be inserted hex encoded. This goes and +-- undoes any such hex encoded JSON. +UPDATE event_json SET json = convert_from(json::bytea, 'utf8') +WHERE event_id IN ( + SELECT event_json.event_id + FROM event_json + INNER JOIN redactions ON (event_json.event_id = redacts) + WHERE have_censored AND json NOT LIKE '{%' +); -- cgit 1.4.1