diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 3f29595256..34e34e5580 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
-from synapse.events import FrozenEvent
from synapse.handlers.presence import PresenceHandler
from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
@@ -48,7 +47,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+ PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
@@ -399,8 +399,7 @@ class SynchrotronServer(HomeServer):
position = row[position_index]
user_id = row[user_index]
- rooms = yield store.get_rooms_for_user(user_id)
- room_ids = [r.room_id for r in rooms]
+ room_ids = yield store.get_rooms_for_user(user_id)
notifier.on_new_event(
"device_list_key", position, rooms=room_ids,
@@ -411,11 +410,16 @@ class SynchrotronServer(HomeServer):
stream = result.get("events")
if stream:
max_position = stream["position"]
+
+ event_map = yield store.get_events([row[1] for row in stream["rows"]])
+
for row in stream["rows"]:
position = row[0]
- internal = json.loads(row[1])
- event_json = json.loads(row[2])
- event = FrozenEvent(event_json, internal_metadata_dict=internal)
+ event_id = row[1]
+ event = event_map.get(event_id, None)
+ if not event:
+ continue
+
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
@@ -478,7 +482,7 @@ def start(config_options):
assert config.worker_app == "synapse.app.synchrotron"
- setup_logging(config.worker_log_config, config.worker_log_file)
+ setup_logging(config, use_worker_options=True)
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
@@ -497,7 +501,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
|