summary refs log tree commit diff
path: root/synapse/app/synchrotron.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r--synapse/app/synchrotron.py52
1 files changed, 44 insertions, 8 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4dfc2dc648..34e34e5580 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.events import FrozenEvent
 from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
@@ -39,6 +38,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.server import HomeServer
 from synapse.storage.client_ips import ClientIpStore
@@ -47,7 +47,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
@@ -77,6 +78,7 @@ class SynchrotronSlavedStore(
     SlavedFilteringStore,
     SlavedPresenceStore,
     SlavedDeviceInboxStore,
+    SlavedDeviceStore,
     RoomStore,
     BaseSlavedStore,
     ClientIpStore,  # After BaseSlavedStore because the constructor is different
@@ -85,6 +87,10 @@ class SynchrotronSlavedStore(
         RoomMemberStore.__dict__["who_forgot_in_room"]
     )
 
+    did_forget = (
+        RoomMemberStore.__dict__["did_forget"]
+    )
+
     # XXX: This is a bit broken because we don't persist the accepted list in a
     # way that can be replicated. This means that we don't have a way to
     # invalidate the cache correctly.
@@ -380,15 +386,40 @@ class SynchrotronServer(HomeServer):
                         stream_key, position, users=users, rooms=rooms
                     )
 
+        @defer.inlineCallbacks
+        def notify_device_list_update(result):
+            stream = result.get("device_lists")
+            if not stream:
+                return
+
+            position_index = stream["field_names"].index("position")
+            user_index = stream["field_names"].index("user_id")
+
+            for row in stream["rows"]:
+                position = row[position_index]
+                user_id = row[user_index]
+
+                room_ids = yield store.get_rooms_for_user(user_id)
+
+                notifier.on_new_event(
+                    "device_list_key", position, rooms=room_ids,
+                )
+
+        @defer.inlineCallbacks
         def notify(result):
             stream = result.get("events")
             if stream:
                 max_position = stream["position"]
+
+                event_map = yield store.get_events([row[1] for row in stream["rows"]])
+
                 for row in stream["rows"]:
                     position = row[0]
-                    internal = json.loads(row[1])
-                    event_json = json.loads(row[2])
-                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
+                    event_id = row[1]
+                    event = event_map.get(event_id, None)
+                    if not event:
+                        continue
+
                     extra_users = ()
                     if event.type == EventTypes.Member:
                         extra_users = (event.state_key,)
@@ -417,6 +448,7 @@ class SynchrotronServer(HomeServer):
             notify_from_stream(
                 result, "to_device", "to_device_key", user="user_id"
             )
+            yield notify_device_list_update(result)
 
         while True:
             try:
@@ -427,7 +459,7 @@ class SynchrotronServer(HomeServer):
                 yield store.process_replication(result)
                 typing_handler.process_replication(result)
                 yield presence_handler.process_replication(result)
-                notify(result)
+                yield notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(5)
@@ -450,7 +482,7 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.synchrotron"
 
-    setup_logging(config.worker_log_config, config.worker_log_file)
+    setup_logging(config, use_worker_options=True)
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
@@ -469,7 +501,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds: