summary refs log tree commit diff
path: root/synapse/app/synchrotron.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-01-27 13:36:39 +0000
committerErik Johnston <erik@matrix.org>2017-01-27 14:31:35 +0000
commit252b503fc8626078141dc6b82eeff63607874347 (patch)
tree72667c5deb13c6f3cbcbc802ecf9c898a8c6d004 /synapse/app/synchrotron.py
parentComment (diff)
downloadsynapse-252b503fc8626078141dc6b82eeff63607874347.tar.xz
Hook device list updates to replication
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r--synapse/app/synchrotron.py27
1 files changed, 26 insertions, 1 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4dfc2dc648..9d250502e0 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -39,6 +39,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.server import HomeServer
 from synapse.storage.client_ips import ClientIpStore
@@ -77,6 +78,7 @@ class SynchrotronSlavedStore(
     SlavedFilteringStore,
     SlavedPresenceStore,
     SlavedDeviceInboxStore,
+    SlavedDeviceStore,
     RoomStore,
     BaseSlavedStore,
     ClientIpStore,  # After BaseSlavedStore because the constructor is different
@@ -380,6 +382,28 @@ class SynchrotronServer(HomeServer):
                         stream_key, position, users=users, rooms=rooms
                     )
 
+        @defer.inlineCallbacks
+        def notify_device_list_update(result):
+            stream = result.get("device_lists")
+            if not stream:
+                return
+
+            position_index = stream["field_names"].index("position")
+            user_index = stream["field_names"].index("user_id")
+
+            for row in stream["rows"]:
+                logger.info("Handling device list row: %r", row)
+                position = row[position_index]
+                user_id = row[user_index]
+
+                rooms = yield store.get_rooms_for_user(user_id)
+                room_ids = [r.room_id for r in rooms]
+
+                notifier.on_new_event(
+                    "device_list_key", position, rooms=room_ids,
+                )
+
+        @defer.inlineCallbacks
         def notify(result):
             stream = result.get("events")
             if stream:
@@ -417,6 +441,7 @@ class SynchrotronServer(HomeServer):
             notify_from_stream(
                 result, "to_device", "to_device_key", user="user_id"
             )
+            yield notify_device_list_update(result)
 
         while True:
             try:
@@ -427,7 +452,7 @@ class SynchrotronServer(HomeServer):
                 yield store.process_replication(result)
                 typing_handler.process_replication(result)
                 yield presence_handler.process_replication(result)
-                notify(result)
+                yield notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(5)