From 05f5dabc10f9d7a4403c9571c12371b2b6dd93f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Jul 2018 17:21:17 +0100 Subject: Use stream cache in get_linearized_receipts_for_room This avoids us from uncessarily hitting the database when there has been no change for the room --- synapse/storage/receipts.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 3738901ea4..401400d927 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -151,7 +151,6 @@ class ReceiptsWorkerStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -164,6 +163,16 @@ class ReceiptsWorkerStore(SQLBaseStore): Returns: list: A list of receipts. """ + if from_key: + if not self._receipts_stream_cache.has_entity_changed(room_id, from_key): + defer.succeed([]) + + return self._get_linearized_receipts_for_room(room_id, to_key, from_key) + + @cachedInlineCallbacks(num_args=3, tree=True) + def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): + """See get_linearized_receipts_for_room + """ def f(txn): if from_key: sql = ( @@ -211,7 +220,7 @@ class ReceiptsWorkerStore(SQLBaseStore): "content": content, }]) - @cachedList(cached_method_name="get_linearized_receipts_for_room", + @cachedList(cached_method_name="_get_linearized_receipts_for_room", list_name="room_ids", num_args=3, inlineCallbacks=True) def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): if not room_ids: @@ -373,7 +382,7 @@ class ReceiptsStore(ReceiptsWorkerStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) + txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,)) txn.call_after( self._receipts_stream_cache.entity_has_changed, @@ -493,7 +502,7 @@ class ReceiptsStore(ReceiptsWorkerStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) + txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,)) self._simple_delete_txn( txn, -- cgit 1.4.1 From 6ccefef07a0eb366d64076bf6052b94409262981 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Jul 2018 18:12:39 +0100 Subject: Use 'is not None' and add comments --- synapse/storage/receipts.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 401400d927..33fc2b9b31 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -140,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore): """ room_ids = set(room_ids) - if from_key: + if from_key is not None: + # Only ask the database about rooms where there have been new + # receipts added since `from_key` room_ids = yield self._receipts_stream_cache.get_entities_changed( room_ids, from_key ) @@ -163,7 +165,9 @@ class ReceiptsWorkerStore(SQLBaseStore): Returns: list: A list of receipts. """ - if from_key: + if from_key is not None: + # Check the cache first to see if any new receipts have been added + # since`from_key`. If not we can no-op. if not self._receipts_stream_cache.has_entity_changed(room_id, from_key): defer.succeed([]) -- cgit 1.4.1 From aff1dfdf3decdd3ed60d7d8de8d2b07904f39d2b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Jul 2018 09:45:37 +0100 Subject: Update return value docstring --- synapse/storage/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 33fc2b9b31..0ac665e967 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -163,7 +163,7 @@ class ReceiptsWorkerStore(SQLBaseStore): from the start. Returns: - list: A list of receipts. + Deferred[list]: A list of receipts. """ if from_key is not None: # Check the cache first to see if any new receipts have been added -- cgit 1.4.1 From 667fba68f3ca808f48143a2a739a54665b0162c6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 18 Jul 2018 14:35:24 +0100 Subject: Run things as background processes This fixes #3518, and ensures that we get useful logs and metrics for lots of things that happen in the background. (There are certainly more things that happen in the background; these are just the common ones I've found running a single-process synapse locally). --- synapse/federation/transaction_queue.py | 15 ++++++--------- synapse/storage/background_updates.py | 10 ++++++++-- synapse/storage/client_ips.py | 15 +++++++++++---- synapse/storage/events.py | 10 ++++------ synapse/storage/events_worker.py | 10 ++++++---- synapse/util/caches/expiringcache.py | 6 +++++- synapse/util/distributor.py | 4 ++++ 7 files changed, 44 insertions(+), 26 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5c5a73b73c..6996d6b695 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -168,7 +168,7 @@ class TransactionQueue(object): # fire off a processing loop in the background run_as_background_process( - "process_transaction_queue", + "process_event_queue_for_federation", self._process_event_queue_loop, ) @@ -434,14 +434,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Starting transaction loop", destination) - # Drop the logcontext before starting the transaction. It doesn't - # really make sense to log all the outbound transactions against - # whatever path led us to this point: that's pretty arbitrary really. - # - # (this also means we can fire off _perform_transaction without - # yielding) - with logcontext.PreserveLoggingContext(): - self._transaction_transmission_loop(destination) + run_as_background_process( + "federation_transaction_transmission_loop", + self._transaction_transmission_loop, + destination, + ) @defer.inlineCallbacks def _transaction_transmission_loop(self, destination): diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index dc9eca7d15..5fe1ca2de7 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -19,6 +19,8 @@ from canonicaljson import json from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process + from . import engines from ._base import SQLBaseStore @@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers = {} self._all_done = False - @defer.inlineCallbacks def start_doing_background_updates(self): - logger.info("Starting background schema updates") + run_as_background_process( + "background_updates", self._run_background_updates, + ) + @defer.inlineCallbacks + def _run_background_updates(self): + logger.info("Starting background schema updates") while True: yield self.hs.get_clock().sleep( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index b78eda3413..77ae10da3d 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -19,6 +19,7 @@ from six import iteritems from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import CACHE_SIZE_FACTOR from . import background_updates @@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._batch_row_update[key] = (user_agent, device_id, now) def _update_client_ips_batch(self): - to_update = self._batch_row_update - self._batch_row_update = {} - return self.runInteraction( - "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update + def update(): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, + to_update, + ) + + run_as_background_process( + "update_client_ips", update, ) def _update_client_ips_batch_txn(self, txn, to_update): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2aaab0d02c..4ff0fdc4ab 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -33,12 +33,13 @@ from synapse.api.errors import SynapseError # these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -155,11 +156,8 @@ class _EventPeristenceQueue(object): self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - # set handle_queue_loop off on the background. We don't want to - # attribute work done in it to the current request, so we drop the - # logcontext altogether. - with PreserveLoggingContext(): - handle_queue_loop() + # set handle_queue_loop off in the background + run_as_background_process("persist_events", handle_queue_loop) def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 67433606c6..f28239a808 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401 from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import ( LoggingContext, PreserveLoggingContext, @@ -322,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore): should_start = False if should_start: - with PreserveLoggingContext(): - self.runWithConnection( - self._do_fetch - ) + run_as_background_process( + "fetch_events", + self.runWithConnection, + self._do_fetch, + ) logger.debug("Loading %d events", len(events)) with PreserveLoggingContext(): diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 4abca91f6d..465adc54a8 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,6 +16,7 @@ import logging from collections import OrderedDict +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache logger = logging.getLogger(__name__) @@ -63,7 +64,10 @@ class ExpiringCache(object): return def f(): - self._prune_cache() + run_as_background_process( + "prune_cache_%s" % self._cache_name, + self._prune_cache, + ) self._clock.looping_call(f, self._expiry_ms / 2) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 734331caaa..d91ae400eb 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -75,6 +75,10 @@ class Distributor(object): self.pre_registration[name].append(observer) def fire(self, name, *args, **kwargs): + """Dispatches the given signal to the registered observers. + + Runs the observers as a background process. Does not return a deferred. + """ if name not in self.signals: raise KeyError("%r does not have a signal named %s" % (self, name)) -- cgit 1.4.1 From 00bc9791379109690fb5e465adf0e0e8e76167aa Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 19 Jul 2018 10:51:15 +0100 Subject: Disable logcontext warning Temporary workaround to #3518 while we release 0.33.0. --- synapse/storage/_base.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 98dde77431..a6a0e6ec9f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -343,9 +343,10 @@ class SQLBaseStore(object): """ parent_context = LoggingContext.current_context() if parent_context == LoggingContext.sentinel: - logger.warn( - "Running db txn from sentinel context: metrics will be lost", - ) + # warning disabled for 0.33.0 release; proper fixes will land imminently. + # logger.warn( + # "Running db txn from sentinel context: metrics will be lost", + # ) parent_context = None start_time = time.time() -- cgit 1.4.1 From f1a15ea20692e9f4b347cdddf6d714912356ba64 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 19 Jul 2018 11:14:20 +0100 Subject: revert 00bc979 ... we've fixed the things that caused the warnings, so we should reinstate the warning. --- synapse/storage/_base.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a6a0e6ec9f..1d41d8d445 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -343,10 +343,9 @@ class SQLBaseStore(object): """ parent_context = LoggingContext.current_context() if parent_context == LoggingContext.sentinel: - # warning disabled for 0.33.0 release; proper fixes will land imminently. - # logger.warn( - # "Running db txn from sentinel context: metrics will be lost", - # ) + logger.warn( + "Starting db connection from sentinel context: metrics will be lost", + ) parent_context = None start_time = time.time() -- cgit 1.4.1