diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 896225aab9..67433606c6 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,29 +12,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+import logging
+from collections import namedtuple
+
+from canonicaljson import json
from twisted.internet import defer
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
-
from synapse.util.logcontext import (
- PreserveLoggingContext, make_deferred_yieldable, run_in_background,
LoggingContext,
+ PreserveLoggingContext,
+ make_deferred_yieldable,
+ run_in_background,
)
from synapse.util.metrics import Measure
-from synapse.api.errors import SynapseError
-
-from collections import namedtuple
-
-import logging
-
-from canonicaljson import json
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -223,32 +222,47 @@ class EventsWorkerStore(SQLBaseStore):
"""Takes a database connection and waits for requests for events from
the _event_fetch_list queue.
"""
- event_list = []
i = 0
while True:
- try:
- with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
- single_threaded = self.database_engine.single_threaded
- if single_threaded or i > EVENT_QUEUE_ITERATIONS:
- self._event_fetch_ongoing -= 1
- return
- else:
- self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
- i += 1
- continue
- i = 0
+ with self._event_fetch_lock:
+ event_list = self._event_fetch_list
+ self._event_fetch_list = []
+
+ if not event_list:
+ single_threaded = self.database_engine.single_threaded
+ if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+ self._event_fetch_ongoing -= 1
+ return
+ else:
+ self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+ i += 1
+ continue
+ i = 0
+
+ self._fetch_event_list(conn, event_list)
+
+ def _fetch_event_list(self, conn, event_list):
+ """Handle a load of requests from the _event_fetch_list queue
+ Args:
+ conn (twisted.enterprise.adbapi.Connection): database connection
+
+ event_list (list[Tuple[list[str], Deferred]]):
+ The fetch requests. Each entry consists of a list of event
+ ids to be fetched, and a deferred to be completed once the
+ events have been fetched.
+
+ """
+ with Measure(self._clock, "_fetch_event_list"):
+ try:
event_id_lists = zip(*event_list)[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
rows = self._new_transaction(
- conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
+ conn, "do_fetch", [], [],
+ self._fetch_event_rows, event_ids,
)
row_dict = {
@@ -281,9 +295,8 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext():
d.errback(e)
- if event_list:
- with PreserveLoggingContext():
- self.hs.get_reactor().callFromThread(fire, event_list)
+ with PreserveLoggingContext():
+ self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
|