summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py32
-rw-r--r--synapse/storage/events_worker.py54
-rw-r--r--synapse/storage/user_directory.py4
3 files changed, 51 insertions, 39 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1fd5d8f162..98dde77431 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -220,7 +220,7 @@ class SQLBaseStore(object):
         self._clock.looping_call(loop, 10000)
 
     def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
-                         logging_context, func, *args, **kwargs):
+                         func, *args, **kwargs):
         start = time.time()
         txn_id = self._TXN_ID
 
@@ -284,8 +284,7 @@ class SQLBaseStore(object):
             end = time.time()
             duration = end - start
 
-            if logging_context is not None:
-                logging_context.add_database_transaction(duration)
+            LoggingContext.current_context().add_database_transaction(duration)
 
             transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
 
@@ -309,19 +308,15 @@ class SQLBaseStore(object):
         Returns:
             Deferred: The result of func
         """
-        current_context = LoggingContext.current_context()
-
         after_callbacks = []
         exception_callbacks = []
 
-        def inner_func(conn, *args, **kwargs):
-            return self._new_transaction(
-                conn, desc, after_callbacks, exception_callbacks, current_context,
-                func, *args, **kwargs
-            )
-
         try:
-            result = yield self.runWithConnection(inner_func, *args, **kwargs)
+            result = yield self.runWithConnection(
+                self._new_transaction,
+                desc, after_callbacks, exception_callbacks, func,
+                *args, **kwargs
+            )
 
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
@@ -346,22 +341,25 @@ class SQLBaseStore(object):
         Returns:
             Deferred: The result of func
         """
-        current_context = LoggingContext.current_context()
+        parent_context = LoggingContext.current_context()
+        if parent_context == LoggingContext.sentinel:
+            logger.warn(
+                "Running db txn from sentinel context: metrics will be lost",
+            )
+            parent_context = None
 
         start_time = time.time()
 
         def inner_func(conn, *args, **kwargs):
-            with LoggingContext("runWithConnection") as context:
+            with LoggingContext("runWithConnection", parent_context) as context:
                 sched_duration_sec = time.time() - start_time
                 sql_scheduling_timer.observe(sched_duration_sec)
-                current_context.add_database_scheduled(sched_duration_sec)
+                context.add_database_scheduled(sched_duration_sec)
 
                 if self.database_engine.is_connection_closed(conn):
                     logger.debug("Reconnecting closed database connection")
                     conn.reconnect()
 
-                current_context.copy_to(context)
-
                 return func(conn, *args, **kwargs)
 
         with PreserveLoggingContext():
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 5fe1fd13e5..67433606c6 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -222,32 +222,47 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        event_list = []
         i = 0
         while True:
-            try:
-                with self._event_fetch_lock:
-                    event_list = self._event_fetch_list
-                    self._event_fetch_list = []
-
-                    if not event_list:
-                        single_threaded = self.database_engine.single_threaded
-                        if single_threaded or i > EVENT_QUEUE_ITERATIONS:
-                            self._event_fetch_ongoing -= 1
-                            return
-                        else:
-                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                            i += 1
-                            continue
-                    i = 0
+            with self._event_fetch_lock:
+                event_list = self._event_fetch_list
+                self._event_fetch_list = []
+
+                if not event_list:
+                    single_threaded = self.database_engine.single_threaded
+                    if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+                        self._event_fetch_ongoing -= 1
+                        return
+                    else:
+                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                        i += 1
+                        continue
+                i = 0
+
+            self._fetch_event_list(conn, event_list)
+
+    def _fetch_event_list(self, conn, event_list):
+        """Handle a load of requests from the _event_fetch_list queue
+
+        Args:
+            conn (twisted.enterprise.adbapi.Connection): database connection
+
+            event_list (list[Tuple[list[str], Deferred]]):
+                The fetch requests. Each entry consists of a list of event
+                ids to be fetched, and a deferred to be completed once the
+                events have been fetched.
 
+        """
+        with Measure(self._clock, "_fetch_event_list"):
+            try:
                 event_id_lists = zip(*event_list)[0]
                 event_ids = [
                     item for sublist in event_id_lists for item in sublist
                 ]
 
                 rows = self._new_transaction(
-                    conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
+                    conn, "do_fetch", [], [],
+                    self._fetch_event_rows, event_ids,
                 )
 
                 row_dict = {
@@ -280,9 +295,8 @@ class EventsWorkerStore(SQLBaseStore):
                             with PreserveLoggingContext():
                                 d.errback(e)
 
-                if event_list:
-                    with PreserveLoggingContext():
-                        self.hs.get_reactor().callFromThread(fire, event_list)
+                with PreserveLoggingContext():
+                    self.hs.get_reactor().callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index ce59e70d0e..a8781b0e5d 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -265,7 +265,7 @@ class UserDirectoryStore(SQLBaseStore):
         self.get_user_in_public_room.invalidate((user_id,))
 
     def get_users_in_public_due_to_room(self, room_id):
-        """Get all user_ids that are in the room directory becuase they're
+        """Get all user_ids that are in the room directory because they're
         in the given room_id
         """
         return self._simple_select_onecol(
@@ -277,7 +277,7 @@ class UserDirectoryStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_users_in_dir_due_to_room(self, room_id):
-        """Get all user_ids that are in the room directory becuase they're
+        """Get all user_ids that are in the room directory because they're
         in the given room_id
         """
         user_ids_dir = yield self._simple_select_onecol(