diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/auth.py | 2 | ||||
-rw-r--r-- | synapse/handlers/deactivate_account.py | 4 | ||||
-rw-r--r-- | synapse/handlers/device.py | 28 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 6 | ||||
-rw-r--r-- | synapse/handlers/event_auth.py | 5 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 9 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 7 | ||||
-rw-r--r-- | synapse/handlers/message.py | 7 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 10 | ||||
-rw-r--r-- | synapse/handlers/push_rules.py | 18 | ||||
-rw-r--r-- | synapse/handlers/read_marker.py | 5 | ||||
-rw-r--r-- | synapse/handlers/register.py | 27 | ||||
-rw-r--r-- | synapse/handlers/room.py | 10 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 179 | ||||
-rw-r--r-- | synapse/handlers/room_member_worker.py | 3 | ||||
-rw-r--r-- | synapse/handlers/sso.py | 9 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 20 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 3 |
18 files changed, 269 insertions, 83 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 1e89447044..59e340974d 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -212,7 +212,7 @@ class AuthHandler: self._password_enabled_for_login = hs.config.auth.password_enabled_for_login self._password_enabled_for_reauth = hs.config.auth.password_enabled_for_reauth self._password_localdb_enabled = hs.config.auth.password_localdb_enabled - self._third_party_rules = hs.get_third_party_event_rules() + self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules # Ratelimiter for failed auth during UIA. Uses same ratelimit config # as per `rc_login.failed_attempts`. diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index bd5867491b..f299b89a1b 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -39,11 +39,11 @@ class DeactivateAccountHandler: self._profile_handler = hs.get_profile_handler() self.user_directory_handler = hs.get_user_directory_handler() self._server_name = hs.hostname - self._third_party_rules = hs.get_third_party_event_rules() + self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules # Flag that indicates whether the process to part users from rooms is running self._user_parter_running = False - self._third_party_rules = hs.get_third_party_event_rules() + self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index b9d3b7fbc6..5d12a39e26 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -75,10 +75,14 @@ class DeviceWorkerHandler: self.store = hs.get_datastores().main self.notifier = hs.get_notifier() self.state = hs.get_state_handler() + self._appservice_handler = hs.get_application_service_handler() self._state_storage = hs.get_storage_controllers().state self._auth_handler = hs.get_auth_handler() self.server_name = hs.hostname self._msc3852_enabled = hs.config.experimental.msc3852_enabled + self._query_appservices_for_keys = ( + hs.config.experimental.msc3984_appservice_key_query + ) self.device_list_updater = DeviceListWorkerUpdater(hs) @@ -328,6 +332,30 @@ class DeviceWorkerHandler: user_id, "self_signing" ) + # Check if the application services have any results. + if self._query_appservices_for_keys: + # Query the appservice for all devices for this user. + query: Dict[str, Optional[List[str]]] = {user_id: None} + + # Query the appservices for any keys. + appservice_results = await self._appservice_handler.query_keys(query) + + # Merge results, overriding anything from the database. + appservice_devices = appservice_results.get("device_keys", {}).get( + user_id, {} + ) + + # Filter the database results to only those devices that the appservice has + # *not* responded with. + devices = [d for d in devices if d["device_id"] not in appservice_devices] + # Append the appservice response by wrapping each result in another dictionary. + devices.extend( + {"device_id": device_id, "keys": device} + for device_id, device in appservice_devices.items() + ) + + # TODO Handle cross-signing keys. + return { "user_id": user_id, "stream_id": stream_id, diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 5e8316e2e5..1e0623c7f8 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -52,7 +52,9 @@ class DirectoryHandler: self.config = hs.config self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search self.require_membership = hs.config.server.require_membership_for_aliases - self.third_party_event_rules = hs.get_third_party_event_rules() + self._third_party_event_rules = ( + hs.get_module_api_callbacks().third_party_event_rules + ) self.server_name = hs.hostname self.federation = hs.get_federation_client() @@ -503,7 +505,7 @@ class DirectoryHandler: # Check if publishing is blocked by a third party module allowed_by_third_party_rules = ( await ( - self.third_party_event_rules.check_visibility_can_be_modified( + self._third_party_event_rules.check_visibility_can_be_modified( room_id, visibility ) ) diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py index 0db0bd7304..3e37c0cbe2 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py @@ -29,7 +29,7 @@ from synapse.event_auth import ( ) from synapse.events import EventBase from synapse.events.builder import EventBuilder -from synapse.types import StateMap, StrCollection, get_domain_from_id +from synapse.types import StateMap, StrCollection if TYPE_CHECKING: from synapse.server import HomeServer @@ -47,6 +47,7 @@ class EventAuthHandler: self._store = hs.get_datastores().main self._state_storage_controller = hs.get_storage_controllers().state self._server_name = hs.hostname + self._is_mine_id = hs.is_mine_id async def check_auth_rules_from_context( self, @@ -247,7 +248,7 @@ class EventAuthHandler: if not await self.is_user_in_rooms(allowed_rooms, user_id): # If this is a remote request, the user might be in an allowed room # that we do not know about. - if get_domain_from_id(user_id) != self._server_name: + if not self._is_mine_id(user_id): for room_id in allowed_rooms: if not await self._store.is_host_joined(room_id, self._server_name): raise SynapseError( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d1a88cc604..19dec4812f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -141,6 +141,7 @@ class FederationHandler: self.server_name = hs.hostname self.keyring = hs.get_keyring() self.is_mine_id = hs.is_mine_id + self.is_mine_server_name = hs.is_mine_server_name self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker self.event_creation_handler = hs.get_event_creation_handler() self.event_builder_factory = hs.get_event_builder_factory() @@ -169,7 +170,9 @@ class FederationHandler: self._room_backfill = Linearizer("room_backfill") - self.third_party_event_rules = hs.get_third_party_event_rules() + self._third_party_event_rules = ( + hs.get_module_api_callbacks().third_party_event_rules + ) # Tracks running partial state syncs by room ID. # Partial state syncs currently only run on the main process, so it's okay to @@ -451,7 +454,7 @@ class FederationHandler: for dom in domains: # We don't want to ask our own server for information we don't have - if dom == self.server_name: + if self.is_mine_server_name(dom): continue try: @@ -1253,7 +1256,7 @@ class FederationHandler: unpersisted_context, ) = await self.event_creation_handler.create_new_client_event(builder=builder) - event_allowed, _ = await self.third_party_event_rules.check_event_allowed( + event_allowed, _ = await self._third_party_event_rules.check_event_allowed( event, unpersisted_context ) if not event_allowed: diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 06609fab93..06343d40e4 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -157,10 +157,13 @@ class FederationEventHandler: self._get_room_member_handler = hs.get_room_member_handler self._federation_client = hs.get_federation_client() - self._third_party_event_rules = hs.get_third_party_event_rules() + self._third_party_event_rules = ( + hs.get_module_api_callbacks().third_party_event_rules + ) self._notifier = hs.get_notifier() self._is_mine_id = hs.is_mine_id + self._is_mine_server_name = hs.is_mine_server_name self._server_name = hs.hostname self._instance_name = hs.get_instance_name() @@ -686,7 +689,7 @@ class FederationEventHandler: server from invalid events (there is probably no point in trying to re-fetch invalid events from every other HS in the room.) """ - if dest == self._server_name: + if self._is_mine_server_name(dest): raise SynapseError(400, "Can't backfill from self.") events = await self._federation_client.backfill( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ac1932a7f9..0b61c2272b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -77,7 +77,6 @@ from synapse.util.metrics import measure_func from synapse.visibility import get_effective_room_visibility_from_state if TYPE_CHECKING: - from synapse.events.third_party_rules import ThirdPartyEventRules from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -509,8 +508,8 @@ class EventCreationHandler: self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker - self.third_party_event_rules: "ThirdPartyEventRules" = ( - self.hs.get_third_party_event_rules() + self._third_party_event_rules = ( + self.hs.get_module_api_callbacks().third_party_event_rules ) self._block_events_without_consent_error = ( @@ -1314,7 +1313,7 @@ class EventCreationHandler: if requester: context.app_service = requester.app_service - res, new_content = await self.third_party_event_rules.check_event_allowed( + res, new_content = await self._third_party_event_rules.check_event_allowed( event, context ) if res is False: diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 440d3f4acd..a9160c87e3 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -59,9 +59,9 @@ class ProfileHandler: self.max_avatar_size = hs.config.server.max_avatar_size self.allowed_avatar_mimetypes = hs.config.server.allowed_avatar_mimetypes - self.server_name = hs.config.server.server_name + self._is_mine_server_name = hs.is_mine_server_name - self._third_party_rules = hs.get_third_party_event_rules() + self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict: target_user = UserID.from_string(user_id) @@ -170,8 +170,8 @@ class ProfileHandler: displayname_to_set = None # If the admin changes the display name of a user, the requesting user cannot send - # the join event to update the displayname in the rooms. - # This must be done by the target user himself. + # the join event to update the display name in the rooms. + # This must be done by the target user themselves. if by_admin: requester = create_requester( target_user, @@ -309,7 +309,7 @@ class ProfileHandler: else: server_name = host - if server_name == self.server_name: + if self._is_mine_server_name(server_name): media_info = await self.store.get_local_media(media_id) else: media_info = await self.store.get_cached_remote_media(server_name, media_id) diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py index 1219672a59..7ed88a3611 100644 --- a/synapse/handlers/push_rules.py +++ b/synapse/handlers/push_rules.py @@ -11,14 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union import attr from synapse.api.errors import SynapseError, UnrecognizedRequestError +from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.push_rule import RuleNotFoundException from synapse.synapse_rust.push import get_base_rule_ids -from synapse.types import JsonDict +from synapse.types import JsonDict, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -115,6 +116,17 @@ class PushRulesHandler: stream_id = self._main_store.get_max_push_rules_stream_id() self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) + async def push_rules_for_user( + self, user: UserID + ) -> Dict[str, Dict[str, List[Dict[str, Any]]]]: + """ + Push rules aren't really account data, but get formatted as such for /sync. + """ + user_id = user.to_string() + rules_raw = await self._main_store.get_push_rules_for_user(user_id) + rules = format_push_rules_for_user(user, rules_raw) + return rules + def check_actions(actions: List[Union[str, JsonDict]]) -> None: """Check if the given actions are spec compliant. @@ -129,6 +141,8 @@ def check_actions(actions: List[Union[str, JsonDict]]) -> None: raise InvalidRuleException("No actions found") for a in actions: + # "dont_notify" and "coalesce" are legacy actions. They are allowed, but + # ignored (resulting in no action from the pusher). if a in ["notify", "dont_notify", "coalesce"]: pass elif isinstance(a, dict) and "set_tweak" in a: diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 05122fd5a6..6d35e61880 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING +from synapse.api.constants import ReceiptTypes from synapse.util.async_helpers import Linearizer if TYPE_CHECKING: @@ -42,7 +43,7 @@ class ReadMarkerHandler: async with self.read_marker_linearizer.queue((room_id, user_id)): existing_read_marker = await self.store.get_account_data_for_room_and_type( - user_id, room_id, "m.fully_read" + user_id, room_id, ReceiptTypes.FULLY_READ ) should_update = True @@ -56,5 +57,5 @@ class ReadMarkerHandler: if should_update: content = {"event_id": event_id} await self.account_data_handler.add_account_data_to_room( - user_id, room_id, "m.fully_read", content + user_id, room_id, ReceiptTypes.FULLY_READ, content ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 61c4b833bd..c80946c2e9 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -46,7 +46,7 @@ from synapse.replication.http.register import ( ReplicationRegisterServlet, ) from synapse.spam_checker_api import RegistrationBehaviour -from synapse.types import RoomAlias, UserID, create_requester +from synapse.types import GUEST_USER_ID_PATTERN, RoomAlias, UserID, create_requester from synapse.types.state import StateFilter if TYPE_CHECKING: @@ -143,10 +143,15 @@ class RegistrationHandler: assigned_user_id: Optional[str] = None, inhibit_user_in_use_error: bool = False, ) -> None: - if types.contains_invalid_mxid_characters(localpart): + if types.contains_invalid_mxid_characters( + localpart, self.hs.config.experimental.msc4009_e164_mxids + ): + extra_chars = ( + "=_-./+" if self.hs.config.experimental.msc4009_e164_mxids else "=_-./" + ) raise SynapseError( 400, - "User ID can only contain characters a-z, 0-9, or '=_-./'", + f"User ID can only contain characters a-z, 0-9, or '{extra_chars}'", Codes.INVALID_USERNAME, ) @@ -195,16 +200,12 @@ class RegistrationHandler: errcode=Codes.FORBIDDEN, ) - if guest_access_token is None: - try: - int(localpart) - raise SynapseError( - 400, - "Numeric user IDs are reserved for guest users.", - errcode=Codes.INVALID_USERNAME, - ) - except ValueError: - pass + if guest_access_token is None and GUEST_USER_ID_PATTERN.fullmatch(localpart): + raise SynapseError( + 400, + "Numeric user IDs are reserved for guest users.", + errcode=Codes.INVALID_USERNAME, + ) async def register_user( self, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index efd9612d90..5e1702d78a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -160,7 +160,9 @@ class RoomCreationHandler: ) self._server_notices_mxid = hs.config.servernotices.server_notices_mxid - self.third_party_event_rules = hs.get_third_party_event_rules() + self._third_party_event_rules = ( + hs.get_module_api_callbacks().third_party_event_rules + ) async def upgrade_room( self, requester: Requester, old_room_id: str, new_version: RoomVersion @@ -742,7 +744,7 @@ class RoomCreationHandler: # Let the third party rules modify the room creation config if needed, or abort # the room creation entirely with an exception. - await self.third_party_event_rules.on_create_room( + await self._third_party_event_rules.on_create_room( requester, config, is_requester_admin=is_requester_admin ) @@ -879,7 +881,7 @@ class RoomCreationHandler: # Check whether this visibility value is blocked by a third party module allowed_by_third_party_rules = ( await ( - self.third_party_event_rules.check_visibility_can_be_modified( + self._third_party_event_rules.check_visibility_can_be_modified( room_id, visibility ) ) @@ -1731,7 +1733,7 @@ class RoomShutdownHandler: self.room_member_handler = hs.get_room_member_handler() self._room_creation_handler = hs.get_room_creation_handler() self._replication = hs.get_replication_data_handler() - self._third_party_rules = hs.get_third_party_event_rules() + self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastores().main diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed805d6ec8..af0ca5c26d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -16,7 +16,7 @@ import abc import logging import random from http import HTTPStatus -from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple from synapse import types from synapse.api.constants import ( @@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN +from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.logging import opentracing +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.types import ( JsonDict, @@ -97,7 +100,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.clock = hs.get_clock() self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker - self.third_party_event_rules = hs.get_third_party_event_rules() + self._third_party_event_rules = ( + hs.get_module_api_callbacks().third_party_event_rules + ) self._server_notices_mxid = self.config.servernotices.server_notices_mxid self._enable_lookup = hs.config.registration.enable_3pid_lookup self.allow_per_room_profiles = self.config.server.allow_per_room_profiles @@ -280,9 +285,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): """ raise NotImplementedError() - @abc.abstractmethod async def forget(self, user: UserID, room_id: str) -> None: - raise NotImplementedError() + user_id = user.to_string() + + member = await self._storage_controllers.state.get_current_state_event( + room_id=room_id, event_type=EventTypes.Member, state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, + Membership.BAN, + ]: + raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) + + # In normal case this call is only required if `membership` is not `None`. + # But: After the last member had left the room, the background update + # `_background_remove_left_rooms` is deleting rows related to this room from + # the table `current_state_events` and `get_current_state_events` is `None`. + await self.store.forget(user_id, room_id) async def ratelimit_multiple_invites( self, @@ -1541,7 +1562,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # can't just rely on the standard ratelimiting of events. await self._third_party_invite_limiter.ratelimit(requester) - can_invite = await self.third_party_event_rules.check_threepid_can_be_invited( + can_invite = await self._third_party_event_rules.check_threepid_can_be_invited( medium, address, room_id ) if not can_invite: @@ -2046,25 +2067,141 @@ class RoomMemberMasterHandler(RoomMemberHandler): """Implements RoomMemberHandler._user_left_room""" user_left_room(self.distributor, target, room_id) - async def forget(self, user: UserID, room_id: str) -> None: - user_id = user.to_string() - member = await self._storage_controllers.state.get_current_state_event( - room_id=room_id, event_type=EventTypes.Member, state_key=user_id - ) - membership = member.membership if member else None +class RoomForgetterHandler(StateDeltasHandler): + """Forgets rooms when they are left, when enabled in the homeserver config. - if membership is not None and membership not in [ - Membership.LEAVE, - Membership.BAN, - ]: - raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) + For the purposes of this feature, kicks, bans and "leaves" via state resolution + weirdness are all considered to be leaves. - # In normal case this call is only required if `membership` is not `None`. - # But: After the last member had left the room, the background update - # `_background_remove_left_rooms` is deleting rows related to this room from - # the table `current_state_events` and `get_current_state_events` is `None`. - await self.store.forget(user_id, room_id) + Derived from `StatsHandler` and `UserDirectoryHandler`. + """ + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._hs = hs + self._store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() + self._clock = hs.get_clock() + self._notifier = hs.get_notifier() + self._room_member_handler = hs.get_room_member_handler() + + # The current position in the current_state_delta stream + self.pos: Optional[int] = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if hs.config.worker.run_background_tasks: + self._notifier.add_replication_callback(self.notify_new_event) + + # We kick this off to pick up outstanding work from before the last restart. + self._clock.call_later(0, self.notify_new_event) + + def notify_new_event(self) -> None: + """Called when there may be more deltas to process""" + if self._is_processing: + return + + self._is_processing = True + + async def process() -> None: + try: + await self._unsafe_process() + finally: + self._is_processing = False + + run_as_background_process("room_forgetter.notify_new_event", process) + + async def _unsafe_process(self) -> None: + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = await self._store.get_room_forgetter_stream_pos() + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding room forgetter processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering + + if not self._hs.config.room.forget_on_leave: + # Update the processing position, so that if the server admin turns the + # feature on at a later date, we don't decide to forget every room that + # has ever been left in the past. + self.pos = self._store.get_room_max_stream_ordering() + await self._store.update_room_forgetter_stream_pos(self.pos) + return + + # Loop round handling deltas until we're up to date + + while True: + # Be sure to read the max stream_ordering *before* checking if there are any outstanding + # deltas, since there is otherwise a chance that we could miss updates which arrive + # after we check the deltas. + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos == room_max_stream_ordering: + break + + logger.debug( + "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering + ) + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) + + logger.debug("Handling %d state deltas", len(deltas)) + await self._handle_deltas(deltas) + + self.pos = max_pos + + # Expose current event processing position to prometheus + event_processing_positions.labels("room_forgetter").set(max_pos) + + await self._store.update_room_forgetter_stream_pos(max_pos) + + async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: + """Called with the state deltas to process""" + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + if typ != EventTypes.Member: + continue + + if not self._hs.is_mine_id(state_key): + continue + + change = await self._get_key_change( + prev_event_id, + event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + is_leave = change is MatchChange.now_false + + if is_leave: + try: + await self._room_member_handler.forget( + UserID.from_string(state_key), room_id + ) + except SynapseError as e: + if e.code == 400: + # The user is back in the room. + pass + else: + raise def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 76e36b8a6d..e8ff1ad063 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -137,6 +137,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler): await self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left" ) - - async def forget(self, target: UserID, room_id: str) -> None: - raise RuntimeError("Cannot forget rooms on workers.") diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index c28325323c..25fd2eb3a1 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -194,6 +194,7 @@ class SsoHandler: self._clock = hs.get_clock() self._store = hs.get_datastores().main self._server_name = hs.hostname + self._is_mine_server_name = hs.is_mine_server_name self._registration_handler = hs.get_registration_handler() self._auth_handler = hs.get_auth_handler() self._device_handler = hs.get_device_handler() @@ -224,6 +225,8 @@ class SsoHandler: self._consent_at_registration = hs.config.consent.user_consent_at_registration + self._e164_mxids = hs.config.experimental.msc4009_e164_mxids + def register_identity_provider(self, p: SsoIdentityProvider) -> None: p_id = p.idp_id assert p_id not in self._identity_providers @@ -710,7 +713,7 @@ class SsoHandler: # Since the localpart is provided via a potentially untrusted module, # ensure the MXID is valid before registering. if not attributes.localpart or contains_invalid_mxid_characters( - attributes.localpart + attributes.localpart, self._e164_mxids ): raise MappingException("localpart is invalid: %s" % (attributes.localpart,)) @@ -802,7 +805,7 @@ class SsoHandler: if profile["avatar_url"] is not None: server_name = profile["avatar_url"].split("/")[-2] media_id = profile["avatar_url"].split("/")[-1] - if server_name == self._server_name: + if self._is_mine_server_name(server_name): media = await self._media_repo.store.get_local_media(media_id) if media is not None and upload_name == media["upload_name"]: logger.info("skipping saving the user avatar") @@ -943,7 +946,7 @@ class SsoHandler: localpart, ) - if contains_invalid_mxid_characters(localpart): + if contains_invalid_mxid_characters(localpart, self._e164_mxids): raise SynapseError(400, "localpart is invalid: %s" % (localpart,)) user_id = UserID(localpart, self._server_name).to_string() user_infos = await self._store.get_users_by_id_case_insensitive(user_id) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 64d298408d..c010405be6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -50,7 +50,6 @@ from synapse.logging.opentracing import ( start_active_span, trace, ) -from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.roommember import MemberSummary @@ -261,6 +260,7 @@ class SyncHandler: self.notifier = hs.get_notifier() self.presence_handler = hs.get_presence_handler() self._relations_handler = hs.get_relations_handler() + self._push_rules_handler = hs.get_push_rules_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() self.state = hs.get_state_handler() @@ -428,12 +428,6 @@ class SyncHandler: set_tag(SynapseTags.SYNC_RESULT, bool(sync_result)) return sync_result - async def push_rules_for_user(self, user: UserID) -> Dict[str, Dict[str, list]]: - user_id = user.to_string() - rules_raw = await self.store.get_push_rules_for_user(user_id) - rules = format_push_rules_for_user(user, rules_raw) - return rules - async def ephemeral_by_room( self, sync_result_builder: "SyncResultBuilder", @@ -1777,18 +1771,18 @@ class SyncHandler: if push_rules_changed: global_account_data = dict(global_account_data) - global_account_data["m.push_rules"] = await self.push_rules_for_user( - sync_config.user - ) + global_account_data[ + AccountDataTypes.PUSH_RULES + ] = await self._push_rules_handler.push_rules_for_user(sync_config.user) else: all_global_account_data = await self.store.get_global_account_data_for_user( user_id ) global_account_data = dict(all_global_account_data) - global_account_data["m.push_rules"] = await self.push_rules_for_user( - sync_config.user - ) + global_account_data[ + AccountDataTypes.PUSH_RULES + ] = await self._push_rules_handler.push_rules_for_user(sync_config.user) account_data_for_user = ( await sync_config.filter_collection.filter_global_account_data( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 39ae44ea95..7aeae5319c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -68,6 +68,7 @@ class FollowerTypingHandler: self.server_name = hs.config.server.server_name self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id + self.is_mine_server_name = hs.is_mine_server_name self.federation = None if hs.should_send_federation(): @@ -153,7 +154,7 @@ class FollowerTypingHandler: member.room_id ) for domain in hosts: - if domain != self.server_name: + if not self.is_mine_server_name(domain): logger.debug("sending typing update to %s", domain) self.federation.build_and_send_edu( destination=domain, |