summary refs log tree commit diff
path: root/synapse/storage/events_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events_worker.py')
-rw-r--r--synapse/storage/events_worker.py96
1 files changed, 58 insertions, 38 deletions
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 32d9d00ffb..f28239a808 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,27 +12,29 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ._base import SQLBaseStore
+import logging
+from collections import namedtuple
 
-from twisted.internet import defer, reactor
+from canonicaljson import json
 
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase  # noqa: F401
 from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
-
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable, run_in_background,
+    LoggingContext,
+    PreserveLoggingContext,
+    make_deferred_yieldable,
+    run_in_background,
 )
 from synapse.util.metrics import Measure
-from synapse.api.errors import SynapseError
-
-from collections import namedtuple
-
-import logging
-import simplejson as json
 
-# these are only included to make the type annotations work
-from synapse.events import EventBase    # noqa: F401
-from synapse.events.snapshot import EventContext   # noqa: F401
+from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
@@ -145,6 +147,9 @@ class EventsWorkerStore(SQLBaseStore):
         missing_events_ids = [e for e in event_ids if e not in event_entry_map]
 
         if missing_events_ids:
+            log_ctx = LoggingContext.current_context()
+            log_ctx.record_event_fetch(len(missing_events_ids))
+
             missing_events = yield self._enqueue_events(
                 missing_events_ids,
                 check_redacted=check_redacted,
@@ -218,32 +223,47 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        event_list = []
         i = 0
         while True:
-            try:
-                with self._event_fetch_lock:
-                    event_list = self._event_fetch_list
-                    self._event_fetch_list = []
-
-                    if not event_list:
-                        single_threaded = self.database_engine.single_threaded
-                        if single_threaded or i > EVENT_QUEUE_ITERATIONS:
-                            self._event_fetch_ongoing -= 1
-                            return
-                        else:
-                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                            i += 1
-                            continue
-                    i = 0
+            with self._event_fetch_lock:
+                event_list = self._event_fetch_list
+                self._event_fetch_list = []
+
+                if not event_list:
+                    single_threaded = self.database_engine.single_threaded
+                    if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+                        self._event_fetch_ongoing -= 1
+                        return
+                    else:
+                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                        i += 1
+                        continue
+                i = 0
+
+            self._fetch_event_list(conn, event_list)
+
+    def _fetch_event_list(self, conn, event_list):
+        """Handle a load of requests from the _event_fetch_list queue
+
+        Args:
+            conn (twisted.enterprise.adbapi.Connection): database connection
 
+            event_list (list[Tuple[list[str], Deferred]]):
+                The fetch requests. Each entry consists of a list of event
+                ids to be fetched, and a deferred to be completed once the
+                events have been fetched.
+
+        """
+        with Measure(self._clock, "_fetch_event_list"):
+            try:
                 event_id_lists = zip(*event_list)[0]
                 event_ids = [
                     item for sublist in event_id_lists for item in sublist
                 ]
 
                 rows = self._new_transaction(
-                    conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
+                    conn, "do_fetch", [], [],
+                    self._fetch_event_rows, event_ids,
                 )
 
                 row_dict = {
@@ -265,7 +285,7 @@ class EventsWorkerStore(SQLBaseStore):
                             except Exception:
                                 logger.exception("Failed to callback")
                 with PreserveLoggingContext():
-                    reactor.callFromThread(fire, event_list, row_dict)
+                    self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
             except Exception as e:
                 logger.exception("do_fetch")
 
@@ -276,9 +296,8 @@ class EventsWorkerStore(SQLBaseStore):
                             with PreserveLoggingContext():
                                 d.errback(e)
 
-                if event_list:
-                    with PreserveLoggingContext():
-                        reactor.callFromThread(fire, event_list)
+                with PreserveLoggingContext():
+                    self.hs.get_reactor().callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
@@ -304,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
                 should_start = False
 
         if should_start:
-            with PreserveLoggingContext():
-                self.runWithConnection(
-                    self._do_fetch
-                )
+            run_as_background_process(
+                "fetch_events",
+                self.runWithConnection,
+                self._do_fetch,
+            )
 
         logger.debug("Loading %d events", len(events))
         with PreserveLoggingContext():