From 1ce59d7ba002a869ee94fbe375898cc79c6eb4d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Jul 2024 12:39:49 +0100 Subject: Fix sync waiting for an invalid token from the "future" (#17386) Fixes https://github.com/element-hq/synapse/issues/17274, hopefully. Basically, old versions of Synapse could advance streams without persisting anything in the DB (fixed in #17229). On restart those updates would get lost, and so the position of the stream would revert to an older position. If this happened across an upgrade to a later Synapse version which included #17215, then sync could get blocked indefinitely (until the stream advanced to the position in the token). We fix this by bounding the stream positions we'll wait for to the maximum position of the underlying stream ID generator. --- synapse/notifier.py | 7 +++ synapse/storage/databases/main/account_data.py | 10 ++-- synapse/storage/databases/main/deviceinbox.py | 10 ++-- synapse/storage/databases/main/devices.py | 3 ++ synapse/storage/databases/main/events_worker.py | 4 +- synapse/storage/databases/main/presence.py | 10 ++-- synapse/storage/databases/main/push_rule.py | 3 ++ synapse/storage/databases/main/receipts.py | 10 ++-- synapse/storage/databases/main/room.py | 11 ++--- synapse/storage/databases/main/stream.py | 3 ++ synapse/storage/util/id_generators.py | 5 ++ synapse/storage/util/sequence.py | 24 ++++++++++ synapse/streams/events.py | 64 ++++++++++++++++++++++++- synapse/types/__init__.py | 18 +++++++ 14 files changed, 153 insertions(+), 29 deletions(-) (limited to 'synapse') diff --git a/synapse/notifier.py b/synapse/notifier.py index c87eb748c0..c3ecf86ec4 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -764,6 +764,13 @@ class Notifier: async def wait_for_stream_token(self, stream_token: StreamToken) -> bool: """Wait for this worker to catch up with the given stream token.""" + current_token = self.event_sources.get_current_token() + if stream_token.is_before_or_eq(current_token): + return True + + # Work around a bug where older Synapse versions gave out tokens "from + # the future", i.e. that are ahead of the tokens persisted in the DB. + stream_token = await self.event_sources.bound_future_token(stream_token) start = self.clock.time_msec() while True: diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 9611a84932..966393869b 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -43,10 +43,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.push_rule import PushRulesWorkerStore -from synapse.storage.util.id_generators import ( - AbstractStreamIdGenerator, - MultiWriterIdGenerator, -) +from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import JsonDict, JsonMapping from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -71,7 +68,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) self._instance_name in hs.config.worker.writers.account_data ) - self._account_data_id_gen: AbstractStreamIdGenerator + self._account_data_id_gen: MultiWriterIdGenerator self._account_data_id_gen = MultiWriterIdGenerator( db_conn=db_conn, @@ -113,6 +110,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) """ return self._account_data_id_gen.get_current_token() + def get_account_data_id_generator(self) -> MultiWriterIdGenerator: + return self._account_data_id_gen + @cached() async def get_global_account_data_for_user( self, user_id: str diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 07333efff8..304ac42411 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -50,10 +50,7 @@ from synapse.storage.database import ( LoggingTransaction, make_in_list_sql_clause, ) -from synapse.storage.util.id_generators import ( - AbstractStreamIdGenerator, - MultiWriterIdGenerator, -) +from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache @@ -92,7 +89,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._instance_name in hs.config.worker.writers.to_device ) - self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator( + self._to_device_msg_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), @@ -169,6 +166,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): def get_to_device_stream_token(self) -> int: return self._to_device_msg_id_gen.get_current_token() + def get_to_device_id_generator(self) -> MultiWriterIdGenerator: + return self._to_device_msg_id_gen + async def get_messages_for_user_devices( self, user_ids: Collection[str], diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 59a035dd62..53024bddc3 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -243,6 +243,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() + def get_device_stream_id_generator(self) -> MultiWriterIdGenerator: + return self._device_list_id_gen + async def count_devices_by_users( self, user_ids: Optional[Collection[str]] = None ) -> int: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e264d36f02..198e65cfa5 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -192,8 +192,8 @@ class EventsWorkerStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) - self._stream_id_gen: AbstractStreamIdGenerator - self._backfill_id_gen: AbstractStreamIdGenerator + self._stream_id_gen: MultiWriterIdGenerator + self._backfill_id_gen: MultiWriterIdGenerator self._stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 923e764491..065c885603 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -42,10 +42,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.engines._base import IsolationLevel from synapse.storage.types import Connection -from synapse.storage.util.id_generators import ( - AbstractStreamIdGenerator, - MultiWriterIdGenerator, -) +from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.iterutils import batch_iter @@ -83,7 +80,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() - self._presence_id_gen: AbstractStreamIdGenerator + self._presence_id_gen: MultiWriterIdGenerator self._can_persist_presence = ( self._instance_name in hs.config.worker.writers.presence @@ -455,6 +452,9 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) def get_current_presence_token(self) -> int: return self._presence_id_gen.get_current_token() + def get_presence_stream_id_gen(self) -> MultiWriterIdGenerator: + return self._presence_id_gen + def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]: """Fetch non-offline presence from the database so that we can register the appropriate time outs. diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 2a39dc9f90..bbdde17711 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -178,6 +178,9 @@ class PushRulesWorkerStore( """ return self._push_rules_stream_id_gen.get_current_token() + def get_push_rules_stream_id_gen(self) -> MultiWriterIdGenerator: + return self._push_rules_stream_id_gen + def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 8432560a89..3bde0ae0d4 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -45,10 +45,7 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.engines._base import IsolationLevel -from synapse.storage.util.id_generators import ( - AbstractStreamIdGenerator, - MultiWriterIdGenerator, -) +from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import ( JsonDict, JsonMapping, @@ -76,7 +73,7 @@ class ReceiptsWorkerStore(SQLBaseStore): # In the worker store this is an ID tracker which we overwrite in the non-worker # class below that is used on the main process. - self._receipts_id_gen: AbstractStreamIdGenerator + self._receipts_id_gen: MultiWriterIdGenerator self._can_write_to_receipts = ( self._instance_name in hs.config.worker.writers.receipts @@ -136,6 +133,9 @@ class ReceiptsWorkerStore(SQLBaseStore): def get_receipt_stream_id_for_instance(self, instance_name: str) -> int: return self._receipts_id_gen.get_current_token_for_writer(instance_name) + def get_receipts_stream_id_gen(self) -> MultiWriterIdGenerator: + return self._receipts_id_gen + def get_last_unthreaded_receipt_for_user_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index d5627b1d6e..80a4bf95f2 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -59,11 +59,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.types import Cursor -from synapse.storage.util.id_generators import ( - AbstractStreamIdGenerator, - IdGenerator, - MultiWriterIdGenerator, -) +from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList @@ -151,7 +147,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): self.config: HomeServerConfig = hs.config - self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator + self._un_partial_stated_rooms_stream_id_gen: MultiWriterIdGenerator self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator( db_conn=db_conn, @@ -1409,6 +1405,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): instance_name ) + def get_un_partial_stated_rooms_id_generator(self) -> MultiWriterIdGenerator: + return self._un_partial_stated_rooms_stream_id_gen + async def get_un_partial_stated_rooms_between( self, last_id: int, current_id: int, room_ids: Collection[str] ) -> Set[str]: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ff0d723684..b7eb3116ae 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -577,6 +577,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions)) + def get_events_stream_id_generator(self) -> MultiWriterIdGenerator: + return self._stream_id_gen + async def get_room_events_stream_for_rooms( self, room_ids: Collection[str], diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 48f88a6f8a..e8588f33cf 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -812,6 +812,11 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): pos = self.get_current_token_for_writer(self._instance_name) txn.execute(sql, (self._stream_name, self._instance_name, pos)) + async def get_max_allocated_token(self) -> int: + return await self._db.runInteraction( + "get_max_allocated_token", self._sequence_gen.get_max_allocated + ) + @attr.s(frozen=True, auto_attribs=True) class _AsyncCtxManagerWrapper(Generic[T]): diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index c4c0602b28..cac3eba1a5 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -88,6 +88,10 @@ class SequenceGenerator(metaclass=abc.ABCMeta): """ ... + @abc.abstractmethod + def get_max_allocated(self, txn: Cursor) -> int: + """Get the maximum ID that we have allocated""" + class PostgresSequenceGenerator(SequenceGenerator): """An implementation of SequenceGenerator which uses a postgres sequence""" @@ -190,6 +194,17 @@ class PostgresSequenceGenerator(SequenceGenerator): % {"seq": self._sequence_name, "stream_name": stream_name} ) + def get_max_allocated(self, txn: Cursor) -> int: + # We just read from the sequence what the last value we fetched was. + txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}") + row = txn.fetchone() + assert row is not None + + last_value, is_called = row + if not is_called: + last_value -= 1 + return last_value + GetFirstCallbackType = Callable[[Cursor], int] @@ -248,6 +263,15 @@ class LocalSequenceGenerator(SequenceGenerator): # There is nothing to do for in memory sequences pass + def get_max_allocated(self, txn: Cursor) -> int: + with self._lock: + if self._current_max_id is None: + assert self._callback is not None + self._current_max_id = self._callback(txn) + self._callback = None + + return self._current_max_id + def build_sequence_generator( db_conn: "LoggingDatabaseConnection", diff --git a/synapse/streams/events.py b/synapse/streams/events.py index dd7401ac8e..93d5ae1a55 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -30,7 +30,12 @@ from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource from synapse.logging.opentracing import trace from synapse.streams import EventSource -from synapse.types import MultiWriterStreamToken, StreamKeyType, StreamToken +from synapse.types import ( + AbstractMultiWriterStreamToken, + MultiWriterStreamToken, + StreamKeyType, + StreamToken, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -91,6 +96,63 @@ class EventSources: ) return token + async def bound_future_token(self, token: StreamToken) -> StreamToken: + """Bound a token that is ahead of the current token to the maximum + persisted values. + + This ensures that if we wait for the given token we know the stream will + eventually advance to that point. + + This works around a bug where older Synapse versions will give out + tokens for streams, and then after a restart will give back tokens where + the stream has "gone backwards". + """ + + current_token = self.get_current_token() + + stream_key_to_id_gen = { + StreamKeyType.ROOM: self.store.get_events_stream_id_generator(), + StreamKeyType.PRESENCE: self.store.get_presence_stream_id_gen(), + StreamKeyType.RECEIPT: self.store.get_receipts_stream_id_gen(), + StreamKeyType.ACCOUNT_DATA: self.store.get_account_data_id_generator(), + StreamKeyType.PUSH_RULES: self.store.get_push_rules_stream_id_gen(), + StreamKeyType.TO_DEVICE: self.store.get_to_device_id_generator(), + StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(), + StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(), + } + + for _, key in StreamKeyType.__members__.items(): + if key == StreamKeyType.TYPING: + # Typing stream is allowed to "reset", and so comparisons don't + # really make sense as is. + # TODO: Figure out a better way of tracking resets. + continue + + token_value = token.get_field(key) + current_value = current_token.get_field(key) + + if isinstance(token_value, AbstractMultiWriterStreamToken): + assert type(current_value) is type(token_value) + + if not token_value.is_before_or_eq(current_value): # type: ignore[arg-type] + max_token = await stream_key_to_id_gen[ + key + ].get_max_allocated_token() + + token = token.copy_and_replace( + key, token.room_key.bound_stream_token(max_token) + ) + else: + assert isinstance(current_value, int) + if current_value < token_value: + max_token = await stream_key_to_id_gen[ + key + ].get_max_allocated_token() + + token = token.copy_and_replace(key, min(token_value, max_token)) + + return token + @trace async def get_start_token_for_pagination(self, room_id: str) -> StreamToken: """Get the start token for a given room to be used to paginate diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 151658df53..8ab9f90238 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -536,6 +536,16 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): return True + def bound_stream_token(self, max_stream: int) -> "Self": + """Bound the stream positions to a maximum value""" + + return type(self)( + stream=min(self.stream, max_stream), + instance_map=immutabledict( + {k: min(s, max_stream) for k, s in self.instance_map.items()} + ), + ) + @attr.s(frozen=True, slots=True, order=False) class RoomStreamToken(AbstractMultiWriterStreamToken): @@ -722,6 +732,14 @@ class RoomStreamToken(AbstractMultiWriterStreamToken): else: return "s%d" % (self.stream,) + def bound_stream_token(self, max_stream: int) -> "RoomStreamToken": + """See super class""" + + # This only makes sense for stream tokens. + assert self.topological is None + + return super().bound_stream_token(max_stream) + @attr.s(frozen=True, slots=True, order=False) class MultiWriterStreamToken(AbstractMultiWriterStreamToken): -- cgit 1.4.1 From b905ae27caac4bb27262d9d7ac6e834de5694f10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Jul 2024 14:06:36 +0100 Subject: Fix regression when bounding future tokens (#17391) Fix bug added in #17386, where we accidentally used `room_key` for the receipts stream. See first commit. Reviewable commit-by-commit --- changelog.d/17391.bugfix | 1 + synapse/streams/events.py | 26 ++++++++++++++++++++++---- tests/handlers/test_sync.py | 37 +++++++++++++++++++++++++++++++------ 3 files changed, 54 insertions(+), 10 deletions(-) create mode 100644 changelog.d/17391.bugfix (limited to 'synapse') diff --git a/changelog.d/17391.bugfix b/changelog.d/17391.bugfix new file mode 100644 index 0000000000..9686b5c276 --- /dev/null +++ b/changelog.d/17391.bugfix @@ -0,0 +1 @@ +Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 93d5ae1a55..856f646795 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -19,6 +19,7 @@ # # +import logging from typing import TYPE_CHECKING, Sequence, Tuple import attr @@ -41,6 +42,9 @@ if TYPE_CHECKING: from synapse.server import HomeServer +logger = logging.getLogger(__name__) + + @attr.s(frozen=True, slots=True, auto_attribs=True) class _EventSourcesInner: room: RoomEventSource @@ -139,9 +143,16 @@ class EventSources: key ].get_max_allocated_token() - token = token.copy_and_replace( - key, token.room_key.bound_stream_token(max_token) - ) + if max_token < token_value.get_max_stream_pos(): + logger.error( + "Bounding token from the future '%s': token: %s, bound: %s", + key, + token_value, + max_token, + ) + token = token.copy_and_replace( + key, token_value.bound_stream_token(max_token) + ) else: assert isinstance(current_value, int) if current_value < token_value: @@ -149,7 +160,14 @@ class EventSources: key ].get_max_allocated_token() - token = token.copy_and_replace(key, min(token_value, max_token)) + if max_token < token_value: + logger.error( + "Bounding token from the future '%s': token: %s, bound: %s", + key, + token_value, + max_token, + ) + token = token.copy_and_replace(key, max_token) return token diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 5319928c28..674dd4fb54 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -36,7 +36,14 @@ from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVe from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer -from synapse.types import JsonDict, StreamKeyType, UserID, create_requester +from synapse.types import ( + JsonDict, + MultiWriterStreamToken, + RoomStreamToken, + StreamKeyType, + UserID, + create_requester, +) from synapse.util import Clock import tests.unittest @@ -999,7 +1006,13 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): self.get_success(sync_d, by=1.0) - def test_wait_for_invalid_future_sync_token(self) -> None: + @parameterized.expand( + [(key,) for key in StreamKeyType.__members__.values()], + name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}", + ) + def test_wait_for_invalid_future_sync_token( + self, stream_key: StreamKeyType + ) -> None: """Like the previous test, except we give a token that has a stream position ahead of what is in the DB, i.e. its invalid and we shouldn't wait for the stream to advance (as it may never do so). @@ -1010,11 +1023,23 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): """ user = self.register_user("alice", "password") - # Create a token and arbitrarily advance one of the streams. + # Create a token and advance one of the streams. current_token = self.hs.get_event_sources().get_current_token() - since_token = current_token.copy_and_advance( - StreamKeyType.PRESENCE, current_token.presence_key + 1 - ) + token_value = current_token.get_field(stream_key) + + # How we advance the streams depends on the type. + if isinstance(token_value, int): + since_token = current_token.copy_and_advance(stream_key, token_value + 1) + elif isinstance(token_value, MultiWriterStreamToken): + since_token = current_token.copy_and_advance( + stream_key, MultiWriterStreamToken(stream=token_value.stream + 1) + ) + elif isinstance(token_value, RoomStreamToken): + since_token = current_token.copy_and_advance( + stream_key, RoomStreamToken(stream=token_value.stream + 1) + ) + else: + raise Exception("Unreachable") sync_d = defer.ensureDeferred( self.sync_handler.wait_for_sync_for_user( -- cgit 1.4.1 From 1609855ff8322e3d4d91f8aea322f9750ac24ba2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 1 Jul 2024 12:48:36 +0100 Subject: Limit size of presence EDUs (#17371) Otherwise they are unbounded. --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/17371.misc | 1 + synapse/federation/sender/per_destination_queue.py | 31 ++++-- tests/federation/test_federation_sender.py | 119 +++++++++++++++++++++ 3 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 changelog.d/17371.misc (limited to 'synapse') diff --git a/changelog.d/17371.misc b/changelog.d/17371.misc new file mode 100644 index 0000000000..0fbf19f4fb --- /dev/null +++ b/changelog.d/17371.misc @@ -0,0 +1 @@ +Limit size of presence EDUs to 50 entries. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index d9f2f017ed..9f1c2fe22a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -21,6 +21,7 @@ # import datetime import logging +from collections import OrderedDict from types import TracebackType from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type @@ -68,6 +69,10 @@ sent_edus_by_type = Counter( # If the retry interval is larger than this then we enter "catchup" mode CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000 +# Limit how many presence states we add to each presence EDU, to ensure that +# they are bounded in size. +MAX_PRESENCE_STATES_PER_EDU = 50 + class PerDestinationQueue: """ @@ -144,7 +149,7 @@ class PerDestinationQueue: # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence: Dict[str, UserPresenceState] = {} + self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict() # List of room_id -> receipt_type -> user_id -> receipt_dict, # @@ -399,7 +404,7 @@ class PerDestinationQueue: # through another mechanism, because this is all volatile! self._pending_edus = [] self._pending_edus_keyed = {} - self._pending_presence = {} + self._pending_presence.clear() self._pending_receipt_edus = [] self._start_catching_up() @@ -721,22 +726,26 @@ class _TransactionQueueManager: # Add presence EDU. if self.queue._pending_presence: + # Only send max 50 presence entries in the EDU, to bound the amount + # of data we're sending. + presence_to_add: List[JsonDict] = [] + while ( + self.queue._pending_presence + and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU + ): + _, presence = self.queue._pending_presence.popitem(last=False) + presence_to_add.append( + format_user_presence_state(presence, self.queue._clock.time_msec()) + ) + pending_edus.append( Edu( origin=self.queue._server_name, destination=self.queue._destination, edu_type=EduTypes.PRESENCE, - content={ - "push": [ - format_user_presence_state( - presence, self.queue._clock.time_msec() - ) - for presence in self.queue._pending_presence.values() - ] - }, + content={"push": presence_to_add}, ) ) - self.queue._pending_presence = {} # Add read receipt EDUs. pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 9073afc70e..6a8887fe74 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -27,6 +27,8 @@ from twisted.internet import defer from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms +from synapse.api.presence import UserPresenceState +from synapse.federation.sender.per_destination_queue import MAX_PRESENCE_STATES_PER_EDU from synapse.federation.units import Transaction from synapse.handlers.device import DeviceHandler from synapse.rest import admin @@ -266,6 +268,123 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): ) +class FederationSenderPresenceTestCases(HomeserverTestCase): + """ + Test federation sending for presence updates. + """ + + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + self.federation_transport_client = Mock(spec=["send_transaction"]) + self.federation_transport_client.send_transaction = AsyncMock() + hs = self.setup_test_homeserver( + federation_transport_client=self.federation_transport_client, + ) + + return hs + + def default_config(self) -> JsonDict: + config = super().default_config() + config["federation_sender_instances"] = None + return config + + def test_presence_simple(self) -> None: + "Test that sending a single presence update works" + + mock_send_transaction: AsyncMock = ( + self.federation_transport_client.send_transaction + ) + mock_send_transaction.return_value = {} + + sender = self.hs.get_federation_sender() + self.get_success( + sender.send_presence_to_destinations( + [UserPresenceState.default("@user:test")], + ["server"], + ) + ) + + self.pump() + + # expect a call to send_transaction + mock_send_transaction.assert_awaited_once() + + json_cb = mock_send_transaction.call_args[0][1] + data = json_cb() + self.assertEqual( + data["edus"], + [ + { + "edu_type": EduTypes.PRESENCE, + "content": { + "push": [ + { + "presence": "offline", + "user_id": "@user:test", + } + ] + }, + } + ], + ) + + def test_presence_batched(self) -> None: + """Test that sending lots of presence updates to a destination are + batched, rather than having them all sent in one EDU.""" + + mock_send_transaction: AsyncMock = ( + self.federation_transport_client.send_transaction + ) + mock_send_transaction.return_value = {} + + sender = self.hs.get_federation_sender() + + # We now send lots of presence updates to force the federation sender to + # batch the mup. + number_presence_updates_to_send = MAX_PRESENCE_STATES_PER_EDU * 2 + self.get_success( + sender.send_presence_to_destinations( + [ + UserPresenceState.default(f"@user{i}:test") + for i in range(number_presence_updates_to_send) + ], + ["server"], + ) + ) + + self.pump() + + # We should have seen at least one transcation be sent by now. + mock_send_transaction.assert_called() + + # We don't want to specify exactly how the presence EDUs get sent out, + # could be one per transaction or multiple per transaction. We just want + # to assert that a) each presence EDU has bounded number of updates, and + # b) that all updates get sent out. + presence_edus = [] + for transaction_call in mock_send_transaction.call_args_list: + json_cb = transaction_call[0][1] + data = json_cb() + + for edu in data["edus"]: + self.assertEqual(edu.get("edu_type"), EduTypes.PRESENCE) + presence_edus.append(edu) + + # A set of all user presence we see, this should end up matching the + # number we sent out above. + seen_users: Set[str] = set() + + for edu in presence_edus: + presence_states = edu["content"]["push"] + + # This is where we actually check that the number of presence + # updates is bounded. + self.assertLessEqual(len(presence_states), MAX_PRESENCE_STATES_PER_EDU) + + seen_users.update(p["user_id"] for p in presence_states) + + self.assertEqual(len(seen_users), number_presence_updates_to_send) + + class FederationSenderDevicesTestCases(HomeserverTestCase): """ Test federation sending to update devices. -- cgit 1.4.1