From aa7806486960f501d72917f1a90a36cdc8035a05 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Thu, 8 Jul 2021 14:27:12 +0100 Subject: Minor changes to `user_daily_visits` (#10324) * Use fake time in tests in _get_start_of_day. * Change the inequality of last_seen in user_daily_visits Co-authored-by: Erik Johnston --- synapse/storage/databases/main/metrics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index c3f551d377..e3a544d9b2 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -320,7 +320,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): """ Returns millisecond unixtime for start of UTC day. """ - now = time.gmtime() + now = time.gmtime(self._clock.time()) today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0)) return today_start * 1000 @@ -352,7 +352,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): ) udv ON u.user_id = udv.user_id AND u.device_id=udv.device_id INNER JOIN users ON users.name=u.user_id - WHERE last_seen > ? AND last_seen <= ? + WHERE ? <= last_seen AND last_seen < ? AND udv.timestamp IS NULL AND users.is_guest=0 AND users.appservice_id IS NULL GROUP BY u.user_id, u.device_id -- cgit 1.5.1 From f6767abc054f3461cd9a70ba096fcf9a8e640edb Mon Sep 17 00:00:00 2001 From: Cristina Date: Thu, 8 Jul 2021 10:57:13 -0500 Subject: Remove functionality associated with unused historical stats tables (#9721) Fixes #9602 --- changelog.d/9721.removal | 1 + docs/room_and_user_statistics.md | 50 +---- docs/sample_config.yaml | 5 - synapse/config/stats.py | 9 - synapse/handlers/stats.py | 27 --- synapse/storage/databases/main/purge_events.py | 1 - synapse/storage/databases/main/stats.py | 291 +------------------------ synapse/storage/schema/__init__.py | 6 +- tests/handlers/test_stats.py | 203 +---------------- tests/rest/admin/test_room.py | 1 - 10 files changed, 22 insertions(+), 572 deletions(-) create mode 100644 changelog.d/9721.removal (limited to 'synapse/storage') diff --git a/changelog.d/9721.removal b/changelog.d/9721.removal new file mode 100644 index 0000000000..da2ba48c84 --- /dev/null +++ b/changelog.d/9721.removal @@ -0,0 +1 @@ +Remove functionality associated with the unused `room_stats_historical` and `user_stats_historical` tables. Contributed by @xmunoz. diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md index e1facb38d4..cc38c890bb 100644 --- a/docs/room_and_user_statistics.md +++ b/docs/room_and_user_statistics.md @@ -1,9 +1,9 @@ Room and User Statistics ======================== -Synapse maintains room and user statistics (as well as a cache of room state), -in various tables. These can be used for administrative purposes but are also -used when generating the public room directory. +Synapse maintains room and user statistics in various tables. These can be used +for administrative purposes but are also used when generating the public room +directory. # Synapse Developer Documentation @@ -15,48 +15,8 @@ used when generating the public room directory. * **subject**: Something we are tracking stats about – currently a room or user. * **current row**: An entry for a subject in the appropriate current statistics table. Each subject can have only one. -* **historical row**: An entry for a subject in the appropriate historical - statistics table. Each subject can have any number of these. ### Overview -Stats are maintained as time series. There are two kinds of column: - -* absolute columns – where the value is correct for the time given by `end_ts` - in the stats row. (Imagine a line graph for these values) - * They can also be thought of as 'gauges' in Prometheus, if you are familiar. -* per-slice columns – where the value corresponds to how many of the occurrences - occurred within the time slice given by `(end_ts − bucket_size)…end_ts` - or `start_ts…end_ts`. (Imagine a histogram for these values) - -Stats are maintained in two tables (for each type): current and historical. - -Current stats correspond to the present values. Each subject can only have one -entry. - -Historical stats correspond to values in the past. Subjects may have multiple -entries. - -## Concepts around the management of stats - -### Current rows - -Current rows contain the most up-to-date statistics for a room. -They only contain absolute columns - -### Historical rows - -Historical rows can always be considered to be valid for the time slice and -end time specified. - -* historical rows will not exist for every time slice – they will be omitted - if there were no changes. In this case, the following assumptions can be - made to interpolate/recreate missing rows: - - absolute fields have the same values as in the preceding row - - per-slice fields are zero (`0`) -* historical rows will not be retained forever – rows older than a configurable - time will be purged. - -#### Purge - -The purging of historical rows is not yet implemented. +Stats correspond to the present values. Current rows contain the most up-to-date +statistics for a room. Each subject can only have one entry. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 71463168e3..cbbe7d58d9 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2652,11 +2652,6 @@ stats: # #enabled: false - # The size of each timeslice in the room_stats_historical and - # user_stats_historical tables, as a time period. Defaults to "1d". - # - #bucket_size: 1h - # Server Notices room configuration # diff --git a/synapse/config/stats.py b/synapse/config/stats.py index 78f61fe9da..6f253e00c0 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -38,13 +38,9 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 * 1000 stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = self.parse_duration( - stats_config.get("bucket_size", "1d") - ) if not self.stats_enabled: logger.warning(ROOM_STATS_DISABLED_WARN) @@ -59,9 +55,4 @@ class StatsConfig(Config): # correctly. # #enabled: false - - # The size of each timeslice in the room_stats_historical and - # user_stats_historical tables, as a time period. Defaults to "1d". - # - #bucket_size: 1h """ diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 4e45d1da57..814d08efcb 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -45,7 +45,6 @@ class StatsHandler: self.clock = hs.get_clock() self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.stats_bucket_size = hs.config.stats_bucket_size self.stats_enabled = hs.config.stats_enabled @@ -106,20 +105,6 @@ class StatsHandler: room_deltas = {} user_deltas = {} - # Then count deltas for total_events and total_event_bytes. - ( - room_count, - user_count, - ) = await self.store.get_changes_room_total_events_and_bytes( - self.pos, max_pos - ) - - for room_id, fields in room_count.items(): - room_deltas.setdefault(room_id, Counter()).update(fields) - - for user_id, fields in user_count.items(): - user_deltas.setdefault(user_id, Counter()).update(fields) - logger.debug("room_deltas: %s", room_deltas) logger.debug("user_deltas: %s", user_deltas) @@ -181,12 +166,10 @@ class StatsHandler: event_content = {} # type: JsonDict - sender = None if event_id is not None: event = await self.store.get_event(event_id, allow_none=True) if event: event_content = event.content or {} - sender = event.sender # All the values in this dict are deltas (RELATIVE changes) room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter()) @@ -244,12 +227,6 @@ class StatsHandler: room_stats_delta["joined_members"] += 1 elif membership == Membership.INVITE: room_stats_delta["invited_members"] += 1 - - if sender and self.is_mine_id(sender): - user_to_stats_deltas.setdefault(sender, Counter())[ - "invites_sent" - ] += 1 - elif membership == Membership.LEAVE: room_stats_delta["left_members"] += 1 elif membership == Membership.BAN: @@ -279,10 +256,6 @@ class StatsHandler: room_state["is_federatable"] = ( event_content.get("m.federate", True) is True ) - if sender and self.is_mine_id(sender): - user_to_stats_deltas.setdefault(sender, Counter())[ - "rooms_created" - ] += 1 elif typ == EventTypes.JoinRules: room_state["join_rules"] = event_content.get("join_rule") elif typ == EventTypes.RoomHistoryVisibility: diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 7fb7780d0f..ec6b1eb5d4 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -392,7 +392,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "room_memberships", "room_stats_state", "room_stats_current", - "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 82a1833509..b10bee6daf 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import StoreError from synapse.storage.database import DatabasePool from synapse.storage.databases.main.state_deltas import StateDeltasStore -from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict from synapse.util.caches.descriptors import cached @@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = { "user": ("joined_rooms",), } -# these fields are per-timeslice and so should be reset to 0 upon a new slice -# You can draw these stats on a histogram. -# Example: number of events sent locally during a time slice -PER_SLICE_FIELDS = { - "room": ("total_events", "total_event_bytes"), - "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"), -} - TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} # these are the tables (& ID columns) which contain our actual subjects @@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore): self.server_name = hs.hostname self.clock = self.hs.get_clock() self.stats_enabled = hs.config.stats_enabled - self.stats_bucket_size = hs.config.stats_bucket_size self.stats_delta_processing_lock = DeferredLock() @@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore): self.db_pool.updates.register_noop_background_update("populate_stats_cleanup") self.db_pool.updates.register_noop_background_update("populate_stats_prepare") - def quantise_stats_time(self, ts): - """ - Quantises a timestamp to be a multiple of the bucket size. - - Args: - ts (int): the timestamp to quantise, in milliseconds since the Unix - Epoch - - Returns: - int: a timestamp which - - is divisible by the bucket size; - - is no later than `ts`; and - - is the largest such timestamp. - """ - return (ts // self.stats_bucket_size) * self.stats_bucket_size - async def _populate_stats_process_users(self, progress, batch_size): """ This is a background update which regenerates statistics for users. @@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - async def get_statistics_for_subject( - self, stats_type: str, stats_id: str, start: str, size: int = 100 - ) -> List[dict]: - """ - Get statistics for a given subject. - - Args: - stats_type: The type of subject - stats_id: The ID of the subject (e.g. room_id or user_id) - start: Pagination start. Number of entries, not timestamp. - size: How many entries to return. - - Returns: - A list of dicts, where the dict has the keys of - ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". - """ - return await self.db_pool.runInteraction( - "get_statistics_for_subject", - self._get_statistics_for_subject_txn, - stats_type, - stats_id, - start, - size, - ) - - def _get_statistics_for_subject_txn( - self, txn, stats_type, stats_id, start, size=100 - ): - """ - Transaction-bound version of L{get_statistics_for_subject}. - """ - - table, id_col = TYPE_TO_TABLE[stats_type] - selected_columns = list( - ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] - ) - - slice_list = self.db_pool.simple_select_list_paginate_txn( - txn, - table + "_historical", - "end_ts", - start, - size, - retcols=selected_columns + ["bucket_size", "end_ts"], - keyvalues={id_col: stats_id}, - order_direction="DESC", - ) - - return slice_list - @cached() async def get_earliest_token_for_stats( self, stats_type: str, id: str @@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore): table, id_col = TYPE_TO_TABLE[stats_type] - quantised_ts = self.quantise_stats_time(int(ts)) - end_ts = quantised_ts + self.stats_bucket_size - # Lets be paranoid and check that all the given field names are known abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] - slice_field_names = PER_SLICE_FIELDS[stats_type] for field in chain(fields.keys(), absolute_field_overrides.keys()): - if field not in abs_field_names and field not in slice_field_names: + if field not in abs_field_names: # guard against potential SQL injection dodginess raise ValueError( "%s is not a recognised field" @@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore): additive_relatives=deltas_of_absolute_fields, ) - per_slice_additive_relatives = { - key: fields.get(key, 0) for key in slice_field_names - } - self._upsert_copy_from_table_with_additive_relatives_txn( - txn=txn, - into_table=table + "_historical", - keyvalues={id_col: stats_id}, - extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, - extra_dst_keyvalues={"end_ts": end_ts}, - additive_relatives=per_slice_additive_relatives, - src_table=table + "_current", - copy_columns=abs_field_names, - ) - def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): @@ -572,201 +478,6 @@ class StatsStore(StateDeltasStore): current_row.update(absolutes) self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row) - def _upsert_copy_from_table_with_additive_relatives_txn( - self, - txn, - into_table, - keyvalues, - extra_dst_keyvalues, - extra_dst_insvalues, - additive_relatives, - src_table, - copy_columns, - ): - """Updates the historic stats table with latest updates. - - This involves copying "absolute" fields from the `_current` table, and - adding relative fields to any existing values. - - Args: - txn: Transaction - into_table (str): The destination table to UPSERT the row into - keyvalues (dict[str, any]): Row-identifying key values - extra_dst_keyvalues (dict[str, any]): Additional keyvalues - for `into_table`. - extra_dst_insvalues (dict[str, any]): Additional values to insert - on new row creation for `into_table`. - additive_relatives (dict[str, any]): Fields that will be added onto - if existing row present. (Must be disjoint from copy_columns.) - src_table (str): The source table to copy from - copy_columns (iterable[str]): The list of columns to copy - """ - if self.database_engine.can_native_upsert: - ins_columns = chain( - keyvalues, - copy_columns, - additive_relatives, - extra_dst_keyvalues, - extra_dst_insvalues, - ) - sel_exprs = chain( - keyvalues, - copy_columns, - ( - "?" - for _ in chain( - additive_relatives, extra_dst_keyvalues, extra_dst_insvalues - ) - ), - ) - keyvalues_where = ("%s = ?" % f for f in keyvalues) - - sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) - sets_ar = ( - "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) - for f in additive_relatives - ) - - sql = """ - INSERT INTO %(into_table)s (%(ins_columns)s) - SELECT %(sel_exprs)s - FROM %(src_table)s - WHERE %(keyvalues_where)s - ON CONFLICT (%(keyvalues)s) - DO UPDATE SET %(sets)s - """ % { - "into_table": into_table, - "ins_columns": ", ".join(ins_columns), - "sel_exprs": ", ".join(sel_exprs), - "keyvalues_where": " AND ".join(keyvalues_where), - "src_table": src_table, - "keyvalues": ", ".join( - chain(keyvalues.keys(), extra_dst_keyvalues.keys()) - ), - "sets": ", ".join(chain(sets_cc, sets_ar)), - } - - qargs = list( - chain( - additive_relatives.values(), - extra_dst_keyvalues.values(), - extra_dst_insvalues.values(), - keyvalues.values(), - ) - ) - txn.execute(sql, qargs) - else: - self.database_engine.lock_table(txn, into_table) - src_row = self.db_pool.simple_select_one_txn( - txn, src_table, keyvalues, copy_columns - ) - all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} - dest_current_row = self.db_pool.simple_select_one_txn( - txn, - into_table, - keyvalues=all_dest_keyvalues, - retcols=list(chain(additive_relatives.keys(), copy_columns)), - allow_none=True, - ) - - if dest_current_row is None: - merged_dict = { - **keyvalues, - **extra_dst_keyvalues, - **extra_dst_insvalues, - **src_row, - **additive_relatives, - } - self.db_pool.simple_insert_txn(txn, into_table, merged_dict) - else: - for (key, val) in additive_relatives.items(): - src_row[key] = dest_current_row[key] + val - self.db_pool.simple_update_txn( - txn, into_table, all_dest_keyvalues, src_row - ) - - async def get_changes_room_total_events_and_bytes( - self, min_pos: int, max_pos: int - ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]: - """Fetches the counts of events in the given range of stream IDs. - - Args: - min_pos - max_pos - - Returns: - Mapping of room ID to field changes. - """ - - return await self.db_pool.runInteraction( - "stats_incremental_total_events_and_bytes", - self.get_changes_room_total_events_and_bytes_txn, - min_pos, - max_pos, - ) - - def get_changes_room_total_events_and_bytes_txn( - self, txn, low_pos: int, high_pos: int - ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]: - """Gets the total_events and total_event_bytes counts for rooms and - senders, in a range of stream_orderings (including backfilled events). - - Args: - txn - low_pos: Low stream ordering - high_pos: High stream ordering - - Returns: - The room and user deltas for total_events/total_event_bytes in the - format of `stats_id` -> fields - """ - - if low_pos >= high_pos: - # nothing to do here. - return {}, {} - - if isinstance(self.database_engine, PostgresEngine): - new_bytes_expression = "OCTET_LENGTH(json)" - else: - new_bytes_expression = "LENGTH(CAST(json AS BLOB))" - - sql = """ - SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes - FROM events INNER JOIN event_json USING (event_id) - WHERE (? < stream_ordering AND stream_ordering <= ?) - OR (? <= stream_ordering AND stream_ordering <= ?) - GROUP BY events.room_id - """ % ( - new_bytes_expression, - ) - - txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) - - room_deltas = { - room_id: {"total_events": new_events, "total_event_bytes": new_bytes} - for room_id, new_events, new_bytes in txn - } - - sql = """ - SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes - FROM events INNER JOIN event_json USING (event_id) - WHERE (? < stream_ordering AND stream_ordering <= ?) - OR (? <= stream_ordering AND stream_ordering <= ?) - GROUP BY events.sender - """ % ( - new_bytes_expression, - ) - - txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos)) - - user_deltas = { - user_id: {"total_events": new_events, "total_event_bytes": new_bytes} - for user_id, new_events, new_bytes in txn - if self.hs.is_mine_id(user_id) - } - - return room_deltas, user_deltas - async def _calculate_and_set_initial_state_for_room( self, room_id: str ) -> Tuple[dict, dict, int]: diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 0a53b73ccc..36340a652a 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 60 +SCHEMA_VERSION = 61 """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -21,6 +21,10 @@ older versions of Synapse). See `README.md `_ for more information on how this works. + +Changes in SCHEMA_VERSION = 61: + - The `user_stats_historical` and `room_stats_historical` tables are not written and + are not read (previously, they were written but not read). """ diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index c9d4fd9336..e4059acda3 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -88,16 +88,12 @@ class StatsRoomTests(unittest.HomeserverTestCase): def _get_current_stats(self, stats_type, stat_id): table, id_col = stats.TYPE_TO_TABLE[stats_type] - cols = list(stats.ABSOLUTE_STATS_FIELDS[stats_type]) + list( - stats.PER_SLICE_FIELDS[stats_type] - ) - - end_ts = self.store.quantise_stats_time(self.reactor.seconds() * 1000) + cols = list(stats.ABSOLUTE_STATS_FIELDS[stats_type]) return self.get_success( self.store.db_pool.simple_select_one( - table + "_historical", - {id_col: stat_id, end_ts: end_ts}, + table + "_current", + {id_col: stat_id}, cols, allow_none=True, ) @@ -156,115 +152,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.assertEqual(len(r), 1) self.assertEqual(r[0]["topic"], "foo") - def test_initial_earliest_token(self): - """ - Ingestion via notify_new_event will ignore tokens that the background - update have already processed. - """ - - self.reactor.advance(86401) - - self.hs.config.stats_enabled = False - self.handler.stats_enabled = False - - u1 = self.register_user("u1", "pass") - u1_token = self.login("u1", "pass") - - u2 = self.register_user("u2", "pass") - u2_token = self.login("u2", "pass") - - u3 = self.register_user("u3", "pass") - u3_token = self.login("u3", "pass") - - room_1 = self.helper.create_room_as(u1, tok=u1_token) - self.helper.send_state( - room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token - ) - - # Begin the ingestion by creating the temp tables. This will also store - # the position that the deltas should begin at, once they take over. - self.hs.config.stats_enabled = True - self.handler.stats_enabled = True - self.store.db_pool.updates._all_done = False - self.get_success( - self.store.db_pool.simple_update_one( - table="stats_incremental_position", - keyvalues={}, - updatevalues={"stream_id": 0}, - ) - ) - - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - {"update_name": "populate_stats_prepare", "progress_json": "{}"}, - ) - ) - - while not self.get_success( - self.store.db_pool.updates.has_completed_background_updates() - ): - self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 - ) - - # Now, before the table is actually ingested, add some more events. - self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token) - self.helper.join(room=room_1, user=u2, tok=u2_token) - - # orig_delta_processor = self.store. - - # Now do the initial ingestion. - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - {"update_name": "populate_stats_process_rooms", "progress_json": "{}"}, - ) - ) - self.get_success( - self.store.db_pool.simple_insert( - "background_updates", - { - "update_name": "populate_stats_cleanup", - "progress_json": "{}", - "depends_on": "populate_stats_process_rooms", - }, - ) - ) - - self.store.db_pool.updates._all_done = False - while not self.get_success( - self.store.db_pool.updates.has_completed_background_updates() - ): - self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 - ) - - self.reactor.advance(86401) - - # Now add some more events, triggering ingestion. Because of the stream - # position being set to before the events sent in the middle, a simpler - # implementation would reprocess those events, and say there were four - # users, not three. - self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token) - self.helper.join(room=room_1, user=u3, tok=u3_token) - - # self.handler.notify_new_event() - - # We need to let the delta processor advance… - self.reactor.advance(10 * 60) - - # Get the slices! There should be two -- day 1, and day 2. - r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0)) - - self.assertEqual(len(r), 2) - - # The oldest has 2 joined members - self.assertEqual(r[-1]["joined_members"], 2) - - # The newest has 3 - self.assertEqual(r[0]["joined_members"], 3) - def test_create_user(self): """ When we create a user, it should have statistics already ready. @@ -296,22 +183,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.assertIsNotNone(r1stats) self.assertIsNotNone(r2stats) - # contains the default things you'd expect in a fresh room - self.assertEqual( - r1stats["total_events"], - EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM, - "Wrong number of total_events in new room's stats!" - " You may need to update this if more state events are added to" - " the room creation process.", - ) - self.assertEqual( - r2stats["total_events"], - EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM, - "Wrong number of total_events in new room's stats!" - " You may need to update this if more state events are added to" - " the room creation process.", - ) - self.assertEqual( r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM ) @@ -327,24 +198,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.assertEqual(r2stats["invited_members"], 0) self.assertEqual(r2stats["banned_members"], 0) - def test_send_message_increments_total_events(self): - """ - When we send a message, it increments total_events. - """ - - self._perform_background_initial_update() - - u1 = self.register_user("u1", "pass") - u1token = self.login("u1", "pass") - r1 = self.helper.create_room_as(u1, tok=u1token) - r1stats_ante = self._get_current_stats("room", r1) - - self.helper.send(r1, "hiss", tok=u1token) - - r1stats_post = self._get_current_stats("room", r1) - - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) - def test_updating_profile_information_does_not_increase_joined_members_count(self): """ Check that the joined_members count does not increase when a user changes their @@ -378,7 +231,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): def test_send_state_event_nonoverwriting(self): """ - When we send a non-overwriting state event, it increments total_events AND current_state_events + When we send a non-overwriting state event, it increments current_state_events """ self._perform_background_initial_update() @@ -399,44 +252,14 @@ class StatsRoomTests(unittest.HomeserverTestCase): r1stats_post = self._get_current_stats("room", r1) - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) self.assertEqual( r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], 1, ) - def test_send_state_event_overwriting(self): - """ - When we send an overwriting state event, it increments total_events ONLY - """ - - self._perform_background_initial_update() - - u1 = self.register_user("u1", "pass") - u1token = self.login("u1", "pass") - r1 = self.helper.create_room_as(u1, tok=u1token) - - self.helper.send_state( - r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby" - ) - - r1stats_ante = self._get_current_stats("room", r1) - - self.helper.send_state( - r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby" - ) - - r1stats_post = self._get_current_stats("room", r1) - - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) - self.assertEqual( - r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], - 0, - ) - def test_join_first_time(self): """ - When a user joins a room for the first time, total_events, current_state_events and + When a user joins a room for the first time, current_state_events and joined_members should increase by exactly 1. """ @@ -455,7 +278,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): r1stats_post = self._get_current_stats("room", r1) - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) self.assertEqual( r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], 1, @@ -466,7 +288,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): def test_join_after_leave(self): """ - When a user joins a room after being previously left, total_events and + When a user joins a room after being previously left, joined_members should increase by exactly 1. current_state_events should not increase. left_members should decrease by exactly 1. @@ -490,7 +312,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): r1stats_post = self._get_current_stats("room", r1) - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) self.assertEqual( r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], 0, @@ -504,7 +325,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): def test_invited(self): """ - When a user invites another user, current_state_events, total_events and + When a user invites another user, current_state_events and invited_members should increase by exactly 1. """ @@ -522,7 +343,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): r1stats_post = self._get_current_stats("room", r1) - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) self.assertEqual( r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], 1, @@ -533,7 +353,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): def test_join_after_invite(self): """ - When a user joins a room after being invited, total_events and + When a user joins a room after being invited and joined_members should increase by exactly 1. current_state_events should not increase. invited_members should decrease by exactly 1. @@ -556,7 +376,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): r1stats_post = self._get_current_stats("room", r1) - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) self.assertEqual( r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], 0, @@ -570,7 +389,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): def test_left(self): """ - When a user leaves a room after joining, total_events and + When a user leaves a room after joining and left_members should increase by exactly 1. current_state_events should not increase. joined_members should decrease by exactly 1. @@ -593,7 +412,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): r1stats_post = self._get_current_stats("room", r1) - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) self.assertEqual( r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], 0, @@ -607,7 +425,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): def test_banned(self): """ - When a user is banned from a room after joining, total_events and + When a user is banned from a room after joining and left_members should increase by exactly 1. current_state_events should not increase. banned_members should decrease by exactly 1. @@ -630,7 +448,6 @@ class StatsRoomTests(unittest.HomeserverTestCase): r1stats_post = self._get_current_stats("room", r1) - self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1) self.assertEqual( r1stats_post["current_state_events"] - r1stats_ante["current_state_events"], 0, diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index ee071c2477..959d3cea77 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1753,7 +1753,6 @@ PURGE_TABLES = [ "room_memberships", "room_stats_state", "room_stats_current", - "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", -- cgit 1.5.1 From e3e73e181b2f399f3acc9fd3138d1857f0492fa9 Mon Sep 17 00:00:00 2001 From: Andreas Rammhold Date: Fri, 9 Jul 2021 12:03:02 +0200 Subject: Upsert redactions in case they already exists (#10343) * Upsert redactions in case they already exists Occasionally, in combination with retention, redactions aren't deleted from the database whenever they are due for deletion. The server will eventually try to backfill the deleted events and trip over the already existing redaction events. Switching to an UPSERT for those events allows us to recover from there situations. The retention code still needs fixing but that is outside of my current comfort zone on this code base. This is related to #8707 where the error was discussed already. Signed-off-by: Andreas Rammhold * Also purge redactions when purging events Previously redacints where left behind leading to backfilling issues when the server stumbled across the already existing yet to be backfilled redactions. This issues has been discussed in #8707. Signed-off-by: Andreas Rammhold --- changelog.d/10343.bugfix | 1 + synapse/storage/databases/main/events.py | 4 ++-- synapse/storage/databases/main/purge_events.py | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 changelog.d/10343.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/10343.bugfix b/changelog.d/10343.bugfix new file mode 100644 index 0000000000..53ccf79a81 --- /dev/null +++ b/changelog.d/10343.bugfix @@ -0,0 +1 @@ +Fix errors during backfill caused by previously purged redaction events. Contributed by Andreas Rammhold (@andir). diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 897fa06639..08c580b0dc 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1580,11 +1580,11 @@ class PersistEventsStore: # invalidate the cache for the redacted event txn.call_after(self.store._invalidate_get_event_cache, event.redacts) - self.db_pool.simple_insert_txn( + self.db_pool.simple_upsert_txn( txn, table="redactions", + keyvalues={"event_id": event.event_id}, values={ - "event_id": event.event_id, "redacts": event.redacts, "received_ts": self._clock.time_msec(), }, diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ec6b1eb5d4..eb4841830d 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -215,6 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "event_relations", "event_search", "rejections", + "redactions", ): logger.info("[purge] removing events from %s", table) -- cgit 1.5.1 From 751372fa61e28f06715d086fe5cc58d174ca1a17 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 9 Jul 2021 13:01:11 +0100 Subject: Switch `application_services_txns.txn_id` to BIGINT (#10349) --- changelog.d/10349.misc | 1 + .../61/01change_appservices_txns.sql.postgres | 23 ++++++++++++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 changelog.d/10349.misc create mode 100644 synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres (limited to 'synapse/storage') diff --git a/changelog.d/10349.misc b/changelog.d/10349.misc new file mode 100644 index 0000000000..5b014e7416 --- /dev/null +++ b/changelog.d/10349.misc @@ -0,0 +1 @@ +Switch `application_services_txns.txn_id` database column to `BIGINT`. diff --git a/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres new file mode 100644 index 0000000000..c8aec78e60 --- /dev/null +++ b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres @@ -0,0 +1,23 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- we use bigint elsewhere in the database for appservice txn ids (notably, +-- application_services_state.last_txn), and generally we use bigints everywhere else +-- we have monotonic counters, so let's bring this one in line. +-- +-- assuming there aren't thousands of rows for decommisioned/non-functional ASes, this +-- table should be pretty small, so safe to do a synchronous ALTER TABLE. + +ALTER TABLE application_services_txns ALTER COLUMN txn_id SET DATA TYPE BIGINT; -- cgit 1.5.1 From ca9dface8c63ee164979fbed68693a2511c455f7 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 9 Jul 2021 14:12:47 +0100 Subject: Fix the user directory becoming broken (and noisy errors being logged) when knocking and room statistics are in use. (#10344) Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/10344.bugfix | 1 + synapse/storage/databases/main/stats.py | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 changelog.d/10344.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/10344.bugfix b/changelog.d/10344.bugfix new file mode 100644 index 0000000000..ab6eb4999f --- /dev/null +++ b/changelog.d/10344.bugfix @@ -0,0 +1 @@ +Fix the user directory becoming broken (and noisy errors being logged) when knocking and room statistics are in use. diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index b10bee6daf..59d67c255b 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -434,7 +434,7 @@ class StatsStore(StateDeltasStore): ] relative_updates = [ - "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" % {"table": table, "field": field} for field in additive_relatives.keys() ] @@ -474,7 +474,10 @@ class StatsStore(StateDeltasStore): self.db_pool.simple_insert_txn(txn, table, merged_dict) else: for (key, val) in additive_relatives.items(): - current_row[key] += val + if current_row[key] is None: + current_row[key] = val + else: + current_row[key] += val current_row.update(absolutes) self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row) @@ -604,6 +607,7 @@ class StatsStore(StateDeltasStore): "invited_members": membership_counts.get(Membership.INVITE, 0), "left_members": membership_counts.get(Membership.LEAVE, 0), "banned_members": membership_counts.get(Membership.BAN, 0), + "knocked_members": membership_counts.get(Membership.KNOCK, 0), "local_users_in_room": len(local_users_in_room), }, ) -- cgit 1.5.1 From c2c364f27f61bece85dc7fd17cdedc4b60b9f7af Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 12 Jul 2021 17:22:54 +0100 Subject: Replace `room_depth.min_depth` with a BIGINT (#10289) while I'm dealing with INTEGERs and BIGINTs, let's replace room_depth.min_depth with a BIGINT. --- changelog.d/10289.misc | 1 + synapse/storage/databases/main/room.py | 104 +++++++++++++++++++-- .../delta/61/02drop_redundant_room_depth_index.sql | 18 ++++ .../schema/main/delta/61/03recreate_min_depth.py | 70 ++++++++++++++ 4 files changed, 186 insertions(+), 7 deletions(-) create mode 100644 changelog.d/10289.misc create mode 100644 synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql create mode 100644 synapse/storage/schema/main/delta/61/03recreate_min_depth.py (limited to 'synapse/storage') diff --git a/changelog.d/10289.misc b/changelog.d/10289.misc new file mode 100644 index 0000000000..2df30e7a7a --- /dev/null +++ b/changelog.d/10289.misc @@ -0,0 +1 @@ +Convert `room_depth.min_depth` column to a `BIGINT`. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 9f0d64a325..6ddafe5434 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -25,6 +25,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchStore +from synapse.storage.types import Cursor from synapse.types import JsonDict, ThirdPartyInstanceID from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -1022,10 +1023,22 @@ class RoomWorkerStore(SQLBaseStore): ) -class RoomBackgroundUpdateStore(SQLBaseStore): +class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column" + POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2" + REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth" + + +_REPLACE_ROOM_DEPTH_SQL_COMMANDS = ( + "DROP TRIGGER populate_min_depth2_trigger ON room_depth", + "DROP FUNCTION populate_min_depth2()", + "ALTER TABLE room_depth DROP COLUMN min_depth", + "ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth", +) + +class RoomBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) @@ -1037,15 +1050,25 @@ class RoomBackgroundUpdateStore(SQLBaseStore): ) self.db_pool.updates.register_background_update_handler( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, self._remove_tombstoned_rooms_from_directory, ) self.db_pool.updates.register_background_update_handler( - self.ADD_ROOMS_ROOM_VERSION_COLUMN, + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN, self._background_add_rooms_room_version_column, ) + # BG updates to change the type of room_depth.min_depth + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2, + self._background_populate_room_depth_min_depth2, + ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH, + self._background_replace_room_depth_min_depth, + ) + async def _background_insert_retention(self, progress, batch_size): """Retrieves a list of all rooms within a range and inserts an entry for each of them into the room_retention table. @@ -1164,7 +1187,9 @@ class RoomBackgroundUpdateStore(SQLBaseStore): new_last_room_id = room_id self.db_pool.updates._background_update_progress_txn( - txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id} + txn, + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN, + {"room_id": new_last_room_id}, ) return False @@ -1176,7 +1201,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): if end: await self.db_pool.updates._end_background_update( - self.ADD_ROOMS_ROOM_VERSION_COLUMN + _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN ) return batch_size @@ -1215,7 +1240,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): if not rooms: await self.db_pool.updates._end_background_update( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE ) return 0 @@ -1224,7 +1249,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): await self.set_room_is_public(room_id, False) await self.db_pool.updates._background_update_progress( - self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} + _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} ) return len(rooms) @@ -1268,6 +1293,71 @@ class RoomBackgroundUpdateStore(SQLBaseStore): return max_ordering is None + async def _background_populate_room_depth_min_depth2( + self, progress: JsonDict, batch_size: int + ) -> int: + """Populate room_depth.min_depth2 + + This is to deal with the fact that min_depth was initially created as a + 32-bit integer field. + """ + + def process(txn: Cursor) -> int: + last_room = progress.get("last_room", "") + txn.execute( + """ + UPDATE room_depth SET min_depth2=min_depth + WHERE room_id IN ( + SELECT room_id FROM room_depth WHERE room_id > ? + ORDER BY room_id LIMIT ? + ) + RETURNING room_id; + """, + (last_room, batch_size), + ) + row_count = txn.rowcount + if row_count == 0: + return 0 + last_room = max(row[0] for row in txn) + logger.info("populated room_depth up to %s", last_room) + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2, + {"last_room": last_room}, + ) + return row_count + + result = await self.db_pool.runInteraction( + "_background_populate_min_depth2", process + ) + + if result != 0: + return result + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2 + ) + return 0 + + async def _background_replace_room_depth_min_depth( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop the old 'min_depth' column and rename 'min_depth2' into its place.""" + + def process(txn: Cursor) -> None: + for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS: + logger.info("completing room_depth migration: %s", sql) + txn.execute(sql) + + await self.db_pool.runInteraction("_background_replace_room_depth", process) + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH, + ) + + return 0 + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def __init__(self, database: DatabasePool, db_conn, hs): diff --git a/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql new file mode 100644 index 0000000000..35ca7a40c0 --- /dev/null +++ b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- this index is redundant; there is another UNIQUE index on this table. +DROP INDEX IF EXISTS room_depth_room; + diff --git a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py new file mode 100644 index 0000000000..f8d7db9f2e --- /dev/null +++ b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py @@ -0,0 +1,70 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This migration handles the process of changing the type of `room_depth.min_depth` to +a BIGINT. +""" +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.types import Cursor + + +def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + if not isinstance(database_engine, PostgresEngine): + # this only applies to postgres - sqlite does not distinguish between big and + # little ints. + return + + # First add a new column to contain the bigger min_depth + cur.execute("ALTER TABLE room_depth ADD COLUMN min_depth2 BIGINT") + + # Create a trigger which will keep it populated. + cur.execute( + """ + CREATE OR REPLACE FUNCTION populate_min_depth2() RETURNS trigger AS $BODY$ + BEGIN + new.min_depth2 := new.min_depth; + RETURN NEW; + END; + $BODY$ LANGUAGE plpgsql + """ + ) + + cur.execute( + """ + CREATE TRIGGER populate_min_depth2_trigger BEFORE INSERT OR UPDATE ON room_depth + FOR EACH ROW + EXECUTE PROCEDURE populate_min_depth2() + """ + ) + + # Start a bg process to populate it for old rooms + cur.execute( + """ + INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6103, 'populate_room_depth_min_depth2', '{}') + """ + ) + + # and another to switch them over once it completes. + cur.execute( + """ + INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2') + """ + ) + + +def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + pass -- cgit 1.5.1 From 879d8c1ee1703a0f612b7f442409d2fcded587d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jul 2021 11:33:15 +0100 Subject: Fix federation inbound age metric. (#10355) We should be reporting the age rather than absolute timestamp. --- changelog.d/10355.bugfix | 1 + synapse/storage/databases/main/event_federation.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 changelog.d/10355.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/10355.bugfix b/changelog.d/10355.bugfix new file mode 100644 index 0000000000..92df612011 --- /dev/null +++ b/changelog.d/10355.bugfix @@ -0,0 +1 @@ +Fix newly added `synapse_federation_server_oldest_inbound_pdu_in_staging` prometheus metric to measure age rather than timestamp. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index c4474df975..4e06938849 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1230,7 +1230,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging" ) - (age,) = txn.fetchone() + (received_ts,) = txn.fetchone() + + age = self._clock.time_msec() - received_ts return count, age -- cgit 1.5.1 From 2d16e69b4bf09b5274a8fa15c8ca4719db8366c1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 13 Jul 2021 08:59:27 -0400 Subject: Show all joinable rooms in the spaces summary. (#10298) Previously only world-readable rooms were shown. This means that rooms which are public, knockable, or invite-only with a pending invitation, are included in a space summary. It also applies the same logic to the experimental room version from MSC3083 -- if a user has access to the proper allowed rooms then it is shown in the spaces summary. This change is made per MSC3173 allowing stripped state of a room to be shown to any potential room joiner. --- changelog.d/10298.feature | 1 + changelog.d/10305.feature | 1 + changelog.d/10305.misc | 1 - synapse/handlers/space_summary.py | 68 +++++++--- synapse/storage/databases/main/roommember.py | 13 +- tests/handlers/test_space_summary.py | 191 ++++++++++++++++++++++++--- 6 files changed, 237 insertions(+), 38 deletions(-) create mode 100644 changelog.d/10298.feature create mode 100644 changelog.d/10305.feature delete mode 100644 changelog.d/10305.misc (limited to 'synapse/storage') diff --git a/changelog.d/10298.feature b/changelog.d/10298.feature new file mode 100644 index 0000000000..7059db5075 --- /dev/null +++ b/changelog.d/10298.feature @@ -0,0 +1 @@ +The spaces summary API now returns any joinable rooms, not only rooms which are world-readable. diff --git a/changelog.d/10305.feature b/changelog.d/10305.feature new file mode 100644 index 0000000000..7059db5075 --- /dev/null +++ b/changelog.d/10305.feature @@ -0,0 +1 @@ +The spaces summary API now returns any joinable rooms, not only rooms which are world-readable. diff --git a/changelog.d/10305.misc b/changelog.d/10305.misc deleted file mode 100644 index 8488d47f6f..0000000000 --- a/changelog.d/10305.misc +++ /dev/null @@ -1 +0,0 @@ -Additional unit tests for the spaces summary API. diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py index b585057ec3..366e6211e5 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py @@ -24,6 +24,7 @@ from synapse.api.constants import ( EventContentFields, EventTypes, HistoryVisibility, + JoinRules, Membership, RoomTypes, ) @@ -150,14 +151,21 @@ class SpaceSummaryHandler: # The room should only be included in the summary if: # a. the user is in the room; # b. the room is world readable; or - # c. the user is in a space that has been granted access to - # the room. + # c. the user could join the room, e.g. the join rules + # are set to public or the user is in a space that + # has been granted access to the room. # # Note that we know the user is not in the root room (which is # why the remote call was made in the first place), but the user # could be in one of the children rooms and we just didn't know # about the link. - include_room = room.get("world_readable") is True + + # The API doesn't return the room version so assume that a + # join rule of knock is valid. + include_room = ( + room.get("join_rules") in (JoinRules.PUBLIC, JoinRules.KNOCK) + or room.get("world_readable") is True + ) # Check if the user is a member of any of the allowed spaces # from the response. @@ -420,9 +428,8 @@ class SpaceSummaryHandler: It should be included if: - * The requester is joined or invited to the room. - * The requester can join without an invite (per MSC3083). - * The origin server has any user that is joined or invited to the room. + * The requester is joined or can join the room (per MSC3173). + * The origin server has any user that is joined or can join the room. * The history visibility is set to world readable. Args: @@ -441,13 +448,39 @@ class SpaceSummaryHandler: # If there's no state for the room, it isn't known. if not state_ids: + # The user might have a pending invite for the room. + if requester and await self._store.get_invite_for_local_user_in_room( + requester, room_id + ): + return True + logger.info("room %s is unknown, omitting from summary", room_id) return False room_version = await self._store.get_room_version(room_id) - # if we have an authenticated requesting user, first check if they are able to view - # stripped state in the room. + # Include the room if it has join rules of public or knock. + join_rules_event_id = state_ids.get((EventTypes.JoinRules, "")) + if join_rules_event_id: + join_rules_event = await self._store.get_event(join_rules_event_id) + join_rule = join_rules_event.content.get("join_rule") + if join_rule == JoinRules.PUBLIC or ( + room_version.msc2403_knocking and join_rule == JoinRules.KNOCK + ): + return True + + # Include the room if it is peekable. + hist_vis_event_id = state_ids.get((EventTypes.RoomHistoryVisibility, "")) + if hist_vis_event_id: + hist_vis_ev = await self._store.get_event(hist_vis_event_id) + hist_vis = hist_vis_ev.content.get("history_visibility") + if hist_vis == HistoryVisibility.WORLD_READABLE: + return True + + # Otherwise we need to check information specific to the user or server. + + # If we have an authenticated requesting user, check if they are a member + # of the room (or can join the room). if requester: member_event_id = state_ids.get((EventTypes.Member, requester), None) @@ -470,9 +503,11 @@ class SpaceSummaryHandler: return True # If this is a request over federation, check if the host is in the room or - # is in one of the spaces specified via the join rules. + # has a user who could join the room. elif origin: - if await self._event_auth_handler.check_host_in_room(room_id, origin): + if await self._event_auth_handler.check_host_in_room( + room_id, origin + ) or await self._store.is_host_invited(room_id, origin): return True # Alternately, if the host has a user in any of the spaces specified @@ -490,18 +525,10 @@ class SpaceSummaryHandler: ): return True - # otherwise, check if the room is peekable - hist_vis_event_id = state_ids.get((EventTypes.RoomHistoryVisibility, ""), None) - if hist_vis_event_id: - hist_vis_ev = await self._store.get_event(hist_vis_event_id) - hist_vis = hist_vis_ev.content.get("history_visibility") - if hist_vis == HistoryVisibility.WORLD_READABLE: - return True - logger.info( - "room %s is unpeekable and user %s is not a member / not allowed to join, omitting from summary", + "room %s is unpeekable and requester %s is not a member / not allowed to join, omitting from summary", room_id, - requester, + requester or origin, ) return False @@ -535,6 +562,7 @@ class SpaceSummaryHandler: "canonical_alias": stats["canonical_alias"], "num_joined_members": stats["joined_members"], "avatar_url": stats["avatar"], + "join_rules": stats["join_rules"], "world_readable": ( stats["history_visibility"] == HistoryVisibility.WORLD_READABLE ), diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 2796354a1f..4d82c4c26d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -703,13 +703,22 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=10000) async def is_host_joined(self, room_id: str, host: str) -> bool: + return await self._check_host_room_membership(room_id, host, Membership.JOIN) + + @cached(max_entries=10000) + async def is_host_invited(self, room_id: str, host: str) -> bool: + return await self._check_host_room_membership(room_id, host, Membership.INVITE) + + async def _check_host_room_membership( + self, room_id: str, host: str, membership: str + ) -> bool: if "%" in host or "_" in host: raise Exception("Invalid host name") sql = """ SELECT state_key FROM current_state_events AS c INNER JOIN room_memberships AS m USING (event_id) - WHERE m.membership = 'join' + WHERE m.membership = ? AND type = 'm.room.member' AND c.room_id = ? AND state_key LIKE ? @@ -722,7 +731,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): like_clause = "%:" + host rows = await self.db_pool.execute( - "is_host_joined", None, sql, room_id, like_clause + "is_host_joined", None, sql, membership, room_id, like_clause ) if not rows: diff --git a/tests/handlers/test_space_summary.py b/tests/handlers/test_space_summary.py index faed1f1a18..3f73ad7f94 100644 --- a/tests/handlers/test_space_summary.py +++ b/tests/handlers/test_space_summary.py @@ -14,8 +14,18 @@ from typing import Any, Iterable, Optional, Tuple from unittest import mock -from synapse.api.constants import EventContentFields, JoinRules, RoomTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + HistoryVisibility, + JoinRules, + Membership, + RestrictedJoinRuleTypes, + RoomTypes, +) from synapse.api.errors import AuthError +from synapse.api.room_versions import RoomVersions +from synapse.events import make_event_from_dict from synapse.handlers.space_summary import _child_events_comparison_key from synapse.rest import admin from synapse.rest.client.v1 import login, room @@ -117,7 +127,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): """Add a child room to a space.""" self.helper.send_state( space_id, - event_type="m.space.child", + event_type=EventTypes.SpaceChild, body={"via": [self.hs.hostname]}, tok=token, state_key=room_id, @@ -155,29 +165,129 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # The user cannot see the space. self.get_failure(self.handler.get_space_summary(user2, self.space), AuthError) - # Joining the room causes it to be visible. - self.helper.join(self.space, user2, tok=token2) + # If the space is made world-readable it should return a result. + self.helper.send_state( + self.space, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.WORLD_READABLE}, + tok=self.token, + ) result = self.get_success(self.handler.get_space_summary(user2, self.space)) - - # The result should only have the space, but includes the link to the room. - self._assert_rooms(result, [self.space]) + self._assert_rooms(result, [self.space, self.room]) self._assert_events(result, [(self.space, self.room)]) - def test_world_readable(self): - """A world-readable room is visible to everyone.""" + # Make it not world-readable again and confirm it results in an error. self.helper.send_state( self.space, - event_type="m.room.history_visibility", - body={"history_visibility": "world_readable"}, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.JOINED}, tok=self.token, ) + self.get_failure(self.handler.get_space_summary(user2, self.space), AuthError) + + # Join the space and results should be returned. + self.helper.join(self.space, user2, tok=token2) + result = self.get_success(self.handler.get_space_summary(user2, self.space)) + self._assert_rooms(result, [self.space, self.room]) + self._assert_events(result, [(self.space, self.room)]) + def _create_room_with_join_rule( + self, join_rule: str, room_version: Optional[str] = None, **extra_content + ) -> str: + """Create a room with the given join rule and add it to the space.""" + room_id = self.helper.create_room_as( + self.user, + room_version=room_version, + tok=self.token, + extra_content={ + "initial_state": [ + { + "type": EventTypes.JoinRules, + "state_key": "", + "content": { + "join_rule": join_rule, + **extra_content, + }, + } + ] + }, + ) + self._add_child(self.space, room_id, self.token) + return room_id + + def test_filtering(self): + """ + Rooms should be properly filtered to only include rooms the user has access to. + """ user2 = self.register_user("user2", "pass") + token2 = self.login("user2", "pass") - # The space should be visible, as well as the link to the room. + # Create a few rooms which will have different properties. + public_room = self._create_room_with_join_rule(JoinRules.PUBLIC) + knock_room = self._create_room_with_join_rule( + JoinRules.KNOCK, room_version=RoomVersions.V7.identifier + ) + not_invited_room = self._create_room_with_join_rule(JoinRules.INVITE) + invited_room = self._create_room_with_join_rule(JoinRules.INVITE) + self.helper.invite(invited_room, targ=user2, tok=self.token) + restricted_room = self._create_room_with_join_rule( + JoinRules.MSC3083_RESTRICTED, + room_version=RoomVersions.MSC3083.identifier, + allow=[], + ) + restricted_accessible_room = self._create_room_with_join_rule( + JoinRules.MSC3083_RESTRICTED, + room_version=RoomVersions.MSC3083.identifier, + allow=[ + { + "type": RestrictedJoinRuleTypes.ROOM_MEMBERSHIP, + "room_id": self.space, + "via": [self.hs.hostname], + } + ], + ) + world_readable_room = self._create_room_with_join_rule(JoinRules.INVITE) + self.helper.send_state( + world_readable_room, + event_type=EventTypes.RoomHistoryVisibility, + body={"history_visibility": HistoryVisibility.WORLD_READABLE}, + tok=self.token, + ) + joined_room = self._create_room_with_join_rule(JoinRules.INVITE) + self.helper.invite(joined_room, targ=user2, tok=self.token) + self.helper.join(joined_room, user2, tok=token2) + + # Join the space. + self.helper.join(self.space, user2, tok=token2) result = self.get_success(self.handler.get_space_summary(user2, self.space)) - self._assert_rooms(result, [self.space]) - self._assert_events(result, [(self.space, self.room)]) + + self._assert_rooms( + result, + [ + self.space, + self.room, + public_room, + knock_room, + invited_room, + restricted_accessible_room, + world_readable_room, + joined_room, + ], + ) + self._assert_events( + result, + [ + (self.space, self.room), + (self.space, public_room), + (self.space, knock_room), + (self.space, not_invited_room), + (self.space, invited_room), + (self.space, restricted_room), + (self.space, restricted_accessible_room), + (self.space, world_readable_room), + (self.space, joined_room), + ], + ) def test_complex_space(self): """ @@ -186,7 +296,7 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): # Create an inaccessible room. user2 = self.register_user("user2", "pass") token2 = self.login("user2", "pass") - room2 = self.helper.create_room_as(user2, tok=token2) + room2 = self.helper.create_room_as(user2, is_public=False, tok=token2) # This is a bit odd as "user" is adding a room they don't know about, but # it works for the tests. self._add_child(self.space, room2, self.token) @@ -292,16 +402,60 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): subspace = "#subspace:" + fed_hostname # Create a few rooms which will have different properties. + public_room = "#public:" + fed_hostname + knock_room = "#knock:" + fed_hostname + not_invited_room = "#not_invited:" + fed_hostname + invited_room = "#invited:" + fed_hostname restricted_room = "#restricted:" + fed_hostname restricted_accessible_room = "#restricted_accessible:" + fed_hostname world_readable_room = "#world_readable:" + fed_hostname joined_room = self.helper.create_room_as(self.user, tok=self.token) + # Poke an invite over federation into the database. + fed_handler = self.hs.get_federation_handler() + event = make_event_from_dict( + { + "room_id": invited_room, + "event_id": "!abcd:" + fed_hostname, + "type": EventTypes.Member, + "sender": "@remote:" + fed_hostname, + "state_key": self.user, + "content": {"membership": Membership.INVITE}, + "prev_events": [], + "auth_events": [], + "depth": 1, + "origin_server_ts": 1234, + } + ) + self.get_success( + fed_handler.on_invite_request(fed_hostname, event, RoomVersions.V6) + ) + async def summarize_remote_room( _self, room, suggested_only, max_children, exclude_rooms ): # Note that these entries are brief, but should contain enough info. rooms = [ + { + "room_id": public_room, + "world_readable": False, + "join_rules": JoinRules.PUBLIC, + }, + { + "room_id": knock_room, + "world_readable": False, + "join_rules": JoinRules.KNOCK, + }, + { + "room_id": not_invited_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, + { + "room_id": invited_room, + "world_readable": False, + "join_rules": JoinRules.INVITE, + }, { "room_id": restricted_room, "world_readable": False, @@ -364,6 +518,9 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): self.space, self.room, subspace, + public_room, + knock_room, + invited_room, restricted_accessible_room, world_readable_room, joined_room, @@ -374,6 +531,10 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): [ (self.space, self.room), (self.space, subspace), + (subspace, public_room), + (subspace, knock_room), + (subspace, not_invited_room), + (subspace, invited_room), (subspace, restricted_room), (subspace, restricted_accessible_room), (subspace, world_readable_room), -- cgit 1.5.1 From 3acf85c85f62655077f8c4b466389de4a4183604 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jul 2021 16:02:12 +0100 Subject: Reduce likelihood of Postgres table scanning `state_groups_state`. (#10359) The postgres statistics collector sometimes massively underestimates the number of distinct state groups are in the `state_groups_state`, which can cause postgres to use table scans for queries for multiple state groups. We fix this by manually setting `n_distinct` on the column. --- changelog.d/10359.bugfix | 1 + .../02state_groups_state_n_distinct.sql.postgres | 34 ++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 changelog.d/10359.bugfix create mode 100644 synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres (limited to 'synapse/storage') diff --git a/changelog.d/10359.bugfix b/changelog.d/10359.bugfix new file mode 100644 index 0000000000..d318f8fa08 --- /dev/null +++ b/changelog.d/10359.bugfix @@ -0,0 +1 @@ +Fix PostgreSQL sometimes using table scans for queries against `state_groups_state` table, taking a long time and a large amount of IO. diff --git a/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres b/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres new file mode 100644 index 0000000000..35a153da7b --- /dev/null +++ b/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres @@ -0,0 +1,34 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- By default the postgres statistics collector massively underestimates the +-- number of distinct state groups are in the `state_groups_state`, which can +-- cause postgres to use table scans for queries for multiple state groups. +-- +-- To work around this we can manually tell postgres the number of distinct state +-- groups there are by setting `n_distinct` (a negative value here is the number +-- of distinct values divided by the number of rows, so -0.02 means on average +-- there are 50 rows per distinct value). We don't need a particularly +-- accurate number here, as a) we just want it to always use index scans and b) +-- our estimate is going to be better than the one made by the statistics +-- collector. + +ALTER TABLE state_groups_state ALTER COLUMN state_group SET (n_distinct = -0.02); + +-- Ideally we'd do an `ANALYZE state_groups_state (state_group)` here so that +-- the above gets picked up immediately, but that can take a bit of time so we +-- rely on the autovacuum eventually getting run and doing that in the +-- background for us. -- cgit 1.5.1 From bdfde6dca11a9468372b3c9b327ad3327cbdbe4a Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Thu, 15 Jul 2021 18:46:54 +0200 Subject: Use inline type hints in `http/federation/`, `storage/` and `util/` (#10381) --- changelog.d/10381.misc | 1 + synapse/http/federation/well_known_resolver.py | 13 ++++---- synapse/storage/background_updates.py | 16 ++++----- synapse/storage/database.py | 14 ++++---- synapse/storage/databases/main/appservice.py | 4 +-- synapse/storage/databases/main/end_to_end_keys.py | 2 +- synapse/storage/databases/main/event_federation.py | 26 +++++++-------- .../storage/databases/main/event_push_actions.py | 2 +- synapse/storage/databases/main/events.py | 38 ++++++++++------------ .../storage/databases/main/events_bg_updates.py | 8 ++--- synapse/storage/databases/main/events_worker.py | 6 ++-- synapse/storage/databases/main/purge_events.py | 2 +- synapse/storage/databases/main/push_rule.py | 6 ++-- synapse/storage/databases/main/registration.py | 2 +- synapse/storage/databases/main/stream.py | 6 ++-- synapse/storage/databases/main/tags.py | 2 +- synapse/storage/databases/main/ui_auth.py | 4 +-- synapse/storage/persist_events.py | 16 ++++----- synapse/storage/prepare_database.py | 6 ++-- synapse/storage/state.py | 4 +-- synapse/storage/util/id_generators.py | 12 +++---- synapse/storage/util/sequence.py | 6 ++-- synapse/util/async_helpers.py | 8 ++--- synapse/util/batching_queue.py | 8 ++--- synapse/util/caches/__init__.py | 4 +-- synapse/util/caches/cached_call.py | 6 ++-- synapse/util/caches/deferred_cache.py | 12 +++---- synapse/util/caches/descriptors.py | 36 ++++++++++---------- synapse/util/caches/dictionary_cache.py | 6 ++-- synapse/util/caches/expiringcache.py | 4 +-- synapse/util/caches/lrucache.py | 8 ++--- synapse/util/caches/response_cache.py | 2 +- synapse/util/caches/stream_change_cache.py | 6 ++-- synapse/util/caches/ttlcache.py | 6 ++-- synapse/util/iterutils.py | 2 +- synapse/util/macaroons.py | 2 +- synapse/util/metrics.py | 2 +- synapse/util/patch_inline_callbacks.py | 4 +-- 38 files changed, 150 insertions(+), 162 deletions(-) create mode 100644 changelog.d/10381.misc (limited to 'synapse/storage') diff --git a/changelog.d/10381.misc b/changelog.d/10381.misc new file mode 100644 index 0000000000..eed2d8552a --- /dev/null +++ b/changelog.d/10381.misc @@ -0,0 +1 @@ +Convert internal type variable syntax to reflect wider ecosystem use. \ No newline at end of file diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 20d39a4ea6..43f2140429 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -70,10 +70,8 @@ WELL_KNOWN_RETRY_ATTEMPTS = 3 logger = logging.getLogger(__name__) -_well_known_cache = TTLCache("well-known") # type: TTLCache[bytes, Optional[bytes]] -_had_valid_well_known_cache = TTLCache( - "had-valid-well-known" -) # type: TTLCache[bytes, bool] +_well_known_cache: TTLCache[bytes, Optional[bytes]] = TTLCache("well-known") +_had_valid_well_known_cache: TTLCache[bytes, bool] = TTLCache("had-valid-well-known") @attr.s(slots=True, frozen=True) @@ -130,9 +128,10 @@ class WellKnownResolver: # requests for the same server in parallel? try: with Measure(self._clock, "get_well_known"): - result, cache_period = await self._fetch_well_known( - server_name - ) # type: Optional[bytes], float + result: Optional[bytes] + cache_period: float + + result, cache_period = await self._fetch_well_known(server_name) except _FetchWellKnownFailure as e: if prev_result and e.temporary: diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 142787fdfd..82b31d24f1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -92,14 +92,12 @@ class BackgroundUpdater: self.db_pool = database # if a background update is currently running, its name. - self._current_background_update = None # type: Optional[str] - - self._background_update_performance = ( - {} - ) # type: Dict[str, BackgroundUpdatePerformance] - self._background_update_handlers = ( - {} - ) # type: Dict[str, Callable[[JsonDict, int], Awaitable[int]]] + self._current_background_update: Optional[str] = None + + self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {} + self._background_update_handlers: Dict[ + str, Callable[[JsonDict, int], Awaitable[int]] + ] = {} self._all_done = False def start_doing_background_updates(self) -> None: @@ -411,7 +409,7 @@ class BackgroundUpdater: c.execute(sql) if isinstance(self.db_pool.engine, engines.PostgresEngine): - runner = create_index_psql # type: Optional[Callable[[Connection], None]] + runner: Optional[Callable[[Connection], None]] = create_index_psql elif psql_only: runner = None else: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 33c42cf95a..f80d822c12 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -670,8 +670,8 @@ class DatabasePool: Returns: The result of func """ - after_callbacks = [] # type: List[_CallbackListEntry] - exception_callbacks = [] # type: List[_CallbackListEntry] + after_callbacks: List[_CallbackListEntry] = [] + exception_callbacks: List[_CallbackListEntry] = [] if not current_context(): logger.warning("Starting db txn '%s' from sentinel context", desc) @@ -1090,7 +1090,7 @@ class DatabasePool: return False # We didn't find any existing rows, so insert a new one - allvalues = {} # type: Dict[str, Any] + allvalues: Dict[str, Any] = {} allvalues.update(keyvalues) allvalues.update(values) allvalues.update(insertion_values) @@ -1121,7 +1121,7 @@ class DatabasePool: values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting """ - allvalues = {} # type: Dict[str, Any] + allvalues: Dict[str, Any] = {} allvalues.update(keyvalues) allvalues.update(insertion_values or {}) @@ -1257,7 +1257,7 @@ class DatabasePool: value_values: A list of each row's value column values. Ignored if value_names is empty. """ - allnames = [] # type: List[str] + allnames: List[str] = [] allnames.extend(key_names) allnames.extend(value_names) @@ -1566,7 +1566,7 @@ class DatabasePool: """ keyvalues = keyvalues or {} - results = [] # type: List[Dict[str, Any]] + results: List[Dict[str, Any]] = [] if not iterable: return results @@ -1978,7 +1978,7 @@ class DatabasePool: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else "" - arg_list = [] # type: List[Any] + arg_list: List[Any] = [] if filters: where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters) arg_list += list(filters.values()) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 9f182c2a89..e2d1b758bd 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -48,9 +48,7 @@ def _make_exclusive_regex( ] if exclusive_user_regexes: exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes) - exclusive_user_pattern = re.compile( - exclusive_user_regex - ) # type: Optional[Pattern] + exclusive_user_pattern: Optional[Pattern] = re.compile(exclusive_user_regex) else: # We handle this case specially otherwise the constructed regex # will always match diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 0e3dd4e9ca..78ae68ec68 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -247,7 +247,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore): txn.execute(sql, query_params) - result = {} # type: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] + result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {} for (user_id, device_id, display_name, key_json) in txn: if include_deleted_devices: deleted_devices.remove((user_id, device_id)) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 4e06938849..d39368c20e 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -62,9 +62,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) # Cache of event ID to list of auth event IDs and their depths. - self._event_auth_cache = LruCache( + self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache( 500000, "_event_auth_cache", size_callback=len - ) # type: LruCache[str, List[Tuple[str, int]]] + ) self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) @@ -137,10 +137,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas initial_events = set(event_ids) # All the events that we've found that are reachable from the events. - seen_events = set() # type: Set[str] + seen_events: Set[str] = set() # A map from chain ID to max sequence number of the given events. - event_chains = {} # type: Dict[int, int] + event_chains: Dict[int, int] = {} sql = """ SELECT event_id, chain_id, sequence_number @@ -182,7 +182,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ # A map from chain ID to max sequence number *reachable* from any event ID. - chains = {} # type: Dict[int, int] + chains: Dict[int, int] = {} # Add all linked chains reachable from initial set of chains. for batch in batch_iter(event_chains, 1000): @@ -353,14 +353,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas initial_events = set(state_sets[0]).union(*state_sets[1:]) # Map from event_id -> (chain ID, seq no) - chain_info = {} # type: Dict[str, Tuple[int, int]] + chain_info: Dict[str, Tuple[int, int]] = {} # Map from chain ID -> seq no -> event Id - chain_to_event = {} # type: Dict[int, Dict[int, str]] + chain_to_event: Dict[int, Dict[int, str]] = {} # All the chains that we've found that are reachable from the state # sets. - seen_chains = set() # type: Set[int] + seen_chains: Set[int] = set() sql = """ SELECT event_id, chain_id, sequence_number @@ -392,9 +392,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Corresponds to `state_sets`, except as a map from chain ID to max # sequence number reachable from the state set. - set_to_chain = [] # type: List[Dict[int, int]] + set_to_chain: List[Dict[int, int]] = [] for state_set in state_sets: - chains = {} # type: Dict[int, int] + chains: Dict[int, int] = {} set_to_chain.append(chains) for event_id in state_set: @@ -446,7 +446,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Mapping from chain ID to the range of sequence numbers that should be # pulled from the database. - chain_to_gap = {} # type: Dict[int, Tuple[int, int]] + chain_to_gap: Dict[int, Tuple[int, int]] = {} for chain_id in seen_chains: min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain) @@ -555,7 +555,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas } # The sorted list of events whose auth chains we should walk. - search = [] # type: List[Tuple[int, str]] + search: List[Tuple[int, str]] = [] # We need to get the depth of the initial events for sorting purposes. sql = """ @@ -578,7 +578,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas search.sort() # Map from event to its auth events - event_to_auth_events = {} # type: Dict[str, Set[str]] + event_to_auth_events: Dict[str, Set[str]] = {} base_sql = """ SELECT a.event_id, auth_id, depth diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index d1237c65cc..55caa6bbe7 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -759,7 +759,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # object because we might not have the same amount of rows in each of them. To do # this, we use a dict indexed on the user ID and room ID to make it easier to # populate. - summaries = {} # type: Dict[Tuple[str, str], _EventPushSummary] + summaries: Dict[Tuple[str, str], _EventPushSummary] = {} for row in txn: summaries[(row[0], row[1])] = _EventPushSummary( unread_count=row[2], diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 08c580b0dc..ec8579b9ad 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -109,10 +109,8 @@ class PersistEventsStore: # Ideally we'd move these ID gens here, unfortunately some other ID # generators are chained off them so doing so is a bit of a PITA. - self._backfill_id_gen = ( - self.store._backfill_id_gen - ) # type: MultiWriterIdGenerator - self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator + self._backfill_id_gen: MultiWriterIdGenerator = self.store._backfill_id_gen + self._stream_id_gen: MultiWriterIdGenerator = self.store._stream_id_gen # This should only exist on instances that are configured to write assert ( @@ -221,7 +219,7 @@ class PersistEventsStore: Returns: Filtered event ids """ - results = [] # type: List[str] + results: List[str] = [] def _get_events_which_are_prevs_txn(txn, batch): sql = """ @@ -508,7 +506,7 @@ class PersistEventsStore: """ # Map from event ID to chain ID/sequence number. - chain_map = {} # type: Dict[str, Tuple[int, int]] + chain_map: Dict[str, Tuple[int, int]] = {} # Set of event IDs to calculate chain ID/seq numbers for. events_to_calc_chain_id_for = set(event_to_room_id) @@ -817,8 +815,8 @@ class PersistEventsStore: # new chain if the sequence number has already been allocated. # - existing_chains = set() # type: Set[int] - tree = [] # type: List[Tuple[str, Optional[str]]] + existing_chains: Set[int] = set() + tree: List[Tuple[str, Optional[str]]] = [] # We need to do this in a topologically sorted order as we want to # generate chain IDs/sequence numbers of an event's auth events before @@ -848,7 +846,7 @@ class PersistEventsStore: ) txn.execute(sql % (clause,), args) - chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int] + chain_to_max_seq_no: Dict[Any, int] = {row[0]: row[1] for row in txn} # Allocate the new events chain ID/sequence numbers. # @@ -858,8 +856,8 @@ class PersistEventsStore: # number of new chain IDs in one call, replacing all temporary # objects with real allocated chain IDs. - unallocated_chain_ids = set() # type: Set[object] - new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]] + unallocated_chain_ids: Set[object] = set() + new_chain_tuples: Dict[str, Tuple[Any, int]] = {} for event_id, auth_event_id in tree: # If we reference an auth_event_id we fetch the allocated chain ID, # either from the existing `chain_map` or the newly generated @@ -870,7 +868,7 @@ class PersistEventsStore: if not existing_chain_id: existing_chain_id = chain_map[auth_event_id] - new_chain_tuple = None # type: Optional[Tuple[Any, int]] + new_chain_tuple: Optional[Tuple[Any, int]] = None if existing_chain_id: # We found a chain ID/sequence number candidate, check its # not already taken. @@ -897,9 +895,9 @@ class PersistEventsStore: ) # Map from potentially temporary chain ID to real chain ID - chain_id_to_allocated_map = dict( + chain_id_to_allocated_map: Dict[Any, int] = dict( zip(unallocated_chain_ids, newly_allocated_chain_ids) - ) # type: Dict[Any, int] + ) chain_id_to_allocated_map.update((c, c) for c in existing_chains) return { @@ -1175,9 +1173,9 @@ class PersistEventsStore: Returns: list[(EventBase, EventContext)]: filtered list """ - new_events_and_contexts = ( - OrderedDict() - ) # type: OrderedDict[str, Tuple[EventBase, EventContext]] + new_events_and_contexts: OrderedDict[ + str, Tuple[EventBase, EventContext] + ] = OrderedDict() for event, context in events_and_contexts: prev_event_context = new_events_and_contexts.get(event.event_id) if prev_event_context: @@ -1205,7 +1203,7 @@ class PersistEventsStore: we are persisting backfilled (bool): True if the events were backfilled """ - depth_updates = {} # type: Dict[str, int] + depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) @@ -1885,7 +1883,7 @@ class PersistEventsStore: ), ) - room_to_event_ids = {} # type: Dict[str, List[str]] + room_to_event_ids: Dict[str, List[str]] = {} for e, _ in events_and_contexts: room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) @@ -2012,7 +2010,7 @@ class PersistEventsStore: Forward extremities are handled when we first start persisting the events. """ - events_by_room = {} # type: Dict[str, List[EventBase]] + events_by_room: Dict[str, List[EventBase]] = {} for ev in events: events_by_room.setdefault(ev.room_id, []).append(ev) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 29f33bac55..6fcb2b8353 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -960,9 +960,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): event_to_types = {row[0]: (row[1], row[2]) for row in rows} # Calculate the new last position we've processed up to. - new_last_depth = rows[-1][3] if rows else last_depth # type: int - new_last_stream = rows[-1][4] if rows else last_stream # type: int - new_last_room_id = rows[-1][5] if rows else "" # type: str + new_last_depth: int = rows[-1][3] if rows else last_depth + new_last_stream: int = rows[-1][4] if rows else last_stream + new_last_room_id: str = rows[-1][5] if rows else "" # Map from room_id to last depth/stream_ordering processed for the room, # excluding the last room (which we're likely still processing). We also @@ -989,7 +989,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): retcols=("event_id", "auth_id"), ) - event_to_auth_chain = {} # type: Dict[str, List[str]] + event_to_auth_chain: Dict[str, List[str]] = {} for row in auth_events: event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"]) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 403a5ddaba..3c86adab56 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1365,10 +1365,10 @@ class EventsWorkerStore(SQLBaseStore): # we need to make sure that, for every stream id in the results, we get *all* # the rows with that stream id. - rows = await self.db_pool.runInteraction( + rows: List[Tuple] = await self.db_pool.runInteraction( "get_all_updated_current_state_deltas", get_all_updated_current_state_deltas_txn, - ) # type: List[Tuple] + ) # if we've got fewer rows than the limit, we're good if len(rows) < target_row_count: @@ -1469,7 +1469,7 @@ class EventsWorkerStore(SQLBaseStore): """ mapping = {} - txn_id_to_event = {} # type: Dict[Tuple[str, int, str], str] + txn_id_to_event: Dict[Tuple[str, int, str], str] = {} for event in events: token_id = getattr(event.internal_metadata, "token_id", None) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index eb4841830d..664c65dac5 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -115,7 +115,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): logger.info("[purge] looking for events to delete") should_delete_expr = "state_key IS NULL" - should_delete_params = () # type: Tuple[Any, ...] + should_delete_params: Tuple[Any, ...] = () if not delete_local_events: should_delete_expr += " AND event_id NOT LIKE ?" diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index db52176337..a7fb8cd848 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -79,9 +79,9 @@ class PushRulesWorkerStore( super().__init__(database, db_conn, hs) if hs.config.worker.worker_app is None: - self._push_rules_stream_id_gen = StreamIdGenerator( - db_conn, "push_rules_stream", "stream_id" - ) # type: Union[StreamIdGenerator, SlavedIdTracker] + self._push_rules_stream_id_gen: Union[ + StreamIdGenerator, SlavedIdTracker + ] = StreamIdGenerator(db_conn, "push_rules_stream", "stream_id") else: self._push_rules_stream_id_gen = SlavedIdTracker( db_conn, "push_rules_stream", "stream_id" diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index e31c5864ac..6ad1a0cf7f 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1744,7 +1744,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): items = keyvalues.items() where_clause = " AND ".join(k + " = ?" for k, _ in items) - values = [v for _, v in items] # type: List[Union[str, int]] + values: List[Union[str, int]] = [v for _, v in items] # Conveniently, refresh_tokens and access_tokens both use the user_id and device_id fields. Only caveat # is the `except_token_id` param that is tricky to get right, so for now we're just using the same where # clause and values before we handle that. This seems to be only used in the "set password" handler. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7581c7d3ff..959f13de47 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1085,9 +1085,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and # then filtering the results. if from_token.topological is not None: - from_bound = ( - from_token.as_historical_tuple() - ) # type: Tuple[Optional[int], int] + from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple() elif direction == "b": from_bound = ( None, @@ -1099,7 +1097,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): from_token.stream, ) - to_bound = None # type: Optional[Tuple[Optional[int], int]] + to_bound: Optional[Tuple[Optional[int], int]] = None if to_token: if to_token.topological is not None: to_bound = to_token.as_historical_tuple() diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index 1d62c6140f..f93ff0a545 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -42,7 +42,7 @@ class TagsWorkerStore(AccountDataWorkerStore): "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"] ) - tags_by_room = {} # type: Dict[str, Dict[str, JsonDict]] + tags_by_room: Dict[str, Dict[str, JsonDict]] = {} for row in rows: room_tags = tags_by_room.setdefault(row["room_id"], {}) room_tags[row["tag"]] = db_to_json(row["content"]) diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index 22c05cdde7..38bfdf5dad 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -224,12 +224,12 @@ class UIAuthWorkerStore(SQLBaseStore): self, txn: LoggingTransaction, session_id: str, key: str, value: Any ): # Get the current value. - result = self.db_pool.simple_select_one_txn( + result: Dict[str, Any] = self.db_pool.simple_select_one_txn( # type: ignore txn, table="ui_auth_sessions", keyvalues={"session_id": session_id}, retcols=("serverdict",), - ) # type: Dict[str, Any] # type: ignore + ) # Update it and add it back to the database. serverdict = db_to_json(result["serverdict"]) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 051095fea9..a39877f0d5 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -307,7 +307,7 @@ class EventsPersistenceStorage: matched the transcation ID; the existing event is returned in such a case. """ - partitioned = {} # type: Dict[str, List[Tuple[EventBase, EventContext]]] + partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) @@ -384,7 +384,7 @@ class EventsPersistenceStorage: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. """ - replaced_events = {} # type: Dict[str, str] + replaced_events: Dict[str, str] = {} if not events_and_contexts: return replaced_events @@ -440,16 +440,14 @@ class EventsPersistenceStorage: # Set of remote users which were in rooms the server has left. We # should check if we still share any rooms and if not we mark their # device lists as stale. - potentially_left_users = set() # type: Set[str] + potentially_left_users: Set[str] = set() if not backfilled: with Measure(self._clock, "_calculate_state_and_extrem"): # Work out the new "current state" for each room. # We do this by working out what the new extremities are and then # calculating the state from that. - events_by_room = ( - {} - ) # type: Dict[str, List[Tuple[EventBase, EventContext]]] + events_by_room: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, context in chunk: events_by_room.setdefault(event.room_id, []).append( (event, context) @@ -622,9 +620,9 @@ class EventsPersistenceStorage: ) # Remove any events which are prev_events of any existing events. - existing_prevs = await self.persist_events_store._get_events_which_are_prevs( - result - ) # type: Collection[str] + existing_prevs: Collection[ + str + ] = await self.persist_events_store._get_events_which_are_prevs(result) result.difference_update(existing_prevs) # Finally handle the case where the new events have soft-failed prev diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 683e5e3b90..82a7686df0 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -256,7 +256,7 @@ def _setup_new_database( for database in databases ) - directory_entries = [] # type: List[_DirectoryListing] + directory_entries: List[_DirectoryListing] = [] for directory in directories: directory_entries.extend( _DirectoryListing(file_name, os.path.join(directory, file_name)) @@ -424,10 +424,10 @@ def _upgrade_existing_database( directories.append(os.path.join(schema_path, database, "delta", str(v))) # Used to check if we have any duplicate file names - file_name_counter = Counter() # type: CounterType[str] + file_name_counter: CounterType[str] = Counter() # Now find which directories have anything of interest. - directory_entries = [] # type: List[_DirectoryListing] + directory_entries: List[_DirectoryListing] = [] for directory in directories: logger.debug("Looking for schema deltas in %s", directory) try: diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c9dce726cb..f8fbba9d38 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -91,7 +91,7 @@ class StateFilter: Returns: The new state filter. """ - type_dict = {} # type: Dict[str, Optional[Set[str]]] + type_dict: Dict[str, Optional[Set[str]]] = {} for typ, s in types: if typ in type_dict: if type_dict[typ] is None: @@ -194,7 +194,7 @@ class StateFilter: """ where_clause = "" - where_args = [] # type: List[str] + where_args: List[str] = [] if self.is_full(): return where_clause, where_args diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index f1e62f9e85..c768fdea56 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -112,7 +112,7 @@ class StreamIdGenerator: # insertion ordering will ensure its in the correct ordering. # # The key and values are the same, but we never look at the values. - self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int] + self._unfinished_ids: OrderedDict[int, int] = OrderedDict() def get_next(self): """ @@ -236,15 +236,15 @@ class MultiWriterIdGenerator: # Note: If we are a negative stream then we still store all the IDs as # positive to make life easier for us, and simply negate the IDs when we # return them. - self._current_positions = {} # type: Dict[str, int] + self._current_positions: Dict[str, int] = {} # Set of local IDs that we're still processing. The current position # should be less than the minimum of this set (if not empty). - self._unfinished_ids = set() # type: Set[int] + self._unfinished_ids: Set[int] = set() # Set of local IDs that we've processed that are larger than the current # position, due to there being smaller unpersisted IDs. - self._finished_ids = set() # type: Set[int] + self._finished_ids: Set[int] = set() # We track the max position where we know everything before has been # persisted. This is done by a) looking at the min across all instances @@ -265,7 +265,7 @@ class MultiWriterIdGenerator: self._persisted_upto_position = ( min(self._current_positions.values()) if self._current_positions else 1 ) - self._known_persisted_positions = [] # type: List[int] + self._known_persisted_positions: List[int] = [] self._sequence_gen = PostgresSequenceGenerator(sequence_name) @@ -465,7 +465,7 @@ class MultiWriterIdGenerator: self._unfinished_ids.discard(next_id) self._finished_ids.add(next_id) - new_cur = None # type: Optional[int] + new_cur: Optional[int] = None if self._unfinished_ids: # If there are unfinished IDs then the new position will be the diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 30b6b8e0ca..bb33e04fb1 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -208,10 +208,10 @@ class LocalSequenceGenerator(SequenceGenerator): get_next_id_txn; should return the curreent maximum id """ # the callback. this is cleared after it is called, so that it can be GCed. - self._callback = get_first_callback # type: Optional[GetFirstCallbackType] + self._callback: Optional[GetFirstCallbackType] = get_first_callback # The current max value, or None if we haven't looked in the DB yet. - self._current_max_id = None # type: Optional[int] + self._current_max_id: Optional[int] = None self._lock = threading.Lock() def get_next_id_txn(self, txn: Cursor) -> int: @@ -274,7 +274,7 @@ def build_sequence_generator( `check_consistency` details. """ if isinstance(database_engine, PostgresEngine): - seq = PostgresSequenceGenerator(sequence_name) # type: SequenceGenerator + seq: SequenceGenerator = PostgresSequenceGenerator(sequence_name) else: seq = LocalSequenceGenerator(get_first_callback) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 061102c3c8..014db1355b 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -257,7 +257,7 @@ class Linearizer: max_count: The maximum number of concurrent accesses """ if name is None: - self.name = id(self) # type: Union[str, int] + self.name: Union[str, int] = id(self) else: self.name = name @@ -269,7 +269,7 @@ class Linearizer: self.max_count = max_count # key_to_defer is a map from the key to a _LinearizerEntry. - self.key_to_defer = {} # type: Dict[Hashable, _LinearizerEntry] + self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {} def is_queued(self, key: Hashable) -> bool: """Checks whether there is a process queued up waiting""" @@ -409,10 +409,10 @@ class ReadWriteLock: def __init__(self): # Latest readers queued - self.key_to_current_readers = {} # type: Dict[str, Set[defer.Deferred]] + self.key_to_current_readers: Dict[str, Set[defer.Deferred]] = {} # Latest writer queued - self.key_to_current_writer = {} # type: Dict[str, defer.Deferred] + self.key_to_current_writer: Dict[str, defer.Deferred] = {} async def read(self, key: str) -> ContextManager: new_defer = defer.Deferred() diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py index 8fd5bfb69b..274cea7eb7 100644 --- a/synapse/util/batching_queue.py +++ b/synapse/util/batching_queue.py @@ -93,11 +93,11 @@ class BatchingQueue(Generic[V, R]): self._clock = clock # The set of keys currently being processed. - self._processing_keys = set() # type: Set[Hashable] + self._processing_keys: Set[Hashable] = set() # The currently pending batch of values by key, with a Deferred to call # with the result of the corresponding `_process_batch_callback` call. - self._next_values = {} # type: Dict[Hashable, List[Tuple[V, defer.Deferred]]] + self._next_values: Dict[Hashable, List[Tuple[V, defer.Deferred]]] = {} # The function to call with batches of values. self._process_batch_callback = process_batch_callback @@ -108,9 +108,7 @@ class BatchingQueue(Generic[V, R]): number_of_keys.labels(self._name).set_function(lambda: len(self._next_values)) - self._number_in_flight_metric = number_in_flight.labels( - self._name - ) # type: Gauge + self._number_in_flight_metric: Gauge = number_in_flight.labels(self._name) async def add_to_queue(self, value: V, key: Hashable = ()) -> R: """Adds the value to the queue with the given key, returning the result diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index ca36f07c20..9012034b7a 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -29,8 +29,8 @@ logger = logging.getLogger(__name__) TRACK_MEMORY_USAGE = False -caches_by_name = {} # type: Dict[str, Sized] -collectors_by_name = {} # type: Dict[str, CacheMetric] +caches_by_name: Dict[str, Sized] = {} +collectors_by_name: Dict[str, "CacheMetric"] = {} cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py index a301c9e89b..891bee0b33 100644 --- a/synapse/util/caches/cached_call.py +++ b/synapse/util/caches/cached_call.py @@ -63,9 +63,9 @@ class CachedCall(Generic[TV]): f: The underlying function. Only one call to this function will be alive at once (per instance of CachedCall) """ - self._callable = f # type: Optional[Callable[[], Awaitable[TV]]] - self._deferred = None # type: Optional[Deferred] - self._result = None # type: Union[None, Failure, TV] + self._callable: Optional[Callable[[], Awaitable[TV]]] = f + self._deferred: Optional[Deferred] = None + self._result: Union[None, Failure, TV] = None async def get(self) -> TV: """Kick off the call if necessary, and return the result""" diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 1044139119..8c6fafc677 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -80,25 +80,25 @@ class DeferredCache(Generic[KT, VT]): cache_type = TreeCache if tree else dict # _pending_deferred_cache maps from the key value to a `CacheEntry` object. - self._pending_deferred_cache = ( - cache_type() - ) # type: Union[TreeCache, MutableMapping[KT, CacheEntry]] + self._pending_deferred_cache: Union[ + TreeCache, "MutableMapping[KT, CacheEntry]" + ] = cache_type() def metrics_cb(): cache_pending_metric.labels(name).set(len(self._pending_deferred_cache)) # cache is used for completed results and maps to the result itself, rather than # a Deferred. - self.cache = LruCache( + self.cache: LruCache[KT, VT] = LruCache( max_size=max_entries, cache_name=name, cache_type=cache_type, size_callback=(lambda d: len(d) or 1) if iterable else None, metrics_collection_callback=metrics_cb, apply_cache_factor_from_config=apply_cache_factor_from_config, - ) # type: LruCache[KT, VT] + ) - self.thread = None # type: Optional[threading.Thread] + self.thread: Optional[threading.Thread] = None @property def max_entries(self): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index d77e8edeea..1e8e6b1d01 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -46,17 +46,17 @@ F = TypeVar("F", bound=Callable[..., Any]) class _CachedFunction(Generic[F]): - invalidate = None # type: Any - invalidate_all = None # type: Any - prefill = None # type: Any - cache = None # type: Any - num_args = None # type: Any + invalidate: Any = None + invalidate_all: Any = None + prefill: Any = None + cache: Any = None + num_args: Any = None - __name__ = None # type: str + __name__: str # Note: This function signature is actually fiddled with by the synapse mypy # plugin to a) make it a bound method, and b) remove any `cache_context` arg. - __call__ = None # type: F + __call__: F class _CacheDescriptorBase: @@ -115,8 +115,8 @@ class _CacheDescriptorBase: class _LruCachedFunction(Generic[F]): - cache = None # type: LruCache[CacheKey, Any] - __call__ = None # type: F + cache: LruCache[CacheKey, Any] + __call__: F def lru_cache( @@ -180,10 +180,10 @@ class LruCacheDescriptor(_CacheDescriptorBase): self.max_entries = max_entries def __get__(self, obj, owner): - cache = LruCache( + cache: LruCache[CacheKey, Any] = LruCache( cache_name=self.orig.__name__, max_size=self.max_entries, - ) # type: LruCache[CacheKey, Any] + ) get_cache_key = self.cache_key_builder sentinel = LruCacheDescriptor._Sentinel.sentinel @@ -271,12 +271,12 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): self.iterable = iterable def __get__(self, obj, owner): - cache = DeferredCache( + cache: DeferredCache[CacheKey, Any] = DeferredCache( name=self.orig.__name__, max_entries=self.max_entries, tree=self.tree, iterable=self.iterable, - ) # type: DeferredCache[CacheKey, Any] + ) get_cache_key = self.cache_key_builder @@ -359,7 +359,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): def __get__(self, obj, objtype=None): cached_method = getattr(obj, self.cached_method_name) - cache = cached_method.cache # type: DeferredCache[CacheKey, Any] + cache: DeferredCache[CacheKey, Any] = cached_method.cache num_args = cached_method.num_args @functools.wraps(self.orig) @@ -472,15 +472,15 @@ class _CacheContext: Cache = Union[DeferredCache, LruCache] - _cache_context_objects = ( - WeakValueDictionary() - ) # type: WeakValueDictionary[Tuple[_CacheContext.Cache, CacheKey], _CacheContext] + _cache_context_objects: """WeakValueDictionary[ + Tuple["_CacheContext.Cache", CacheKey], "_CacheContext" + ]""" = WeakValueDictionary() def __init__(self, cache: "_CacheContext.Cache", cache_key: CacheKey) -> None: self._cache = cache self._cache_key = cache_key - def invalidate(self): # type: () -> None + def invalidate(self) -> None: """Invalidates the cache entry referred to by the context.""" self._cache.invalidate(self._cache_key) diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 56d94d96ce..3f852edd7f 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -62,13 +62,13 @@ class DictionaryCache(Generic[KT, DKT]): """ def __init__(self, name: str, max_entries: int = 1000): - self.cache = LruCache( + self.cache: LruCache[KT, DictionaryEntry] = LruCache( max_size=max_entries, cache_name=name, size_callback=len - ) # type: LruCache[KT, DictionaryEntry] + ) self.name = name self.sequence = 0 - self.thread = None # type: Optional[threading.Thread] + self.thread: Optional[threading.Thread] = None def check_thread(self) -> None: expected_thread = self.thread diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index ac47a31cd7..bde16b8577 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -27,7 +27,7 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) -SENTINEL = object() # type: Any +SENTINEL: Any = object() T = TypeVar("T") @@ -71,7 +71,7 @@ class ExpiringCache(Generic[KT, VT]): self._expiry_ms = expiry_ms self._reset_expiry_on_get = reset_expiry_on_get - self._cache = OrderedDict() # type: OrderedDict[KT, _CacheEntry] + self._cache: OrderedDict[KT, _CacheEntry] = OrderedDict() self.iterable = iterable diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 4b9d0433ff..efeba0cb96 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -226,7 +226,7 @@ class _Node: # footprint down. Storing `None` is free as its a singleton, while empty # lists are 56 bytes (and empty sets are 216 bytes, if we did the naive # thing and used sets). - self.callbacks = None # type: Optional[List[Callable[[], None]]] + self.callbacks: Optional[List[Callable[[], None]]] = None self.add_callbacks(callbacks) @@ -362,15 +362,15 @@ class LruCache(Generic[KT, VT]): # register_cache might call our "set_cache_factor" callback; there's nothing to # do yet when we get resized. - self._on_resize = None # type: Optional[Callable[[],None]] + self._on_resize: Optional[Callable[[], None]] = None if cache_name is not None: - metrics = register_cache( + metrics: Optional[CacheMetric] = register_cache( "lru_cache", cache_name, self, collect_callback=metrics_collection_callback, - ) # type: Optional[CacheMetric] + ) else: metrics = None diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 34c662c4db..ed7204336f 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -66,7 +66,7 @@ class ResponseCache(Generic[KV]): # This is poorly-named: it includes both complete and incomplete results. # We keep complete results rather than switching to absolute values because # that makes it easier to cache Failure results. - self.pending_result_cache = {} # type: Dict[KV, ObservableDeferred] + self.pending_result_cache: Dict[KV, ObservableDeferred] = {} self.clock = clock self.timeout_sec = timeout_ms / 1000.0 diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index e81e468899..3a41a8baa6 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -45,10 +45,10 @@ class StreamChangeCache: ): self._original_max_size = max_size self._max_size = math.floor(max_size) - self._entity_to_key = {} # type: Dict[EntityType, int] + self._entity_to_key: Dict[EntityType, int] = {} # map from stream id to the a set of entities which changed at that stream id. - self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]] + self._cache: SortedDict[int, Set[EntityType]] = SortedDict() # the earliest stream_pos for which we can reliably answer # get_all_entities_changed. In other words, one less than the earliest @@ -155,7 +155,7 @@ class StreamChangeCache: if stream_pos < self._earliest_known_stream_pos: return None - changed_entities = [] # type: List[EntityType] + changed_entities: List[EntityType] = [] for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)): changed_entities.extend(self._cache[k]) diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index c276107d56..46afe3f934 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -23,7 +23,7 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) -SENTINEL = object() # type: Any +SENTINEL: Any = object() T = TypeVar("T") KT = TypeVar("KT") @@ -35,10 +35,10 @@ class TTLCache(Generic[KT, VT]): def __init__(self, cache_name: str, timer: Callable[[], float] = time.time): # map from key to _CacheEntry - self._data = {} # type: Dict[KT, _CacheEntry] + self._data: Dict[KT, _CacheEntry] = {} # the _CacheEntries, sorted by expiry time - self._expiry_list = SortedList() # type: SortedList[_CacheEntry] + self._expiry_list: SortedList[_CacheEntry] = SortedList() self._timer = timer diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 886afa9d19..8ac3eab2f5 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -68,7 +68,7 @@ def sorted_topologically( # This is implemented by Kahn's algorithm. degree_map = {node: 0 for node in nodes} - reverse_graph = {} # type: Dict[T, Set[T]] + reverse_graph: Dict[T, Set[T]] = {} for node, edges in graph.items(): if node not in degree_map: diff --git a/synapse/util/macaroons.py b/synapse/util/macaroons.py index f6ebfd7e7d..d1f76e3dc5 100644 --- a/synapse/util/macaroons.py +++ b/synapse/util/macaroons.py @@ -39,7 +39,7 @@ def get_value_from_macaroon(macaroon: pymacaroons.Macaroon, key: str) -> str: caveat in the macaroon, or if the caveat was not found in the macaroon. """ prefix = key + " = " - result = None # type: Optional[str] + result: Optional[str] = None for caveat in macaroon.caveats: if not caveat.caveat_id.startswith(prefix): continue diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 45353d41c5..1b82dca81b 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -124,7 +124,7 @@ class Measure: assert isinstance(curr_context, LoggingContext) parent_context = curr_context self._logging_context = LoggingContext(str(curr_context), parent_context) - self.start = None # type: Optional[int] + self.start: Optional[int] = None def __enter__(self) -> "Measure": if self.start is not None: diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index eed0291cae..99f01e325c 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -41,7 +41,7 @@ def do_patch(): @functools.wraps(f) def wrapped(*args, **kwargs): start_context = current_context() - changes = [] # type: List[str] + changes: List[str] = [] orig = orig_inline_callbacks(_check_yield_points(f, changes)) try: @@ -131,7 +131,7 @@ def _check_yield_points(f: Callable, changes: List[str]): gen = f(*args, **kwargs) last_yield_line_no = gen.gi_frame.f_lineno - result = None # type: Any + result: Any = None while True: expected_context = current_context() -- cgit 1.5.1 From 7387d6f6249277482964e588462619c8b23a9d82 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 19 Jul 2021 04:16:46 -0500 Subject: Remove unused `events_by_room` (#10421) It looks like it was first used and introduced in https://github.com/matrix-org/synapse/commit/5130d80d79fe1f95ce03b8f1cfd4fbf0a32f5ac8#diff-8a4a36a7728107b2ccaff2cb405dbab229a1100fe50653a63d1aa9ac10ae45e8R305 but the But the usage was removed in https://github.com/matrix-org/synapse/commit/4c6a31cd6efa25be4c9f1b357e8f92065fac63eb#diff-8a4a36a7728107b2ccaff2cb405dbab229a1100fe50653a63d1aa9ac10ae45e8 --- changelog.d/10421.misc | 1 + synapse/storage/databases/main/events.py | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) create mode 100644 changelog.d/10421.misc (limited to 'synapse/storage') diff --git a/changelog.d/10421.misc b/changelog.d/10421.misc new file mode 100644 index 0000000000..385cbe07af --- /dev/null +++ b/changelog.d/10421.misc @@ -0,0 +1 @@ +Remove unused `events_by_room` code (tech debt). diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index ec8579b9ad..a396a201d4 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2010,10 +2010,6 @@ class PersistEventsStore: Forward extremities are handled when we first start persisting the events. """ - events_by_room: Dict[str, List[EventBase]] = {} - for ev in events: - events_by_room.setdefault(ev.room_id, []).append(ev) - query = ( "INSERT INTO event_backward_extremities (event_id, room_id)" " SELECT ?, ? WHERE NOT EXISTS (" -- cgit 1.5.1 From 95e47b2e782b5e7afa5fd2afd1d0ea7745eaac36 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Mon, 19 Jul 2021 16:28:05 +0200 Subject: [pyupgrade] `synapse/` (#10348) This PR is tantamount to running ``` pyupgrade --py36-plus --keep-percent-format `find synapse/ -type f -name "*.py"` ``` Part of #9744 --- changelog.d/10348.misc | 1 + synapse/app/generic_worker.py | 6 ++-- synapse/app/homeserver.py | 6 ++-- synapse/config/appservice.py | 2 +- synapse/config/tls.py | 6 ++-- synapse/handlers/cas.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/handlers/identity.py | 4 +-- synapse/handlers/oidc.py | 38 ++++++++++++++------------ synapse/handlers/register.py | 15 ++++------ synapse/handlers/saml.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/http/proxyagent.py | 2 +- synapse/http/site.py | 2 +- synapse/logging/opentracing.py | 2 +- synapse/metrics/_exposition.py | 26 ++++++++---------- synapse/metrics/background_process_metrics.py | 3 +- synapse/rest/client/v1/login.py | 25 ++++++----------- synapse/rest/media/v1/__init__.py | 4 +-- synapse/storage/database.py | 2 +- synapse/storage/databases/main/deviceinbox.py | 4 +-- synapse/storage/databases/main/group_server.py | 6 +++- synapse/storage/databases/main/roommember.py | 2 +- synapse/storage/prepare_database.py | 2 +- synapse/types.py | 4 +-- synapse/util/caches/lrucache.py | 3 +- synapse/util/caches/treecache.py | 3 +- synapse/util/daemonize.py | 8 +++--- synapse/visibility.py | 4 +-- 29 files changed, 86 insertions(+), 102 deletions(-) create mode 100644 changelog.d/10348.misc (limited to 'synapse/storage') diff --git a/changelog.d/10348.misc b/changelog.d/10348.misc new file mode 100644 index 0000000000..b2275a1350 --- /dev/null +++ b/changelog.d/10348.misc @@ -0,0 +1 @@ +Run `pyupgrade` on the codebase. \ No newline at end of file diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index b43d858f59..c3d4992518 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -395,10 +395,8 @@ class GenericWorkerServer(HomeServer): elif listener.type == "metrics": if not self.config.enable_metrics: logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) + "Metrics listener configured, but " + "enable_metrics is not True!" ) else: _base.listen_metrics(listener.bind_addresses, listener.port) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7af56ac136..920b34d97b 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -305,10 +305,8 @@ class SynapseHomeServer(HomeServer): elif listener.type == "metrics": if not self.config.enable_metrics: logger.warning( - ( - "Metrics listener configured, but " - "enable_metrics is not True!" - ) + "Metrics listener configured, but " + "enable_metrics is not True!" ) else: _base.listen_metrics(listener.bind_addresses, listener.port) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index a39d457c56..1ebea88db2 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -64,7 +64,7 @@ def load_appservices(hostname, config_files): for config_file in config_files: try: - with open(config_file, "r") as f: + with open(config_file) as f: appservice = _load_appservice(hostname, yaml.safe_load(f), config_file) if appservice.id in seen_ids: raise ConfigError( diff --git a/synapse/config/tls.py b/synapse/config/tls.py index fed05ac7be..5679f05e42 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -66,10 +66,8 @@ class TlsConfig(Config): if self.federation_client_minimum_tls_version == "1.3": if getattr(SSL, "OP_NO_TLSv1_3", None) is None: raise ConfigError( - ( - "federation_client_minimum_tls_version cannot be 1.3, " - "your OpenSSL does not support it" - ) + "federation_client_minimum_tls_version cannot be 1.3, " + "your OpenSSL does not support it" ) # Whitelist of domains to not verify certificates for diff --git a/synapse/handlers/cas.py b/synapse/handlers/cas.py index b681d208bc..0325f86e20 100644 --- a/synapse/handlers/cas.py +++ b/synapse/handlers/cas.py @@ -40,7 +40,7 @@ class CasError(Exception): def __str__(self): if self.error_description: - return "{}: {}".format(self.error, self.error_description) + return f"{self.error}: {self.error_description}" return self.error diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5c4463583e..cf389be3e4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -735,7 +735,7 @@ class FederationHandler(BaseHandler): # we need to make sure we re-load from the database to get the rejected # state correct. fetched_events.update( - (await self.store.get_events(missing_desired_events, allow_rejected=True)) + await self.store.get_events(missing_desired_events, allow_rejected=True) ) # check for events which were in the wrong room. diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 33d16fbf9c..0961dec5ab 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -302,7 +302,7 @@ class IdentityHandler(BaseHandler): ) url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,) - url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii") + url_bytes = b"/_matrix/identity/api/v1/3pid/unbind" content = { "mxid": mxid, @@ -695,7 +695,7 @@ class IdentityHandler(BaseHandler): return data["mxid"] except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") - except IOError as e: + except OSError as e: logger.warning("Error from v1 identity server lookup: %s" % (e,)) return None diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index a330c48fa7..eca8f16040 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -72,26 +72,26 @@ _SESSION_COOKIES = [ (b"oidc_session_no_samesite", b"HttpOnly"), ] + #: A token exchanged from the token endpoint, as per RFC6749 sec 5.1. and #: OpenID.Core sec 3.1.3.3. -Token = TypedDict( - "Token", - { - "access_token": str, - "token_type": str, - "id_token": Optional[str], - "refresh_token": Optional[str], - "expires_in": int, - "scope": Optional[str], - }, -) +class Token(TypedDict): + access_token: str + token_type: str + id_token: Optional[str] + refresh_token: Optional[str] + expires_in: int + scope: Optional[str] + #: A JWK, as per RFC7517 sec 4. The type could be more precise than that, but #: there is no real point of doing this in our case. JWK = Dict[str, str] + #: A JWK Set, as per RFC7517 sec 5. -JWKS = TypedDict("JWKS", {"keys": List[JWK]}) +class JWKS(TypedDict): + keys: List[JWK] class OidcHandler: @@ -255,7 +255,7 @@ class OidcError(Exception): def __str__(self): if self.error_description: - return "{}: {}".format(self.error, self.error_description) + return f"{self.error}: {self.error_description}" return self.error @@ -639,7 +639,7 @@ class OidcProvider: ) logger.warning(description) # Body was still valid JSON. Might be useful to log it for debugging. - logger.warning("Code exchange response: {resp!r}".format(resp=resp)) + logger.warning("Code exchange response: %r", resp) raise OidcError("server_error", description) return resp @@ -1217,10 +1217,12 @@ class OidcSessionData: ui_auth_session_id = attr.ib(type=str) -UserAttributeDict = TypedDict( - "UserAttributeDict", - {"localpart": Optional[str], "display_name": Optional[str], "emails": List[str]}, -) +class UserAttributeDict(TypedDict): + localpart: Optional[str] + display_name: Optional[str] + emails: List[str] + + C = TypeVar("C") diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 056fe5e89f..8cf614136e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -55,15 +55,12 @@ login_counter = Counter( ["guest", "auth_provider"], ) -LoginDict = TypedDict( - "LoginDict", - { - "device_id": str, - "access_token": str, - "valid_until_ms": Optional[int], - "refresh_token": Optional[str], - }, -) + +class LoginDict(TypedDict): + device_id: str + access_token: str + valid_until_ms: Optional[int] + refresh_token: Optional[str] class RegistrationHandler(BaseHandler): diff --git a/synapse/handlers/saml.py b/synapse/handlers/saml.py index 72f54c9403..e6e71e9729 100644 --- a/synapse/handlers/saml.py +++ b/synapse/handlers/saml.py @@ -372,7 +372,7 @@ class SamlHandler(BaseHandler): DOT_REPLACE_PATTERN = re.compile( - ("[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),)) + "[^%s]" % (re.escape("".join(mxid_localpart_allowed_characters)),) ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 722c4ae670..150a4f291e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1601,7 +1601,7 @@ class SyncHandler: logger.debug( "Membership changes in %s: [%s]", room_id, - ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)), + ", ".join("%s (%s)" % (e.event_id, e.membership) for e in events), ) non_joins = [e for e in events if e.membership != Membership.JOIN] diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 7a6a1717de..f7193e60bd 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -172,7 +172,7 @@ class ProxyAgent(_AgentBase): """ uri = uri.strip() if not _VALID_URI.match(uri): - raise ValueError("Invalid URI {!r}".format(uri)) + raise ValueError(f"Invalid URI {uri!r}") parsed_uri = URI.fromBytes(uri) pool_key = (parsed_uri.scheme, parsed_uri.host, parsed_uri.port) diff --git a/synapse/http/site.py b/synapse/http/site.py index 3b0a38124e..190084e8aa 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -384,7 +384,7 @@ class SynapseRequest(Request): # authenticated (e.g. and admin is puppetting a user) then we log both. requester, authenticated_entity = self.get_authenticated_entity() if authenticated_entity: - requester = "{}.{}".format(authenticated_entity, requester) + requester = f"{authenticated_entity}.{requester}" self.site.access_logger.log( log_level, diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 185844f188..ecd51f1b4a 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -374,7 +374,7 @@ def init_tracer(hs: "HomeServer"): config = JaegerConfig( config=hs.config.jaeger_config, - service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()), + service_name=f"{hs.config.server_name} {hs.get_instance_name()}", scope_manager=LogContextScopeManager(hs.config), metrics_factory=PrometheusMetricsFactory(), ) diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index 7e49d0d02c..bb9bcb5592 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -34,7 +34,7 @@ from twisted.web.resource import Resource from synapse.util import caches -CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8") +CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8" INF = float("inf") @@ -55,8 +55,8 @@ def floatToGoString(d): # Go switches to exponents sooner than Python. # We only need to care about positive values for le/quantile. if d > 0 and dot > 6: - mantissa = "{0}.{1}{2}".format(s[0], s[1:dot], s[dot + 1 :]).rstrip("0.") - return "{0}e+0{1}".format(mantissa, dot - 1) + mantissa = f"{s[0]}.{s[1:dot]}{s[dot + 1 :]}".rstrip("0.") + return f"{mantissa}e+0{dot - 1}" return s @@ -65,7 +65,7 @@ def sample_line(line, name): labelstr = "{{{0}}}".format( ",".join( [ - '{0}="{1}"'.format( + '{}="{}"'.format( k, v.replace("\\", r"\\").replace("\n", r"\n").replace('"', r"\""), ) @@ -78,10 +78,8 @@ def sample_line(line, name): timestamp = "" if line.timestamp is not None: # Convert to milliseconds. - timestamp = " {0:d}".format(int(float(line.timestamp) * 1000)) - return "{0}{1} {2}{3}\n".format( - name, labelstr, floatToGoString(line.value), timestamp - ) + timestamp = f" {int(float(line.timestamp) * 1000):d}" + return "{}{} {}{}\n".format(name, labelstr, floatToGoString(line.value), timestamp) def generate_latest(registry, emit_help=False): @@ -118,12 +116,12 @@ def generate_latest(registry, emit_help=False): # Output in the old format for compatibility. if emit_help: output.append( - "# HELP {0} {1}\n".format( + "# HELP {} {}\n".format( mname, metric.documentation.replace("\\", r"\\").replace("\n", r"\n"), ) ) - output.append("# TYPE {0} {1}\n".format(mname, mtype)) + output.append(f"# TYPE {mname} {mtype}\n") om_samples: Dict[str, List[str]] = {} for s in metric.samples: @@ -143,13 +141,13 @@ def generate_latest(registry, emit_help=False): for suffix, lines in sorted(om_samples.items()): if emit_help: output.append( - "# HELP {0}{1} {2}\n".format( + "# HELP {}{} {}\n".format( metric.name, suffix, metric.documentation.replace("\\", r"\\").replace("\n", r"\n"), ) ) - output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix)) + output.append(f"# TYPE {metric.name}{suffix} gauge\n") output.extend(lines) # Get rid of the weird colon things while we're at it @@ -163,12 +161,12 @@ def generate_latest(registry, emit_help=False): # Also output in the new format, if it's different. if emit_help: output.append( - "# HELP {0} {1}\n".format( + "# HELP {} {}\n".format( mnewname, metric.documentation.replace("\\", r"\\").replace("\n", r"\n"), ) ) - output.append("# TYPE {0} {1}\n".format(mnewname, mtype)) + output.append(f"# TYPE {mnewname} {mtype}\n") for s in metric.samples: # Get rid of the OpenMetrics specific samples (we should already have diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 4455fa71a8..3a14260752 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -137,8 +137,7 @@ class _Collector: _background_process_db_txn_duration, _background_process_db_sched_duration, ): - for r in m.collect(): - yield r + yield from m.collect() REGISTRY.register(_Collector()) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 99d02cb355..11567bf32c 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -44,19 +44,14 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -LoginResponse = TypedDict( - "LoginResponse", - { - "user_id": str, - "access_token": str, - "home_server": str, - "expires_in_ms": Optional[int], - "refresh_token": Optional[str], - "device_id": str, - "well_known": Optional[Dict[str, Any]], - }, - total=False, -) +class LoginResponse(TypedDict, total=False): + user_id: str + access_token: str + home_server: str + expires_in_ms: Optional[int] + refresh_token: Optional[str] + device_id: str + well_known: Optional[Dict[str, Any]] class LoginRestServlet(RestServlet): @@ -150,9 +145,7 @@ class LoginRestServlet(RestServlet): # login flow types returned. flows.append({"type": LoginRestServlet.TOKEN_TYPE}) - flows.extend( - ({"type": t} for t in self.auth_handler.get_supported_login_types()) - ) + flows.extend({"type": t} for t in self.auth_handler.get_supported_login_types()) flows.append({"type": LoginRestServlet.APPSERVICE_TYPE}) diff --git a/synapse/rest/media/v1/__init__.py b/synapse/rest/media/v1/__init__.py index d20186bbd0..3dd16d4bb5 100644 --- a/synapse/rest/media/v1/__init__.py +++ b/synapse/rest/media/v1/__init__.py @@ -17,7 +17,7 @@ import PIL.Image # check for JPEG support. try: PIL.Image._getdecoder("rgb", "jpeg", None) -except IOError as e: +except OSError as e: if str(e).startswith("decoder jpeg not available"): raise Exception( "FATAL: jpeg codec not supported. Install pillow correctly! " @@ -32,7 +32,7 @@ except Exception: # check for PNG support. try: PIL.Image._getdecoder("rgb", "zip", None) -except IOError as e: +except OSError as e: if str(e).startswith("decoder zip not available"): raise Exception( "FATAL: zip codec not supported. Install pillow correctly! " diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f80d822c12..ccf9ac51ef 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -907,7 +907,7 @@ class DatabasePool: # The sort is to ensure that we don't rely on dictionary iteration # order. keys, vals = zip( - *[zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i] + *(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i) ) for k in keys: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 50e7ddd735..c55508867d 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -203,9 +203,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): "delete_messages_for_device", delete_messages_for_device_txn ) - log_kv( - {"message": "deleted {} messages for device".format(count), "count": count} - ) + log_kv({"message": f"deleted {count} messages for device", "count": count}) # Update the cache, ensuring that we only ever increase the value last_deleted_stream_id = self._last_device_delete_cache.get( diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index 66ad363bfb..e70d3649ff 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -27,8 +27,11 @@ from synapse.util import json_encoder _DEFAULT_CATEGORY_ID = "" _DEFAULT_ROLE_ID = "" + # A room in a group. -_RoomInGroup = TypedDict("_RoomInGroup", {"room_id": str, "is_public": bool}) +class _RoomInGroup(TypedDict): + room_id: str + is_public: bool class GroupServerWorkerStore(SQLBaseStore): @@ -92,6 +95,7 @@ class GroupServerWorkerStore(SQLBaseStore): "is_public": False # Whether this is a public room or not } """ + # TODO: Pagination def _get_rooms_in_group_txn(txn): diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4d82c4c26d..68f1b40ea6 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -649,7 +649,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): event_to_memberships = await self._get_joined_profiles_from_event_ids( missing_member_event_ids ) - users_in_room.update((row for row in event_to_memberships.values() if row)) + users_in_room.update(row for row in event_to_memberships.values() if row) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 82a7686df0..61392b9639 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -639,7 +639,7 @@ def get_statements(f: Iterable[str]) -> Generator[str, None, None]: def executescript(txn: Cursor, schema_path: str) -> None: - with open(schema_path, "r") as f: + with open(schema_path) as f: execute_statements_from_stream(txn, f) diff --git a/synapse/types.py b/synapse/types.py index fad23c8700..429bb013d2 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -577,10 +577,10 @@ class RoomStreamToken: entries = [] for name, pos in self.instance_map.items(): instance_id = await store.get_id_for_instance(name) - entries.append("{}.{}".format(instance_id, pos)) + entries.append(f"{instance_id}.{pos}") encoded_map = "~".join(entries) - return "m{}~{}".format(self.stream, encoded_map) + return f"m{self.stream}~{encoded_map}" else: return "s%d" % (self.stream,) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index efeba0cb96..5c65d187b6 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -90,8 +90,7 @@ def enumerate_leaves(node, depth): yield node else: for n in node.values(): - for m in enumerate_leaves(n, depth - 1): - yield m + yield from enumerate_leaves(n, depth - 1) P = TypeVar("P") diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index a6df81ebff..4138931e7b 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -138,7 +138,6 @@ def iterate_tree_cache_entry(d): """ if isinstance(d, TreeCacheNode): for value_d in d.values(): - for value in iterate_tree_cache_entry(value_d): - yield value + yield from iterate_tree_cache_entry(value_d) else: yield d diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py index 31b24dd188..d8532411c2 100644 --- a/synapse/util/daemonize.py +++ b/synapse/util/daemonize.py @@ -31,13 +31,13 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") - # If pidfile already exists, we should read pid from there; to overwrite it, if # locking will fail, because locking attempt somehow purges the file contents. if os.path.isfile(pid_file): - with open(pid_file, "r") as pid_fh: + with open(pid_file) as pid_fh: old_pid = pid_fh.read() # Create a lockfile so that only one instance of this daemon is running at any time. try: lock_fh = open(pid_file, "w") - except IOError: + except OSError: print("Unable to create the pidfile.") sys.exit(1) @@ -45,7 +45,7 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") - # Try to get an exclusive lock on the file. This will fail if another process # has the file locked. fcntl.flock(lock_fh, fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError: + except OSError: print("Unable to lock on the pidfile.") # We need to overwrite the pidfile if we got here. # @@ -113,7 +113,7 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") - try: lock_fh.write("%s" % (os.getpid())) lock_fh.flush() - except IOError: + except OSError: logger.error("Unable to write pid to the pidfile.") print("Unable to write pid to the pidfile.") sys.exit(1) diff --git a/synapse/visibility.py b/synapse/visibility.py index 1dc6b90275..17532059e9 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -96,7 +96,7 @@ async def filter_events_for_client( if isinstance(ignored_users_dict, dict): ignore_list = frozenset(ignored_users_dict.keys()) - erased_senders = await storage.main.are_users_erased((e.sender for e in events)) + erased_senders = await storage.main.are_users_erased(e.sender for e in events) if filter_send_to_client: room_ids = {e.room_id for e in events} @@ -353,7 +353,7 @@ async def filter_events_for_server( ) if not check_history_visibility_only: - erased_senders = await storage.main.are_users_erased((e.sender for e in events)) + erased_senders = await storage.main.are_users_erased(e.sender for e in events) else: # We don't want to check whether users are erased, which is equivalent # to no users having been erased. -- cgit 1.5.1 From 4e340412c020f685cb402a735b983f6e332e206b Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Mon, 19 Jul 2021 16:11:34 +0100 Subject: Add a new version of the R30 phone-home metric, which removes a false impression of retention given by the old R30 metric (#10332) Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/10332.feature | 1 + synapse/app/phone_stats_home.py | 4 + synapse/storage/databases/main/metrics.py | 129 ++++++++++++++++ tests/app/test_phone_stats_home.py | 242 ++++++++++++++++++++++++++++++ tests/rest/client/v1/utils.py | 30 +++- tests/unittest.py | 15 +- 6 files changed, 416 insertions(+), 5 deletions(-) create mode 100644 changelog.d/10332.feature (limited to 'synapse/storage') diff --git a/changelog.d/10332.feature b/changelog.d/10332.feature new file mode 100644 index 0000000000..091947ff22 --- /dev/null +++ b/changelog.d/10332.feature @@ -0,0 +1 @@ +Add a new version of the R30 phone-home metric, which removes a false impression of retention given by the old R30 metric. diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 8f86cecb76..7904c246df 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -107,6 +107,10 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process): for name, count in r30_results.items(): stats["r30_users_" + name] = count + r30v2_results = await hs.get_datastore().count_r30_users() + for name, count in r30v2_results.items(): + stats["r30v2_users_" + name] = count + stats["cache_factor"] = hs.config.caches.global_factor stats["event_cache_size"] = hs.config.caches.event_cache_size diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index e3a544d9b2..dc0bbc56ac 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -316,6 +316,135 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): return await self.db_pool.runInteraction("count_r30_users", _count_r30_users) + async def count_r30v2_users(self) -> Dict[str, int]: + """ + Counts the number of 30 day retained users, defined as users that: + - Appear more than once in the past 60 days + - Have more than 30 days between the most and least recent appearances that + occurred in the past 60 days. + + (This is the second version of this metric, hence R30'v2') + + Returns: + A mapping from client type to the number of 30-day retained users for that client. + + The dict keys are: + - "all" (a combined number of users across any and all clients) + - "android" (Element Android) + - "ios" (Element iOS) + - "electron" (Element Desktop) + - "web" (any web application -- it's not possible to distinguish Element Web here) + """ + + def _count_r30v2_users(txn): + thirty_days_in_secs = 86400 * 30 + now = int(self._clock.time()) + sixty_days_ago_in_secs = now - 2 * thirty_days_in_secs + one_day_from_now_in_secs = now + 86400 + + # This is the 'per-platform' count. + sql = """ + SELECT + client_type, + count(client_type) + FROM + ( + SELECT + user_id, + CASE + WHEN + LOWER(user_agent) LIKE '%%riot%%' OR + LOWER(user_agent) LIKE '%%element%%' + THEN CASE + WHEN + LOWER(user_agent) LIKE '%%electron%%' + THEN 'electron' + WHEN + LOWER(user_agent) LIKE '%%android%%' + THEN 'android' + WHEN + LOWER(user_agent) LIKE '%%ios%%' + THEN 'ios' + ELSE 'unknown' + END + WHEN + LOWER(user_agent) LIKE '%%mozilla%%' OR + LOWER(user_agent) LIKE '%%gecko%%' + THEN 'web' + ELSE 'unknown' + END as client_type + FROM + user_daily_visits + WHERE + timestamp > ? + AND + timestamp < ? + GROUP BY + user_id, + client_type + HAVING + max(timestamp) - min(timestamp) > ? + ) AS temp + GROUP BY + client_type + ; + """ + + # We initialise all the client types to zero, so we get an explicit + # zero if they don't appear in the query results + results = {"ios": 0, "android": 0, "web": 0, "electron": 0} + txn.execute( + sql, + ( + sixty_days_ago_in_secs * 1000, + one_day_from_now_in_secs * 1000, + thirty_days_in_secs * 1000, + ), + ) + + for row in txn: + if row[0] == "unknown": + continue + results[row[0]] = row[1] + + # This is the 'all users' count. + sql = """ + SELECT COUNT(*) FROM ( + SELECT + 1 + FROM + user_daily_visits + WHERE + timestamp > ? + AND + timestamp < ? + GROUP BY + user_id + HAVING + max(timestamp) - min(timestamp) > ? + ) AS r30_users + """ + + txn.execute( + sql, + ( + sixty_days_ago_in_secs * 1000, + one_day_from_now_in_secs * 1000, + thirty_days_in_secs * 1000, + ), + ) + row = txn.fetchone() + if row is None: + results["all"] = 0 + else: + results["all"] = row[0] + + return results + + return await self.db_pool.runInteraction( + "count_r30v2_users", _count_r30v2_users + ) + def _get_start_of_day(self): """ Returns millisecond unixtime for start of UTC day. diff --git a/tests/app/test_phone_stats_home.py b/tests/app/test_phone_stats_home.py index 2da6ba4dde..5527e278db 100644 --- a/tests/app/test_phone_stats_home.py +++ b/tests/app/test_phone_stats_home.py @@ -1,9 +1,11 @@ import synapse +from synapse.app.phone_stats_home import start_phone_stats_home from synapse.rest.client.v1 import login, room from tests import unittest from tests.unittest import HomeserverTestCase +FIVE_MINUTES_IN_SECONDS = 300 ONE_DAY_IN_SECONDS = 86400 @@ -151,3 +153,243 @@ class PhoneHomeTestCase(HomeserverTestCase): # *Now* the user appears in R30. r30_results = self.get_success(self.hs.get_datastore().count_r30_users()) self.assertEqual(r30_results, {"all": 1, "unknown": 1}) + + +class PhoneHomeR30V2TestCase(HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def _advance_to(self, desired_time_secs: float): + now = self.hs.get_clock().time() + assert now < desired_time_secs + self.reactor.advance(desired_time_secs - now) + + def make_homeserver(self, reactor, clock): + hs = super(PhoneHomeR30V2TestCase, self).make_homeserver(reactor, clock) + + # We don't want our tests to actually report statistics, so check + # that it's not enabled + assert not hs.config.report_stats + + # This starts the needed data collection that we rely on to calculate + # R30v2 metrics. + start_phone_stats_home(hs) + return hs + + def test_r30v2_minimum_usage(self): + """ + Tests the minimum amount of interaction necessary for the R30v2 metric + to consider a user 'retained'. + """ + + # Register a user, log it in, create a room and send a message + user_id = self.register_user("u1", "secret!") + access_token = self.login("u1", "secret!") + room_id = self.helper.create_room_as(room_creator=user_id, tok=access_token) + self.helper.send(room_id, "message", tok=access_token) + first_post_at = self.hs.get_clock().time() + + # Give time for user_daily_visits table to be updated. + # (user_daily_visits is updated every 5 minutes using a looping call.) + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + store = self.hs.get_datastore() + + # Check the R30 results do not count that user. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 0, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + # Advance 31 days. + # (R30v2 includes users with **more** than 30 days between the two visits, + # and user_daily_visits records the timestamp as the start of the day.) + self.reactor.advance(31 * ONE_DAY_IN_SECONDS) + # Also advance 5 minutes to let another user_daily_visits update occur + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + # (Make sure the user isn't somehow counted by this point.) + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 0, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + # Send a message (this counts as activity) + self.helper.send(room_id, "message2", tok=access_token) + + # We have to wait a few minutes for the user_daily_visits table to + # be updated by a background process. + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + # *Now* the user is counted. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 1, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + # Advance to JUST under 60 days after the user's first post + self._advance_to(first_post_at + 60 * ONE_DAY_IN_SECONDS - 5) + + # Check the user is still counted. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 1, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + # Advance into the next day. The user's first activity is now more than 60 days old. + self._advance_to(first_post_at + 60 * ONE_DAY_IN_SECONDS + 5) + + # Check the user is now no longer counted in R30. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 0, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + def test_r30v2_user_must_be_retained_for_at_least_a_month(self): + """ + Tests that a newly-registered user must be retained for a whole month + before appearing in the R30v2 statistic, even if they post every day + during that time! + """ + + # set a custom user-agent to impersonate Element/Android. + headers = ( + ( + "User-Agent", + "Element/1.1 (Linux; U; Android 9; MatrixAndroidSDK_X 0.0.1)", + ), + ) + + # Register a user and send a message + user_id = self.register_user("u1", "secret!") + access_token = self.login("u1", "secret!", custom_headers=headers) + room_id = self.helper.create_room_as( + room_creator=user_id, tok=access_token, custom_headers=headers + ) + self.helper.send(room_id, "message", tok=access_token, custom_headers=headers) + + # Give time for user_daily_visits table to be updated. + # (user_daily_visits is updated every 5 minutes using a looping call.) + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + store = self.hs.get_datastore() + + # Check the user does not contribute to R30 yet. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 0, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + for _ in range(30): + # This loop posts a message every day for 30 days + self.reactor.advance(ONE_DAY_IN_SECONDS - FIVE_MINUTES_IN_SECONDS) + self.helper.send( + room_id, "I'm still here", tok=access_token, custom_headers=headers + ) + + # give time for user_daily_visits to update + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + # Notice that the user *still* does not contribute to R30! + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 0, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + # advance yet another day with more activity + self.reactor.advance(ONE_DAY_IN_SECONDS) + self.helper.send( + room_id, "Still here!", tok=access_token, custom_headers=headers + ) + + # give time for user_daily_visits to update + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + # *Now* the user appears in R30. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 1, "android": 1, "electron": 0, "ios": 0, "web": 0} + ) + + def test_r30v2_returning_dormant_users_not_counted(self): + """ + Tests that dormant users (users inactive for a long time) do not + contribute to R30v2 when they return for just a single day. + This is a key difference between R30 and R30v2. + """ + + # set a custom user-agent to impersonate Element/iOS. + headers = ( + ( + "User-Agent", + "Riot/1.4 (iPhone; iOS 13; Scale/4.00)", + ), + ) + + # Register a user and send a message + user_id = self.register_user("u1", "secret!") + access_token = self.login("u1", "secret!", custom_headers=headers) + room_id = self.helper.create_room_as( + room_creator=user_id, tok=access_token, custom_headers=headers + ) + self.helper.send(room_id, "message", tok=access_token, custom_headers=headers) + + # the user goes inactive for 2 months + self.reactor.advance(60 * ONE_DAY_IN_SECONDS) + + # the user returns for one day, perhaps just to check out a new feature + self.helper.send(room_id, "message", tok=access_token, custom_headers=headers) + + # Give time for user_daily_visits table to be updated. + # (user_daily_visits is updated every 5 minutes using a looping call.) + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + store = self.hs.get_datastore() + + # Check that the user does not contribute to R30v2, even though it's been + # more than 30 days since registration. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 0, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) + + # Check that this is a situation where old R30 differs: + # old R30 DOES count this as 'retained'. + r30_results = self.get_success(store.count_r30_users()) + self.assertEqual(r30_results, {"all": 1, "ios": 1}) + + # Now we want to check that the user will still be able to appear in + # R30v2 as long as the user performs some other activity between + # 30 and 60 days later. + self.reactor.advance(32 * ONE_DAY_IN_SECONDS) + self.helper.send(room_id, "message", tok=access_token, custom_headers=headers) + + # (give time for tables to update) + self.reactor.advance(FIVE_MINUTES_IN_SECONDS) + + # Check the user now satisfies the requirements to appear in R30v2. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 1, "ios": 1, "android": 0, "electron": 0, "web": 0} + ) + + # Advance to 59.5 days after the user's first R30v2-eligible activity. + self.reactor.advance(27.5 * ONE_DAY_IN_SECONDS) + + # Check the user still appears in R30v2. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 1, "ios": 1, "android": 0, "electron": 0, "web": 0} + ) + + # Advance to 60.5 days after the user's first R30v2-eligible activity. + self.reactor.advance(ONE_DAY_IN_SECONDS) + + # Check the user no longer appears in R30v2. + r30_results = self.get_success(store.count_r30v2_users()) + self.assertEqual( + r30_results, {"all": 0, "android": 0, "electron": 0, "ios": 0, "web": 0} + ) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 69798e95c3..fc2d35596e 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -19,7 +19,7 @@ import json import re import time import urllib.parse -from typing import Any, Dict, Mapping, MutableMapping, Optional +from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional, Tuple, Union from unittest.mock import patch import attr @@ -53,6 +53,9 @@ class RestHelper: tok: str = None, expect_code: int = 200, extra_content: Optional[Dict] = None, + custom_headers: Optional[ + Iterable[Tuple[Union[bytes, str], Union[bytes, str]]] + ] = None, ) -> str: """ Create a room. @@ -87,6 +90,7 @@ class RestHelper: "POST", path, json.dumps(content).encode("utf8"), + custom_headers=custom_headers, ) assert channel.result["code"] == b"%d" % expect_code, channel.result @@ -175,14 +179,30 @@ class RestHelper: self.auth_user_id = temp_id - def send(self, room_id, body=None, txn_id=None, tok=None, expect_code=200): + def send( + self, + room_id, + body=None, + txn_id=None, + tok=None, + expect_code=200, + custom_headers: Optional[ + Iterable[Tuple[Union[bytes, str], Union[bytes, str]]] + ] = None, + ): if body is None: body = "body_text_here" content = {"msgtype": "m.text", "body": body} return self.send_event( - room_id, "m.room.message", content, txn_id, tok, expect_code + room_id, + "m.room.message", + content, + txn_id, + tok, + expect_code, + custom_headers=custom_headers, ) def send_event( @@ -193,6 +213,9 @@ class RestHelper: txn_id=None, tok=None, expect_code=200, + custom_headers: Optional[ + Iterable[Tuple[Union[bytes, str], Union[bytes, str]]] + ] = None, ): if txn_id is None: txn_id = "m%s" % (str(time.time())) @@ -207,6 +230,7 @@ class RestHelper: "PUT", path, json.dumps(content or {}).encode("utf8"), + custom_headers=custom_headers, ) assert ( diff --git a/tests/unittest.py b/tests/unittest.py index c6d9064423..3eec9c4d5b 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -594,7 +594,15 @@ class HomeserverTestCase(TestCase): user_id = channel.json_body["user_id"] return user_id - def login(self, username, password, device_id=None): + def login( + self, + username, + password, + device_id=None, + custom_headers: Optional[ + Iterable[Tuple[Union[bytes, str], Union[bytes, str]]] + ] = None, + ): """ Log in a user, and get an access token. Requires the Login API be registered. @@ -605,7 +613,10 @@ class HomeserverTestCase(TestCase): body["device_id"] = device_id channel = self.make_request( - "POST", "/_matrix/client/r0/login", json.dumps(body).encode("utf8") + "POST", + "/_matrix/client/r0/login", + json.dumps(body).encode("utf8"), + custom_headers=custom_headers, ) self.assertEqual(channel.code, 200, channel.result) -- cgit 1.5.1 From 54389d5697622f1beffaeda96d9c6da7ef7d93a9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Jul 2021 14:24:25 +0100 Subject: Fix dropping locks on shut down (#10433) --- changelog.d/10433.bugfix | 1 + synapse/storage/databases/main/lock.py | 6 +++++- tests/storage/databases/main/test_lock.py | 13 +++++++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 changelog.d/10433.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/10433.bugfix b/changelog.d/10433.bugfix new file mode 100644 index 0000000000..ad85a83f96 --- /dev/null +++ b/changelog.d/10433.bugfix @@ -0,0 +1 @@ +Fix error while dropping locks on shutdown. Introduced in v1.38.0. diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 774861074c..3d1dff660b 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -78,7 +78,11 @@ class LockStore(SQLBaseStore): """Called when the server is shutting down""" logger.info("Dropping held locks due to shutdown") - for (lock_name, lock_key), token in self._live_tokens.items(): + # We need to take a copy of the tokens dict as dropping the locks will + # cause the dictionary to change. + tokens = dict(self._live_tokens) + + for (lock_name, lock_key), token in tokens.items(): await self._drop_lock(lock_name, lock_key, token) logger.info("Dropped locks due to shutdown") diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 9ca70e7367..d326a1d6a6 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -98,3 +98,16 @@ class LockTestCase(unittest.HomeserverTestCase): lock2 = self.get_success(self.store.try_acquire_lock("name", "key")) self.assertIsNotNone(lock2) + + def test_shutdown(self): + """Test that shutting down Synapse releases the locks""" + # Acquire two locks + lock = self.get_success(self.store.try_acquire_lock("name", "key1")) + self.assertIsNotNone(lock) + lock2 = self.get_success(self.store.try_acquire_lock("name", "key2")) + self.assertIsNotNone(lock2) + + # Now call the shutdown code + self.get_success(self.store._on_shutdown()) + + self.assertEqual(self.store._live_tokens, {}) -- cgit 1.5.1 From 74d09a43d9e0f65f1292aa51f58ea676e4aefc7f Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 27 Jul 2021 14:36:38 +0100 Subject: Always communicate device OTK counts to clients (#10485) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/10485.bugfix | 1 + synapse/api/constants.py | 8 ++++++++ synapse/handlers/sync.py | 4 ++++ synapse/storage/databases/main/end_to_end_keys.py | 9 ++++++++- tests/handlers/test_e2e_keys.py | 20 +++++++++++++++----- 5 files changed, 36 insertions(+), 6 deletions(-) create mode 100644 changelog.d/10485.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/10485.bugfix b/changelog.d/10485.bugfix new file mode 100644 index 0000000000..9b44006dc0 --- /dev/null +++ b/changelog.d/10485.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would not inform clients that a device had exhausted its one-time-key pool, potentially causing problems decrypting events. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 8363c2bb0f..8c7ad2a407 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -127,6 +127,14 @@ class ToDeviceEventTypes: RoomKeyRequest = "m.room_key_request" +class DeviceKeyAlgorithms: + """Spec'd algorithms for the generation of per-device keys""" + + ED25519 = "ed25519" + CURVE25519 = "curve25519" + SIGNED_CURVE25519 = "signed_curve25519" + + class EduTypes: Presence = "m.presence" diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 150a4f291e..f30bfcc93c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1093,6 +1093,10 @@ class SyncHandler: one_time_key_counts: JsonDict = {} unused_fallback_key_types: List[str] = [] if device_id: + # TODO: We should have a way to let clients differentiate between the states of: + # * no change in OTK count since the provided since token + # * the server has zero OTKs left for this device + # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 one_time_key_counts = await self.store.count_e2e_one_time_keys( user_id, device_id ) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 78ae68ec68..1edc96042b 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -21,6 +21,7 @@ from canonicaljson import encode_canonical_json from twisted.enterprise.adbapi import Connection +from synapse.api.constants import DeviceKeyAlgorithms from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, make_in_list_sql_clause @@ -381,9 +382,15 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore): " GROUP BY algorithm" ) txn.execute(sql, (user_id, device_id)) - result = {} + + # Initially set the key count to 0. This ensures that the client will always + # receive *some count*, even if it's 0. + result = {DeviceKeyAlgorithms.SIGNED_CURVE25519: 0} + + # Override entries with the count of any keys we pulled from the database for algorithm, key_count in txn: result[algorithm] = key_count + return result return await self.db_pool.runInteraction( diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index e0a24824cc..39e7b1ab25 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -47,12 +47,16 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): "alg2:k3": {"key": "key3"}, } + # Note that "signed_curve25519" is always returned in key count responses. This is necessary until + # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed. res = self.get_success( self.handler.upload_keys_for_user( local_user, device_id, {"one_time_keys": keys} ) ) - self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}}) + self.assertDictEqual( + res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}} + ) # we should be able to change the signature without a problem keys["alg2:k2"]["signatures"]["k1"] = "sig2" @@ -61,7 +65,9 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): local_user, device_id, {"one_time_keys": keys} ) ) - self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}}) + self.assertDictEqual( + res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}} + ) def test_change_one_time_keys(self): """attempts to change one-time-keys should be rejected""" @@ -79,7 +85,9 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): local_user, device_id, {"one_time_keys": keys} ) ) - self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}}) + self.assertDictEqual( + res, {"one_time_key_counts": {"alg1": 1, "alg2": 2, "signed_curve25519": 0}} + ) # Error when changing string key self.get_failure( @@ -89,7 +97,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): SynapseError, ) - # Error when replacing dict key with strin + # Error when replacing dict key with string self.get_failure( self.handler.upload_keys_for_user( local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}} @@ -131,7 +139,9 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): local_user, device_id, {"one_time_keys": keys} ) ) - self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1}}) + self.assertDictEqual( + res, {"one_time_key_counts": {"alg1": 1, "signed_curve25519": 0}} + ) res2 = self.get_success( self.handler.claim_one_time_keys( -- cgit 1.5.1