From fe0f4a3591302176c7eea48a54f6ed83d9eb4aa9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 9 Oct 2020 07:37:51 -0400 Subject: Move additional tasks to the background worker, part 3 (#8489) --- changelog.d/8489.feature | 1 + synapse/app/phone_stats_home.py | 14 +- synapse/storage/databases/main/censor_events.py | 15 +- synapse/storage/databases/main/devices.py | 196 ++++++++-------- synapse/storage/databases/main/event_federation.py | 60 ++--- .../storage/databases/main/event_push_actions.py | 259 ++++++++++----------- .../storage/databases/main/monthly_active_users.py | 2 + synapse/storage/databases/main/registration.py | 11 +- 8 files changed, 276 insertions(+), 282 deletions(-) create mode 100644 changelog.d/8489.feature diff --git a/changelog.d/8489.feature b/changelog.d/8489.feature new file mode 100644 index 0000000000..22591870a4 --- /dev/null +++ b/changelog.d/8489.feature @@ -0,0 +1 @@ + Allow running background tasks in a separate worker process. diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 8a69104a04..c38cf8231f 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -18,10 +18,7 @@ import sys from prometheus_client import Gauge -from synapse.metrics.background_process_metrics import ( - run_as_background_process, - wrap_as_background_process, -) +from synapse.metrics.background_process_metrics import wrap_as_background_process logger = logging.getLogger("synapse.app.homeserver") @@ -152,13 +149,8 @@ def start_phone_stats_home(hs): clock.looping_call(hs.get_datastore().generate_user_daily_visits, 5 * 60 * 1000) # monthly active user limiting functionality - def reap_monthly_active_users(): - return run_as_background_process( - "reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users - ) - - clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) - reap_monthly_active_users() + clock.looping_call(hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60) + hs.get_datastore().reap_monthly_active_users() @wrap_as_background_process("generate_monthly_active_users") async def generate_monthly_active_users(): diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 4bb2b9c28c..849bd5ba7a 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -17,7 +17,7 @@ import logging from typing import TYPE_CHECKING from synapse.events.utils import prune_event_dict -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore @@ -35,14 +35,13 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) - def _censor_redactions(): - return run_as_background_process( - "_censor_redactions", self._censor_redactions - ) - - if self.hs.config.redaction_retention_period is not None: - hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + if ( + hs.config.run_background_tasks + and self.hs.config.redaction_retention_period is not None + ): + hs.get_clock().looping_call(self._censor_redactions, 5 * 60 * 1000) + @wrap_as_background_process("_censor_redactions") async def _censor_redactions(self): """Censors all redactions older than the configured period that haven't been censored yet. diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 2d0a6408b5..88fd97e1df 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -25,7 +25,7 @@ from synapse.logging.opentracing import ( trace, whitelisted_homeserver, ) -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -48,6 +48,14 @@ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" class DeviceWorkerStore(SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + if hs.config.run_background_tasks: + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) + async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]: """Retrieve a device. Only returns devices that are not marked as hidden. @@ -772,6 +780,98 @@ class DeviceWorkerStore(SQLBaseStore): ) return count >= 1 + @wrap_as_background_process("prune_old_outbound_device_pokes") + async def _prune_old_outbound_device_pokes( + self, prune_age: int = 24 * 60 * 60 * 1000 + ) -> None: + """Delete old entries out of the device_lists_outbound_pokes to ensure + that we don't fill up due to dead servers. + + Normally, we try to send device updates as a delta since a previous known point: + this is done by setting the prev_id in the m.device_list_update EDU. However, + for that to work, we have to have a complete record of each change to + each device, which can add up to quite a lot of data. + + An alternative mechanism is that, if the remote server sees that it has missed + an entry in the stream_id sequence for a given user, it will request a full + list of that user's devices. Hence, we can reduce the amount of data we have to + store (and transmit in some future transaction), by clearing almost everything + for a given destination out of the database, and having the remote server + resync. + + All we need to do is make sure we keep at least one row for each + (user, destination) pair, to remind us to send a m.device_list_update EDU for + that user when the destination comes back. It doesn't matter which device + we keep. + """ + yesterday = self._clock.time_msec() - prune_age + + def _prune_txn(txn): + # look for (user, destination) pairs which have an update older than + # the cutoff. + # + # For each pair, we also need to know the most recent stream_id, and + # an arbitrary device_id at that stream_id. + select_sql = """ + SELECT + dlop1.destination, + dlop1.user_id, + MAX(dlop1.stream_id) AS stream_id, + (SELECT MIN(dlop2.device_id) AS device_id FROM + device_lists_outbound_pokes dlop2 + WHERE dlop2.destination = dlop1.destination AND + dlop2.user_id=dlop1.user_id AND + dlop2.stream_id=MAX(dlop1.stream_id) + ) + FROM device_lists_outbound_pokes dlop1 + GROUP BY destination, user_id + HAVING min(ts) < ? AND count(*) > 1 + """ + + txn.execute(select_sql, (yesterday,)) + rows = txn.fetchall() + + if not rows: + return + + logger.info( + "Pruning old outbound device list updates for %i users/destinations: %s", + len(rows), + shortstr((row[0], row[1]) for row in rows), + ) + + # we want to keep the update with the highest stream_id for each user. + # + # there might be more than one update (with different device_ids) with the + # same stream_id, so we also delete all but one rows with the max stream id. + delete_sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE destination = ? AND user_id = ? AND ( + stream_id < ? OR + (stream_id = ? AND device_id != ?) + ) + """ + count = 0 + for (destination, user_id, stream_id, device_id) in rows: + txn.execute( + delete_sql, (destination, user_id, stream_id, stream_id, device_id) + ) + count += txn.rowcount + + # Since we've deleted unsent deltas, we need to remove the entry + # of last successful sent so that the prev_ids are correctly set. + sql = """ + DELETE FROM device_lists_outbound_last_success + WHERE destination = ? AND user_id = ? + """ + txn.executemany(sql, ((row[0], row[1]) for row in rows)) + + logger.info("Pruned %d device list outbound pokes", count) + + await self.db_pool.runInteraction( + "_prune_old_outbound_device_pokes", _prune_txn, + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): @@ -908,8 +1008,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): name="device_id_exists", keylen=2, max_entries=10000 ) - self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) - async def store_device( self, user_id: str, device_id: str, initial_device_display_name: Optional[str] ) -> bool: @@ -1267,95 +1365,3 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): for device_id in device_ids ], ) - - def _prune_old_outbound_device_pokes(self, prune_age: int = 24 * 60 * 60 * 1000): - """Delete old entries out of the device_lists_outbound_pokes to ensure - that we don't fill up due to dead servers. - - Normally, we try to send device updates as a delta since a previous known point: - this is done by setting the prev_id in the m.device_list_update EDU. However, - for that to work, we have to have a complete record of each change to - each device, which can add up to quite a lot of data. - - An alternative mechanism is that, if the remote server sees that it has missed - an entry in the stream_id sequence for a given user, it will request a full - list of that user's devices. Hence, we can reduce the amount of data we have to - store (and transmit in some future transaction), by clearing almost everything - for a given destination out of the database, and having the remote server - resync. - - All we need to do is make sure we keep at least one row for each - (user, destination) pair, to remind us to send a m.device_list_update EDU for - that user when the destination comes back. It doesn't matter which device - we keep. - """ - yesterday = self._clock.time_msec() - prune_age - - def _prune_txn(txn): - # look for (user, destination) pairs which have an update older than - # the cutoff. - # - # For each pair, we also need to know the most recent stream_id, and - # an arbitrary device_id at that stream_id. - select_sql = """ - SELECT - dlop1.destination, - dlop1.user_id, - MAX(dlop1.stream_id) AS stream_id, - (SELECT MIN(dlop2.device_id) AS device_id FROM - device_lists_outbound_pokes dlop2 - WHERE dlop2.destination = dlop1.destination AND - dlop2.user_id=dlop1.user_id AND - dlop2.stream_id=MAX(dlop1.stream_id) - ) - FROM device_lists_outbound_pokes dlop1 - GROUP BY destination, user_id - HAVING min(ts) < ? AND count(*) > 1 - """ - - txn.execute(select_sql, (yesterday,)) - rows = txn.fetchall() - - if not rows: - return - - logger.info( - "Pruning old outbound device list updates for %i users/destinations: %s", - len(rows), - shortstr((row[0], row[1]) for row in rows), - ) - - # we want to keep the update with the highest stream_id for each user. - # - # there might be more than one update (with different device_ids) with the - # same stream_id, so we also delete all but one rows with the max stream id. - delete_sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND user_id = ? AND ( - stream_id < ? OR - (stream_id = ? AND device_id != ?) - ) - """ - count = 0 - for (destination, user_id, stream_id, device_id) in rows: - txn.execute( - delete_sql, (destination, user_id, stream_id, stream_id, device_id) - ) - count += txn.rowcount - - # Since we've deleted unsent deltas, we need to remove the entry - # of last successful sent so that the prev_ids are correctly set. - sql = """ - DELETE FROM device_lists_outbound_last_success - WHERE destination = ? AND user_id = ? - """ - txn.executemany(sql, ((row[0], row[1]) for row in rows)) - - logger.info("Pruned %d device list outbound pokes", count) - - return run_as_background_process( - "prune_old_outbound_device_pokes", - self.db_pool.runInteraction, - "_prune_old_outbound_device_pokes", - _prune_txn, - ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 6d3689c09e..a6279a6c13 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -19,7 +19,7 @@ from typing import Dict, Iterable, List, Set, Tuple from synapse.api.errors import StoreError from synapse.events import EventBase -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.events_worker import EventsWorkerStore @@ -32,6 +32,14 @@ logger = logging.getLogger(__name__) class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + if hs.config.run_background_tasks: + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) + async def get_auth_chain( self, event_ids: Collection[str], include_given: bool = False ) -> List[EventBase]: @@ -586,6 +594,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return [row["event_id"] for row in rows] + @wrap_as_background_process("delete_old_forward_extrem_cache") + async def _delete_old_forward_extrem_cache(self) -> None: + def _delete_old_forward_extrem_cache_txn(txn): + # Delete entries older than a month, while making sure we don't delete + # the only entries for a room. + sql = """ + DELETE FROM stream_ordering_to_exterm + WHERE + room_id IN ( + SELECT room_id + FROM stream_ordering_to_exterm + WHERE stream_ordering > ? + ) AND stream_ordering < ? + """ + txn.execute( + sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) + ) + + await self.db_pool.runInteraction( + "_delete_old_forward_extrem_cache", _delete_old_forward_extrem_cache_txn, + ) + class EventFederationStore(EventFederationWorkerStore): """ Responsible for storing and serving up the various graphs associated @@ -606,34 +636,6 @@ class EventFederationStore(EventFederationWorkerStore): self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth ) - hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 - ) - - def _delete_old_forward_extrem_cache(self): - def _delete_old_forward_extrem_cache_txn(txn): - # Delete entries older than a month, while making sure we don't delete - # the only entries for a room. - sql = """ - DELETE FROM stream_ordering_to_exterm - WHERE - room_id IN ( - SELECT room_id - FROM stream_ordering_to_exterm - WHERE stream_ordering > ? - ) AND stream_ordering < ? - """ - txn.execute( - sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) - ) - - return run_as_background_process( - "delete_old_forward_extrem_cache", - self.db_pool.runInteraction, - "_delete_old_forward_extrem_cache", - _delete_old_forward_extrem_cache_txn, - ) - async def clean_room_for_join(self, room_id): return await self.db_pool.runInteraction( "clean_room_for_join", self._clean_room_for_join_txn, room_id diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 80f3b4d740..2e56dfaf31 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -13,15 +13,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Dict, List, Optional, Tuple, Union import attr -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -81,8 +80,14 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.find_stream_orderings_looping_call = self._clock.looping_call( self._find_stream_orderings_for_times, 10 * 60 * 1000 ) + self._rotate_delay = 3 self._rotate_count = 10000 + self._doing_notif_rotation = False + if hs.config.run_background_tasks: + self._rotate_notif_loop = self._clock.looping_call( + self._rotate_notifs, 30 * 60 * 1000 + ) @cached(num_args=3, tree=True, max_entries=5000) async def get_unread_event_push_actions_by_room_for_user( @@ -514,15 +519,14 @@ class EventPushActionsWorkerStore(SQLBaseStore): "Error removing push actions after event persistence failure" ) - def _find_stream_orderings_for_times(self): - return run_as_background_process( - "event_push_action_stream_orderings", - self.db_pool.runInteraction, + @wrap_as_background_process("event_push_action_stream_orderings") + async def _find_stream_orderings_for_times(self) -> None: + await self.db_pool.runInteraction( "_find_stream_orderings_for_times", self._find_stream_orderings_for_times_txn, ) - def _find_stream_orderings_for_times_txn(self, txn): + def _find_stream_orderings_for_times_txn(self, txn: LoggingTransaction) -> None: logger.info("Searching for stream ordering 1 month ago") self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 @@ -652,129 +656,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): ) return result[0] if result else None - -class EventPushActionsStore(EventPushActionsWorkerStore): - EPA_HIGHLIGHT_INDEX = "epa_highlight_index" - - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - - self.db_pool.updates.register_background_index_update( - self.EPA_HIGHLIGHT_INDEX, - index_name="event_push_actions_u_highlight", - table="event_push_actions", - columns=["user_id", "stream_ordering"], - ) - - self.db_pool.updates.register_background_index_update( - "event_push_actions_highlights_index", - index_name="event_push_actions_highlights_index", - table="event_push_actions", - columns=["user_id", "room_id", "topological_ordering", "stream_ordering"], - where_clause="highlight=1", - ) - - self._doing_notif_rotation = False - self._rotate_notif_loop = self._clock.looping_call( - self._start_rotate_notifs, 30 * 60 * 1000 - ) - - async def get_push_actions_for_user( - self, user_id, before=None, limit=50, only_highlight=False - ): - def f(txn): - before_clause = "" - if before: - before_clause = "AND epa.stream_ordering < ?" - args = [user_id, before, limit] - else: - args = [user_id, limit] - - if only_highlight: - if len(before_clause) > 0: - before_clause += " " - before_clause += "AND epa.highlight = 1" - - # NB. This assumes event_ids are globally unique since - # it makes the query easier to index - sql = ( - "SELECT epa.event_id, epa.room_id," - " epa.stream_ordering, epa.topological_ordering," - " epa.actions, epa.highlight, epa.profile_tag, e.received_ts" - " FROM event_push_actions epa, events e" - " WHERE epa.event_id = e.event_id" - " AND epa.user_id = ? %s" - " AND epa.notif = 1" - " ORDER BY epa.stream_ordering DESC" - " LIMIT ?" % (before_clause,) - ) - txn.execute(sql, args) - return self.db_pool.cursor_to_dict(txn) - - push_actions = await self.db_pool.runInteraction("get_push_actions_for_user", f) - for pa in push_actions: - pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) - return push_actions - - async def get_latest_push_action_stream_ordering(self): - def f(txn): - txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") - return txn.fetchone() - - result = await self.db_pool.runInteraction( - "get_latest_push_action_stream_ordering", f - ) - return result[0] or 0 - - def _remove_old_push_actions_before_txn( - self, txn, room_id, user_id, stream_ordering - ): - """ - Purges old push actions for a user and room before a given - stream_ordering. - - We however keep a months worth of highlighted notifications, so that - users can still get a list of recent highlights. - - Args: - txn: The transcation - room_id: Room ID to delete from - user_id: user ID to delete for - stream_ordering: The lowest stream ordering which will - not be deleted. - """ - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (room_id, user_id), - ) - - # We need to join on the events table to get the received_ts for - # event_push_actions and sqlite won't let us use a join in a delete so - # we can't just delete where received_ts < x. Furthermore we can - # only identify event_push_actions by a tuple of room_id, event_id - # we we can't use a subquery. - # Instead, we look up the stream ordering for the last event in that - # room received before the threshold time and delete event_push_actions - # in the room with a stream_odering before that. - txn.execute( - "DELETE FROM event_push_actions " - " WHERE user_id = ? AND room_id = ? AND " - " stream_ordering <= ?" - " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", - (user_id, room_id, stream_ordering, self.stream_ordering_month_ago), - ) - - txn.execute( - """ - DELETE FROM event_push_summary - WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? - """, - (room_id, user_id, stream_ordering), - ) - - def _start_rotate_notifs(self): - return run_as_background_process("rotate_notifs", self._rotate_notifs) - + @wrap_as_background_process("rotate_notifs") async def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: return @@ -954,6 +836,121 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) +class EventPushActionsStore(EventPushActionsWorkerStore): + EPA_HIGHLIGHT_INDEX = "epa_highlight_index" + + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_index_update( + self.EPA_HIGHLIGHT_INDEX, + index_name="event_push_actions_u_highlight", + table="event_push_actions", + columns=["user_id", "stream_ordering"], + ) + + self.db_pool.updates.register_background_index_update( + "event_push_actions_highlights_index", + index_name="event_push_actions_highlights_index", + table="event_push_actions", + columns=["user_id", "room_id", "topological_ordering", "stream_ordering"], + where_clause="highlight=1", + ) + + async def get_push_actions_for_user( + self, user_id, before=None, limit=50, only_highlight=False + ): + def f(txn): + before_clause = "" + if before: + before_clause = "AND epa.stream_ordering < ?" + args = [user_id, before, limit] + else: + args = [user_id, limit] + + if only_highlight: + if len(before_clause) > 0: + before_clause += " " + before_clause += "AND epa.highlight = 1" + + # NB. This assumes event_ids are globally unique since + # it makes the query easier to index + sql = ( + "SELECT epa.event_id, epa.room_id," + " epa.stream_ordering, epa.topological_ordering," + " epa.actions, epa.highlight, epa.profile_tag, e.received_ts" + " FROM event_push_actions epa, events e" + " WHERE epa.event_id = e.event_id" + " AND epa.user_id = ? %s" + " AND epa.notif = 1" + " ORDER BY epa.stream_ordering DESC" + " LIMIT ?" % (before_clause,) + ) + txn.execute(sql, args) + return self.db_pool.cursor_to_dict(txn) + + push_actions = await self.db_pool.runInteraction("get_push_actions_for_user", f) + for pa in push_actions: + pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) + return push_actions + + async def get_latest_push_action_stream_ordering(self): + def f(txn): + txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") + return txn.fetchone() + + result = await self.db_pool.runInteraction( + "get_latest_push_action_stream_ordering", f + ) + return result[0] or 0 + + def _remove_old_push_actions_before_txn( + self, txn, room_id, user_id, stream_ordering + ): + """ + Purges old push actions for a user and room before a given + stream_ordering. + + We however keep a months worth of highlighted notifications, so that + users can still get a list of recent highlights. + + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + stream_ordering: The lowest stream ordering which will + not be deleted. + """ + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id), + ) + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. + txn.execute( + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " stream_ordering <= ?" + " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", + (user_id, room_id, stream_ordering, self.stream_ordering_month_ago), + ) + + txn.execute( + """ + DELETE FROM event_push_summary + WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? + """, + (room_id, user_id, stream_ordering), + ) + + def _action_has_highlight(actions): for action in actions: try: diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index c66f558567..d788dc0fc6 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -15,6 +15,7 @@ import logging from typing import Dict, List +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_in_list_sql_clause from synapse.util.caches.descriptors import cached @@ -127,6 +128,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): desc="user_last_seen_monthly_active", ) + @wrap_as_background_process("reap_monthly_active_users") async def reap_monthly_active_users(self): """Cleans out monthly active user table to ensure that no stale entries exist. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 7fd7b0b952..236d3cdbe3 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -20,10 +20,7 @@ from typing import Any, Dict, List, Optional, Tuple from synapse.api.constants import UserTypes from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError -from synapse.metrics.background_process_metrics import ( - run_as_background_process, - wrap_as_background_process, -) +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool from synapse.storage.types import Cursor @@ -53,10 +50,7 @@ class RegistrationWorkerStore(SQLBaseStore): self._account_validity = hs.config.account_validity if hs.config.run_background_tasks and self._account_validity.enabled: self._clock.call_later( - 0.0, - run_as_background_process, - "account_validity_set_expiration_dates", - self._set_expiration_date_when_missing, + 0.0, self._set_expiration_date_when_missing, ) # Create a background job for culling expired 3PID validity tokens @@ -812,6 +806,7 @@ class RegistrationWorkerStore(SQLBaseStore): self.clock.time_msec(), ) + @wrap_as_background_process("account_validity_set_expiration_dates") async def _set_expiration_date_when_missing(self): """ Retrieves the list of registered users that don't have an expiration date, and -- cgit 1.4.1