summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/generic_worker.py47
1 files changed, 31 insertions, 16 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 5363642d64..bd1733573b 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -65,12 +65,23 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams._base import (
+from synapse.replication.tcp.streams import (
+    AccountDataStream,
     DeviceListsStream,
+    GroupServerStream,
+    PresenceStream,
+    PushersStream,
+    PushRulesStream,
     ReceiptsStream,
+    TagAccountDataStream,
     ToDeviceStream,
+    TypingStream,
+)
+from synapse.replication.tcp.streams.events import (
+    EventsStream,
+    EventsStreamEventRow,
+    EventsStreamRow,
 )
-from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
 from synapse.rest.admin import register_servlets_for_media_repo
 from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -626,7 +637,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
             if self.send_handler:
                 self.send_handler.process_replication_rows(stream_name, token, rows)
 
-            if stream_name == "events":
+            if stream_name == EventsStream.NAME:
                 # We shouldn't get multiple rows per token for events stream, so
                 # we don't need to optimise this for multiple rows.
                 for row in rows:
@@ -649,43 +660,44 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
                     )
 
                 await self.pusher_pool.on_new_notifications(token, token)
-            elif stream_name == "push_rules":
+            elif stream_name == PushRulesStream.NAME:
                 self.notifier.on_new_event(
                     "push_rules_key", token, users=[row.user_id for row in rows]
                 )
-            elif stream_name in ("account_data", "tag_account_data"):
+            elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
                 self.notifier.on_new_event(
                     "account_data_key", token, users=[row.user_id for row in rows]
                 )
-            elif stream_name == "receipts":
+            elif stream_name == ReceiptsStream.NAME:
                 self.notifier.on_new_event(
                     "receipt_key", token, rooms=[row.room_id for row in rows]
                 )
                 await self.pusher_pool.on_new_receipts(
                     token, token, {row.room_id for row in rows}
                 )
-            elif stream_name == "typing":
+            elif stream_name == TypingStream.NAME:
                 self.typing_handler.process_replication_rows(token, rows)
                 self.notifier.on_new_event(
                     "typing_key", token, rooms=[row.room_id for row in rows]
                 )
-            elif stream_name == "to_device":
+            elif stream_name == ToDeviceStream.NAME:
                 entities = [row.entity for row in rows if row.entity.startswith("@")]
                 if entities:
                     self.notifier.on_new_event("to_device_key", token, users=entities)
-            elif stream_name == "device_lists":
+            elif stream_name == DeviceListsStream.NAME:
                 all_room_ids = set()
                 for row in rows:
-                    room_ids = await self.store.get_rooms_for_user(row.user_id)
-                    all_room_ids.update(room_ids)
+                    if row.entity.startswith("@"):
+                        room_ids = await self.store.get_rooms_for_user(row.entity)
+                        all_room_ids.update(room_ids)
                 self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
-            elif stream_name == "presence":
+            elif stream_name == PresenceStream.NAME:
                 await self.presence_handler.process_replication_rows(token, rows)
-            elif stream_name == "receipts":
+            elif stream_name == GroupServerStream.NAME:
                 self.notifier.on_new_event(
                     "groups_key", token, users=[row.user_id for row in rows]
                 )
-            elif stream_name == "pushers":
+            elif stream_name == PushersStream.NAME:
                 for row in rows:
                     if row.deleted:
                         self.stop_pusher(row.user_id, row.app_id, row.pushkey)
@@ -774,7 +786,10 @@ class FederationSenderHandler(object):
 
         # ... as well as device updates and messages
         elif stream_name == DeviceListsStream.NAME:
-            hosts = {row.destination for row in rows}
+            # The entities are either user IDs (starting with '@') whose devices
+            # have changed, or remote servers that we need to tell about
+            # changes.
+            hosts = {row.entity for row in rows if not row.entity.startswith("@")}
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
 
@@ -789,7 +804,7 @@ class FederationSenderHandler(object):
     async def _on_new_receipts(self, rows):
         """
         Args:
-            rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+            rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
                 new receipts to be processed
         """
         for receipt in rows: