diff options
author | Richard van der Hoff <richard@matrix.org> | 2019-06-26 22:34:41 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2019-06-26 22:34:41 +0100 |
commit | a4daa899ec4cd195fc10936f68df5c78314b366c (patch) | |
tree | 35e88ff388b0f7652773a79930b732aa04f16bde /synapse/storage | |
parent | changelog (diff) | |
parent | Improve docs on choosing server_name (#5558) (diff) | |
download | synapse-a4daa899ec4cd195fc10936f68df5c78314b366c.tar.xz |
Merge branch 'develop' into rav/saml2_client
Diffstat (limited to 'synapse/storage')
35 files changed, 540 insertions, 325 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 71316f7d09..6b0ca80087 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -279,23 +279,35 @@ class DataStore( """ Counts the number of users who used this homeserver in the last 24 hours. """ + yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24) + return self.runInteraction("count_daily_users", self._count_users, yesterday) - def _count_users(txn): - yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24) - - sql = """ - SELECT COALESCE(count(*), 0) FROM ( - SELECT user_id FROM user_ips - WHERE last_seen > ? - GROUP BY user_id - ) u - """ - - txn.execute(sql, (yesterday,)) - count, = txn.fetchone() - return count + def count_monthly_users(self): + """ + Counts the number of users who used this homeserver in the last 30 days. + Note this method is intended for phonehome metrics only and is different + from the mau figure in synapse.storage.monthly_active_users which, + amongst other things, includes a 3 day grace period before a user counts. + """ + thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) + return self.runInteraction( + "count_monthly_users", self._count_users, thirty_days_ago + ) - return self.runInteraction("count_users", _count_users) + def _count_users(self, txn, time_from): + """ + Returns number of users seen in the past time_from period + """ + sql = """ + SELECT COALESCE(count(*), 0) FROM ( + SELECT user_id FROM user_ips + WHERE last_seen > ? + GROUP BY user_id + ) u + """ + txn.execute(sql, (time_from,)) + count, = txn.fetchone() + return count def count_r30_users(self): """ @@ -347,7 +359,7 @@ class DataStore( txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) for row in txn: - if row[0] == 'unknown': + if row[0] == "unknown": pass results[row[0]] = row[1] @@ -374,7 +386,7 @@ class DataStore( txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) count, = txn.fetchone() - results['all'] = count + results["all"] = count return results diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ae891aa332..29589853c6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -38,6 +38,14 @@ from synapse.util.caches.descriptors import Cache from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.stringutils import exception_to_unicode +# import a function which will return a monotonic time, in seconds +try: + # on python 3, use time.monotonic, since time.clock can go backwards + from time import monotonic as monotonic_time +except ImportError: + # ... but python 2 doesn't have it + from time import clock as monotonic_time + logger = logging.getLogger(__name__) try: @@ -167,22 +175,22 @@ class PerformanceCounters(object): self.current_counters = {} self.previous_counters = {} - def update(self, key, start_time, end_time=None): - if end_time is None: - end_time = time.time() - duration = end_time - start_time + def update(self, key, duration_secs): count, cum_time = self.current_counters.get(key, (0, 0)) count += 1 - cum_time += duration + cum_time += duration_secs self.current_counters[key] = (count, cum_time) - return end_time - def interval(self, interval_duration, limit=3): + def interval(self, interval_duration_secs, limit=3): counters = [] for name, (count, cum_time) in iteritems(self.current_counters): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append( - ((cum_time - prev_time) / interval_duration, count - prev_count, name) + ( + (cum_time - prev_time) / interval_duration_secs, + count - prev_count, + name, + ) ) self.previous_counters = dict(self.current_counters) @@ -213,7 +221,6 @@ class SQLBaseStore(object): # is running in mainline, and we have some nice monitoring frontends # to watch it self._txn_perf_counters = PerformanceCounters() - self._get_event_counters = PerformanceCounters() self._get_event_cache = Cache( "*getEvent*", keylen=3, max_entries=hs.config.event_cache_size @@ -299,12 +306,12 @@ class SQLBaseStore(object): def select_users_with_no_expiration_date_txn(txn): """Retrieves the list of registered users with no expiration date from the - database. + database, filtering out deactivated users. """ sql = ( "SELECT users.name FROM users" " LEFT JOIN account_validity ON (users.name = account_validity.user_id)" - " WHERE account_validity.user_id is NULL;" + " WHERE account_validity.user_id is NULL AND users.deactivated = 0;" ) txn.execute(sql, []) @@ -312,9 +319,7 @@ class SQLBaseStore(object): if res: for user in res: self.set_expiration_date_for_user_txn( - txn, - user["name"], - use_delta=True, + txn, user["name"], use_delta=True ) yield self.runInteraction( @@ -352,32 +357,24 @@ class SQLBaseStore(object): ) def start_profiling(self): - self._previous_loop_ts = self._clock.time_msec() + self._previous_loop_ts = monotonic_time() def loop(): curr = self._current_txn_total_time prev = self._previous_txn_total_time self._previous_txn_total_time = curr - time_now = self._clock.time_msec() + time_now = monotonic_time() time_then = self._previous_loop_ts self._previous_loop_ts = time_now - ratio = (curr - prev) / (time_now - time_then) + duration = time_now - time_then + ratio = (curr - prev) / duration - top_three_counters = self._txn_perf_counters.interval( - time_now - time_then, limit=3 - ) - - top_3_event_counters = self._get_event_counters.interval( - time_now - time_then, limit=3 - ) + top_three_counters = self._txn_perf_counters.interval(duration, limit=3) perf_logger.info( - "Total database time: %.3f%% {%s} {%s}", - ratio * 100, - top_three_counters, - top_3_event_counters, + "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) self._clock.looping_call(loop, 10000) @@ -385,7 +382,7 @@ class SQLBaseStore(object): def _new_transaction( self, conn, desc, after_callbacks, exception_callbacks, func, *args, **kwargs ): - start = time.time() + start = monotonic_time() txn_id = self._TXN_ID # We don't really need these to be unique, so lets stop it from @@ -451,7 +448,7 @@ class SQLBaseStore(object): logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: - end = time.time() + end = monotonic_time() duration = end - start LoggingContext.current_context().add_database_transaction(duration) @@ -459,7 +456,7 @@ class SQLBaseStore(object): transaction_logger.debug("[TXN END] {%s} %f sec", name, duration) self._current_txn_total_time += duration - self._txn_perf_counters.update(desc, start, end) + self._txn_perf_counters.update(desc, duration) sql_txn_timer.labels(desc).observe(duration) @defer.inlineCallbacks @@ -525,11 +522,11 @@ class SQLBaseStore(object): ) parent_context = None - start_time = time.time() + start_time = monotonic_time() def inner_func(conn, *args, **kwargs): with LoggingContext("runWithConnection", parent_context) as context: - sched_duration_sec = time.time() - start_time + sched_duration_sec = monotonic_time() - start_time sql_scheduling_timer.observe(sched_duration_sec) context.add_database_scheduled(sched_duration_sec) @@ -1667,7 +1664,7 @@ def db_to_json(db_content): # Decode it to a Unicode string before feeding it to json.loads, so we # consistenty get a Unicode-containing object out. if isinstance(db_content, (bytes, bytearray)): - db_content = db_content.decode('utf8') + db_content = db_content.decode("utf8") try: return json.loads(db_content) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index b8b8273f73..50f913a414 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -169,7 +169,7 @@ class BackgroundUpdateStore(SQLBaseStore): in_flight = set(update["update_name"] for update in updates) for update in updates: if update["depends_on"] not in in_flight: - self._background_update_queue.append(update['update_name']) + self._background_update_queue.append(update["update_name"]) if not self._background_update_queue: # no work left to do diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index d102e07372..3413a46675 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -149,9 +149,7 @@ class DeviceWorkerStore(SQLBaseStore): defer.returnValue((stream_id_cutoff, [])) results = yield self._get_device_update_edus_by_remote( - destination, - from_stream_id, - query_map, + destination, from_stream_id, query_map ) defer.returnValue((now_stream_id, results)) @@ -182,9 +180,7 @@ class DeviceWorkerStore(SQLBaseStore): return list(txn) @defer.inlineCallbacks - def _get_device_update_edus_by_remote( - self, destination, from_stream_id, query_map, - ): + def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map): """Returns a list of device update EDUs as well as E2EE keys Args: @@ -210,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore): # The prev_id for the first row is always the last row before # `from_stream_id` prev_id = yield self._get_last_device_update_for_remote_user( - destination, user_id, from_stream_id, + destination, user_id, from_stream_id ) for device_id, device in iteritems(user_devices): stream_id = query_map[(user_id, device_id)] @@ -238,7 +234,7 @@ class DeviceWorkerStore(SQLBaseStore): defer.returnValue(results) def _get_last_device_update_for_remote_user( - self, destination, user_id, from_stream_id, + self, destination, user_id, from_stream_id ): def f(txn): prev_sent_id_sql = """ diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 521936e3b0..f40ef2ab64 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -87,10 +87,10 @@ class EndToEndRoomKeyStore(SQLBaseStore): }, values={ "version": version, - "first_message_index": room_key['first_message_index'], - "forwarded_count": room_key['forwarded_count'], - "is_verified": room_key['is_verified'], - "session_data": json.dumps(room_key['session_data']), + "first_message_index": room_key["first_message_index"], + "forwarded_count": room_key["forwarded_count"], + "is_verified": room_key["is_verified"], + "session_data": json.dumps(room_key["session_data"]), }, lock=False, ) @@ -118,13 +118,13 @@ class EndToEndRoomKeyStore(SQLBaseStore): try: version = int(version) except ValueError: - defer.returnValue({'rooms': {}}) + defer.returnValue({"rooms": {}}) keyvalues = {"user_id": user_id, "version": version} if room_id: - keyvalues['room_id'] = room_id + keyvalues["room_id"] = room_id if session_id: - keyvalues['session_id'] = session_id + keyvalues["session_id"] = session_id rows = yield self._simple_select_list( table="e2e_room_keys", @@ -141,10 +141,10 @@ class EndToEndRoomKeyStore(SQLBaseStore): desc="get_e2e_room_keys", ) - sessions = {'rooms': {}} + sessions = {"rooms": {}} for row in rows: - room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}}) - room_entry['sessions'][row['session_id']] = { + room_entry = sessions["rooms"].setdefault(row["room_id"], {"sessions": {}}) + room_entry["sessions"][row["session_id"]] = { "first_message_index": row["first_message_index"], "forwarded_count": row["forwarded_count"], "is_verified": row["is_verified"], @@ -174,9 +174,9 @@ class EndToEndRoomKeyStore(SQLBaseStore): keyvalues = {"user_id": user_id, "version": int(version)} if room_id: - keyvalues['room_id'] = room_id + keyvalues["room_id"] = room_id if session_id: - keyvalues['session_id'] = session_id + keyvalues["session_id"] = session_id yield self._simple_delete( table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys" @@ -191,7 +191,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) row = txn.fetchone() if not row: - raise StoreError(404, 'No current backup version') + raise StoreError(404, "No current backup version") return row[0] def get_e2e_room_keys_version_info(self, user_id, version=None): @@ -255,7 +255,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) current_version = txn.fetchone()[0] if current_version is None: - current_version = '0' + current_version = "0" new_version = str(int(current_version) + 1) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 1b97ee74e3..289b6bc281 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -45,6 +45,10 @@ class PostgresEngine(object): # together. For example, version 8.1.5 will be returned as 80105 self._version = db_conn.server_version + # Are we on a supported PostgreSQL version? + if self._version < 90500: + raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.") + db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) @@ -64,9 +68,9 @@ class PostgresEngine(object): @property def can_native_upsert(self): """ - Can we use native UPSERTs? This requires PostgreSQL 9.5+. + Can we use native UPSERTs? """ - return self._version >= 90500 + return True def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 933bcf42c2..e9b9caa49a 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -85,7 +85,7 @@ class Sqlite3Engine(object): def _parse_match_info(buf): bufsize = len(buf) - return [struct.unpack('@I', buf[i : i + 4])[0] for i in range(0, bufsize, 4)] + return [struct.unpack("@I", buf[i : i + 4])[0] for i in range(0, bufsize, 4)] def _rank(raw_match_info): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 09e39c2c28..cb4478342f 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -190,6 +190,34 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id, ) + def get_rooms_with_many_extremities(self, min_count, limit): + """Get the top rooms with at least N extremities. + + Args: + min_count (int): The minimum number of extremities + limit (int): The maximum number of rooms to return. + + Returns: + Deferred[list]: At most `limit` room IDs that have at least + `min_count` extremities, sorted by extremity count. + """ + + def _get_rooms_with_many_extremities_txn(txn): + sql = """ + SELECT room_id FROM event_forward_extremities + GROUP BY room_id + HAVING count(*) > ? + ORDER BY count(*) DESC + LIMIT ? + """ + + txn.execute(sql, (min_count, limit)) + return [room_id for room_id, in txn] + + return self.runInteraction( + "get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn + ) + @cached(max_entries=5000, iterable=True) def get_latest_event_ids_in_room(self, room_id): return self._simple_select_onecol( diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a729f3e067..eca77069fd 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -277,7 +277,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered # by stream_ordering, oldest first. - notifs.sort(key=lambda r: r['stream_ordering']) + notifs.sort(key=lambda r: r["stream_ordering"]) # Take only up to the limit. We have to stop at the limit because # one of the subqueries may have hit the limit. @@ -379,7 +379,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered # by received_ts (most recent first) - notifs.sort(key=lambda r: -(r['received_ts'] or 0)) + notifs.sort(key=lambda r: -(r["received_ts"] or 0)) # Now return the first `limit` defer.returnValue(notifs[:limit]) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f9162be9b9..fefba39ea1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,14 +17,14 @@ import itertools import logging -from collections import OrderedDict, deque, namedtuple +from collections import Counter as c_counter, OrderedDict, deque, namedtuple from functools import wraps from six import iteritems, text_type from six.moves import range from canonicaljson import json -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore from synapse.storage.background_updates import BackgroundUpdateStore @@ -73,6 +74,21 @@ state_delta_reuse_delta_counter = Counter( "synapse_storage_events_state_delta_reuse_delta", "" ) +# The number of forward extremities for each new event. +forward_extremities_counter = Histogram( + "synapse_storage_events_forward_extremities_persisted", + "Number of forward extremities for each new event", + buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), +) + +# The number of stale forward extremities for each new event. Stale extremities +# are those that were in the previous set of extremities as well as the new. +stale_forward_extremities_counter = Histogram( + "synapse_storage_events_stale_forward_extremities_persisted", + "Number of unchanged forward extremities for each new event", + buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), +) + def encode_json(json_object): """ @@ -220,13 +236,39 @@ class EventsStore( EventsWorkerStore, BackgroundUpdateStore, ): - def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) self._event_persist_queue = _EventPeristenceQueue() self._state_resolution_handler = hs.get_state_resolution_handler() + # Collect metrics on the number of forward extremities that exist. + # Counter of number of extremities to count + self._current_forward_extremities_amount = c_counter() + + BucketCollector( + "synapse_forward_extremities", + lambda: self._current_forward_extremities_amount, + buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"], + ) + + # Read the extrems every 60 minutes + hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000) + + @defer.inlineCallbacks + def _read_forward_extremities(self): + def fetch(txn): + txn.execute( + """ + select count(*) c from event_forward_extremities + group by room_id + """ + ) + return txn.fetchall() + + res = yield self.runInteraction("read_forward_extremities", fetch) + self._current_forward_extremities_amount = c_counter(list(x[0] for x in res)) + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): """ @@ -514,6 +556,8 @@ class EventsStore( and not event.internal_metadata.is_soft_failed() ] + latest_event_ids = set(latest_event_ids) + # start with the existing forward extremities result = set(latest_event_ids) @@ -537,6 +581,13 @@ class EventsStore( ) result.difference_update(existing_prevs) + # We only update metrics for events that change forward extremities + # (e.g. we ignore backfill/outliers/etc) + if result != latest_event_ids: + forward_extremities_counter.observe(len(result)) + stale = latest_event_ids & result + stale_forward_extremities_counter.observe(len(stale)) + defer.returnValue(result) @defer.inlineCallbacks @@ -568,17 +619,11 @@ class EventsStore( ) txn.execute(sql, batch) - results.extend( - r[0] - for r in txn - if not json.loads(r[1]).get("soft_failed") - ) + results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed")) for chunk in batch_iter(event_ids, 100): yield self.runInteraction( - "_get_events_which_are_prevs", - _get_events_which_are_prevs_txn, - chunk, + "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk ) defer.returnValue(results) @@ -640,9 +685,7 @@ class EventsStore( for chunk in batch_iter(event_ids, 100): yield self.runInteraction( - "_get_prevs_before_rejected", - _get_prevs_before_rejected_txn, - chunk, + "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk ) defer.returnValue(existing_prevs) diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py index 75c1935bf3..1ce21d190c 100644 --- a/synapse/storage/events_bg_updates.py +++ b/synapse/storage/events_bg_updates.py @@ -64,8 +64,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): ) self.register_background_update_handler( - self.DELETE_SOFT_FAILED_EXTREMITIES, - self._cleanup_extremities_bg_update, + self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update ) @defer.inlineCallbacks @@ -269,7 +268,8 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): LEFT JOIN events USING (event_id) LEFT JOIN event_json USING (event_id) LEFT JOIN rejections USING (event_id) - """, (batch_size,) + """, + (batch_size,), ) for prev_event_id, event_id, metadata, rejected, outlier in txn: @@ -364,13 +364,12 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): column="event_id", iterable=to_delete, keyvalues={}, - retcols=("room_id",) + retcols=("room_id",), ) room_ids = set(row["room_id"] for row in rows) for room_id in room_ids: txn.call_after( - self.get_latest_event_ids_in_room.invalidate, - (room_id,) + self.get_latest_event_ids_in_room.invalidate, (room_id,) ) self._simple_delete_many_txn( @@ -384,7 +383,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): return len(original_set) num_handled = yield self.runInteraction( - "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn, + "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn ) if not num_handled: @@ -394,8 +393,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): txn.execute("DROP TABLE _extremities_to_check") yield self.runInteraction( - "_cleanup_extremities_bg_update_drop_table", - _drop_table_txn, + "_cleanup_extremities_bg_update_drop_table", _drop_table_txn ) defer.returnValue(num_handled) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index cc7df5cf14..6d680d405a 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -27,7 +27,6 @@ from synapse.api.constants import EventTypes from synapse.api.errors import NotFoundError from synapse.api.room_versions import EventFormatVersions from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 -# these are only included to make the type annotations work from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event from synapse.metrics.background_process_metrics import run_as_background_process @@ -111,8 +110,7 @@ class EventsWorkerStore(SQLBaseStore): return ts return self.runInteraction( - "get_approximate_received_ts", - _get_approximate_received_ts_txn, + "get_approximate_received_ts", _get_approximate_received_ts_txn ) @defer.inlineCallbacks @@ -677,7 +675,8 @@ class EventsWorkerStore(SQLBaseStore): """ return self.runInteraction( "get_total_state_event_counts", - self._get_total_state_event_counts_txn, room_id + self._get_total_state_event_counts_txn, + room_id, ) def _get_current_state_event_counts_txn(self, txn, room_id): @@ -701,7 +700,8 @@ class EventsWorkerStore(SQLBaseStore): """ return self.runInteraction( "get_current_state_event_counts", - self._get_current_state_event_counts_txn, room_id + self._get_current_state_event_counts_txn, + room_id, ) @defer.inlineCallbacks diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index dce6a43ac1..73e6fc6de2 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1179,11 +1179,7 @@ class GroupServerStore(SQLBaseStore): for table in tables: self._simple_delete_txn( - txn, - table=table, - keyvalues={"group_id": group_id}, + txn, table=table, keyvalues={"group_id": group_id} ) - return self.runInteraction( - "delete_group", _delete_group_txn - ) + return self.runInteraction("delete_group", _delete_group_txn) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index e3655ad8d7..e72f89e446 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -131,7 +131,7 @@ class KeyStore(SQLBaseStore): def _invalidate(res): f = self._get_server_verify_key.invalidate for i in invalidations: - f((i, )) + f((i,)) return res return self.runInteraction( diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 3ecf47e7a7..6b1238ce4a 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -22,11 +22,11 @@ class MediaRepositoryStore(BackgroundUpdateStore): super(MediaRepositoryStore, self).__init__(db_conn, hs) self.register_background_index_update( - update_name='local_media_repository_url_idx', - index_name='local_media_repository_url_idx', - table='local_media_repository', - columns=['created_ts'], - where_clause='url_cache IS NOT NULL', + update_name="local_media_repository_url_idx", + index_name="local_media_repository_url_idx", + table="local_media_repository", + columns=["created_ts"], + where_clause="url_cache IS NOT NULL", ) def get_local_media(self, media_id): @@ -108,12 +108,12 @@ class MediaRepositoryStore(BackgroundUpdateStore): return dict( zip( ( - 'response_code', - 'etag', - 'expires_ts', - 'og', - 'media_id', - 'download_ts', + "response_code", + "etag", + "expires_ts", + "og", + "media_id", + "download_ts", ), row, ) diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 8aa8abc470..081564360f 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -86,11 +86,11 @@ class MonthlyActiveUsersStore(SQLBaseStore): if len(self.reserved_users) > 0: # questionmarks is a hack to overcome sqlite not supporting # tuples in 'WHERE IN %s' - questionmarks = '?' * len(self.reserved_users) + questionmarks = "?" * len(self.reserved_users) query_args.extend(self.reserved_users) sql = base_sql + """ AND user_id NOT IN ({})""".format( - ','.join(questionmarks) + ",".join(questionmarks) ) else: sql = base_sql @@ -124,7 +124,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): if len(self.reserved_users) > 0: query_args.extend(self.reserved_users) sql = base_sql + """ AND user_id NOT IN ({})""".format( - ','.join(questionmarks) + ",".join(questionmarks) ) else: sql = base_sql diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index f2c1bed487..7c4e1dc7ec 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -133,7 +133,7 @@ def _setup_new_database(cur, database_engine): if ver <= SCHEMA_VERSION: valid_dirs.append((ver, abs_path)) else: - logger.warn("Unexpected entry in 'full_schemas': %s", filename) + logger.debug("Ignoring entry '%s' in 'full_schemas'", filename) if not valid_dirs: raise PrepareDatabaseException( @@ -146,9 +146,10 @@ def _setup_new_database(cur, database_engine): directory_entries = os.listdir(sql_dir) - for filename in sorted(fnmatch.filter(directory_entries, "*.sql") + fnmatch.filter( - directory_entries, "*.sql." + specific - )): + for filename in sorted( + fnmatch.filter(directory_entries, "*.sql") + + fnmatch.filter(directory_entries, "*.sql." + specific) + ): sql_loc = os.path.join(sql_dir, filename) logger.debug("Applying schema %s", sql_loc) executescript(cur, sql_loc) @@ -313,7 +314,7 @@ def _apply_module_schemas(txn, database_engine, config): application config """ for (mod, _config) in config.password_providers: - if not hasattr(mod, 'get_db_schema_files'): + if not hasattr(mod, "get_db_schema_files"): continue modname = ".".join((mod.__module__, mod.__name__)) _apply_module_schema_files( @@ -343,7 +344,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) continue root_name, ext = os.path.splitext(name) - if ext != '.sql': + if ext != ".sql": raise PrepareDatabaseException( "only .sql files are currently supported for module schemas" ) @@ -407,7 +408,7 @@ def get_statements(f): def executescript(txn, schema_path): - with open(schema_path, 'r') as f: + with open(schema_path, "r") as f: for statement in get_statements(f): txn.execute(statement) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index aeec2f57c4..0ff392bdb4 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -41,7 +41,7 @@ class ProfileWorkerStore(SQLBaseStore): defer.returnValue( ProfileInfo( - avatar_url=profile['avatar_url'], display_name=profile['displayname'] + avatar_url=profile["avatar_url"], display_name=profile["displayname"] ) ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 9e406baafa..98cec8c82b 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -46,12 +46,12 @@ def _load_rules(rawrules, enabled_map): rules = list(list_with_base_rules(ruleslist)) for i, rule in enumerate(rules): - rule_id = rule['rule_id'] + rule_id = rule["rule_id"] if rule_id in enabled_map: - if rule.get('enabled', True) != bool(enabled_map[rule_id]): + if rule.get("enabled", True) != bool(enabled_map[rule_id]): # Rules are cached across users. rule = dict(rule) - rule['enabled'] = bool(enabled_map[rule_id]) + rule["enabled"] = bool(enabled_map[rule_id]) rules[i] = rule return rules @@ -126,12 +126,12 @@ class PushRulesWorkerStore( def get_push_rules_enabled_for_user(self, user_id): results = yield self._simple_select_list( table="push_rules_enable", - keyvalues={'user_name': user_id}, + keyvalues={"user_name": user_id}, retcols=("user_name", "rule_id", "enabled"), desc="get_push_rules_enabled_for_user", ) defer.returnValue( - {r['rule_id']: False if r['enabled'] == 0 else True for r in results} + {r["rule_id"]: False if r["enabled"] == 0 else True for r in results} ) def have_push_rules_changed_for_user(self, user_id, last_id): @@ -175,7 +175,7 @@ class PushRulesWorkerStore( rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) for row in rows: - results.setdefault(row['user_name'], []).append(row) + results.setdefault(row["user_name"], []).append(row) enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) @@ -194,7 +194,7 @@ class PushRulesWorkerStore( rule (Dict): A push rule. """ # Create new rule id - rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1]) + rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) new_rule_id = rule_id_scope + "/" + new_room_id # Change room id in each condition @@ -334,8 +334,8 @@ class PushRulesWorkerStore( desc="bulk_get_push_rules_enabled", ) for row in rows: - enabled = bool(row['enabled']) - results.setdefault(row['user_name'], {})[row['rule_id']] = enabled + enabled = bool(row["enabled"]) + results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled defer.returnValue(results) @@ -568,7 +568,7 @@ class PushRuleStore(PushRulesWorkerStore): def delete_push_rule_txn(txn, stream_id, event_stream_ordering): self._simple_delete_one_txn( - txn, "push_rules", {'user_name': user_id, 'rule_id': rule_id} + txn, "push_rules", {"user_name": user_id, "rule_id": rule_id} ) self._insert_push_rules_update_txn( @@ -605,9 +605,9 @@ class PushRuleStore(PushRulesWorkerStore): self._simple_upsert_txn( txn, "push_rules_enable", - {'user_name': user_id, 'rule_id': rule_id}, - {'enabled': 1 if enabled else 0}, - {'id': new_id}, + {"user_name": user_id, "rule_id": rule_id}, + {"enabled": 1 if enabled else 0}, + {"id": new_id}, ) self._insert_push_rules_update_txn( @@ -645,8 +645,8 @@ class PushRuleStore(PushRulesWorkerStore): self._simple_update_one_txn( txn, "push_rules", - {'user_name': user_id, 'rule_id': rule_id}, - {'actions': actions_json}, + {"user_name": user_id, "rule_id": rule_id}, + {"actions": actions_json}, ) self._insert_push_rules_update_txn( diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 1567e1df48..cfe0a94330 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -37,24 +37,24 @@ else: class PusherWorkerStore(SQLBaseStore): def _decode_pushers_rows(self, rows): for r in rows: - dataJson = r['data'] - r['data'] = None + dataJson = r["data"] + r["data"] = None try: if isinstance(dataJson, db_binary_type): dataJson = str(dataJson).decode("UTF8") - r['data'] = json.loads(dataJson) + r["data"] = json.loads(dataJson) except Exception as e: logger.warn( "Invalid JSON in data for pusher %d: %s, %s", - r['id'], + r["id"], dataJson, e.args[0], ) pass - if isinstance(r['pushkey'], db_binary_type): - r['pushkey'] = str(r['pushkey']).decode("UTF8") + if isinstance(r["pushkey"], db_binary_type): + r["pushkey"] = str(r["pushkey"]).decode("UTF8") return rows @@ -195,15 +195,15 @@ class PusherWorkerStore(SQLBaseStore): ) def get_if_users_have_pushers(self, user_ids): rows = yield self._simple_select_many_batch( - table='pushers', - column='user_name', + table="pushers", + column="user_name", iterable=user_ids, - retcols=['user_name'], - desc='get_if_users_have_pushers', + retcols=["user_name"], + desc="get_if_users_have_pushers", ) result = {user_id: False for user_id in user_ids} - result.update({r['user_name']: True for r in rows}) + result.update({r["user_name"]: True for r in rows}) defer.returnValue(result) @@ -299,8 +299,8 @@ class PusherStore(PusherWorkerStore): ): yield self._simple_update_one( "pushers", - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'last_stream_ordering': last_stream_ordering}, + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + {"last_stream_ordering": last_stream_ordering}, desc="update_pusher_last_stream_ordering", ) @@ -310,10 +310,10 @@ class PusherStore(PusherWorkerStore): ): yield self._simple_update_one( "pushers", - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, { - 'last_stream_ordering': last_stream_ordering, - 'last_success': last_success, + "last_stream_ordering": last_stream_ordering, + "last_success": last_success, }, desc="update_pusher_last_stream_ordering_and_success", ) @@ -322,8 +322,8 @@ class PusherStore(PusherWorkerStore): def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): yield self._simple_update_one( "pushers", - {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id}, - {'failing_since': failing_since}, + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + {"failing_since": failing_since}, desc="update_pusher_failing_since", ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index a1647e50a1..b477da12b1 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cachedInlineCallbacks() def get_users_with_read_receipts_in_room(self, room_id): receipts = yield self.get_receipts_for_room(room_id, "m.read") - defer.returnValue(set(r['user_id'] for r in receipts)) + defer.returnValue(set(r["user_id"] for r in receipts)) @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 1dd1182e82..983ce13291 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import re from six import iterkeys @@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks THIRTY_MINUTES_IN_MS = 30 * 60 * 1000 +logger = logging.getLogger(__name__) + class RegistrationWorkerStore(SQLBaseStore): def __init__(self, db_conn, hs): @@ -113,8 +116,9 @@ class RegistrationWorkerStore(SQLBaseStore): defer.returnValue(res) @defer.inlineCallbacks - def set_account_validity_for_user(self, user_id, expiration_ts, email_sent, - renewal_token=None): + def set_account_validity_for_user( + self, user_id, expiration_ts, email_sent, renewal_token=None + ): """Updates the account validity properties of the given account, with the given values. @@ -128,6 +132,7 @@ class RegistrationWorkerStore(SQLBaseStore): renewal_token (str): Renewal token the user can use to extend the validity of their account. Defaults to no token. """ + def set_account_validity_for_user_txn(txn): self._simple_update_txn( txn=txn, @@ -140,12 +145,11 @@ class RegistrationWorkerStore(SQLBaseStore): }, ) self._invalidate_cache_and_stream( - txn, self.get_expiration_ts_for_user, (user_id,), + txn, self.get_expiration_ts_for_user, (user_id,) ) yield self.runInteraction( - "set_account_validity_for_user", - set_account_validity_for_user_txn, + "set_account_validity_for_user", set_account_validity_for_user_txn ) @defer.inlineCallbacks @@ -214,6 +218,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: Deferred: Resolves to a list[dict[user_id (str), expiration_ts_ms (int)]] """ + def select_users_txn(txn, now_ms, renew_at): sql = ( "SELECT user_id, expiration_ts_ms FROM account_validity" @@ -226,7 +231,8 @@ class RegistrationWorkerStore(SQLBaseStore): res = yield self.runInteraction( "get_users_expiring_soon", select_users_txn, - self.clock.time_msec(), self.config.account_validity.renew_at, + self.clock.time_msec(), + self.config.account_validity.renew_at, ) defer.returnValue(res) @@ -249,6 +255,20 @@ class RegistrationWorkerStore(SQLBaseStore): ) @defer.inlineCallbacks + def delete_account_validity_for_user(self, user_id): + """Deletes the entry for the given user in the account validity table, removing + their expiration date and renewal token. + + Args: + user_id (str): ID of the user to remove from the account validity table. + """ + yield self._simple_delete_one( + table="account_validity", + keyvalues={"user_id": user_id}, + desc="delete_account_validity_for_user", + ) + + @defer.inlineCallbacks def is_server_admin(self, user): res = yield self._simple_select_one_onecol( table="users", @@ -352,7 +372,7 @@ class RegistrationWorkerStore(SQLBaseStore): WHERE creation_ts > ? ) AS t GROUP BY user_type """ - results = {'native': 0, 'guest': 0, 'bridged': 0} + results = {"native": 0, "guest": 0, "bridged": 0} txn.execute(sql, (yesterday,)) for row in txn: results[row[0]] = row[1] @@ -418,7 +438,7 @@ class RegistrationWorkerStore(SQLBaseStore): {"medium": medium, "address": address}, ["guest_access_token"], True, - 'get_3pid_guest_access_token', + "get_3pid_guest_access_token", ) if ret: defer.returnValue(ret["guest_access_token"]) @@ -455,11 +475,11 @@ class RegistrationWorkerStore(SQLBaseStore): txn, "user_threepids", {"medium": medium, "address": address}, - ['user_id'], + ["user_id"], True, ) if ret: - return ret['user_id'] + return ret["user_id"] return None @defer.inlineCallbacks @@ -475,8 +495,8 @@ class RegistrationWorkerStore(SQLBaseStore): ret = yield self._simple_select_list( "user_threepids", {"user_id": user_id}, - ['medium', 'address', 'validated_at', 'added_at'], - 'user_get_threepids', + ["medium", "address", "validated_at", "added_at"], + "user_get_threepids", ) defer.returnValue(ret) @@ -555,11 +575,7 @@ class RegistrationWorkerStore(SQLBaseStore): """ return self._simple_select_onecol( table="user_threepid_id_server", - keyvalues={ - "user_id": user_id, - "medium": medium, - "address": address, - }, + keyvalues={"user_id": user_id, "medium": medium, "address": address}, retcol="id_server", desc="get_id_servers_user_bound", ) @@ -595,15 +611,80 @@ class RegistrationStore( self.register_noop_background_update("refresh_tokens_device_index") self.register_background_update_handler( - "user_threepids_grandfather", self._bg_user_threepids_grandfather, + "user_threepids_grandfather", self._bg_user_threepids_grandfather + ) + + self.register_background_update_handler( + "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag ) # Create a background job for culling expired 3PID validity tokens hs.get_clock().looping_call( - self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS, + self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS ) @defer.inlineCallbacks + def _backgroud_update_set_deactivated_flag(self, progress, batch_size): + """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1 + for each of them. + """ + + last_user = progress.get("user_id", "") + + def _backgroud_update_set_deactivated_flag_txn(txn): + txn.execute( + """ + SELECT + users.name, + COUNT(access_tokens.token) AS count_tokens, + COUNT(user_threepids.address) AS count_threepids + FROM users + LEFT JOIN access_tokens ON (access_tokens.user_id = users.name) + LEFT JOIN user_threepids ON (user_threepids.user_id = users.name) + WHERE (users.password_hash IS NULL OR users.password_hash = '') + AND (users.appservice_id IS NULL OR users.appservice_id = '') + AND users.is_guest = 0 + AND users.name > ? + GROUP BY users.name + ORDER BY users.name ASC + LIMIT ?; + """, + (last_user, batch_size), + ) + + rows = self.cursor_to_dict(txn) + + if not rows: + return True + + rows_processed_nb = 0 + + for user in rows: + if not user["count_tokens"] and not user["count_threepids"]: + self.set_user_deactivated_status_txn(txn, user["name"], True) + rows_processed_nb += 1 + + logger.info("Marked %d rows as deactivated", rows_processed_nb) + + self._background_update_progress_txn( + txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]} + ) + + if batch_size > len(rows): + return True + else: + return False + + end = yield self.runInteraction( + "users_set_deactivated_flag", _backgroud_update_set_deactivated_flag_txn + ) + + if end: + yield self._end_background_update("users_set_deactivated_flag") + + defer.returnValue(batch_size) + + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): """Adds an access token for the given user. @@ -768,7 +849,7 @@ class RegistrationStore( def user_set_password_hash_txn(txn): self._simple_update_one_txn( - txn, 'users', {'name': user_id}, {'password_hash': password_hash} + txn, "users", {"name": user_id}, {"password_hash": password_hash} ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -789,9 +870,9 @@ class RegistrationStore( def f(txn): self._simple_update_one_txn( txn, - table='users', - keyvalues={'name': user_id}, - updatevalues={'consent_version': consent_version}, + table="users", + keyvalues={"name": user_id}, + updatevalues={"consent_version": consent_version}, ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -813,9 +894,9 @@ class RegistrationStore( def f(txn): self._simple_update_one_txn( txn, - table='users', - keyvalues={'name': user_id}, - updatevalues={'consent_server_notice_sent': consent_version}, + table="users", + keyvalues={"name": user_id}, + updatevalues={"consent_server_notice_sent": consent_version}, ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -985,7 +1066,7 @@ class RegistrationStore( if id_servers: yield self.runInteraction( - "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn, + "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn ) yield self._end_background_update("user_threepids_grandfather") @@ -993,12 +1074,7 @@ class RegistrationStore( defer.returnValue(1) def get_threepid_validation_session( - self, - medium, - client_secret, - address=None, - sid=None, - validated=True, + self, medium, client_secret, address=None, sid=None, validated=True ): """Gets a session_id and last_send_attempt (if available) for a client_secret/medium/(address|session_id) combo @@ -1018,23 +1094,22 @@ class RegistrationStore( latest session_id and send_attempt count for this 3PID. Otherwise None if there hasn't been a previous attempt """ - keyvalues = { - "medium": medium, - "client_secret": client_secret, - } + keyvalues = {"medium": medium, "client_secret": client_secret} if address: keyvalues["address"] = address if sid: keyvalues["session_id"] = sid - assert(address or sid) + assert address or sid def get_threepid_validation_session_txn(txn): sql = """ SELECT address, session_id, medium, client_secret, last_send_attempt, validated_at FROM threepid_validation_session WHERE %s - """ % (" AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),) + """ % ( + " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)), + ) if validated is not None: sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL") @@ -1049,17 +1124,10 @@ class RegistrationStore( return rows[0] return self.runInteraction( - "get_threepid_validation_session", - get_threepid_validation_session_txn, + "get_threepid_validation_session", get_threepid_validation_session_txn ) - def validate_threepid_session( - self, - session_id, - client_secret, - token, - current_ts, - ): + def validate_threepid_session(self, session_id, client_secret, token, current_ts): """Attempt to validate a threepid session using a token Args: @@ -1091,7 +1159,7 @@ class RegistrationStore( if retrieved_client_secret != client_secret: raise ThreepidValidationError( - 400, "This client_secret does not match the provided session_id", + 400, "This client_secret does not match the provided session_id" ) row = self._simple_select_one_txn( @@ -1104,7 +1172,7 @@ class RegistrationStore( if not row: raise ThreepidValidationError( - 400, "Validation token not found or has expired", + 400, "Validation token not found or has expired" ) expires = row["expires"] next_link = row["next_link"] @@ -1115,7 +1183,7 @@ class RegistrationStore( if expires <= current_ts: raise ThreepidValidationError( - 400, "This token has expired. Please request a new one", + 400, "This token has expired. Please request a new one" ) # Looks good. Validate the session @@ -1130,8 +1198,7 @@ class RegistrationStore( # Return next_link if it exists return self.runInteraction( - "validate_threepid_session_txn", - validate_threepid_session_txn, + "validate_threepid_session_txn", validate_threepid_session_txn ) def upsert_threepid_validation_session( @@ -1198,6 +1265,7 @@ class RegistrationStore( token_expires (int): The timestamp for which after the token will no longer be valid """ + def start_or_continue_validation_session_txn(txn): # Create or update a validation session self._simple_upsert_txn( @@ -1231,6 +1299,7 @@ class RegistrationStore( def cull_expired_threepid_validation_tokens(self): """Remove threepid validation tokens with expiry dates that have passed""" + def cull_expired_threepid_validation_tokens_txn(txn, ts): sql = """ DELETE FROM threepid_validation_token WHERE @@ -1252,6 +1321,7 @@ class RegistrationStore( Args: session_id (str): The ID of the session to delete """ + def delete_threepid_session_txn(txn): self._simple_delete_txn( txn, @@ -1265,6 +1335,53 @@ class RegistrationStore( ) return self.runInteraction( - "delete_threepid_session", - delete_threepid_session_txn, + "delete_threepid_session", delete_threepid_session_txn + ) + + def set_user_deactivated_status_txn(self, txn, user_id, deactivated): + self._simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"deactivated": 1 if deactivated else 0}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_deactivated_status, (user_id,) ) + + @defer.inlineCallbacks + def set_user_deactivated_status(self, user_id, deactivated): + """Set the `deactivated` property for the provided user to the provided value. + + Args: + user_id (str): The ID of the user to set the status for. + deactivated (bool): The value to set for `deactivated`. + """ + + yield self.runInteraction( + "set_user_deactivated_status", + self.set_user_deactivated_status_txn, + user_id, + deactivated, + ) + + @cachedInlineCallbacks() + def get_user_deactivated_status(self, user_id): + """Retrieve the value for the `deactivated` property for the provided user. + + Args: + user_id (str): The ID of the user to retrieve the status for. + + Returns: + defer.Deferred(bool): The requested value. + """ + + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user_id}, + retcol="deactivated", + desc="get_user_deactivated_status", + ) + + # Convert the integer into a boolean. + defer.returnValue(res == 1) diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index 4c83800cca..1b01934c19 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -468,9 +468,5 @@ class RelationsStore(RelationsWorkerStore): """ self._simple_delete_txn( - txn, - table="event_relations", - keyvalues={ - "event_id": redacted_event_id, - } + txn, table="event_relations", keyvalues={"event_id": redacted_event_id} ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7617913326..8004aeb909 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -420,7 +420,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): table="room_memberships", column="event_id", iterable=missing_member_event_ids, - retcols=('user_id', 'display_name', 'avatar_url'), + retcols=("user_id", "display_name", "avatar_url"), keyvalues={"membership": Membership.JOIN}, batch_size=500, desc="_get_joined_users_from_context", @@ -448,7 +448,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cachedInlineCallbacks(max_entries=10000) def is_host_joined(self, room_id, host): - if '%' in host or '_' in host: + if "%" in host or "_" in host: raise Exception("Invalid host name") sql = """ @@ -490,7 +490,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): Deferred: Resolves to True if the host is/was in the room, otherwise False. """ - if '%' in host or '_' in host: + if "%" in host or "_" in host: raise Exception("Invalid host name") sql = """ @@ -723,7 +723,7 @@ class RoomMemberStore(RoomMemberWorkerStore): room_id = row["room_id"] try: event_json = json.loads(row["json"]) - content = event_json['content'] + content = event_json["content"] except Exception: continue diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py index 147496a38b..3edfcfd783 100644 --- a/synapse/storage/schema/delta/20/pushers.py +++ b/synapse/storage/schema/delta/20/pushers.py @@ -29,7 +29,8 @@ logger = logging.getLogger(__name__) def run_create(cur, database_engine, *args, **kwargs): logger.info("Porting pushers table...") - cur.execute(""" + cur.execute( + """ CREATE TABLE IF NOT EXISTS pushers2 ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, @@ -48,27 +49,34 @@ def run_create(cur, database_engine, *args, **kwargs): failing_since BIGINT, UNIQUE (app_id, pushkey, user_name) ) - """) - cur.execute("""SELECT + """ + ) + cur.execute( + """SELECT id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers - """) + """ + ) count = 0 for row in cur.fetchall(): row = list(row) row[8] = bytes(row[8]).decode("utf-8") row[11] = bytes(row[11]).decode("utf-8") - cur.execute(database_engine.convert_param_style(""" + cur.execute( + database_engine.convert_param_style( + """ INSERT into pushers2 ( id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since - ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), - row + ) values (%s)""" + % (",".join(["?" for _ in range(len(row))])) + ), + row, ) count += 1 cur.execute("DROP TABLE pushers") diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index ef7ec34346..9b95411fb6 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -40,9 +40,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): logger.warning("Could not get app_service_config_files from config") pass - appservices = load_appservices( - config.server_name, config_files - ) + appservices = load_appservices(config.server_name, config_files) owned = {} @@ -53,20 +51,19 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): if user_id in owned.keys(): logger.error( "user_id %s was owned by more than one application" - " service (IDs %s and %s); assigning arbitrarily to %s" % - (user_id, owned[user_id], appservice.id, owned[user_id]) + " service (IDs %s and %s); assigning arbitrarily to %s" + % (user_id, owned[user_id], appservice.id, owned[user_id]) ) owned.setdefault(appservice.id, []).append(user_id) for as_id, user_ids in owned.items(): n = 100 - user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n)) + user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n)) for chunk in user_chunks: cur.execute( database_engine.convert_param_style( - "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % ( - ",".join("?" for _ in chunk), - ) + "UPDATE users SET appservice_id = ? WHERE name IN (%s)" + % (",".join("?" for _ in chunk),) ), - [as_id] + chunk + [as_id] + chunk, ) diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py index 93367fa09e..9bb504aad5 100644 --- a/synapse/storage/schema/delta/31/pushers.py +++ b/synapse/storage/schema/delta/31/pushers.py @@ -24,12 +24,13 @@ logger = logging.getLogger(__name__) def token_to_stream_ordering(token): - return int(token[1:].split('_')[0]) + return int(token[1:].split("_")[0]) def run_create(cur, database_engine, *args, **kwargs): logger.info("Porting pushers table, delta 31...") - cur.execute(""" + cur.execute( + """ CREATE TABLE IF NOT EXISTS pushers2 ( id BIGINT PRIMARY KEY, user_name TEXT NOT NULL, @@ -48,26 +49,33 @@ def run_create(cur, database_engine, *args, **kwargs): failing_since BIGINT, UNIQUE (app_id, pushkey, user_name) ) - """) - cur.execute("""SELECT + """ + ) + cur.execute( + """SELECT id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers - """) + """ + ) count = 0 for row in cur.fetchall(): row = list(row) row[12] = token_to_stream_ordering(row[12]) - cur.execute(database_engine.convert_param_style(""" + cur.execute( + database_engine.convert_param_style( + """ INSERT into pushers2 ( id, user_name, access_token, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_stream_ordering, last_success, failing_since - ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), - row + ) values (%s)""" + % (",".join(["?" for _ in range(len(row))])) + ), + row, ) count += 1 cur.execute("DROP TABLE pushers") diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py index 9754d3ccfb..a26057dfb6 100644 --- a/synapse/storage/schema/delta/33/remote_media_ts.py +++ b/synapse/storage/schema/delta/33/remote_media_ts.py @@ -26,5 +26,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs): database_engine.convert_param_style( "UPDATE remote_media_cache SET last_access_ts = ?" ), - (int(time.time() * 1000),) + (int(time.time() * 1000),), ) diff --git a/synapse/storage/schema/delta/47/state_group_seq.py b/synapse/storage/schema/delta/47/state_group_seq.py index f6766501d2..9fd1ccf6f7 100644 --- a/synapse/storage/schema/delta/47/state_group_seq.py +++ b/synapse/storage/schema/delta/47/state_group_seq.py @@ -27,10 +27,7 @@ def run_create(cur, database_engine, *args, **kwargs): else: start_val = row[0] + 1 - cur.execute( - "CREATE SEQUENCE state_group_id_seq START WITH %s", - (start_val, ), - ) + cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,)) def run_upgrade(*args, **kwargs): diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py index 2233af87d7..49f5f2c003 100644 --- a/synapse/storage/schema/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/delta/48/group_unique_indexes.py @@ -38,16 +38,22 @@ def run_create(cur, database_engine, *args, **kwargs): rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" # remove duplicates from group_users & group_invites tables - cur.execute(""" + cur.execute( + """ DELETE FROM group_users WHERE %s NOT IN ( SELECT min(%s) FROM group_users GROUP BY group_id, user_id ); - """ % (rowid, rowid)) - cur.execute(""" + """ + % (rowid, rowid) + ) + cur.execute( + """ DELETE FROM group_invites WHERE %s NOT IN ( SELECT min(%s) FROM group_invites GROUP BY group_id, user_id ); - """ % (rowid, rowid)) + """ + % (rowid, rowid) + ) for statement in get_statements(FIX_INDEXES.splitlines()): cur.execute(statement) diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py index 6dd467b6c5..b1684a8441 100644 --- a/synapse/storage/schema/delta/50/make_event_content_nullable.py +++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py @@ -65,14 +65,18 @@ def run_create(cur, database_engine, *args, **kwargs): def run_upgrade(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): - cur.execute(""" + cur.execute( + """ ALTER TABLE events ALTER COLUMN content DROP NOT NULL; - """) + """ + ) return # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html - cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'") + cur.execute( + "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'" + ) (oldsql,) = cur.fetchone() sql = oldsql.replace("content TEXT NOT NULL", "content TEXT") @@ -86,7 +90,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute("PRAGMA writable_schema=ON") cur.execute( "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'", - (sql, ), + (sql,), ) cur.execute("PRAGMA schema_version=%i" % (oldver + 1,)) cur.execute("PRAGMA writable_schema=OFF") diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql new file mode 100644 index 0000000000..dabdde489b --- /dev/null +++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql @@ -0,0 +1,19 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('users_set_deactivated_flag', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index ff49eaae02..f3b1cec933 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,8 +31,8 @@ from .background_updates import BackgroundUpdateStore logger = logging.getLogger(__name__) SearchEntry = namedtuple( - 'SearchEntry', - ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'], + "SearchEntry", + ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"], ) @@ -216,7 +216,7 @@ class SearchStore(BackgroundUpdateStore): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - have_added_index = progress['have_added_indexes'] + have_added_index = progress["have_added_indexes"] if not have_added_index: @@ -341,29 +341,7 @@ class SearchStore(BackgroundUpdateStore): for entry in entries ) - # inserts to a GIN index are normally batched up into a pending - # list, and then all committed together once the list gets to a - # certain size. The trouble with that is that postgres (pre-9.5) - # uses work_mem to determine the length of the list, and work_mem - # is typically very large. - # - # We therefore reduce work_mem while we do the insert. - # - # (postgres 9.5 uses the separate gin_pending_list_limit setting, - # so doesn't suffer the same problem, but changing work_mem will - # be harmless) - # - # Note that we don't need to worry about restoring it on - # exception, because exceptions will cause the transaction to be - # rolled back, including the effects of the SET command. - # - # Also: we use SET rather than SET LOCAL because there's lots of - # other stuff going on in this transaction, which want to have the - # normal work_mem setting. - - txn.execute("SET work_mem='256kB'") txn.executemany(sql, args) - txn.execute("RESET work_mem") elif isinstance(self.database_engine, Sqlite3Engine): sql = ( diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index ff266b09b0..1cec84ee2e 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -71,7 +71,8 @@ class StatsStore(StateDeltasStore): # Get all the rooms that we want to process. def _make_staging_area(txn): # Create the temporary tables - stmts = get_statements(""" + stmts = get_statements( + """ -- We just recreate the table, we'll be reinserting the -- correct entries again later anyway. DROP TABLE IF EXISTS {temp}_rooms; @@ -85,7 +86,10 @@ class StatsStore(StateDeltasStore): ON {temp}_rooms(events); CREATE INDEX {temp}_rooms_id ON {temp}_rooms(room_id); - """.format(temp=TEMP_TABLE).splitlines()) + """.format( + temp=TEMP_TABLE + ).splitlines() + ) for statement in stmts: txn.execute(statement) @@ -105,7 +109,9 @@ class StatsStore(StateDeltasStore): LEFT JOIN room_stats_earliest_token AS t USING (room_id) WHERE t.room_id IS NULL GROUP BY c.room_id - """ % (TEMP_TABLE,) + """ % ( + TEMP_TABLE, + ) txn.execute(sql) new_pos = yield self.get_max_stream_id_in_current_state_deltas() @@ -184,7 +190,8 @@ class StatsStore(StateDeltasStore): logger.info( "Processing the next %d rooms of %d remaining", - len(rooms_to_work_on), progress["remaining"], + len(rooms_to_work_on), + progress["remaining"], ) # Number of state events we've processed by going through each room @@ -204,10 +211,17 @@ class StatsStore(StateDeltasStore): avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) - state_events = yield self.get_events([ - join_rules_id, history_visibility_id, encryption_id, name_id, - topic_id, avatar_id, canonical_alias_id, - ]) + state_events = yield self.get_events( + [ + join_rules_id, + history_visibility_id, + encryption_id, + name_id, + topic_id, + avatar_id, + canonical_alias_id, + ] + ) def _get_or_none(event_id, arg): event = state_events.get(event_id) @@ -271,7 +285,7 @@ class StatsStore(StateDeltasStore): # We've finished a room. Delete it from the table. self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id}, + txn, TEMP_TABLE + "_rooms", {"room_id": room_id} ) yield self.runInteraction("update_room_stats", _fetch_data) @@ -338,7 +352,7 @@ class StatsStore(StateDeltasStore): "name", "topic", "avatar", - "canonical_alias" + "canonical_alias", ): field = fields.get(col) if field and "\0" in field: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6f7f65d96b..d9482a3843 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -65,7 +65,7 @@ _EventDictReturn = namedtuple( def generate_pagination_where_clause( - direction, column_names, from_token, to_token, engine, + direction, column_names, from_token, to_token, engine ): """Creates an SQL expression to bound the columns by the pagination tokens. @@ -153,7 +153,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine): str """ - assert(bound in (">", "<", ">=", "<=")) + assert bound in (">", "<", ">=", "<=") name1, name2 = column_names val1, val2 = values @@ -169,11 +169,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine): # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we # use the later form when running against postgres. - return "((%d,%d) %s (%s,%s))" % ( - val1, val2, - bound, - name1, name2, - ) + return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2) # We want to generate queries of e.g. the form: # @@ -276,7 +272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_rooms( - self, room_ids, from_key, to_key, limit=0, order='DESC' + self, room_ids, from_key, to_key, limit=0, order="DESC" ): """Get new room events in stream ordering since `from_key`. @@ -346,7 +342,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_room( - self, room_id, from_key, to_key, limit=0, order='DESC' + self, room_id, from_key, to_key, limit=0, order="DESC" ): """Get new room events in stream ordering since `from_key`. @@ -395,8 +391,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_room_events_stream_for_room", f) - ret = yield self.get_events_as_list([ - r.event_id for r in rows], get_prev_content=True, + ret = yield self.get_events_as_list( + [r.event_id for r in rows], get_prev_content=True ) self._set_before_and_after(ret, rows, topo_order=from_id is None) @@ -446,7 +442,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = yield self.runInteraction("get_membership_changes_for_user", f) ret = yield self.get_events_as_list( - [r.event_id for r in rows], get_prev_content=True, + [r.event_id for r in rows], get_prev_content=True ) self._set_before_and_after(ret, rows, topo_order=False) @@ -725,7 +721,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn, room_id, before_token, - direction='b', + direction="b", limit=before_limit, event_filter=event_filter, ) @@ -735,7 +731,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn, room_id, after_token, - direction='f', + direction="f", limit=after_limit, event_filter=event_filter, ) @@ -816,7 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, from_token, to_token=None, - direction='b', + direction="b", limit=-1, event_filter=None, ): @@ -846,7 +842,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. args = [False, room_id] - if direction == 'b': + if direction == "b": order = "DESC" else: order = "ASC" @@ -882,7 +878,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if rows: topo = rows[-1].topological_ordering toke = rows[-1].stream_ordering - if direction == 'b': + if direction == "b": # Tokens are positions between events. # This token points *after* the last event in the chunk. # We need it to point to the event before it in the chunk @@ -898,7 +894,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def paginate_room_events( - self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None + self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None ): """Returns list of events before or after a given token. |