diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 32d9d00ffb..896225aab9 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -14,13 +14,14 @@
# limitations under the License.
from ._base import SQLBaseStore
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
+ LoggingContext,
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
@@ -28,7 +29,8 @@ from synapse.api.errors import SynapseError
from collections import namedtuple
import logging
-import simplejson as json
+
+from canonicaljson import json
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
@@ -145,6 +147,9 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
+ log_ctx = LoggingContext.current_context()
+ log_ctx.record_event_fetch(len(missing_events_ids))
+
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
@@ -265,7 +270,7 @@ class EventsWorkerStore(SQLBaseStore):
except Exception:
logger.exception("Failed to callback")
with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list, row_dict)
+ self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
except Exception as e:
logger.exception("do_fetch")
@@ -278,7 +283,7 @@ class EventsWorkerStore(SQLBaseStore):
if event_list:
with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list)
+ self.hs.get_reactor().callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
|