summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-19 11:36:53 +0000
committerGitHub <noreply@github.com>2020-03-19 11:36:53 +0000
commita319cb1dd179e5d1692ce9fee7d0e09239aa1c26 (patch)
tree556f5cc2797b9caf16188f87aaef3f67df372ddd /synapse/app/generic_worker.py
parentmore changelog (diff)
parentComments from review (diff)
downloadsynapse-a319cb1dd179e5d1692ce9fee7d0e09239aa1c26.tar.xz
Change device list streams to have one row per ID (#7010)
* Add 'device_lists_outbound_pokes' as extra table.

This makes sure we check all the relevant tables to get the current max
stream ID.

Currently not doing so isn't problematic as the max stream ID in
`device_lists_outbound_pokes` is the same as in `device_lists_stream`,
however that will change.

* Change device lists stream to have one row per id.

This will make it possible to process the streams more incrementally,
avoiding having to process large chunks at once.

* Change device list replication to match new semantics.

Instead of sending down batches of user ID/host tuples, send down a row
per entity (user ID or host).

* Newsfile

* Remove handling of multiple rows per ID

* Fix worker handling

* Comments from review
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index b2c764bfe8..cdc078cf11 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -676,8 +676,9 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
             elif stream_name == "device_lists":
                 all_room_ids = set()
                 for row in rows:
-                    room_ids = await self.store.get_rooms_for_user(row.user_id)
-                    all_room_ids.update(room_ids)
+                    if row.entity.startswith("@"):
+                        room_ids = await self.store.get_rooms_for_user(row.entity)
+                        all_room_ids.update(room_ids)
                 self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
             elif stream_name == "presence":
                 await self.presence_handler.process_replication_rows(token, rows)
@@ -774,7 +775,10 @@ class FederationSenderHandler(object):
 
         # ... as well as device updates and messages
         elif stream_name == DeviceListsStream.NAME:
-            hosts = {row.destination for row in rows}
+            # The entities are either user IDs (starting with '@') whose devices
+            # have changed, or remote servers that we need to tell about
+            # changes.
+            hosts = {row.entity for row in rows if not row.entity.startswith("@")}
             for host in hosts:
                 self.federation_sender.send_device_messages(host)