diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/admin.py | 22 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 91 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 4 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 2 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 211 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 6 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 6 | ||||
-rw-r--r-- | synapse/handlers/message.py | 23 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 2 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room_batch.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 4 | ||||
-rw-r--r-- | synapse/handlers/search.py | 12 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 6 |
16 files changed, 262 insertions, 139 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index a53cd62d3c..be3203ac80 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -90,6 +90,7 @@ class AdminHandler: Membership.LEAVE, Membership.BAN, Membership.INVITE, + Membership.KNOCK, ), ) @@ -122,6 +123,13 @@ class AdminHandler: invited_state = invite.unsigned["invite_room_state"] writer.write_invite(room_id, invite, invited_state) + if room.membership == Membership.KNOCK: + event_id = room.event_id + knock = await self.store.get_event(event_id, allow_none=True) + if knock: + knock_state = knock.unsigned["knock_room_state"] + writer.write_knock(room_id, knock, knock_state) + continue # We only want to bother fetching events up to the last time they @@ -239,6 +247,20 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod + def write_knock( + self, room_id: str, event: EventBase, state: StateMap[dict] + ) -> None: + """Write a knock for the room, with associated knock state. + + Args: + room_id: The room ID the knock is for. + event: The knock event. + state: A subset of the state at the knock, with a subset of the + event keys (type, state_key content and sender). + """ + raise NotImplementedError() + + @abc.abstractmethod def finished(self) -> Any: """Called when all data has successfully been exported and written. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 36c206dae6..ddc9105ee9 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.storage.databases.main.directory import RoomAliasMapping from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -58,6 +59,10 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False + self._ephemeral_events_linearizer = Linearizer( + name="appservice_ephemeral_events" + ) + def notify_interested_services(self, max_token: RoomStreamToken) -> None: """Notifies (pushes) all application services interested in this event. @@ -182,7 +187,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, stream_key: str, - new_token: Optional[int], + new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, ) -> None: """ @@ -203,7 +208,7 @@ class ApplicationServicesHandler: Appservices will only receive ephemeral events that fall within their registered user and room namespaces. - new_token: The latest stream token. + new_token: The stream token of the event. users: The users that should be informed of the new event, if any. """ if not self.notify_appservices: @@ -212,6 +217,19 @@ class ApplicationServicesHandler: if stream_key not in ("typing_key", "receipt_key", "presence_key"): return + # Assert that new_token is an integer (and not a RoomStreamToken). + # All of the supported streams that this function handles use an + # integer to track progress (rather than a RoomStreamToken - a + # vector clock implementation) as they don't support multiple + # stream writers. + # + # As a result, we simply assert that new_token is an integer. + # If we do end up needing to pass a RoomStreamToken down here + # in the future, using RoomStreamToken.stream (the minimum stream + # position) to convert to an ascending integer value should work. + # Additional context: https://github.com/matrix-org/synapse/pull/11137 + assert isinstance(new_token, int) + services = [ service for service in self.store.get_app_services() @@ -231,14 +249,13 @@ class ApplicationServicesHandler: self, services: List[ApplicationService], stream_key: str, - new_token: Optional[int], + new_token: int, users: Collection[Union[str, UserID]], ) -> None: logger.debug("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: - # Only handle typing if we have the latest token - if stream_key == "typing_key" and new_token is not None: + if stream_key == "typing_key": # Note that we don't persist the token (via set_type_stream_id_for_appservice) # for typing_key due to performance reasons and due to their highly # ephemeral nature. @@ -248,26 +265,37 @@ class ApplicationServicesHandler: events = await self._handle_typing(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + continue - elif stream_key == "receipt_key": - events = await self._handle_receipts(service) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) - - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token + # Since we read/update the stream position for this AS/stream + with ( + await self._ephemeral_events_linearizer.queue( + (service.id, stream_key) ) + ): + if stream_key == "receipt_key": + events = await self._handle_receipts(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) - elif stream_key == "presence_key": - events = await self._handle_presence(service, users) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing( self, service: ApplicationService, new_token: int @@ -304,7 +332,9 @@ class ApplicationServicesHandler: ) return typing - async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: + async def _handle_receipts( + self, service: ApplicationService, new_token: Optional[int] + ) -> List[JsonDict]: """ Return the latest read receipts that the given application service should receive. @@ -323,6 +353,12 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key @@ -330,7 +366,10 @@ class ApplicationServicesHandler: return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[Union[str, UserID]] + self, + service: ApplicationService, + users: Collection[Union[str, UserID]], + new_token: Optional[int], ) -> List[JsonDict]: """ Return the latest presence updates that the given application service should receive. @@ -353,6 +392,12 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "presence" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + for user in users: if isinstance(user, str): user = UserID.from_string(user) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d508d7d32a..60e59d11a0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1989,7 +1989,9 @@ class PasswordAuthProvider: self, check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None, on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None, - auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None, + auth_checkers: Optional[ + Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK] + ] = None, ) -> None: # Register check_3pid_auth callback if check_3pid_auth is not None: diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8567cb0e00..8ca5f60b1c 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -245,7 +245,7 @@ class DirectoryHandler: servers = result.servers else: try: - fed_result = await self.federation.make_query( + fed_result: Optional[JsonDict] = await self.federation.make_query( destination=room_alias.domain, query_type="directory", args={"room_alias": room_alias.to_string()}, diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d0fb2fc7dc..60c11e3d21 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -201,95 +201,19 @@ class E2eKeysHandler: r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @trace - async def do_remote_query(destination: str) -> None: - """This is called when we are querying the device list of a user on - a remote homeserver and their device list is not in the device list - cache. If we share a room with this user and we're not querying for - specific user we will update the cache with their device list. - """ - - destination_query = remote_queries_not_in_cache[destination] - - # We first consider whether we wish to update the device list cache with - # the users device list. We want to track a user's devices when the - # authenticated user shares a room with the queried user and the query - # has not specified a particular device. - # If we update the cache for the queried user we remove them from further - # queries. We use the more efficient batched query_client_keys for all - # remaining users - user_ids_updated = [] - for (user_id, device_list) in destination_query.items(): - if user_id in user_ids_updated: - continue - - if device_list: - continue - - room_ids = await self.store.get_rooms_for_user(user_id) - if not room_ids: - continue - - # We've decided we're sharing a room with this user and should - # probably be tracking their device lists. However, we haven't - # done an initial sync on the device list so we do it now. - try: - if self._is_master: - user_devices = await self.device_handler.device_list_updater.user_device_resync( - user_id - ) - else: - user_devices = await self._user_device_resync_client( - user_id=user_id - ) - - user_devices = user_devices["devices"] - user_results = results.setdefault(user_id, {}) - for device in user_devices: - user_results[device["device_id"]] = device["keys"] - user_ids_updated.append(user_id) - except Exception as e: - failures[destination] = _exception_to_failure(e) - - if len(destination_query) == len(user_ids_updated): - # We've updated all the users in the query and we do not need to - # make any further remote calls. - return - - # Remove all the users from the query which we have updated - for user_id in user_ids_updated: - destination_query.pop(user_id) - - try: - remote_result = await self.federation.query_client_keys( - destination, {"device_keys": destination_query}, timeout=timeout - ) - - for user_id, keys in remote_result["device_keys"].items(): - if user_id in destination_query: - results[user_id] = keys - - if "master_keys" in remote_result: - for user_id, key in remote_result["master_keys"].items(): - if user_id in destination_query: - cross_signing_keys["master_keys"][user_id] = key - - if "self_signing_keys" in remote_result: - for user_id, key in remote_result["self_signing_keys"].items(): - if user_id in destination_query: - cross_signing_keys["self_signing_keys"][user_id] = key - - except Exception as e: - failure = _exception_to_failure(e) - failures[destination] = failure - set_tag("error", True) - set_tag("reason", failure) - await make_deferred_yieldable( defer.gatherResults( [ - run_in_background(do_remote_query, destination) - for destination in remote_queries_not_in_cache + run_in_background( + self._query_devices_for_destination, + results, + cross_signing_keys, + failures, + destination, + queries, + timeout, + ) + for destination, queries in remote_queries_not_in_cache.items() ], consumeErrors=True, ).addErrback(unwrapFirstError) @@ -301,6 +225,121 @@ class E2eKeysHandler: return ret + @trace + async def _query_devices_for_destination( + self, + results: JsonDict, + cross_signing_keys: JsonDict, + failures: Dict[str, JsonDict], + destination: str, + destination_query: Dict[str, Iterable[str]], + timeout: int, + ) -> None: + """This is called when we are querying the device list of a user on + a remote homeserver and their device list is not in the device list + cache. If we share a room with this user and we're not querying for + specific user we will update the cache with their device list. + + Args: + results: A map from user ID to their device keys, which gets + updated with the newly fetched keys. + cross_signing_keys: Map from user ID to their cross signing keys, + which gets updated with the newly fetched keys. + failures: Map of destinations to failures that have occurred while + attempting to fetch keys. + destination: The remote server to query + destination_query: The query dict of devices to query the remote + server for. + timeout: The timeout for remote HTTP requests. + """ + + # We first consider whether we wish to update the device list cache with + # the users device list. We want to track a user's devices when the + # authenticated user shares a room with the queried user and the query + # has not specified a particular device. + # If we update the cache for the queried user we remove them from further + # queries. We use the more efficient batched query_client_keys for all + # remaining users + user_ids_updated = [] + for (user_id, device_list) in destination_query.items(): + if user_id in user_ids_updated: + continue + + if device_list: + continue + + room_ids = await self.store.get_rooms_for_user(user_id) + if not room_ids: + continue + + # We've decided we're sharing a room with this user and should + # probably be tracking their device lists. However, we haven't + # done an initial sync on the device list so we do it now. + try: + if self._is_master: + resync_results = await self.device_handler.device_list_updater.user_device_resync( + user_id + ) + else: + resync_results = await self._user_device_resync_client( + user_id=user_id + ) + + # Add the device keys to the results. + user_devices = resync_results["devices"] + user_results = results.setdefault(user_id, {}) + for device in user_devices: + user_results[device["device_id"]] = device["keys"] + user_ids_updated.append(user_id) + + # Add any cross signing keys to the results. + master_key = resync_results.get("master_key") + self_signing_key = resync_results.get("self_signing_key") + + if master_key: + cross_signing_keys["master_keys"][user_id] = master_key + + if self_signing_key: + cross_signing_keys["self_signing_keys"][user_id] = self_signing_key + except Exception as e: + failures[destination] = _exception_to_failure(e) + + if len(destination_query) == len(user_ids_updated): + # We've updated all the users in the query and we do not need to + # make any further remote calls. + return + + # Remove all the users from the query which we have updated + for user_id in user_ids_updated: + destination_query.pop(user_id) + + try: + remote_result = await self.federation.query_client_keys( + destination, {"device_keys": destination_query}, timeout=timeout + ) + + for user_id, keys in remote_result["device_keys"].items(): + if user_id in destination_query: + results[user_id] = keys + + if "master_keys" in remote_result: + for user_id, key in remote_result["master_keys"].items(): + if user_id in destination_query: + cross_signing_keys["master_keys"][user_id] = key + + if "self_signing_keys" in remote_result: + for user_id, key in remote_result["self_signing_keys"].items(): + if user_id in destination_query: + cross_signing_keys["self_signing_keys"][user_id] = key + + except Exception as e: + failure = _exception_to_failure(e) + failures[destination] = failure + set_tag("error", True) + set_tag("reason", failure) + + return + async def get_cross_signing_keys_from_cache( self, query: Iterable[str], from_user_id: Optional[str] ) -> Dict[str, Dict[str, dict]]: diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9584d5bd46..1a1cd93b1a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -477,7 +477,7 @@ class FederationEventHandler: @log_function async def backfill( - self, dest: str, room_id: str, limit: int, extremities: Iterable[str] + self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> None: """Trigger a backfill request to `dest` for the given `room_id` @@ -1643,7 +1643,7 @@ class FederationEventHandler: event: the event whose auth_events we want Returns: - all of the events in `event.auth_events`, after deduplication + all of the events listed in `event.auth_events_ids`, after deduplication Raises: AuthError if we were unable to fetch the auth_events for any reason. @@ -1916,7 +1916,7 @@ class FederationEventHandler: event_pos = PersistedEventPosition( self._instance_name, event.internal_metadata.stream_ordering ) - self._notifier.on_new_room_event( + await self._notifier.on_new_room_event( event, event_pos, max_stream_token, extra_users=extra_users ) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 7ef8698a5e..3dbe611f95 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -537,10 +537,6 @@ class IdentityHandler: except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") - # It is already checked that public_baseurl is configured since this code - # should only be used if account_threepid_delegate_msisdn is true. - assert self.hs.config.server.public_baseurl - # we need to tell the client to send the token back to us, since it doesn't # otherwise know where to send it, so add submit_url response parameter # (see also MSC2078) @@ -879,6 +875,8 @@ class IdentityHandler: } if room_type is not None: + invite_config["room_type"] = room_type + # TODO The unstable field is deprecated and should be removed in the future. invite_config["org.matrix.msc3288.room_type"] = room_type # If a custom web client location is available, include it in the request. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 2e024b551f..d4c2a6ab7a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1318,6 +1318,8 @@ class EventCreationHandler: # user is actually admin or not). is_admin_redaction = False if event.type == EventTypes.Redaction: + assert event.redacts is not None + original_event = await self.store.get_event( event.redacts, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -1413,6 +1415,8 @@ class EventCreationHandler: ) if event.type == EventTypes.Redaction: + assert event.redacts is not None + original_event = await self.store.get_event( event.redacts, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -1500,11 +1504,13 @@ class EventCreationHandler: next_batch_id = event.content.get( EventContentFields.MSC2716_NEXT_BATCH_ID ) - conflicting_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( - event.room_id, next_batch_id + conflicting_insertion_event_id = None + if next_batch_id: + conflicting_insertion_event_id = ( + await self.store.get_insertion_event_id_by_batch_id( + event.room_id, next_batch_id + ) ) - ) if conflicting_insertion_event_id is not None: # The current insertion event that we're processing is invalid # because an insertion event already exists in the room with the @@ -1537,13 +1543,16 @@ class EventCreationHandler: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - def _notify() -> None: + async def _notify() -> None: try: - self.notifier.on_new_room_event( + await self.notifier.on_new_room_event( event, event_pos, max_stream_token, extra_users=extra_users ) except Exception: - logger.exception("Error notifying about new room event") + logger.exception( + "Error notifying about new room event %s", + event.event_id, + ) run_in_background(_notify) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 60ff896386..abfe7be0e3 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -438,7 +438,7 @@ class PaginationHandler: } state = None - if event_filter and event_filter.lazy_load_members() and len(events) > 0: + if event_filter and event_filter.lazy_load_members and len(events) > 0: # TODO: remove redundant members # FIXME: we also care about invite targets etc. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fdab50da37..3df872c578 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,7 @@ import synapse.metrics from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.api.presence import UserPresenceState +from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background from synapse.logging.utils import log_function @@ -1551,6 +1552,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): is_guest: bool = False, explicit_room_id: Optional[str] = None, include_offline: bool = True, + service: Optional[ApplicationService] = None, ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index e6c3cf585b..6b5a6ded8b 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -456,7 +456,11 @@ class ProfileHandler: continue new_name = profile.get("displayname") + if not isinstance(new_name, str): + new_name = None new_avatar = profile.get("avatar_url") + if not isinstance(new_avatar, str): + new_avatar = None # We always hit update to update the last_check timestamp await self.store.update_remote_profile_cache(user_id, new_name, new_avatar) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cf01d58ea1..969eb3b9b0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -525,7 +525,7 @@ class RoomCreationHandler: ): await self.room_member_handler.update_membership( requester, - UserID.from_string(old_event["state_key"]), + UserID.from_string(old_event.state_key), new_room_id, "ban", ratelimit=False, @@ -1173,7 +1173,7 @@ class RoomContextHandler: else: last_event_id = event_id - if event_filter and event_filter.lazy_load_members(): + if event_filter and event_filter.lazy_load_members: state_filter = StateFilter.from_lazy_load_member_list( ev.sender for ev in itertools.chain( diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 2f5a3e4d19..0723286383 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -355,7 +355,7 @@ class RoomBatchHandler: for (event, context) in reversed(events_to_persist): await self.event_creation_handler.handle_new_client_event( await self.create_requester_for_user_id_from_app_service( - event["sender"], app_service_requester.app_service + event.sender, app_service_requester.app_service ), event=event, context=context, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 74e6c7eca6..08244b690d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1669,7 +1669,9 @@ class RoomMemberMasterHandler(RoomMemberHandler): # # the prev_events consist solely of the previous membership event. prev_event_ids = [previous_membership_event.event_id] - auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids + auth_event_ids = ( + list(previous_membership_event.auth_event_ids()) + prev_event_ids + ) event, context = await self.event_creation_handler.create_event( requester, diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index a3ffa26be8..6e4dff8056 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -249,7 +249,7 @@ class SearchHandler: ) events.sort(key=lambda e: -rank_map[e.event_id]) - allowed_events = events[: search_filter.limit()] + allowed_events = events[: search_filter.limit] for e in allowed_events: rm = room_groups.setdefault( @@ -271,13 +271,13 @@ class SearchHandler: # We keep looping and we keep filtering until we reach the limit # or we run out of things. # But only go around 5 times since otherwise synapse will be sad. - while len(room_events) < search_filter.limit() and i < 5: + while len(room_events) < search_filter.limit and i < 5: i += 1 search_result = await self.store.search_rooms( room_ids, search_term, keys, - search_filter.limit() * 2, + search_filter.limit * 2, pagination_token=pagination_token, ) @@ -299,9 +299,9 @@ class SearchHandler: ) room_events.extend(events) - room_events = room_events[: search_filter.limit()] + room_events = room_events[: search_filter.limit] - if len(results) < search_filter.limit() * 2: + if len(results) < search_filter.limit * 2: pagination_token = None break else: @@ -311,7 +311,7 @@ class SearchHandler: group = room_groups.setdefault(event.room_id, {"results": []}) group["results"].append(event.event_id) - if room_events and len(room_events) >= search_filter.limit(): + if room_events and len(room_events) >= search_filter.limit: last_event_id = room_events[-1].event_id pagination_token = results_map[last_event_id]["pagination_token"] diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c411d69924..22c6174821 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -62,8 +62,8 @@ class FollowerTypingHandler: if hs.should_send_federation(): self.federation = hs.get_federation_sender() - if hs.config.worker.writers.typing != hs.get_instance_name(): - hs.get_federation_registry().register_instance_for_edu( + if hs.get_instance_name() not in hs.config.worker.writers.typing: + hs.get_federation_registry().register_instances_for_edu( "m.typing", hs.config.worker.writers.typing, ) @@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - assert hs.config.worker.writers.typing == hs.get_instance_name() + assert hs.get_instance_name() in hs.config.worker.writers.typing self.auth = hs.get_auth() self.notifier = hs.get_notifier() |