diff options
Diffstat (limited to 'synapse/storage')
33 files changed, 306 insertions, 254 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 0ca6f6121f..6b0ca80087 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -280,7 +280,7 @@ class DataStore( Counts the number of users who used this homeserver in the last 24 hours. """ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24) - return self.runInteraction("count_daily_users", self._count_users, yesterday,) + return self.runInteraction("count_daily_users", self._count_users, yesterday) def count_monthly_users(self): """ @@ -291,9 +291,7 @@ class DataStore( """ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) return self.runInteraction( - "count_monthly_users", - self._count_users, - thirty_days_ago, + "count_monthly_users", self._count_users, thirty_days_ago ) def _count_users(self, txn, time_from): @@ -361,7 +359,7 @@ class DataStore( txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) for row in txn: - if row[0] == 'unknown': + if row[0] == "unknown": pass results[row[0]] = row[1] @@ -388,7 +386,7 @@ class DataStore( txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) count, = txn.fetchone() - results['all'] = count + results["all"] = count return results diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 910f6ee9de..cbd6568c41 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -311,9 +311,7 @@ class SQLBaseStore(object): if res: for user in res: self.set_expiration_date_for_user_txn( - txn, - user["name"], - use_delta=True, + txn, user["name"], use_delta=True ) yield self.runInteraction( @@ -1662,7 +1660,7 @@ def db_to_json(db_content): # Decode it to a Unicode string before feeding it to json.loads, so we # consistenty get a Unicode-containing object out. if isinstance(db_content, (bytes, bytearray)): - db_content = db_content.decode('utf8') + db_content = db_content.decode("utf8") try: return json.loads(db_content) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b8b8273f73..50f913a414 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -169,7 +169,7 @@ class BackgroundUpdateStore(SQLBaseStore): in_flight = set(update["update_name"] for update in updates) for update in updates: if update["depends_on"] not in in_flight: - self._background_update_queue.append(update['update_name']) + self._background_update_queue.append(update["update_name"]) if not self._background_update_queue: # no work left to do diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d102e07372..3413a46675 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -149,9 +149,7 @@ class DeviceWorkerStore(SQLBaseStore): defer.returnValue((stream_id_cutoff, [])) results = yield self._get_device_update_edus_by_remote( - destination, - from_stream_id, - query_map, + destination, from_stream_id, query_map ) defer.returnValue((now_stream_id, results)) @@ -182,9 +180,7 @@ class DeviceWorkerStore(SQLBaseStore): return list(txn) @defer.inlineCallbacks - def _get_device_update_edus_by_remote( - self, destination, from_stream_id, query_map, - ): + def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map): """Returns a list of device update EDUs as well as E2EE keys Args: @@ -210,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore): # The prev_id for the first row is always the last row before # `from_stream_id` prev_id = yield self._get_last_device_update_for_remote_user( - destination, user_id, from_stream_id, + destination, user_id, from_stream_id ) for device_id, device in iteritems(user_devices): stream_id = query_map[(user_id, device_id)] @@ -238,7 +234,7 @@ class DeviceWorkerStore(SQLBaseStore): defer.returnValue(results) def _get_last_device_update_for_remote_user( - self, destination, user_id, from_stream_id, + self, destination, user_id, from_stream_id ): def f(txn): prev_sent_id_sql = """ diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 521936e3b0..f40ef2ab64 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -87,10 +87,10 @@ class EndToEndRoomKeyStore(SQLBaseStore): }, values={ "version": version, - "first_message_index": room_key['first_message_index'], - "forwarded_count": room_key['forwarded_count'], - "is_verified": room_key['is_verified'], - "session_data": json.dumps(room_key['session_data']), + "first_message_index": room_key["first_message_index"], + "forwarded_count": room_key["forwarded_count"], + "is_verified": room_key["is_verified"], + "session_data": json.dumps(room_key["session_data"]), }, lock=False, ) @@ -118,13 +118,13 @@ class EndToEndRoomKeyStore(SQLBaseStore): try: version = int(version) except ValueError: - defer.returnValue({'rooms': {}}) + defer.returnValue({"rooms": {}}) keyvalues = {"user_id": user_id, "version": version} if room_id: - keyvalues['room_id'] = room_id + keyvalues["room_id"] = room_id if session_id: - keyvalues['session_id'] = session_id + keyvalues["session_id"] = session_id rows = yield self._simple_select_list( table="e2e_room_keys", @@ -141,10 +141,10 @@ class EndToEndRoomKeyStore(SQLBaseStore): desc="get_e2e_room_keys", ) - sessions = {'rooms': {}} + sessions = {"rooms": {}} for row in rows: - room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}}) - room_entry['sessions'][row['session_id']] = { + room_entry = sessions["rooms"].setdefault(row["room_id"], {"sessions": {}}) + room_entry["sessions"][row["session_id"]] = { "first_message_index": row["first_message_index"], "forwarded_count": row["forwarded_count"], "is_verified": row["is_verified"], @@ -174,9 +174,9 @@ class EndToEndRoomKeyStore(SQLBaseStore): keyvalues = {"user_id": user_id, "version": int(version)} if room_id: - keyvalues['room_id'] = room_id + keyvalues["room_id"] = room_id if session_id: - keyvalues['session_id'] = session_id + keyvalues["session_id"] = session_id yield self._simple_delete( table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys" @@ -191,7 +191,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) row = txn.fetchone() if not row: - raise StoreError(404, 'No current backup version') + raise StoreError(404, "No current backup version") return row[0] def get_e2e_room_keys_version_info(self, user_id, version=None): @@ -255,7 +255,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) current_version = txn.fetchone()[0] if current_version is None: - current_version = '0' + current_version = "0" new_version = str(int(current_version) + 1) diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 933bcf42c2..e9b9caa49a 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -85,7 +85,7 @@ class Sqlite3Engine(object): def _parse_match_info(buf): bufsize = len(buf) - return [struct.unpack('@I', buf[i : i + 4])[0] for i in range(0, bufsize, 4)] + return [struct.unpack("@I", buf[i : i + 4])[0] for i in range(0, bufsize, 4)] def _rank(raw_match_info): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 09e39c2c28..cb4478342f 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -190,6 +190,34 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id, ) + def get_rooms_with_many_extremities(self, min_count, limit): + """Get the top rooms with at least N extremities. + + Args: + min_count (int): The minimum number of extremities + limit (int): The maximum number of rooms to return. + + Returns: + Deferred[list]: At most `limit` room IDs that have at least + `min_count` extremities, sorted by extremity count. + """ + + def _get_rooms_with_many_extremities_txn(txn): + sql = """ + SELECT room_id FROM event_forward_extremities + GROUP BY room_id + HAVING count(*) > ? + ORDER BY count(*) DESC + LIMIT ? + """ + + txn.execute(sql, (min_count, limit)) + return [room_id for room_id, in txn] + + return self.runInteraction( + "get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn + ) + @cached(max_entries=5000, iterable=True) def get_latest_event_ids_in_room(self, room_id): return self._simple_select_onecol( diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a729f3e067..eca77069fd 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -277,7 +277,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered # by stream_ordering, oldest first. - notifs.sort(key=lambda r: r['stream_ordering']) + notifs.sort(key=lambda r: r["stream_ordering"]) # Take only up to the limit. We have to stop at the limit because # one of the subqueries may have hit the limit. @@ -379,7 +379,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered # by received_ts (most recent first) - notifs.sort(key=lambda r: -(r['received_ts'] or 0)) + notifs.sort(key=lambda r: -(r["received_ts"] or 0)) # Now return the first `limit` defer.returnValue(notifs[:limit]) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f631fb1733..fefba39ea1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -24,7 +24,7 @@ from six import iteritems, text_type from six.moves import range from canonicaljson import json -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -74,6 +74,21 @@ state_delta_reuse_delta_counter = Counter( "synapse_storage_events_state_delta_reuse_delta", "" ) +# The number of forward extremities for each new event. +forward_extremities_counter = Histogram( + "synapse_storage_events_forward_extremities_persisted", + "Number of forward extremities for each new event", + buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), +) + +# The number of stale forward extremities for each new event. Stale extremities +# are those that were in the previous set of extremities as well as the new. +stale_forward_extremities_counter = Histogram( + "synapse_storage_events_stale_forward_extremities_persisted", + "Number of unchanged forward extremities for each new event", + buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), +) + def encode_json(json_object): """ @@ -234,7 +249,7 @@ class EventsStore( BucketCollector( "synapse_forward_extremities", lambda: self._current_forward_extremities_amount, - buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"] + buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"], ) # Read the extrems every 60 minutes @@ -541,6 +556,8 @@ class EventsStore( and not event.internal_metadata.is_soft_failed() ] + latest_event_ids = set(latest_event_ids) + # start with the existing forward extremities result = set(latest_event_ids) @@ -564,6 +581,13 @@ class EventsStore( ) result.difference_update(existing_prevs) + # We only update metrics for events that change forward extremities + # (e.g. we ignore backfill/outliers/etc) + if result != latest_event_ids: + forward_extremities_counter.observe(len(result)) + stale = latest_event_ids & result + stale_forward_extremities_counter.observe(len(stale)) + defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py index 75c1935bf3..1ce21d190c 100644 --- a/synapse/storage/events_bg_updates.py +++ b/synapse/storage/events_bg_updates.py @@ -64,8 +64,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): ) self.register_background_update_handler( - self.DELETE_SOFT_FAILED_EXTREMITIES, - self._cleanup_extremities_bg_update, + self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update ) @defer.inlineCallbacks @@ -269,7 +268,8 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): LEFT JOIN events USING (event_id) LEFT JOIN event_json USING (event_id) LEFT JOIN rejections USING (event_id) - """, (batch_size,) + """, + (batch_size,), ) for prev_event_id, event_id, metadata, rejected, outlier in txn: @@ -364,13 +364,12 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): column="event_id", iterable=to_delete, keyvalues={}, - retcols=("room_id",) + retcols=("room_id",), ) room_ids = set(row["room_id"] for row in rows) for room_id in room_ids: txn.call_after( - self.get_latest_event_ids_in_room.invalidate, - (room_id,) + self.get_latest_event_ids_in_room.invalidate, (room_id,) ) self._simple_delete_many_txn( @@ -384,7 +383,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): return len(original_set) num_handled = yield self.runInteraction( - "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn, + "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn ) if not num_handled: @@ -394,8 +393,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): txn.execute("DROP TABLE _extremities_to_check") yield self.runInteraction( - "_cleanup_extremities_bg_update_drop_table", - _drop_table_txn, + "_cleanup_extremities_bg_update_drop_table", _drop_table_txn ) defer.returnValue(num_handled) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index cc7df5cf14..6d680d405a 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -27,7 +27,6 @@ from synapse.api.constants import EventTypes from synapse.api.errors import NotFoundError from synapse.api.room_versions import EventFormatVersions from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 -# these are only included to make the type annotations work from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event from synapse.metrics.background_process_metrics import run_as_background_process @@ -111,8 +110,7 @@ class EventsWorkerStore(SQLBaseStore): return ts return self.runInteraction( - "get_approximate_received_ts", - _get_approximate_received_ts_txn, + "get_approximate_received_ts", _get_approximate_received_ts_txn ) @defer.inlineCallbacks @@ -677,7 +675,8 @@ class EventsWorkerStore(SQLBaseStore): """ return self.runInteraction( "get_total_state_event_counts", - self._get_total_state_event_counts_txn, room_id + self._get_total_state_event_counts_txn, + room_id, ) def _get_current_state_event_counts_txn(self, txn, room_id): @@ -701,7 +700,8 @@ class EventsWorkerStore(SQLBaseStore): """ return self.runInteraction( "get_current_state_event_counts", - self._get_current_state_event_counts_txn, room_id + self._get_current_state_event_counts_txn, + room_id, ) @defer.inlineCallbacks diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index dce6a43ac1..73e6fc6de2 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1179,11 +1179,7 @@ class GroupServerStore(SQLBaseStore): for table in tables: self._simple_delete_txn( - txn, - table=table, - keyvalues={"group_id": group_id}, + txn, table=table, keyvalues={"group_id": group_id} ) - return self.runInteraction( - "delete_group", _delete_group_txn - ) + return self.runInteraction("delete_group", _delete_group_txn) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index e3655ad8d7..e72f89e446 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -131,7 +131,7 @@ class KeyStore(SQLBaseStore): def _invalidate(res): f = self._get_server_verify_key.invalidate for i in invalidations: - f((i, )) + f((i,)) return res return self.runInteraction( diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 3ecf47e7a7..6b1238ce4a 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -22,11 +22,11 @@ class MediaRepositoryStore(BackgroundUpdateStore): super(MediaRepositoryStore, self).__init__(db_conn, hs) self.register_background_index_update( - update_name='local_media_repository_url_idx', - index_name='local_media_repository_url_idx', - table='local_media_repository', - columns=['created_ts'], - where_clause='url_cache IS NOT NULL', + update_name="local_media_repository_url_idx", + index_name="local_media_repository_url_idx", + table="local_media_repository", + columns=["created_ts"], + where_clause="url_cache IS NOT NULL", ) def get_local_media(self, media_id): @@ -108,12 +108,12 @@ class MediaRepositoryStore(BackgroundUpdateStore): return dict( zip( ( - 'response_code', - 'etag', - 'expires_ts', - 'og', - 'media_id', - 'download_ts', + "response_code", + "etag", + "expires_ts", + "og", + "media_id", + "download_ts", ), row, ) diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 8aa8abc470..081564360f 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -86,11 +86,11 @@ class MonthlyActiveUsersStore(SQLBaseStore): if len(self.reserved_users) > 0: # questionmarks is a hack to overcome sqlite not supporting # tuples in 'WHERE IN %s' - questionmarks = '?' * len(self.reserved_users) + questionmarks = "?" * len(self.reserved_users) query_args.extend(self.reserved_users) sql = base_sql + """ AND user_id NOT IN ({})""".format( - ','.join(questionmarks) + ",".join(questionmarks) ) else: sql = base_sql @@ -124,7 +124,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): if len(self.reserved_users) > 0: query_args.extend(self.reserved_users) sql = base_sql + """ AND user_id NOT IN ({})""".format( - ','.join(questionmarks) + ",".join(questionmarks) ) else: sql = base_sql diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index f2c1bed487..7c4e1dc7ec 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -133,7 +133,7 @@ def _setup_new_database(cur, database_engine): if ver <= SCHEMA_VERSION: valid_dirs.append((ver, abs_path)) else: - logger.warn("Unexpected entry in 'full_schemas': %s", filename) + logger.debug("Ignoring entry '%s' in 'full_schemas'", filename) if not valid_dirs: raise PrepareDatabaseException( @@ -146,9 +146,10 @@ def _setup_new_database(cur, database_engine): directory_entries = os.listdir(sql_dir) - for filename in sorted(fnmatch.filter(directory_entries, "*.sql") + fnmatch.filter( - directory_entries, "*.sql." + specific - )): + for filename in sorted( + fnmatch.filter(directory_entries, "*.sql") + + fnmatch.filter(directory_entries, "*.sql." + specific) + ): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) executescript(cur, sql_loc) @@ -313,7 +314,7 @@ def _apply_module_schemas(txn, database_engine, config): application config """ for (mod, _config) in config.password_providers: - if not hasattr(mod, 'get_db_schema_files'): + if not hasattr(mod, "get_db_schema_files"): continue modname = ".".join((mod.__module__, mod.__name__)) _apply_module_schema_files( @@ -343,7 +344,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) continue root_name, ext = os.path.splitext(name) - if ext != '.sql': + if ext != ".sql": raise PrepareDatabaseException( "only .sql files are currently supported for module schemas" ) @@ -407,7 +408,7 @@ def get_statements(f): def executescript(txn, schema_path): - with open(schema_path, 'r') as f: + with open(schema_path, "r") as f: for statement in get_statements(f): txn.execute(statement) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index aeec2f57c4..0ff392bdb4 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -41,7 +41,7 @@ class ProfileWorkerStore(SQLBaseStore): defer.returnValue( ProfileInfo( - avatar_url=profile['avatar_url'], display_name=profile['displayname'] + avatar_url=profile["avatar_url"], display_name=profile["displayname"] ) ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 9e406baafa..98cec8c82b 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -46,12 +46,12 @@ def _load_rules(rawrules, enabled_map): rules = list(list_with_base_rules(ruleslist)) for i, rule in enumerate(rules): - rule_id = rule['rule_id'] + rule_id = rule["rule_id"] if rule_id in enabled_map: - if rule.get('enabled', True) != bool(enabled_map[rule_id]): + if rule.get("enabled", True) != bool(enabled_map[rule_id]): # Rules are cached across users. rule = dict(rule) - rule['enabled'] = bool(enabled_map[rule_id]) + rule["enabled"] = bool(enabled_map[rule_id]) rules[i] = rule return rules @@ -126,12 +126,12 @@ class PushRulesWorkerStore( def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", - keyvalues={'user_name': user_id}, + keyvalues={"user_name": user_id}, retcols=("user_name", "rule_id", "enabled"), desc="get_push_rules_enabled_for_user", ) defer.returnValue( - {r['rule_id']: False if r['enabled'] == 0 else True for r in results} + {r["rule_id"]: False if r["enabled"] == 0 else True for r in results} ) def have_push_rules_changed_for_user(self, user_id, last_id): @@ -175,7 +175,7 @@ class PushRulesWorkerStore( rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) for row in rows: - results.setdefault(row['user_name'], []).append(row) + results.setdefault(row["user_name"], []).append(row) enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) @@ -194,7 +194,7 @@ class PushRulesWorkerStore( rule (Dict): A push rule. """ # Create new rule id - rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1]) + rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) new_rule_id = rule_id_scope + "/" + new_room_id # Change room id in each condition @@ -334,8 +334,8 @@ class PushRulesWorkerStore( desc="bulk_get_push_rules_enabled", ) for row in rows: - enabled = bool(row['enabled']) - results.setdefault(row['user_name'], {})[row['rule_id']] = enabled + enabled = bool(row["enabled"]) + results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled defer.returnValue(results) @@ -568,7 +568,7 @@ class PushRuleStore(PushRulesWorkerStore): def delete_push_rule_txn(txn, stream_id, event_stream_ordering): self._simple_delete_one_txn( - txn, "push_rules", {'user_name': user_id, 'rule_id': rule_id} + txn, "push_rules", {"user_name": user_id, "rule_id": rule_id} ) self._insert_push_rules_update_txn( @@ -605,9 +605,9 @@ class PushRuleStore(PushRulesWorkerStore): self._simple_upsert_txn( txn, "push_rules_enable", - {'user_name': user_id, 'rule_id': rule_id}, - {'enabled': 1 if enabled else 0}, - {'id': new_id}, + {"user_name": user_id, "rule_id": rule_id}, + {"enabled": 1 if enabled else 0}, + {"id": new_id}, ) self._insert_push_rules_update_txn( @@ -645,8 +645,8 @@ class PushRuleStore(PushRulesWorkerStore): self._simple_update_one_txn( txn, "push_rules", - {'user_name': user_id, 'rule_id': rule_id}, - {'actions': actions_json}, + {"user_name": user_id, "rule_id": rule_id}, + {"actions": actions_json}, ) self._insert_push_rules_update_txn( diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 1567e1df48..cfe0a94330 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -37,24 +37,24 @@ else: class PusherWorkerStore(SQLBaseStore): def _decode_pushers_rows(self, rows): for r in rows: - dataJson = r['data'] - r['data'] = None + dataJson = r["data"] + r["data"] = None try: if isinstance(dataJson, db_binary_type): dataJson = str(dataJson).decode("UTF8") - r['data'] = json.loads(dataJson) + r["data"] = json.loads(dataJson) except Exception as e: logger.warn( "Invalid JSON in data for pusher %d: %s, %s", - r['id'], + r["id"], dataJson, e.args[0], ) pass - if isinstance(r['pushkey'], db_binary_type): - r['pushkey'] = str(r['pushkey']).decode("UTF8") + if isinstance(r["pushkey"], db_binary_type): + r["pushkey"] = str(r["pushkey"]).decode("UTF8") return rows @@ -195,15 +195,15 @@ class PusherWorkerStore(SQLBaseStore): ) def get_if_users_have_pushers(self, user_ids): rows = yield self._simple_select_many_batch( - table='pushers', - column='user_name', + table="pushers", + column="user_name", iterable=user_ids, - retcols=['user_name'], - desc='get_if_users_have_pushers', + retcols=["user_name"], + desc="get_if_users_have_pushers", ) result = {user_id: False for user_id in user_ids} - result.update({r['user_name']: True for r in rows}) + result.update({r["user_name"]: True for r in rows}) defer.returnValue(result) @@ -299,8 +299,8 @@ class PusherStore(PusherWorkerStore): ): yield self._simple_update_one( "pushers", - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_stream_ordering': last_stream_ordering}, + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + {"last_stream_ordering": last_stream_ordering}, desc="update_pusher_last_stream_ordering", ) @@ -310,10 +310,10 @@ class PusherStore(PusherWorkerStore): ): yield self._simple_update_one( "pushers", - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, { - 'last_stream_ordering': last_stream_ordering, - 'last_success': last_success, + "last_stream_ordering": last_stream_ordering, + "last_success": last_success, }, desc="update_pusher_last_stream_ordering_and_success", ) @@ -322,8 +322,8 @@ class PusherStore(PusherWorkerStore): def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): yield self._simple_update_one( "pushers", - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'failing_since': failing_since}, + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + {"failing_since": failing_since}, desc="update_pusher_failing_since", ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index a1647e50a1..b477da12b1 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cachedInlineCallbacks() def get_users_with_read_receipts_in_room(self, room_id): receipts = yield self.get_receipts_for_room(room_id, "m.read") - defer.returnValue(set(r['user_id'] for r in receipts)) + defer.returnValue(set(r["user_id"] for r in receipts)) @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d36917e4d6..983ce13291 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -116,8 +116,9 @@ class RegistrationWorkerStore(SQLBaseStore): defer.returnValue(res) @defer.inlineCallbacks - def set_account_validity_for_user(self, user_id, expiration_ts, email_sent, - renewal_token=None): + def set_account_validity_for_user( + self, user_id, expiration_ts, email_sent, renewal_token=None + ): """Updates the account validity properties of the given account, with the given values. @@ -131,6 +132,7 @@ class RegistrationWorkerStore(SQLBaseStore): renewal_token (str): Renewal token the user can use to extend the validity of their account. Defaults to no token. """ + def set_account_validity_for_user_txn(txn): self._simple_update_txn( txn=txn, @@ -143,12 +145,11 @@ class RegistrationWorkerStore(SQLBaseStore): }, ) self._invalidate_cache_and_stream( - txn, self.get_expiration_ts_for_user, (user_id,), + txn, self.get_expiration_ts_for_user, (user_id,) ) yield self.runInteraction( - "set_account_validity_for_user", - set_account_validity_for_user_txn, + "set_account_validity_for_user", set_account_validity_for_user_txn ) @defer.inlineCallbacks @@ -217,6 +218,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: Deferred: Resolves to a list[dict[user_id (str), expiration_ts_ms (int)]] """ + def select_users_txn(txn, now_ms, renew_at): sql = ( "SELECT user_id, expiration_ts_ms FROM account_validity" @@ -229,7 +231,8 @@ class RegistrationWorkerStore(SQLBaseStore): res = yield self.runInteraction( "get_users_expiring_soon", select_users_txn, - self.clock.time_msec(), self.config.account_validity.renew_at, + self.clock.time_msec(), + self.config.account_validity.renew_at, ) defer.returnValue(res) @@ -369,7 +372,7 @@ class RegistrationWorkerStore(SQLBaseStore): WHERE creation_ts > ? ) AS t GROUP BY user_type """ - results = {'native': 0, 'guest': 0, 'bridged': 0} + results = {"native": 0, "guest": 0, "bridged": 0} txn.execute(sql, (yesterday,)) for row in txn: results[row[0]] = row[1] @@ -435,7 +438,7 @@ class RegistrationWorkerStore(SQLBaseStore): {"medium": medium, "address": address}, ["guest_access_token"], True, - 'get_3pid_guest_access_token', + "get_3pid_guest_access_token", ) if ret: defer.returnValue(ret["guest_access_token"]) @@ -472,11 +475,11 @@ class RegistrationWorkerStore(SQLBaseStore): txn, "user_threepids", {"medium": medium, "address": address}, - ['user_id'], + ["user_id"], True, ) if ret: - return ret['user_id'] + return ret["user_id"] return None @defer.inlineCallbacks @@ -492,8 +495,8 @@ class RegistrationWorkerStore(SQLBaseStore): ret = yield self._simple_select_list( "user_threepids", {"user_id": user_id}, - ['medium', 'address', 'validated_at', 'added_at'], - 'user_get_threepids', + ["medium", "address", "validated_at", "added_at"], + "user_get_threepids", ) defer.returnValue(ret) @@ -572,11 +575,7 @@ class RegistrationWorkerStore(SQLBaseStore): """ return self._simple_select_onecol( table="user_threepid_id_server", - keyvalues={ - "user_id": user_id, - "medium": medium, - "address": address, - }, + keyvalues={"user_id": user_id, "medium": medium, "address": address}, retcol="id_server", desc="get_id_servers_user_bound", ) @@ -612,16 +611,16 @@ class RegistrationStore( self.register_noop_background_update("refresh_tokens_device_index") self.register_background_update_handler( - "user_threepids_grandfather", self._bg_user_threepids_grandfather, + "user_threepids_grandfather", self._bg_user_threepids_grandfather ) self.register_background_update_handler( - "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag, + "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag ) # Create a background job for culling expired 3PID validity tokens hs.get_clock().looping_call( - self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS, + self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS ) @defer.inlineCallbacks @@ -662,7 +661,7 @@ class RegistrationStore( for user in rows: if not user["count_tokens"] and not user["count_threepids"]: - self.set_user_deactivated_status_txn(txn, user["user_id"], True) + self.set_user_deactivated_status_txn(txn, user["name"], True) rows_processed_nb += 1 logger.info("Marked %d rows as deactivated", rows_processed_nb) @@ -677,8 +676,7 @@ class RegistrationStore( return False end = yield self.runInteraction( - "users_set_deactivated_flag", - _backgroud_update_set_deactivated_flag_txn, + "users_set_deactivated_flag", _backgroud_update_set_deactivated_flag_txn ) if end: @@ -851,7 +849,7 @@ class RegistrationStore( def user_set_password_hash_txn(txn): self._simple_update_one_txn( - txn, 'users', {'name': user_id}, {'password_hash': password_hash} + txn, "users", {"name": user_id}, {"password_hash": password_hash} ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -872,9 +870,9 @@ class RegistrationStore( def f(txn): self._simple_update_one_txn( txn, - table='users', - keyvalues={'name': user_id}, - updatevalues={'consent_version': consent_version}, + table="users", + keyvalues={"name": user_id}, + updatevalues={"consent_version": consent_version}, ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -896,9 +894,9 @@ class RegistrationStore( def f(txn): self._simple_update_one_txn( txn, - table='users', - keyvalues={'name': user_id}, - updatevalues={'consent_server_notice_sent': consent_version}, + table="users", + keyvalues={"name": user_id}, + updatevalues={"consent_server_notice_sent": consent_version}, ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -1068,7 +1066,7 @@ class RegistrationStore( if id_servers: yield self.runInteraction( - "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn, + "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn ) yield self._end_background_update("user_threepids_grandfather") @@ -1076,12 +1074,7 @@ class RegistrationStore( defer.returnValue(1) def get_threepid_validation_session( - self, - medium, - client_secret, - address=None, - sid=None, - validated=True, + self, medium, client_secret, address=None, sid=None, validated=True ): """Gets a session_id and last_send_attempt (if available) for a client_secret/medium/(address|session_id) combo @@ -1101,23 +1094,22 @@ class RegistrationStore( latest session_id and send_attempt count for this 3PID. Otherwise None if there hasn't been a previous attempt """ - keyvalues = { - "medium": medium, - "client_secret": client_secret, - } + keyvalues = {"medium": medium, "client_secret": client_secret} if address: keyvalues["address"] = address if sid: keyvalues["session_id"] = sid - assert(address or sid) + assert address or sid def get_threepid_validation_session_txn(txn): sql = """ SELECT address, session_id, medium, client_secret, last_send_attempt, validated_at FROM threepid_validation_session WHERE %s - """ % (" AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),) + """ % ( + " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)), + ) if validated is not None: sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL") @@ -1132,17 +1124,10 @@ class RegistrationStore( return rows[0] return self.runInteraction( - "get_threepid_validation_session", - get_threepid_validation_session_txn, + "get_threepid_validation_session", get_threepid_validation_session_txn ) - def validate_threepid_session( - self, - session_id, - client_secret, - token, - current_ts, - ): + def validate_threepid_session(self, session_id, client_secret, token, current_ts): """Attempt to validate a threepid session using a token Args: @@ -1174,7 +1159,7 @@ class RegistrationStore( if retrieved_client_secret != client_secret: raise ThreepidValidationError( - 400, "This client_secret does not match the provided session_id", + 400, "This client_secret does not match the provided session_id" ) row = self._simple_select_one_txn( @@ -1187,7 +1172,7 @@ class RegistrationStore( if not row: raise ThreepidValidationError( - 400, "Validation token not found or has expired", + 400, "Validation token not found or has expired" ) expires = row["expires"] next_link = row["next_link"] @@ -1198,7 +1183,7 @@ class RegistrationStore( if expires <= current_ts: raise ThreepidValidationError( - 400, "This token has expired. Please request a new one", + 400, "This token has expired. Please request a new one" ) # Looks good. Validate the session @@ -1213,8 +1198,7 @@ class RegistrationStore( # Return next_link if it exists return self.runInteraction( - "validate_threepid_session_txn", - validate_threepid_session_txn, + "validate_threepid_session_txn", validate_threepid_session_txn ) def upsert_threepid_validation_session( @@ -1281,6 +1265,7 @@ class RegistrationStore( token_expires (int): The timestamp for which after the token will no longer be valid """ + def start_or_continue_validation_session_txn(txn): # Create or update a validation session self._simple_upsert_txn( @@ -1314,6 +1299,7 @@ class RegistrationStore( def cull_expired_threepid_validation_tokens(self): """Remove threepid validation tokens with expiry dates that have passed""" + def cull_expired_threepid_validation_tokens_txn(txn, ts): sql = """ DELETE FROM threepid_validation_token WHERE @@ -1335,6 +1321,7 @@ class RegistrationStore( Args: session_id (str): The ID of the session to delete """ + def delete_threepid_session_txn(txn): self._simple_delete_txn( txn, @@ -1348,8 +1335,7 @@ class RegistrationStore( ) return self.runInteraction( - "delete_threepid_session", - delete_threepid_session_txn, + "delete_threepid_session", delete_threepid_session_txn ) def set_user_deactivated_status_txn(self, txn, user_id, deactivated): @@ -1360,7 +1346,7 @@ class RegistrationStore( updatevalues={"deactivated": 1 if deactivated else 0}, ) self._invalidate_cache_and_stream( - txn, self.get_user_deactivated_status, (user_id,), + txn, self.get_user_deactivated_status, (user_id,) ) @defer.inlineCallbacks @@ -1375,7 +1361,8 @@ class RegistrationStore( yield self.runInteraction( "set_user_deactivated_status", self.set_user_deactivated_status_txn, - user_id, deactivated, + user_id, + deactivated, ) @cachedInlineCallbacks() diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index 4c83800cca..1b01934c19 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -468,9 +468,5 @@ class RelationsStore(RelationsWorkerStore): """ self._simple_delete_txn( - txn, - table="event_relations", - keyvalues={ - "event_id": redacted_event_id, - } + txn, table="event_relations", keyvalues={"event_id": redacted_event_id} ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7617913326..8004aeb909 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -420,7 +420,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): table="room_memberships", column="event_id", iterable=missing_member_event_ids, - retcols=('user_id', 'display_name', 'avatar_url'), + retcols=("user_id", "display_name", "avatar_url"), keyvalues={"membership": Membership.JOIN}, batch_size=500, desc="_get_joined_users_from_context", @@ -448,7 +448,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cachedInlineCallbacks(max_entries=10000) def is_host_joined(self, room_id, host): - if '%' in host or '_' in host: + if "%" in host or "_" in host: raise Exception("Invalid host name") sql = """ @@ -490,7 +490,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): Deferred: Resolves to True if the host is/was in the room, otherwise False. """ - if '%' in host or '_' in host: + if "%" in host or "_" in host: raise Exception("Invalid host name") sql = """ @@ -723,7 +723,7 @@ class RoomMemberStore(RoomMemberWorkerStore): room_id = row["room_id"] try: event_json = json.loads(row["json"]) - content = event_json['content'] + content = event_json["content"] except Exception: continue diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py index 147496a38b..3edfcfd783 100644 --- a/synapse/storage/schema/delta/20/pushers.py +++ b/synapse/storage/schema/delta/20/pushers.py @@ -29,7 +29,8 @@ logger = logging.getLogger(__name__) def run_create(cur, database_engine, *args, **kwargs): logger.info("Porting pushers table...") - cur.execute(""" + cur.execute( + """ CREATE TABLE IF NOT EXISTS pushers2 ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, @@ -48,27 +49,34 @@ def run_create(cur, database_engine, *args, **kwargs): failing_since BIGINT, UNIQUE (app_id, pushkey, user_name) ) - """) - cur.execute("""SELECT + """ + ) + cur.execute( + """SELECT id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers - """) + """ + ) count = 0 for row in cur.fetchall(): row = list(row) row[8] = bytes(row[8]).decode("utf-8") row[11] = bytes(row[11]).decode("utf-8") - cur.execute(database_engine.convert_param_style(""" + cur.execute( + database_engine.convert_param_style( + """ INSERT into pushers2 ( id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since - ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), - row + ) values (%s)""" + % (",".join(["?" for _ in range(len(row))])) + ), + row, ) count += 1 cur.execute("DROP TABLE pushers") diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index ef7ec34346..9b95411fb6 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -40,9 +40,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): logger.warning("Could not get app_service_config_files from config") pass - appservices = load_appservices( - config.server_name, config_files - ) + appservices = load_appservices(config.server_name, config_files) owned = {} @@ -53,20 +51,19 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): if user_id in owned.keys(): logger.error( "user_id %s was owned by more than one application" - " service (IDs %s and %s); assigning arbitrarily to %s" % - (user_id, owned[user_id], appservice.id, owned[user_id]) + " service (IDs %s and %s); assigning arbitrarily to %s" + % (user_id, owned[user_id], appservice.id, owned[user_id]) ) owned.setdefault(appservice.id, []).append(user_id) for as_id, user_ids in owned.items(): n = 100 - user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n)) + user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n)) for chunk in user_chunks: cur.execute( database_engine.convert_param_style( - "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % ( - ",".join("?" for _ in chunk), - ) + "UPDATE users SET appservice_id = ? WHERE name IN (%s)" + % (",".join("?" for _ in chunk),) ), - [as_id] + chunk + [as_id] + chunk, ) diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py index 93367fa09e..9bb504aad5 100644 --- a/synapse/storage/schema/delta/31/pushers.py +++ b/synapse/storage/schema/delta/31/pushers.py @@ -24,12 +24,13 @@ logger = logging.getLogger(__name__) def token_to_stream_ordering(token): - return int(token[1:].split('_')[0]) + return int(token[1:].split("_")[0]) def run_create(cur, database_engine, *args, **kwargs): logger.info("Porting pushers table, delta 31...") - cur.execute(""" + cur.execute( + """ CREATE TABLE IF NOT EXISTS pushers2 ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, @@ -48,26 +49,33 @@ def run_create(cur, database_engine, *args, **kwargs): failing_since BIGINT, UNIQUE (app_id, pushkey, user_name) ) - """) - cur.execute("""SELECT + """ + ) + cur.execute( + """SELECT id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers - """) + """ + ) count = 0 for row in cur.fetchall(): row = list(row) row[12] = token_to_stream_ordering(row[12]) - cur.execute(database_engine.convert_param_style(""" + cur.execute( + database_engine.convert_param_style( + """ INSERT into pushers2 ( id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_stream_ordering, last_success, failing_since - ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), - row + ) values (%s)""" + % (",".join(["?" for _ in range(len(row))])) + ), + row, ) count += 1 cur.execute("DROP TABLE pushers") diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py index 9754d3ccfb..a26057dfb6 100644 --- a/synapse/storage/schema/delta/33/remote_media_ts.py +++ b/synapse/storage/schema/delta/33/remote_media_ts.py @@ -26,5 +26,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs): database_engine.convert_param_style( "UPDATE remote_media_cache SET last_access_ts = ?" ), - (int(time.time() * 1000),) + (int(time.time() * 1000),), ) diff --git a/synapse/storage/schema/delta/47/state_group_seq.py b/synapse/storage/schema/delta/47/state_group_seq.py index f6766501d2..9fd1ccf6f7 100644 --- a/synapse/storage/schema/delta/47/state_group_seq.py +++ b/synapse/storage/schema/delta/47/state_group_seq.py @@ -27,10 +27,7 @@ def run_create(cur, database_engine, *args, **kwargs): else: start_val = row[0] + 1 - cur.execute( - "CREATE SEQUENCE state_group_id_seq START WITH %s", - (start_val, ), - ) + cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,)) def run_upgrade(*args, **kwargs): diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py index 2233af87d7..49f5f2c003 100644 --- a/synapse/storage/schema/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/delta/48/group_unique_indexes.py @@ -38,16 +38,22 @@ def run_create(cur, database_engine, *args, **kwargs): rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" # remove duplicates from group_users & group_invites tables - cur.execute(""" + cur.execute( + """ DELETE FROM group_users WHERE %s NOT IN ( SELECT min(%s) FROM group_users GROUP BY group_id, user_id ); - """ % (rowid, rowid)) - cur.execute(""" + """ + % (rowid, rowid) + ) + cur.execute( + """ DELETE FROM group_invites WHERE %s NOT IN ( SELECT min(%s) FROM group_invites GROUP BY group_id, user_id ); - """ % (rowid, rowid)) + """ + % (rowid, rowid) + ) for statement in get_statements(FIX_INDEXES.splitlines()): cur.execute(statement) diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py index 6dd467b6c5..b1684a8441 100644 --- a/synapse/storage/schema/delta/50/make_event_content_nullable.py +++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py @@ -65,14 +65,18 @@ def run_create(cur, database_engine, *args, **kwargs): def run_upgrade(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): - cur.execute(""" + cur.execute( + """ ALTER TABLE events ALTER COLUMN content DROP NOT NULL; - """) + """ + ) return # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html - cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'") + cur.execute( + "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'" + ) (oldsql,) = cur.fetchone() sql = oldsql.replace("content TEXT NOT NULL", "content TEXT") @@ -86,7 +90,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute("PRAGMA writable_schema=ON") cur.execute( "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'", - (sql, ), + (sql,), ) cur.execute("PRAGMA schema_version=%i" % (oldver + 1,)) cur.execute("PRAGMA writable_schema=OFF") diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 10a27c207a..f3b1cec933 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,8 +31,8 @@ from .background_updates import BackgroundUpdateStore logger = logging.getLogger(__name__) SearchEntry = namedtuple( - 'SearchEntry', - ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'], + "SearchEntry", + ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"], ) @@ -216,7 +216,7 @@ class SearchStore(BackgroundUpdateStore): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - have_added_index = progress['have_added_indexes'] + have_added_index = progress["have_added_indexes"] if not have_added_index: diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index ff266b09b0..1cec84ee2e 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -71,7 +71,8 @@ class StatsStore(StateDeltasStore): # Get all the rooms that we want to process. def _make_staging_area(txn): # Create the temporary tables - stmts = get_statements(""" + stmts = get_statements( + """ -- We just recreate the table, we'll be reinserting the -- correct entries again later anyway. DROP TABLE IF EXISTS {temp}_rooms; @@ -85,7 +86,10 @@ class StatsStore(StateDeltasStore): ON {temp}_rooms(events); CREATE INDEX {temp}_rooms_id ON {temp}_rooms(room_id); - """.format(temp=TEMP_TABLE).splitlines()) + """.format( + temp=TEMP_TABLE + ).splitlines() + ) for statement in stmts: txn.execute(statement) @@ -105,7 +109,9 @@ class StatsStore(StateDeltasStore): LEFT JOIN room_stats_earliest_token AS t USING (room_id) WHERE t.room_id IS NULL GROUP BY c.room_id - """ % (TEMP_TABLE,) + """ % ( + TEMP_TABLE, + ) txn.execute(sql) new_pos = yield self.get_max_stream_id_in_current_state_deltas() @@ -184,7 +190,8 @@ class StatsStore(StateDeltasStore): logger.info( "Processing the next %d rooms of %d remaining", - len(rooms_to_work_on), progress["remaining"], + len(rooms_to_work_on), + progress["remaining"], ) # Number of state events we've processed by going through each room @@ -204,10 +211,17 @@ class StatsStore(StateDeltasStore): avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) - state_events = yield self.get_events([ - join_rules_id, history_visibility_id, encryption_id, name_id, - topic_id, avatar_id, canonical_alias_id, - ]) + state_events = yield self.get_events( + [ + join_rules_id, + history_visibility_id, + encryption_id, + name_id, + topic_id, + avatar_id, + canonical_alias_id, + ] + ) def _get_or_none(event_id, arg): event = state_events.get(event_id) @@ -271,7 +285,7 @@ class StatsStore(StateDeltasStore): # We've finished a room. Delete it from the table. self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id}, + txn, TEMP_TABLE + "_rooms", {"room_id": room_id} ) yield self.runInteraction("update_room_stats", _fetch_data) @@ -338,7 +352,7 @@ class StatsStore(StateDeltasStore): "name", "topic", "avatar", - "canonical_alias" + "canonical_alias", ): field = fields.get(col) if field and "\0" in field: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6f7f65d96b..d9482a3843 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -65,7 +65,7 @@ _EventDictReturn = namedtuple( def generate_pagination_where_clause( - direction, column_names, from_token, to_token, engine, + direction, column_names, from_token, to_token, engine ): """Creates an SQL expression to bound the columns by the pagination tokens. @@ -153,7 +153,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine): str """ - assert(bound in (">", "<", ">=", "<=")) + assert bound in (">", "<", ">=", "<=") name1, name2 = column_names val1, val2 = values @@ -169,11 +169,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine): # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we # use the later form when running against postgres. - return "((%d,%d) %s (%s,%s))" % ( - val1, val2, - bound, - name1, name2, - ) + return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2) # We want to generate queries of e.g. the form: # @@ -276,7 +272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_rooms( - self, room_ids, from_key, to_key, limit=0, order='DESC' + self, room_ids, from_key, to_key, limit=0, order="DESC" ): """Get new room events in stream ordering since `from_key`. @@ -346,7 +342,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_room( - self, room_id, from_key, to_key, limit=0, order='DESC' + self, room_id, from_key, to_key, limit=0, order="DESC" ): """Get new room events in stream ordering since `from_key`. @@ -395,8 +391,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_room_events_stream_for_room", f) - ret = yield self.get_events_as_list([ - r.event_id for r in rows], get_prev_content=True, + ret = yield self.get_events_as_list( + [r.event_id for r in rows], get_prev_content=True ) self._set_before_and_after(ret, rows, topo_order=from_id is None) @@ -446,7 +442,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_membership_changes_for_user", f) ret = yield self.get_events_as_list( - [r.event_id for r in rows], get_prev_content=True, + [r.event_id for r in rows], get_prev_content=True ) self._set_before_and_after(ret, rows, topo_order=False) @@ -725,7 +721,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn, room_id, before_token, - direction='b', + direction="b", limit=before_limit, event_filter=event_filter, ) @@ -735,7 +731,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn, room_id, after_token, - direction='f', + direction="f", limit=after_limit, event_filter=event_filter, ) @@ -816,7 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, from_token, to_token=None, - direction='b', + direction="b", limit=-1, event_filter=None, ): @@ -846,7 +842,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. args = [False, room_id] - if direction == 'b': + if direction == "b": order = "DESC" else: order = "ASC" @@ -882,7 +878,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if rows: topo = rows[-1].topological_ordering toke = rows[-1].stream_ordering - if direction == 'b': + if direction == "b": # Tokens are positions between events. # This token points *after* the last event in the chunk. # We need it to point to the event before it in the chunk @@ -898,7 +894,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def paginate_room_events( - self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None + self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None ): """Returns list of events before or after a given token. |