diff options
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r-- | synapse/replication/tcp/streams/__init__.py | 3 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 74 |
2 files changed, 52 insertions, 25 deletions
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index a7eadfa3c9..9c67f661a3 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -37,7 +37,6 @@ from synapse.replication.tcp.streams._base import ( Stream, ToDeviceStream, TypingStream, - UserSignatureStream, ) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.federation import FederationStream @@ -62,7 +61,6 @@ STREAMS_MAP = { ToDeviceStream, FederationStream, AccountDataStream, - UserSignatureStream, UnPartialStatedRoomStream, UnPartialStatedEventStream, ) @@ -82,7 +80,6 @@ __all__ = [ "DeviceListsStream", "ToDeviceStream", "AccountDataStream", - "UserSignatureStream", "UnPartialStatedRoomStream", "UnPartialStatedEventStream", ] diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index fbf78da9c2..a4bdb48c0c 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -463,18 +463,67 @@ class DeviceListsStream(Stream): @attr.s(slots=True, frozen=True, auto_attribs=True) class DeviceListsStreamRow: entity: str + # Indicates that a user has signed their own device with their user-signing key + is_signature: bool NAME = "device_lists" ROW_TYPE = DeviceListsStreamRow def __init__(self, hs: "HomeServer"): - store = hs.get_datastores().main + self.store = hs.get_datastores().main super().__init__( hs.get_instance_name(), - current_token_without_instance(store.get_device_stream_token), - store.get_all_device_list_changes_for_remotes, + current_token_without_instance(self.store.get_device_stream_token), + self._update_function, + ) + + async def _update_function( + self, + instance_name: str, + from_token: Token, + current_token: Token, + target_row_count: int, + ) -> StreamUpdateResult: + ( + device_updates, + devices_to_token, + devices_limited, + ) = await self.store.get_all_device_list_changes_for_remotes( + instance_name, from_token, current_token, target_row_count ) + ( + signatures_updates, + signatures_to_token, + signatures_limited, + ) = await self.store.get_all_user_signature_changes_for_remotes( + instance_name, from_token, current_token, target_row_count + ) + + upper_limit_token = current_token + if devices_limited: + upper_limit_token = min(upper_limit_token, devices_to_token) + if signatures_limited: + upper_limit_token = min(upper_limit_token, signatures_to_token) + + device_updates = [ + (stream_id, (entity, False)) + for stream_id, (entity,) in device_updates + if stream_id <= upper_limit_token + ] + + signatures_updates = [ + (stream_id, (entity, True)) + for stream_id, (entity,) in signatures_updates + if stream_id <= upper_limit_token + ] + + updates = list( + heapq.merge(device_updates, signatures_updates, key=lambda row: row[0]) + ) + + return updates, upper_limit_token, devices_limited or signatures_limited + class ToDeviceStream(Stream): """New to_device messages for a client""" @@ -583,22 +632,3 @@ class AccountDataStream(Stream): heapq.merge(room_rows, global_rows, tag_rows, key=lambda row: row[0]) ) return updates, to_token, limited - - -class UserSignatureStream(Stream): - """A user has signed their own device with their user-signing key""" - - @attr.s(slots=True, frozen=True, auto_attribs=True) - class UserSignatureStreamRow: - user_id: str - - NAME = "user_signature" - ROW_TYPE = UserSignatureStreamRow - - def __init__(self, hs: "HomeServer"): - store = hs.get_datastores().main - super().__init__( - hs.get_instance_name(), - current_token_without_instance(store.get_device_stream_token), - store.get_all_user_signature_changes_for_remotes, - ) |