summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r--synapse/replication/tcp/streams/_base.py96
1 files changed, 24 insertions, 72 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py

index 4acefc8a96..682d47f402 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -79,7 +79,7 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool] UpdateFunction = Callable[[str, Token, Token, int], Awaitable[StreamUpdateResult]] -class Stream(object): +class Stream: """Base class for the streams. Provides a `get_updates()` function that returns new updates since the last @@ -198,26 +198,6 @@ def current_token_without_instance( return lambda instance_name: current_token() -def db_query_to_update_function( - query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]] -) -> UpdateFunction: - """Wraps a db query function which returns a list of rows to make it - suitable for use as an `update_function` for the Stream class - """ - - async def update_function(instance_name, from_token, upto_token, limit): - rows = await query_function(from_token, upto_token, limit) - updates = [(row[0], row[1:]) for row in rows] - limited = False - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited - - return update_function - - def make_http_update_function(hs, stream_name: str) -> UpdateFunction: """Makes a suitable function for use as an `update_function` that queries the master process for updates. @@ -264,7 +244,7 @@ class BackfillStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_current_backfill_token), - db_query_to_update_function(store.get_all_new_backfill_event_rows), + store.get_all_new_backfill_event_rows, ) @@ -291,9 +271,7 @@ class PresenceStream(Stream): if hs.config.worker_app is None: # on the master, query the presence handler presence_handler = hs.get_presence_handler() - update_function = db_query_to_update_function( - presence_handler.get_all_presence_updates - ) + update_function = presence_handler.get_all_presence_updates else: # Query master process update_function = make_http_update_function(hs, self.NAME) @@ -316,13 +294,12 @@ class TypingStream(Stream): def __init__(self, hs): typing_handler = hs.get_typing_handler() - if hs.config.worker_app is None: - # on the master, query the typing handler - update_function = db_query_to_update_function( - typing_handler.get_all_typing_updates - ) + writer_instance = hs.config.worker.writers.typing + if writer_instance == hs.get_instance_name(): + # On the writer, query the typing handler + update_function = typing_handler.get_all_typing_updates else: - # Query master process + # Query the typing writer process update_function = make_http_update_function(hs, self.NAME) super().__init__( @@ -352,7 +329,7 @@ class ReceiptsStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_max_receipt_stream_id), - db_query_to_update_function(store.get_all_updated_receipts), + store.get_all_updated_receipts, ) @@ -367,26 +344,17 @@ class PushRulesStream(Stream): def __init__(self, hs): self.store = hs.get_datastore() + super(PushRulesStream, self).__init__( - hs.get_instance_name(), self._current_token, self._update_function + hs.get_instance_name(), + self._current_token, + self.store.get_all_push_rule_updates, ) def _current_token(self, instance_name: str) -> int: - push_rules_token, _ = self.store.get_push_rules_stream_token() + push_rules_token = self.store.get_max_push_rules_stream_id() return push_rules_token - async def _update_function( - self, instance_name: str, from_token: Token, to_token: Token, limit: int - ): - rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit) - - limited = False - if len(rows) == limit: - to_token = rows[-1][0] - limited = True - - return [(row[0], (row[2],)) for row in rows], to_token, limited - class PushersStream(Stream): """A user has added/changed/removed a pusher @@ -406,7 +374,7 @@ class PushersStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_pushers_stream_token), - db_query_to_update_function(store.get_all_updated_pushers_rows), + store.get_all_updated_pushers_rows, ) @@ -434,27 +402,13 @@ class CachesStream(Stream): ROW_TYPE = CachesStreamRow def __init__(self, hs): - self.store = hs.get_datastore() + store = hs.get_datastore() super().__init__( hs.get_instance_name(), - self.store.get_cache_stream_token, - self._update_function, + store.get_cache_stream_token_for_writer, + store.get_all_updated_caches, ) - async def _update_function( - self, instance_name: str, from_token: int, upto_token: int, limit: int - ): - rows = await self.store.get_all_updated_caches( - instance_name, from_token, upto_token, limit - ) - updates = [(row[0], row[1:]) for row in rows] - limited = False - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited - class PublicRoomsStream(Stream): """The public rooms list changed @@ -478,7 +432,7 @@ class PublicRoomsStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_current_public_room_stream_id), - db_query_to_update_function(store.get_all_new_public_rooms), + store.get_all_new_public_rooms, ) @@ -499,7 +453,7 @@ class DeviceListsStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_device_stream_token), - db_query_to_update_function(store.get_all_device_list_changes_for_remotes), + store.get_all_device_list_changes_for_remotes, ) @@ -517,7 +471,7 @@ class ToDeviceStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_to_device_stream_token), - db_query_to_update_function(store.get_all_new_device_messages), + store.get_all_new_device_messages, ) @@ -537,7 +491,7 @@ class TagAccountDataStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_max_account_data_stream_id), - db_query_to_update_function(store.get_all_updated_tags), + store.get_all_updated_tags, ) @@ -625,7 +579,7 @@ class GroupServerStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_group_stream_token), - db_query_to_update_function(store.get_all_groups_changes), + store.get_all_groups_changes, ) @@ -643,7 +597,5 @@ class UserSignatureStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_device_stream_token), - db_query_to_update_function( - store.get_all_user_signature_changes_for_remotes - ), + store.get_all_user_signature_changes_for_remotes, )