diff options
Diffstat (limited to 'synapse')
-rwxr-xr-x | synapse/app/homeserver.py | 13 | ||||
-rw-r--r-- | synapse/groups/attestations.py | 6 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 9 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 14 | ||||
-rw-r--r-- | synapse/rest/media/v1/media_repository.py | 8 | ||||
-rw-r--r-- | synapse/rest/media/v1/preview_url_resource.py | 8 | ||||
-rw-r--r-- | synapse/storage/_base.py | 6 | ||||
-rw-r--r-- | synapse/storage/devices.py | 8 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 9 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 13 | ||||
-rw-r--r-- | synapse/storage/events.py | 11 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 6 |
13 files changed, 83 insertions, 30 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2ad1beb8d8..b7e7718290 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -49,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.module_api import ModuleApi from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements @@ -427,6 +428,9 @@ def run(hs): # currently either 0 or 1 stats_process = [] + def start_phone_stats_home(): + run_as_background_process("phone_stats_home", phone_stats_home) + @defer.inlineCallbacks def phone_stats_home(): logger.info("Gathering stats for reporting") @@ -498,7 +502,10 @@ def run(hs): ) def generate_user_daily_visit_stats(): - hs.get_datastore().generate_user_daily_visits() + run_as_background_process( + "generate_user_daily_visits", + hs.get_datastore().generate_user_daily_visits, + ) # Rather than update on per session basis, batch up the requests. # If you increase the loop period, the accuracy of user_daily_visits @@ -507,7 +514,7 @@ def run(hs): if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") - clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) + clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) # We need to defer this init for the cases that we daemonize # otherwise the process ID we get is that of the non-daemon process @@ -515,7 +522,7 @@ def run(hs): # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes - clock.call_later(5 * 60, phone_stats_home) + clock.call_later(5 * 60, start_phone_stats_home) if hs.config.daemonize and hs.config.print_pidfile: print (hs.config.pid_file) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 47452700a8..4216af0a27 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -43,6 +43,7 @@ from signedjson.sign import sign_json from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id from synapse.util.logcontext import run_in_background @@ -129,7 +130,7 @@ class GroupAttestionRenewer(object): self.attestations = hs.get_groups_attestation_signing() self._renew_attestations_loop = self.clock.looping_call( - self._renew_attestations, 30 * 60 * 1000, + self._start_renew_attestations, 30 * 60 * 1000, ) @defer.inlineCallbacks @@ -151,6 +152,9 @@ class GroupAttestionRenewer(object): defer.returnValue({}) + def _start_renew_attestations(self): + run_as_background_process("renew_attestations", self._renew_attestations) + @defer.inlineCallbacks def _renew_attestations(self): """Called periodically to check if we need to update any of our attestations diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 859f6d2b2e..43692b83a8 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer from synapse.api.errors import AuthError, CodeMessageException, SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -41,7 +42,7 @@ class ProfileHandler(BaseHandler): if hs.config.worker_app is None: self.clock.looping_call( - self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, ) @defer.inlineCallbacks @@ -254,6 +255,12 @@ class ProfileHandler(BaseHandler): room_id, str(e.message) ) + def _start_update_remote_profile_cache(self): + run_as_background_process( + "Update remote profile", self._update_remote_profile_cache, + ) + + @defer.inlineCallbacks def _update_remote_profile_cache(self): """Called periodically to check profiles of remote users we haven't checked in a while. diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e592ab57bf..970e94313e 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -192,7 +192,7 @@ class ReplicationClientHandler(object): """Returns a deferred that is resolved when we receive a SYNC command with given data. - Used by tests. + [Not currently] used by tests. """ return self.awaiting_syncs.setdefault(data, defer.Deferred()) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 611fb66e1d..fd59f1595f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -25,6 +25,7 @@ from twisted.internet import defer from twisted.internet.protocol import Factory from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.metrics import Measure, measure_func from .protocol import ServerReplicationStreamProtocol @@ -117,7 +118,6 @@ class ReplicationStreamer(object): for conn in self.connections: conn.send_error("server shutting down") - @defer.inlineCallbacks def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the connections if there are. @@ -132,14 +132,16 @@ class ReplicationStreamer(object): stream.discard_updates_and_advance() return - # If we're in the process of checking for new updates, mark that fact - # and return + self.pending_updates = True + if self.is_looping: - logger.debug("Noitifier poke loop already running") - self.pending_updates = True + logger.debug("Notifier poke loop already running") return - self.pending_updates = True + run_as_background_process("replication_notifier", self._run_notifier_loop) + + @defer.inlineCallbacks + def _run_notifier_loop(self): self.is_looping = True try: diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 30242c525a..5b13378caa 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -35,6 +35,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.async import Linearizer from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination @@ -100,10 +101,15 @@ class MediaRepository(object): ) self.clock.looping_call( - self._update_recently_accessed, + self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS, ) + def _start_update_recently_accessed(self): + run_as_background_process( + "update_recently_accessed_media", self._update_recently_accessed, + ) + @defer.inlineCallbacks def _update_recently_accessed(self): remote_media = self.recently_accessed_remotes diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index b70b15c4c2..4efd5339a4 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -41,6 +41,7 @@ from synapse.http.server import ( wrap_json_request_handler, ) from synapse.http.servlet import parse_integer, parse_string +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.async import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -81,7 +82,7 @@ class PreviewUrlResource(Resource): self._cache.start() self._cleaner_loop = self.clock.looping_call( - self._expire_url_cache_data, 10 * 1000 + self._start_expire_url_cache_data, 10 * 1000, ) def render_OPTIONS(self, request): @@ -371,6 +372,11 @@ class PreviewUrlResource(Resource): "etag": headers["ETag"][0] if "ETag" in headers else None, }) + def _start_expire_url_cache_data(self): + run_as_background_process( + "expire_url_cache_data", self._expire_url_cache_data, + ) + @defer.inlineCallbacks def _expire_url_cache_data(self): """Clean up expired url cache content, media and thumbnails. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1d41d8d445..44f37b4c1e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -311,6 +311,12 @@ class SQLBaseStore(object): after_callbacks = [] exception_callbacks = [] + if LoggingContext.current_context() == LoggingContext.sentinel: + logger.warn( + "Starting db txn '%s' from sentinel context", + desc, + ) + try: result = yield self.runWithConnection( self._new_transaction, diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index cc3cdf2ebc..52dccb1507 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from ._base import Cache, SQLBaseStore @@ -711,6 +712,9 @@ class DeviceStore(SQLBaseStore): logger.info("Pruned %d device list outbound pokes", txn.rowcount) - return self.runInteraction( - "_prune_old_outbound_device_pokes", _prune_txn + run_as_background_process( + "prune_old_outbound_device_pokes", + self.runInteraction, + "_prune_old_outbound_device_pokes", + _prune_txn, ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 8d366d1b91..65f2d19e20 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64 from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.events import EventsWorkerStore from synapse.storage.signatures import SignatureWorkerStore @@ -446,7 +447,7 @@ class EventFederationStore(EventFederationWorkerStore): ) hs.get_clock().looping_call( - self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + self._delete_old_forward_extrem_cache, 60 * 60 * 1000, ) def _update_min_depth_for_room_txn(self, txn, room_id, depth): @@ -548,9 +549,11 @@ class EventFederationStore(EventFederationWorkerStore): sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) ) - return self.runInteraction( + run_as_background_process( + "delete_old_forward_extrem_cache", + self.runInteraction, "_delete_old_forward_extrem_cache", - _delete_old_forward_extrem_cache_txn + _delete_old_forward_extrem_cache_txn, ) def clean_room_for_join(self, room_id): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 29b511ae5e..4f44b0ad47 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks @@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): "Error removing push actions after event persistence failure", ) - @defer.inlineCallbacks def _find_stream_orderings_for_times(self): - yield self.runInteraction( + run_as_background_process( + "event_push_action_stream_orderings", + self.runInteraction, "_find_stream_orderings_for_times", - self._find_stream_orderings_for_times_txn + self._find_stream_orderings_for_times_txn, ) def _find_stream_orderings_for_times_txn(self, txn): @@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): self._doing_notif_rotation = False self._rotate_notif_loop = self._clock.looping_call( - self._rotate_notifs, 30 * 60 * 1000 + self._start_rotate_notifs, 30 * 60 * 1000, ) def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, @@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? """, (room_id, user_id, stream_ordering)) + def _start_rotate_notifs(self): + run_as_background_process("rotate_notifs", self._rotate_notifs) + @defer.inlineCallbacks def _rotate_notifs(self): if self._doing_notif_rotation or self.stream_ordering_day_ago is None: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d7fea0986b..200f5ec95f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -142,15 +142,14 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: - # handle_queue_loop runs in the sentinel logcontext, so - # there is no need to preserve_fn when running the - # callbacks on the deferred. try: ret = yield per_item_callback(item) + except Exception: + with PreserveLoggingContext(): + item.deferred.errback() + else: with PreserveLoggingContext(): item.deferred.callback(ret) - except Exception: - item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: @@ -1139,7 +1138,7 @@ class EventsStore(EventsWorkerStore): ): txn.executemany( "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts] ) def _store_event_txn(self, txn, events_and_contexts): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index c3bc94f56d..b4b479d94c 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore @@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore): def __init__(self, db_conn, hs): super(TransactionStore, self).__init__(db_conn, hs) - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) + self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -271,6 +272,9 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + def _start_cleanup_transactions(self): + run_as_background_process("cleanup_transactions", self._cleanup_transactions) + def _cleanup_transactions(self): now = self._clock.time_msec() month_ago = now - 30 * 24 * 60 * 60 * 1000 |