diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-19 11:36:53 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-19 11:36:53 +0000 |
commit | a319cb1dd179e5d1692ce9fee7d0e09239aa1c26 (patch) | |
tree | 556f5cc2797b9caf16188f87aaef3f67df372ddd /synapse/app/generic_worker.py | |
parent | more changelog (diff) | |
parent | Comments from review (diff) | |
download | synapse-a319cb1dd179e5d1692ce9fee7d0e09239aa1c26.tar.xz |
Change device list streams to have one row per ID (#7010)
* Add 'device_lists_outbound_pokes' as extra table. This makes sure we check all the relevant tables to get the current max stream ID. Currently not doing so isn't problematic as the max stream ID in `device_lists_outbound_pokes` is the same as in `device_lists_stream`, however that will change. * Change device lists stream to have one row per id. This will make it possible to process the streams more incrementally, avoiding having to process large chunks at once. * Change device list replication to match new semantics. Instead of sending down batches of user ID/host tuples, send down a row per entity (user ID or host). * Newsfile * Remove handling of multiple rows per ID * Fix worker handling * Comments from review
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r-- | synapse/app/generic_worker.py | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index b2c764bfe8..cdc078cf11 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -676,8 +676,9 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler): elif stream_name == "device_lists": all_room_ids = set() for row in rows: - room_ids = await self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) + if row.entity.startswith("@"): + room_ids = await self.store.get_rooms_for_user(row.entity) + all_room_ids.update(room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) elif stream_name == "presence": await self.presence_handler.process_replication_rows(token, rows) @@ -774,7 +775,10 @@ class FederationSenderHandler(object): # ... as well as device updates and messages elif stream_name == DeviceListsStream.NAME: - hosts = {row.destination for row in rows} + # The entities are either user IDs (starting with '@') whose devices + # have changed, or remote servers that we need to tell about + # changes. + hosts = {row.entity for row in rows if not row.entity.startswith("@")} for host in hosts: self.federation_sender.send_device_messages(host) |