From 683d6f75af0e941e9ab3bc0a985aa6ed5cc7a238 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 20 Apr 2021 14:55:20 -0400 Subject: Rename handler and config modules which end in handler/config. (#9816) --- synapse/server.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/server.py') diff --git a/synapse/server.py b/synapse/server.py index 42d2fad8e8..59ae91b503 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -70,7 +70,7 @@ from synapse.handlers.acme import AcmeHandler from synapse.handlers.admin import AdminHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGenerator -from synapse.handlers.cas_handler import CasHandler +from synapse.handlers.cas import CasHandler from synapse.handlers.deactivate_account import DeactivateAccountHandler from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler from synapse.handlers.devicemessage import DeviceMessageHandler @@ -145,8 +145,8 @@ logger = logging.getLogger(__name__) if TYPE_CHECKING: from txredisapi import RedisProtocol - from synapse.handlers.oidc_handler import OidcHandler - from synapse.handlers.saml_handler import SamlHandler + from synapse.handlers.oidc import OidcHandler + from synapse.handlers.saml import SamlHandler T = TypeVar("T", bound=Callable[..., Any]) @@ -696,13 +696,13 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_saml_handler(self) -> "SamlHandler": - from synapse.handlers.saml_handler import SamlHandler + from synapse.handlers.saml import SamlHandler return SamlHandler(self) @cache_in_self def get_oidc_handler(self) -> "OidcHandler": - from synapse.handlers.oidc_handler import OidcHandler + from synapse.handlers.oidc import OidcHandler return OidcHandler(self) -- cgit 1.5.1 From d924827da1db5d210eb06db2247a1403ed4c8b9a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 23 Apr 2021 07:05:51 -0400 Subject: Check for space membership during a remote join of a restricted room (#9814) When receiving a /send_join request for a room with join rules set to 'restricted', check if the user is a member of the spaces defined in the 'allow' key of the join rules. This only applies to an experimental room version, as defined in MSC3083. --- changelog.d/9814.feature | 1 + synapse/api/auth.py | 1 + synapse/handlers/event_auth.py | 86 +++++++++++++++++++++++++++++++++++++++++ synapse/handlers/federation.py | 44 ++++++++++++++++----- synapse/handlers/room_member.py | 62 ++--------------------------- synapse/server.py | 5 +++ 6 files changed, 131 insertions(+), 68 deletions(-) create mode 100644 changelog.d/9814.feature create mode 100644 synapse/handlers/event_auth.py (limited to 'synapse/server.py') diff --git a/changelog.d/9814.feature b/changelog.d/9814.feature new file mode 100644 index 0000000000..9404ad2fc0 --- /dev/null +++ b/changelog.d/9814.feature @@ -0,0 +1 @@ +Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 872fd100cd..2d845d0d5c 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -65,6 +65,7 @@ class Auth: """ FIXME: This class contains a mix of functions for authenticating users of our client-server API and authenticating events added to room graphs. + The latter should be moved to synapse.handlers.event_auth.EventAuthHandler. """ def __init__(self, hs): diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py new file mode 100644 index 0000000000..eff639f407 --- /dev/null +++ b/synapse/handlers/event_auth.py @@ -0,0 +1,86 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +from synapse.api.constants import EventTypes, JoinRules +from synapse.api.room_versions import RoomVersion +from synapse.types import StateMap + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class EventAuthHandler: + """ + This class contains methods for authenticating events added to room graphs. + """ + + def __init__(self, hs: "HomeServer"): + self._store = hs.get_datastore() + + async def can_join_without_invite( + self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str + ) -> bool: + """ + Check whether a user can join a room without an invite. + + When joining a room with restricted joined rules (as defined in MSC3083), + the membership of spaces must be checked during join. + + Args: + state_ids: The state of the room as it currently is. + room_version: The room version of the room being joined. + user_id: The user joining the room. + + Returns: + True if the user can join the room, false otherwise. + """ + # This only applies to room versions which support the new join rule. + if not room_version.msc3083_join_rules: + return True + + # If there's no join rule, then it defaults to invite (so this doesn't apply). + join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None) + if not join_rules_event_id: + return True + + # If the join rule is not restricted, this doesn't apply. + join_rules_event = await self._store.get_event(join_rules_event_id) + if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED: + return True + + # If allowed is of the wrong form, then only allow invited users. + allowed_spaces = join_rules_event.content.get("allow", []) + if not isinstance(allowed_spaces, list): + return False + + # Get the list of joined rooms and see if there's an overlap. + joined_rooms = await self._store.get_rooms_for_user(user_id) + + # Pull out the other room IDs, invalid data gets filtered. + for space in allowed_spaces: + if not isinstance(space, dict): + continue + + space_id = space.get("space") + if not isinstance(space_id, str): + continue + + # The user was joined to one of the spaces specified, they can join + # this room! + if space_id in joined_rooms: + return True + + # The user was not in any of the required spaces. + return False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dbdd7d2db3..9d867aaf4d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -146,6 +146,7 @@ class FederationHandler(BaseHandler): self.is_mine_id = hs.is_mine_id self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() + self._event_auth_handler = hs.get_event_auth_handler() self._message_handler = hs.get_message_handler() self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config @@ -1673,8 +1674,40 @@ class FederationHandler(BaseHandler): # would introduce the danger of backwards-compatibility problems. event.internal_metadata.send_on_behalf_of = origin + # Calculate the event context. context = await self.state_handler.compute_event_context(event) - context = await self._auth_and_persist_event(origin, event, context) + + # Get the state before the new event. + prev_state_ids = await context.get_prev_state_ids() + + # Check if the user is already in the room or invited to the room. + user_id = event.state_key + prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) + newly_joined = True + user_is_invited = False + if prev_member_event_id: + prev_member_event = await self.store.get_event(prev_member_event_id) + newly_joined = prev_member_event.membership != Membership.JOIN + user_is_invited = prev_member_event.membership == Membership.INVITE + + # If the member is not already in the room, and not invited, check if + # they should be allowed access via membership in a space. + if ( + newly_joined + and not user_is_invited + and not await self._event_auth_handler.can_join_without_invite( + prev_state_ids, + event.room_version, + user_id, + ) + ): + raise AuthError( + 403, + "You do not belong to any of the required spaces to join this room.", + ) + + # Persist the event. + await self._auth_and_persist_event(origin, event, context) logger.debug( "on_send_join_request: After _auth_and_persist_event: %s, sigs: %s", @@ -1682,8 +1715,6 @@ class FederationHandler(BaseHandler): event.signatures, ) - prev_state_ids = await context.get_prev_state_ids() - state_ids = list(prev_state_ids.values()) auth_chain = await self.store.get_auth_chain(event.room_id, state_ids) @@ -2006,7 +2037,7 @@ class FederationHandler(BaseHandler): state: Optional[Iterable[EventBase]] = None, auth_events: Optional[MutableStateMap[EventBase]] = None, backfilled: bool = False, - ) -> EventContext: + ) -> None: """ Process an event by performing auth checks and then persisting to the database. @@ -2028,9 +2059,6 @@ class FederationHandler(BaseHandler): event is an outlier), may be the auth events claimed by the remote server. backfilled: True if the event was backfilled. - - Returns: - The event context. """ context = await self._check_event_auth( origin, @@ -2060,8 +2088,6 @@ class FederationHandler(BaseHandler): ) raise - return context - async def _auth_and_persist_events( self, origin: str, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 2bbfac6471..2c5bada1d8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -19,7 +19,7 @@ from http import HTTPStatus from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple from synapse import types -from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership +from synapse.api.constants import AccountDataTypes, EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, @@ -28,7 +28,6 @@ from synapse.api.errors import ( SynapseError, ) from synapse.api.ratelimiting import Ratelimiter -from synapse.api.room_versions import RoomVersion from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID @@ -64,6 +63,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.profile_handler = hs.get_profile_handler() self.event_creation_handler = hs.get_event_creation_handler() self.account_data_handler = hs.get_account_data_handler() + self.event_auth_handler = hs.get_event_auth_handler() self.member_linearizer = Linearizer(name="member") @@ -178,62 +178,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): await self._invites_per_user_limiter.ratelimit(requester, invitee_user_id) - async def _can_join_without_invite( - self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str - ) -> bool: - """ - Check whether a user can join a room without an invite. - - When joining a room with restricted joined rules (as defined in MSC3083), - the membership of spaces must be checked during join. - - Args: - state_ids: The state of the room as it currently is. - room_version: The room version of the room being joined. - user_id: The user joining the room. - - Returns: - True if the user can join the room, false otherwise. - """ - # This only applies to room versions which support the new join rule. - if not room_version.msc3083_join_rules: - return True - - # If there's no join rule, then it defaults to public (so this doesn't apply). - join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None) - if not join_rules_event_id: - return True - - # If the join rule is not restricted, this doesn't apply. - join_rules_event = await self.store.get_event(join_rules_event_id) - if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED: - return True - - # If allowed is of the wrong form, then only allow invited users. - allowed_spaces = join_rules_event.content.get("allow", []) - if not isinstance(allowed_spaces, list): - return False - - # Get the list of joined rooms and see if there's an overlap. - joined_rooms = await self.store.get_rooms_for_user(user_id) - - # Pull out the other room IDs, invalid data gets filtered. - for space in allowed_spaces: - if not isinstance(space, dict): - continue - - space_id = space.get("space") - if not isinstance(space_id, str): - continue - - # The user was joined to one of the spaces specified, they can join - # this room! - if space_id in joined_rooms: - return True - - # The user was not in any of the required spaces. - return False - async def _local_membership_update( self, requester: Requester, @@ -302,7 +246,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if ( newly_joined and not user_is_invited - and not await self._can_join_without_invite( + and not await self.event_auth_handler.can_join_without_invite( prev_state_ids, event.room_version, user_id ) ): diff --git a/synapse/server.py b/synapse/server.py index 59ae91b503..67598fffe3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -77,6 +77,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.directory import DirectoryHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler +from synapse.handlers.event_auth import EventAuthHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.federation import FederationHandler from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler @@ -746,6 +747,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_space_summary_handler(self) -> SpaceSummaryHandler: return SpaceSummaryHandler(self) + @cache_in_self + def get_event_auth_handler(self) -> EventAuthHandler: + return EventAuthHandler(self) + @cache_in_self def get_external_cache(self) -> ExternalCache: return ExternalCache(self) -- cgit 1.5.1 From 9d25a0ae65ce8728d0fda1eebaf0b469316f84d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Apr 2021 12:21:55 +0100 Subject: Split presence out of master (#9820) --- changelog.d/9820.feature | 1 + scripts/synapse_port_db | 7 +- synapse/app/generic_worker.py | 31 +------- synapse/config/workers.py | 27 ++++++- synapse/handlers/presence.py | 56 ++++++++----- synapse/replication/http/_base.py | 5 +- synapse/replication/slave/storage/presence.py | 50 ------------ synapse/replication/tcp/handler.py | 18 ++++- synapse/replication/tcp/streams/_base.py | 17 ++-- synapse/rest/client/v1/presence.py | 7 +- synapse/server.py | 6 +- synapse/storage/databases/main/__init__.py | 47 +---------- synapse/storage/databases/main/presence.py | 92 +++++++++++++++++++++- .../schema/delta/59/12presence_stream_instance.sql | 18 +++++ .../59/12presence_stream_instance_seq.sql.postgres | 20 +++++ tests/app/test_frontend_proxy.py | 83 ------------------- tests/rest/client/v1/test_presence.py | 5 +- 17 files changed, 245 insertions(+), 245 deletions(-) create mode 100644 changelog.d/9820.feature delete mode 100644 synapse/replication/slave/storage/presence.py create mode 100644 synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql create mode 100644 synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres delete mode 100644 tests/app/test_frontend_proxy.py (limited to 'synapse/server.py') diff --git a/changelog.d/9820.feature b/changelog.d/9820.feature new file mode 100644 index 0000000000..f56b0bb3bd --- /dev/null +++ b/changelog.d/9820.feature @@ -0,0 +1 @@ +Add experimental support for handling presence on a worker. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index b7c1ffc956..f0c93d5226 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -634,8 +634,11 @@ class Porter(object): "device_inbox_sequence", ("device_inbox", "device_federation_outbox") ) await self._setup_sequence( - "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data")) - await self._setup_sequence("receipts_sequence", ("receipts_linearized", )) + "account_data_sequence", + ("room_account_data", "room_tags_revisions", "account_data"), + ) + await self._setup_sequence("receipts_sequence", ("receipts_linearized",)) + await self._setup_sequence("presence_stream_sequence", ("presence_stream",)) await self._setup_auth_chain_sequence() # Step 3. Get tables. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 26c458dbb6..7b2ac3ca64 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -55,7 +55,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.pushers import SlavedPusherStore @@ -64,7 +63,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.rest.admin import register_servlets_for_media_repo -from synapse.rest.client.v1 import events, login, room +from synapse.rest.client.v1 import events, login, presence, room from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.profile import ( ProfileAvatarURLRestServlet, @@ -110,6 +109,7 @@ from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, ) +from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.search import SearchWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.transactions import TransactionWorkerStore @@ -121,26 +121,6 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.generic_worker") -class PresenceStatusStubServlet(RestServlet): - """If presence is disabled this servlet can be used to stub out setting - presence status. - """ - - PATTERNS = client_patterns("/presence/(?P[^/]*)/status") - - def __init__(self, hs): - super().__init__() - self.auth = hs.get_auth() - - async def on_GET(self, request, user_id): - await self.auth.get_user_by_req(request) - return 200, {"presence": "offline"} - - async def on_PUT(self, request, user_id): - await self.auth.get_user_by_req(request) - return 200, {} - - class KeyUploadServlet(RestServlet): """An implementation of the `KeyUploadServlet` that responds to read only requests, but otherwise proxies through to the master instance. @@ -241,6 +221,7 @@ class GenericWorkerSlavedStore( StatsStore, UIAuthWorkerStore, EndToEndRoomKeyStore, + PresenceStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedReceiptsStore, @@ -259,7 +240,6 @@ class GenericWorkerSlavedStore( SlavedTransactionStore, SlavedProfileStore, SlavedClientIpStore, - SlavedPresenceStore, SlavedFilteringStore, MonthlyActiveUsersWorkerStore, MediaRepositoryStore, @@ -327,10 +307,7 @@ class GenericWorkerServer(HomeServer): user_directory.register_servlets(self, resource) - # If presence is disabled, use the stub servlet that does - # not allow sending presence - if not self.config.use_presence: - PresenceStatusStubServlet(self).register(resource) + presence.register_servlets(self, resource) groups.register_servlets(self, resource) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index b2540163d1..462630201d 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -64,6 +64,14 @@ class WriterLocations: Attributes: events: The instances that write to the event and backfill streams. typing: The instance that writes to the typing stream. + to_device: The instances that write to the to_device stream. Currently + can only be a single instance. + account_data: The instances that write to the account data streams. Currently + can only be a single instance. + receipts: The instances that write to the receipts stream. Currently + can only be a single instance. + presence: The instances that write to the presence stream. Currently + can only be a single instance. """ events = attr.ib( @@ -85,6 +93,11 @@ class WriterLocations: type=List[str], converter=_instance_to_list_converter, ) + presence = attr.ib( + default=["master"], + type=List[str], + converter=_instance_to_list_converter, + ) class WorkerConfig(Config): @@ -188,7 +201,14 @@ class WorkerConfig(Config): # Check that the configured writers for events and typing also appears in # `instance_map`. - for stream in ("events", "typing", "to_device", "account_data", "receipts"): + for stream in ( + "events", + "typing", + "to_device", + "account_data", + "receipts", + "presence", + ): instances = _instance_to_list_converter(getattr(self.writers, stream)) for instance in instances: if instance != "master" and instance not in self.instance_map: @@ -215,6 +235,11 @@ class WorkerConfig(Config): if len(self.writers.events) == 0: raise ConfigError("Must specify at least one instance to handle `events`.") + if len(self.writers.presence) != 1: + raise ConfigError( + "Must only specify one instance to handle `presence` messages." + ) + self.events_shard_config = RoutableShardedWorkerHandlingConfig( self.writers.events ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7fd28ffa54..9938be3821 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -122,7 +122,8 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class BasePresenceHandler(abc.ABC): - """Parts of the PresenceHandler that are shared between workers and master""" + """Parts of the PresenceHandler that are shared between workers and presence + writer""" def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -309,8 +310,16 @@ class WorkerPresenceHandler(BasePresenceHandler): super().__init__(hs) self.hs = hs + self._presence_writer_instance = hs.config.worker.writers.presence[0] + self._presence_enabled = hs.config.use_presence + # Route presence EDUs to the right worker + hs.get_federation_registry().register_instances_for_edu( + "m.presence", + hs.config.worker.writers.presence, + ) + # The number of ongoing syncs on this process, by user id. # Empty if _presence_enabled is false. self._user_to_num_current_syncs = {} # type: Dict[str, int] @@ -318,8 +327,8 @@ class WorkerPresenceHandler(BasePresenceHandler): self.notifier = hs.get_notifier() self.instance_id = hs.get_instance_id() - # user_id -> last_sync_ms. Lists the users that have stopped syncing - # but we haven't notified the master of that yet + # user_id -> last_sync_ms. Lists the users that have stopped syncing but + # we haven't notified the presence writer of that yet self.users_going_offline = {} self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) @@ -352,22 +361,23 @@ class WorkerPresenceHandler(BasePresenceHandler): ) def mark_as_coming_online(self, user_id): - """A user has started syncing. Send a UserSync to the master, unless they - had recently stopped syncing. + """A user has started syncing. Send a UserSync to the presence writer, + unless they had recently stopped syncing. Args: user_id (str) """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: - # Safe to skip because we haven't yet told the master they were offline + # Safe to skip because we haven't yet told the presence writer they + # were offline self.send_user_sync(user_id, True, self.clock.time_msec()) def mark_as_going_offline(self, user_id): - """A user has stopped syncing. We wait before notifying the master as - its likely they'll come back soon. This allows us to avoid sending - a stopped syncing immediately followed by a started syncing notification - to the master + """A user has stopped syncing. We wait before notifying the presence + writer as its likely they'll come back soon. This allows us to avoid + sending a stopped syncing immediately followed by a started syncing + notification to the presence writer Args: user_id (str) @@ -375,8 +385,8 @@ class WorkerPresenceHandler(BasePresenceHandler): self.users_going_offline[user_id] = self.clock.time_msec() def send_stop_syncing(self): - """Check if there are any users who have stopped syncing a while ago - and haven't come back yet. If there are poke the master about them. + """Check if there are any users who have stopped syncing a while ago and + haven't come back yet. If there are poke the presence writer about them. """ now = self.clock.time_msec() for user_id, last_sync_ms in list(self.users_going_offline.items()): @@ -492,9 +502,12 @@ class WorkerPresenceHandler(BasePresenceHandler): if not self.hs.config.use_presence: return - # Proxy request to master + # Proxy request to instance that writes presence await self._set_state_client( - user_id=user_id, state=state, ignore_status_msg=ignore_status_msg + instance_name=self._presence_writer_instance, + user_id=user_id, + state=state, + ignore_status_msg=ignore_status_msg, ) async def bump_presence_active_time(self, user): @@ -505,9 +518,11 @@ class WorkerPresenceHandler(BasePresenceHandler): if not self.hs.config.use_presence: return - # Proxy request to master + # Proxy request to instance that writes presence user_id = user.to_string() - await self._bump_active_client(user_id=user_id) + await self._bump_active_client( + instance_name=self._presence_writer_instance, user_id=user_id + ) class PresenceHandler(BasePresenceHandler): @@ -1909,7 +1924,7 @@ class PresenceFederationQueue: self._queue_presence_updates = True # Whether this instance is a presence writer. - self._presence_writer = hs.config.worker.worker_app is None + self._presence_writer = self._instance_name in hs.config.worker.writers.presence # The FederationSender instance, if this process sends federation traffic directly. self._federation = None @@ -1957,7 +1972,7 @@ class PresenceFederationQueue: Will forward to the local federation sender (if there is one) and queue to send over replication (if there are other federation sender instances.). - Must only be called on the master process. + Must only be called on the presence writer process. """ # This should only be called on a presence writer. @@ -2003,10 +2018,11 @@ class PresenceFederationQueue: We return rows in the form of `(destination, user_id)` to keep the size of each row bounded (rather than returning the sets in a row). - On workers this will query the master process via HTTP replication. + On workers this will query the presence writer process via HTTP replication. """ if instance_name != self._instance_name: - # If not local we query over http replication from the master + # If not local we query over http replication from the presence + # writer result = await self._repl_client( instance_name=instance_name, stream_name=PresenceFederationStream.NAME, diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index ece03467b5..5685cf2121 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -158,7 +158,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): def make_client(cls, hs): """Create a client that makes requests. - Returns a callable that accepts the same parameters as `_serialize_payload`. + Returns a callable that accepts the same parameters as + `_serialize_payload`, and also accepts an optional `instance_name` + parameter to specify which instance to hit (the instance must be in + the `instance_map` config). """ clock = hs.get_clock() client = hs.get_simple_http_client() diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py deleted file mode 100644 index 57327d910d..0000000000 --- a/synapse/replication/slave/storage/presence.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.replication.tcp.streams import PresenceStream -from synapse.storage import DataStore -from synapse.storage.database import DatabasePool -from synapse.storage.databases.main.presence import PresenceStore -from synapse.util.caches.stream_change_cache import StreamChangeCache - -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker - - -class SlavedPresenceStore(BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id") - - self._presence_on_startup = self._get_active_presence(db_conn) # type: ignore - - self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() - ) - - _get_active_presence = DataStore._get_active_presence - take_presence_startup_info = DataStore.take_presence_startup_info - _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] - get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] - - def get_current_presence_token(self): - return self._presence_id_gen.get_current_token() - - def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == PresenceStream.NAME: - self._presence_id_gen.advance(instance_name, token) - for row in rows: - self.presence_stream_cache.entity_has_changed(row.user_id, token) - self._get_presence_for_user.invalidate((row.user_id,)) - return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 2ce1b9f222..7ced4c543c 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -55,6 +55,8 @@ from synapse.replication.tcp.streams import ( CachesStream, EventsStream, FederationStream, + PresenceFederationStream, + PresenceStream, ReceiptsStream, Stream, TagAccountDataStream, @@ -99,6 +101,10 @@ class ReplicationCommandHandler: self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + self._is_presence_writer = ( + hs.get_instance_name() in hs.config.worker.writers.presence + ) + self._streams = { stream.NAME: stream(hs) for stream in STREAMS_MAP.values() } # type: Dict[str, Stream] @@ -153,6 +159,14 @@ class ReplicationCommandHandler: continue + if isinstance(stream, (PresenceStream, PresenceFederationStream)): + # Only add PresenceStream as a source on the instance in charge + # of presence. + if self._is_presence_writer: + self._streams_to_replicate.append(stream) + + continue + # Only add any other streams if we're on master. if hs.config.worker_app is not None: continue @@ -350,7 +364,7 @@ class ReplicationCommandHandler: ) -> Optional[Awaitable[None]]: user_sync_counter.inc() - if self._is_master: + if self._is_presence_writer: return self._presence_handler.update_external_syncs_row( cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms ) @@ -360,7 +374,7 @@ class ReplicationCommandHandler: def on_CLEAR_USER_SYNC( self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand ) -> Optional[Awaitable[None]]: - if self._is_master: + if self._is_presence_writer: return self._presence_handler.update_external_syncs_clear(cmd.instance_id) else: return None diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 9d75a89f1c..b03824925a 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -272,15 +272,22 @@ class PresenceStream(Stream): NAME = "presence" ROW_TYPE = PresenceStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() - if hs.config.worker_app is None: - # on the master, query the presence handler + if hs.get_instance_name() in hs.config.worker.writers.presence: + # on the presence writer, query the presence handler presence_handler = hs.get_presence_handler() - update_function = presence_handler.get_all_presence_updates + + from synapse.handlers.presence import PresenceHandler + + assert isinstance(presence_handler, PresenceHandler) + + update_function = ( + presence_handler.get_all_presence_updates + ) # type: UpdateFunction else: - # Query master process + # Query presence writer process update_function = make_http_update_function(hs, self.NAME) super().__init__( diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index c232484f29..2b24fe5aa6 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -35,10 +35,15 @@ class PresenceStatusRestServlet(RestServlet): self.clock = hs.get_clock() self.auth = hs.get_auth() + self._use_presence = hs.config.server.use_presence + async def on_GET(self, request, user_id): requester = await self.auth.get_user_by_req(request) user = UserID.from_string(user_id) + if not self._use_presence: + return 200, {"presence": "offline"} + if requester.user != user: allowed = await self.presence_handler.is_visible( observed_user=user, observer_user=requester.user @@ -80,7 +85,7 @@ class PresenceStatusRestServlet(RestServlet): except Exception: raise SynapseError(400, "Unable to parse state") - if self.hs.config.use_presence: + if self._use_presence: await self.presence_handler.set_state(user, state) return 200, {} diff --git a/synapse/server.py b/synapse/server.py index 67598fffe3..8c147be2b3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -418,10 +418,10 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_presence_handler(self) -> BasePresenceHandler: - if self.config.worker_app: - return WorkerPresenceHandler(self) - else: + if self.get_instance_name() in self.config.worker.writers.presence: return PresenceHandler(self) + else: + return WorkerPresenceHandler(self) @cache_in_self def get_typing_writer_handler(self) -> TypingWriterHandler: diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 5c50f5f950..49c7606d51 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -17,7 +17,6 @@ import logging from typing import List, Optional, Tuple -from synapse.api.constants import PresenceState from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import DatabasePool from synapse.storage.databases.main.stats import UserSortOrder @@ -51,7 +50,7 @@ from .media_repository import MediaRepositoryStore from .metrics import ServerMetricsStore from .monthly_active_users import MonthlyActiveUsersStore from .openid import OpenIdStore -from .presence import PresenceStore, UserPresenceState +from .presence import PresenceStore from .profile import ProfileStore from .purge_events import PurgeEventsStore from .push_rule import PushRuleStore @@ -126,9 +125,6 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._presence_id_gen = StreamIdGenerator( - db_conn, "presence_stream", "stream_id" - ) self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" ) @@ -177,21 +173,6 @@ class DataStore( super().__init__(database, db_conn, hs) - self._presence_on_startup = self._get_active_presence(db_conn) - - presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( - db_conn, - "presence_stream", - entity_column="user_id", - stream_column="stream_id", - max_value=self._presence_id_gen.get_current_token(), - ) - self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", - min_presence_val, - prefilled_cache=presence_cache_prefill, - ) - device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max @@ -238,32 +219,6 @@ class DataStore( def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - def take_presence_startup_info(self): - active_on_startup = self._presence_on_startup - self._presence_on_startup = None - return active_on_startup - - def _get_active_presence(self, db_conn): - """Fetch non-offline presence from the database so that we can register - the appropriate time outs. - """ - - sql = ( - "SELECT user_id, state, last_active_ts, last_federation_update_ts," - " last_user_sync_ts, status_msg, currently_active FROM presence_stream" - " WHERE state != ?" - ) - - txn = db_conn.cursor() - txn.execute(sql, (PresenceState.OFFLINE,)) - rows = self.db_pool.cursor_to_dict(txn) - txn.close() - - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return [UserPresenceState(**row) for row in rows] - async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index c207d917b1..db22fab23e 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -12,16 +12,69 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, List, Tuple -from synapse.api.presence import UserPresenceState +from synapse.api.presence import PresenceState, UserPresenceState +from synapse.replication.tcp.streams import PresenceStream from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause +from synapse.storage.database import DatabasePool +from synapse.storage.engines import PostgresEngine +from synapse.storage.types import Connection +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.iterutils import batch_iter +if TYPE_CHECKING: + from synapse.server import HomeServer + class PresenceStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: Connection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._can_persist_presence = ( + hs.get_instance_name() in hs.config.worker.writers.presence + ) + + if isinstance(database.engine, PostgresEngine): + self._presence_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="presence_stream", + instance_name=self._instance_name, + tables=[("presence_stream", "instance_name", "stream_id")], + sequence_name="presence_stream_sequence", + writers=hs.config.worker.writers.to_device, + ) + else: + self._presence_id_gen = StreamIdGenerator( + db_conn, "presence_stream", "stream_id" + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( + db_conn, + "presence_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=self._presence_id_gen.get_current_token(), + ) + self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", + min_presence_val, + prefilled_cache=presence_cache_prefill, + ) + async def update_presence(self, presence_states): + assert self._can_persist_presence + stream_ordering_manager = self._presence_id_gen.get_next_mult( len(presence_states) ) @@ -57,6 +110,7 @@ class PresenceStore(SQLBaseStore): "last_user_sync_ts": state.last_user_sync_ts, "status_msg": state.status_msg, "currently_active": state.currently_active, + "instance_name": self._instance_name, } for stream_id, state in zip(stream_orderings, presence_states) ], @@ -216,3 +270,37 @@ class PresenceStore(SQLBaseStore): def get_current_presence_token(self): return self._presence_id_gen.get_current_token() + + def _get_active_presence(self, db_conn: Connection): + """Fetch non-offline presence from the database so that we can register + the appropriate time outs. + """ + + sql = ( + "SELECT user_id, state, last_active_ts, last_federation_update_ts," + " last_user_sync_ts, status_msg, currently_active FROM presence_stream" + " WHERE state != ?" + ) + + txn = db_conn.cursor() + txn.execute(sql, (PresenceState.OFFLINE,)) + rows = self.db_pool.cursor_to_dict(txn) + txn.close() + + for row in rows: + row["currently_active"] = bool(row["currently_active"]) + + return [UserPresenceState(**row) for row in rows] + + def take_presence_startup_info(self): + active_on_startup = self._presence_on_startup + self._presence_on_startup = None + return active_on_startup + + def process_replication_rows(self, stream_name, instance_name, token, rows): + if stream_name == PresenceStream.NAME: + self._presence_id_gen.advance(instance_name, token) + for row in rows: + self.presence_stream_cache.entity_has_changed(row.user_id, token) + self._get_presence_for_user.invalidate((row.user_id,)) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql new file mode 100644 index 0000000000..b6ba0bda1a --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column to specify which instance wrote the row. Historic rows have +-- `NULL`, which indicates that the master instance wrote them. +ALTER TABLE presence_stream ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres new file mode 100644 index 0000000000..02b182adf9 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres @@ -0,0 +1,20 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence; + +SELECT setval('presence_stream_sequence', ( + SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream +)); diff --git a/tests/app/test_frontend_proxy.py b/tests/app/test_frontend_proxy.py deleted file mode 100644 index 3d45da38ab..0000000000 --- a/tests/app/test_frontend_proxy.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.app.generic_worker import GenericWorkerServer - -from tests.server import make_request -from tests.unittest import HomeserverTestCase - - -class FrontendProxyTests(HomeserverTestCase): - def make_homeserver(self, reactor, clock): - - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer - ) - - return hs - - def default_config(self): - c = super().default_config() - c["worker_app"] = "synapse.app.frontend_proxy" - - c["worker_listeners"] = [ - { - "type": "http", - "port": 8080, - "bind_addresses": ["0.0.0.0"], - "resources": [{"names": ["client"]}], - } - ] - - return c - - def test_listen_http_with_presence_enabled(self): - """ - When presence is on, the stub servlet will not register. - """ - # Presence is on - self.hs.config.use_presence = True - - # Listen with the config - self.hs._listen_http(self.hs.config.worker.worker_listeners[0]) - - # Grab the resource from the site that was told to listen - self.assertEqual(len(self.reactor.tcpServers), 1) - site = self.reactor.tcpServers[0][1] - - channel = make_request(self.reactor, site, "PUT", "presence/a/status") - - # 400 + unrecognised, because nothing is registered - self.assertEqual(channel.code, 400) - self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") - - def test_listen_http_with_presence_disabled(self): - """ - When presence is off, the stub servlet will register. - """ - # Presence is off - self.hs.config.use_presence = False - - # Listen with the config - self.hs._listen_http(self.hs.config.worker.worker_listeners[0]) - - # Grab the resource from the site that was told to listen - self.assertEqual(len(self.reactor.tcpServers), 1) - site = self.reactor.tcpServers[0][1] - - channel = make_request(self.reactor, site, "PUT", "presence/a/status") - - # 401, because the stub servlet still checks authentication - self.assertEqual(channel.code, 401) - self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN") diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 3a050659ca..409f3949dc 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -16,6 +16,7 @@ from unittest.mock import Mock from twisted.internet import defer +from synapse.handlers.presence import PresenceHandler from synapse.rest.client.v1 import presence from synapse.types import UserID @@ -32,7 +33,7 @@ class PresenceTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): - presence_handler = Mock() + presence_handler = Mock(spec=PresenceHandler) presence_handler.set_state.return_value = defer.succeed(None) hs = self.setup_test_homeserver( @@ -59,12 +60,12 @@ class PresenceTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertEqual(self.hs.get_presence_handler().set_state.call_count, 1) + @unittest.override_config({"use_presence": False}) def test_put_presence_disabled(self): """ PUT to the status endpoint with use_presence disabled will NOT call set_state on the presence handler. """ - self.hs.config.use_presence = False body = {"presence": "here", "status_msg": "beep boop"} channel = self.make_request( -- cgit 1.5.1 From 3ff225175462dde8376aa584e3a47c43b1f0e790 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 23 Apr 2021 19:20:44 +0100 Subject: Improved validation for received requests (#9817) * Simplify `start_listening` callpath * Correctly check the size of uploaded files --- changelog.d/9817.misc | 1 + synapse/api/constants.py | 3 ++ synapse/app/_base.py | 30 ++++++++++-- synapse/app/admin_cmd.py | 8 +-- synapse/app/generic_worker.py | 11 +++-- synapse/app/homeserver.py | 17 +++++-- synapse/config/logger.py | 3 +- synapse/event_auth.py | 4 +- synapse/http/site.py | 32 ++++++++++-- synapse/rest/media/v1/upload_resource.py | 2 - synapse/server.py | 8 +++ tests/http/test_site.py | 83 ++++++++++++++++++++++++++++++++ tests/replication/_base.py | 1 + tests/test_server.py | 1 + tests/unittest.py | 1 + 15 files changed, 174 insertions(+), 31 deletions(-) create mode 100644 changelog.d/9817.misc create mode 100644 tests/http/test_site.py (limited to 'synapse/server.py') diff --git a/changelog.d/9817.misc b/changelog.d/9817.misc new file mode 100644 index 0000000000..8aa8895f05 --- /dev/null +++ b/changelog.d/9817.misc @@ -0,0 +1 @@ +Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 31a59bceec..936b6534b4 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -17,6 +17,9 @@ """Contains constants from the specification.""" +# the max size of a (canonical-json-encoded) event +MAX_PDU_SIZE = 65536 + # the "depth" field on events is limited to 2**63 - 1 MAX_DEPTH = 2 ** 63 - 1 diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 2113c4f370..638e01c1b2 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -30,9 +30,10 @@ from twisted.internet import defer, error, reactor from twisted.protocols.tls import TLSMemoryBIOFactory import synapse +from synapse.api.constants import MAX_PDU_SIZE from synapse.app import check_bind_error from synapse.app.phone_stats_home import start_phone_stats_home -from synapse.config.server import ListenerConfig +from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -288,7 +289,7 @@ def refresh_certificate(hs): logger.info("Context factories updated.") -async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): +async def start(hs: "synapse.server.HomeServer"): """ Start a Synapse server or worker. @@ -300,7 +301,6 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon Args: hs: homeserver instance - listeners: Listener configuration ('listeners' in homeserver.yaml) """ # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): @@ -336,7 +336,7 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa # It is now safe to start your Synapse. - hs.start_listening(listeners) + hs.start_listening() hs.get_datastore().db_pool.start_profiling() hs.get_pusherpool().start() @@ -530,3 +530,25 @@ def sdnotify(state): # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET # unless systemd is expecting us to notify it. logger.warning("Unable to send notification to systemd: %s", e) + + +def max_request_body_size(config: HomeServerConfig) -> int: + """Get a suitable maximum size for incoming HTTP requests""" + + # Other than media uploads, the biggest request we expect to see is a fully-loaded + # /federation/v1/send request. + # + # The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are + # limited to 65536 bytes (possibly slightly more if the sender didn't use canonical + # json encoding); there is no specced limit to EDUs (see + # https://github.com/matrix-org/matrix-doc/issues/3121). + # + # in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M) + # + max_request_size = 200 * MAX_PDU_SIZE + + # if we have a media repo enabled, we may need to allow larger uploads than that + if config.media.can_load_media_repo: + max_request_size = max(max_request_size, config.media.max_upload_size) + + return max_request_size diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index eb256db749..68ae19c977 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -70,12 +70,6 @@ class AdminCmdSlavedStore( class AdminCmdServer(HomeServer): DATASTORE_CLASS = AdminCmdSlavedStore - def _listen_http(self, listener_config): - pass - - def start_listening(self, listeners): - pass - async def export_data_command(hs, args): """Export data for a user. @@ -232,7 +226,7 @@ def start(config_options): async def run(): with LoggingContext("command"): - _base.start(ss, []) + _base.start(ss) await args.func(ss, args) _base.start_worker_reactor( diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 70e07d0574..1a15ceee81 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -15,7 +15,7 @@ # limitations under the License. import logging import sys -from typing import Dict, Iterable, Optional +from typing import Dict, Optional from twisted.internet import address from twisted.web.resource import IResource @@ -32,7 +32,7 @@ from synapse.api.urls import ( SERVER_KEY_V2_PREFIX, ) from synapse.app import _base -from synapse.app._base import register_start +from synapse.app._base import max_request_body_size, register_start from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging @@ -367,6 +367,7 @@ class GenericWorkerServer(HomeServer): listener_config, root_resource, self.version_string, + max_request_body_size=max_request_body_size(self.config), reactor=self.get_reactor(), ), reactor=self.get_reactor(), @@ -374,8 +375,8 @@ class GenericWorkerServer(HomeServer): logger.info("Synapse worker now listening on port %d", port) - def start_listening(self, listeners: Iterable[ListenerConfig]): - for listener in listeners: + def start_listening(self): + for listener in self.config.worker_listeners: if listener.type == "http": self._listen_http(listener) elif listener.type == "manhole": @@ -468,7 +469,7 @@ def start(config_options): # streams. Will no-op if no streams can be written to by this worker. hs.get_replication_streamer() - register_start(_base.start, hs, config.worker_listeners) + register_start(_base.start, hs) _base.start_worker_reactor("synapse-generic-worker", config) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 140f6bcdee..8e78134bbe 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -17,7 +17,7 @@ import logging import os import sys -from typing import Iterable, Iterator +from typing import Iterator from twisted.internet import reactor from twisted.web.resource import EncodingResourceWrapper, IResource @@ -36,7 +36,13 @@ from synapse.api.urls import ( WEB_CLIENT_PREFIX, ) from synapse.app import _base -from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start +from synapse.app._base import ( + listen_ssl, + listen_tcp, + max_request_body_size, + quit_with_error, + register_start, +) from synapse.config._base import ConfigError from synapse.config.emailconfig import ThreepidBehaviour from synapse.config.homeserver import HomeServerConfig @@ -132,6 +138,7 @@ class SynapseHomeServer(HomeServer): listener_config, create_resource_tree(resources, root_resource), self.version_string, + max_request_body_size=max_request_body_size(self.config), reactor=self.get_reactor(), ) @@ -268,14 +275,14 @@ class SynapseHomeServer(HomeServer): return resources - def start_listening(self, listeners: Iterable[ListenerConfig]): + def start_listening(self): if self.config.redis_enabled: # If redis is enabled we connect via the replication command handler # in the same way as the workers (since we're effectively a client # rather than a server). self.get_tcp_replication().start_replication(self) - for listener in listeners: + for listener in self.config.server.listeners: if listener.type == "http": self._listening_services.extend( self._listener_http(self.config, listener) @@ -407,7 +414,7 @@ def setup(config_options): # Loading the provider metadata also ensures the provider config is valid. await oidc.load_metadata() - await _base.start(hs, config.listeners) + await _base.start(hs) hs.get_datastore().db_pool.updates.start_doing_background_updates() diff --git a/synapse/config/logger.py b/synapse/config/logger.py index b174e0df6d..813076dfe2 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -31,7 +31,6 @@ from twisted.logger import ( ) import synapse -from synapse.app import _base as appbase from synapse.logging._structured import setup_structured_logging from synapse.logging.context import LoggingContextFilter from synapse.logging.filter import MetadataFilter @@ -318,6 +317,8 @@ def setup_logging( # Perform one-time logging configuration. _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner) # Add a SIGHUP handler to reload the logging configuration, if one is available. + from synapse.app import _base as appbase + appbase.register_sighup(_reload_logging_config, log_config_path) # Log immediately so we can grep backwards. diff --git a/synapse/event_auth.py b/synapse/event_auth.py index afc2bc8267..70c556566e 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -21,7 +21,7 @@ from signedjson.key import decode_verify_key_bytes from signedjson.sign import SignatureVerifyException, verify_signed_json from unpaddedbase64 import decode_base64 -from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.constants import MAX_PDU_SIZE, EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, EventSizeError, SynapseError from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, @@ -205,7 +205,7 @@ def _check_size_limits(event: EventBase) -> None: too_big("type") if len(event.event_id) > 255: too_big("event_id") - if len(encode_canonical_json(event.get_pdu_json())) > 65536: + if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE: too_big("event") diff --git a/synapse/http/site.py b/synapse/http/site.py index e911ee4809..671fd3fbcc 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,7 +14,7 @@ import contextlib import logging import time -from typing import Optional, Tuple, Type, Union +from typing import Optional, Tuple, Union import attr from zope.interface import implementer @@ -50,6 +50,7 @@ class SynapseRequest(Request): * Redaction of access_token query-params in __repr__ * Logging at start and end * Metrics to record CPU, wallclock and DB time by endpoint. + * A limit to the size of request which will be accepted It also provides a method `processing`, which returns a context manager. If this method is called, the request won't be logged until the context manager is closed; @@ -60,8 +61,9 @@ class SynapseRequest(Request): logcontext: the log context for this request """ - def __init__(self, channel, *args, **kw): + def __init__(self, channel, *args, max_request_body_size=1024, **kw): Request.__init__(self, channel, *args, **kw) + self._max_request_body_size = max_request_body_size self.site = channel.site # type: SynapseSite self._channel = channel # this is used by the tests self.start_time = 0.0 @@ -98,6 +100,18 @@ class SynapseRequest(Request): self.site.site_tag, ) + def handleContentChunk(self, data): + # we should have a `content` by now. + assert self.content, "handleContentChunk() called before gotLength()" + if self.content.tell() + len(data) > self._max_request_body_size: + logger.warning( + "Aborting connection from %s because the request exceeds maximum size", + self.client, + ) + self.transport.abortConnection() + return + super().handleContentChunk(data) + @property def requester(self) -> Optional[Union[Requester, str]]: return self._requester @@ -505,6 +519,7 @@ class SynapseSite(Site): config: ListenerConfig, resource: IResource, server_version_string, + max_request_body_size: int, reactor: IReactorTime, ): """ @@ -516,6 +531,8 @@ class SynapseSite(Site): resource: The base of the resource tree to be used for serving requests on this site server_version_string: A string to present for the Server header + max_request_body_size: Maximum request body length to allow before + dropping the connection reactor: reactor to be used to manage connection timeouts """ Site.__init__(self, resource, reactor=reactor) @@ -524,9 +541,14 @@ class SynapseSite(Site): assert config.http_options is not None proxied = config.http_options.x_forwarded - self.requestFactory = ( - XForwardedForRequest if proxied else SynapseRequest - ) # type: Type[Request] + request_class = XForwardedForRequest if proxied else SynapseRequest + + def request_factory(channel, queued) -> Request: + return request_class( + channel, max_request_body_size=max_request_body_size, queued=queued + ) + + self.requestFactory = request_factory # type: ignore self.access_logger = logging.getLogger(logger_name) self.server_version_string = server_version_string.encode("ascii") diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 80f017a4dd..024a105bf2 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -51,8 +51,6 @@ class UploadResource(DirectServeJsonResource): async def _async_render_POST(self, request: SynapseRequest) -> None: requester = await self.auth.get_user_by_req(request) - # TODO: The checks here are a bit late. The content will have - # already been uploaded to a tmp file at this point content_length = request.getHeader("Content-Length") if content_length is None: raise SynapseError(msg="Request must specify a Content-Length", code=400) diff --git a/synapse/server.py b/synapse/server.py index 8c147be2b3..06570bb1ce 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -287,6 +287,14 @@ class HomeServer(metaclass=abc.ABCMeta): if self.config.run_background_tasks: self.setup_background_tasks() + def start_listening(self) -> None: + """Start the HTTP, manhole, metrics, etc listeners + + Does nothing in this base class; overridden in derived classes to start the + appropriate listeners. + """ + pass + def setup_background_tasks(self) -> None: """ Some handlers have side effects on instantiation (like registering diff --git a/tests/http/test_site.py b/tests/http/test_site.py new file mode 100644 index 0000000000..8c13b4f693 --- /dev/null +++ b/tests/http/test_site.py @@ -0,0 +1,83 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet.address import IPv6Address +from twisted.test.proto_helpers import StringTransport + +from synapse.app.homeserver import SynapseHomeServer + +from tests.unittest import HomeserverTestCase + + +class SynapseRequestTestCase(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver(homeserver_to_use=SynapseHomeServer) + + def test_large_request(self): + """overlarge HTTP requests should be rejected""" + self.hs.start_listening() + + # find the HTTP server which is configured to listen on port 0 + (port, factory, _backlog, interface) = self.reactor.tcpServers[0] + self.assertEqual(interface, "::") + self.assertEqual(port, 0) + + # as a control case, first send a regular request. + + # complete the connection and wire it up to a fake transport + client_address = IPv6Address("TCP", "::1", "2345") + protocol = factory.buildProtocol(client_address) + transport = StringTransport() + protocol.makeConnection(transport) + + protocol.dataReceived( + b"POST / HTTP/1.1\r\n" + b"Connection: close\r\n" + b"Transfer-Encoding: chunked\r\n" + b"\r\n" + b"0\r\n" + b"\r\n" + ) + + while not transport.disconnecting: + self.reactor.advance(1) + + # we should get a 404 + self.assertRegex(transport.value().decode(), r"^HTTP/1\.1 404 ") + + # now send an oversized request + protocol = factory.buildProtocol(client_address) + transport = StringTransport() + protocol.makeConnection(transport) + + protocol.dataReceived( + b"POST / HTTP/1.1\r\n" + b"Connection: close\r\n" + b"Transfer-Encoding: chunked\r\n" + b"\r\n" + ) + + # we deliberately send all the data in one big chunk, to ensure that + # twisted isn't buffering the data in the chunked transfer decoder. + # we start with the chunk size, in hex. (We won't actually send this much) + protocol.dataReceived(b"10000000\r\n") + sent = 0 + while not transport.disconnected: + self.assertLess(sent, 0x10000000, "connection did not drop") + protocol.dataReceived(b"\0" * 1024) + sent += 1024 + + # default max upload size is 50M, so it should drop on the next buffer after + # that. + self.assertEqual(sent, 50 * 1024 * 1024 + 1024) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index dc3519ea13..624bd1b927 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -359,6 +359,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): config=worker_hs.config.server.listeners[0], resource=resource, server_version_string="1", + max_request_body_size=4096, reactor=self.reactor, ) diff --git a/tests/test_server.py b/tests/test_server.py index 45400be367..407e172e41 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -202,6 +202,7 @@ class OptionsResourceTests(unittest.TestCase): parse_listener_def({"type": "http", "port": 0}), self.resource, "1.0", + max_request_body_size=1234, reactor=self.reactor, ) diff --git a/tests/unittest.py b/tests/unittest.py index 5353e75c7c..9bd02bd9c4 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -247,6 +247,7 @@ class HomeserverTestCase(TestCase): config=self.hs.config.server.listeners[0], resource=self.resource, server_version_string="1", + max_request_body_size=1234, reactor=self.reactor, ) -- cgit 1.5.1 From fe604a022a7142157da7e90a40330beb2a11af7a Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 27 Apr 2021 13:13:07 +0100 Subject: Remove various bits of compatibility code for Python <3.6 (#9879) I went through and removed a bunch of cruft that was lying around for compatibility with old Python versions. This PR also will now prevent Synapse from starting unless you're running Python 3.6+. --- changelog.d/9879.misc | 1 + mypy.ini | 1 - synapse/__init__.py | 4 +-- synapse/python_dependencies.py | 9 ++----- synapse/rest/admin/users.py | 3 ++- synapse/rest/consent/consent_resource.py | 10 +------- synapse/rest/media/v1/filepath.py | 2 +- synapse/secrets.py | 44 -------------------------------- synapse/server.py | 5 ---- synapse/storage/_base.py | 2 +- synapse/storage/database.py | 15 +++++------ synapse/util/caches/response_cache.py | 2 +- tests/rest/admin/test_user.py | 15 +++++------ tests/storage/test__base.py | 3 ++- tests/unittest.py | 2 +- tox.ini | 9 +++---- 16 files changed, 29 insertions(+), 98 deletions(-) create mode 100644 changelog.d/9879.misc delete mode 100644 synapse/secrets.py (limited to 'synapse/server.py') diff --git a/changelog.d/9879.misc b/changelog.d/9879.misc new file mode 100644 index 0000000000..c9ca37cf48 --- /dev/null +++ b/changelog.d/9879.misc @@ -0,0 +1 @@ +Remove backwards-compatibility code for Python versions < 3.6. \ No newline at end of file diff --git a/mypy.ini b/mypy.ini index 32e6197409..a40f705b76 100644 --- a/mypy.ini +++ b/mypy.ini @@ -41,7 +41,6 @@ files = synapse/push, synapse/replication, synapse/rest, - synapse/secrets.py, synapse/server.py, synapse/server_notices, synapse/spam_checker_api, diff --git a/synapse/__init__.py b/synapse/__init__.py index 837e938f56..fbd49a93e1 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -21,8 +21,8 @@ import os import sys # Check that we're not running on an unsupported Python version. -if sys.version_info < (3, 5): - print("Synapse requires Python 3.5 or above.") +if sys.version_info < (3, 6): + print("Synapse requires Python 3.6 or above.") sys.exit(1) # Twisted and canonicaljson will fail to import when this file is executed to diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 2a1c925ee8..2de946f464 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -85,7 +85,7 @@ REQUIREMENTS = [ "typing-extensions>=3.7.4", # We enforce that we have a `cryptography` version that bundles an `openssl` # with the latest security patches. - "cryptography>=3.4.7;python_version>='3.6'", + "cryptography>=3.4.7", ] CONDITIONAL_REQUIREMENTS = { @@ -100,14 +100,9 @@ CONDITIONAL_REQUIREMENTS = { # that use the protocol, such as Let's Encrypt. "acme": [ "txacme>=0.9.2", - # txacme depends on eliot. Eliot 1.8.0 is incompatible with - # python 3.5.2, as per https://github.com/itamarst/eliot/issues/418 - "eliot<1.8.0;python_version<'3.5.3'", ], "saml2": [ - # pysaml2 6.4.0 is incompatible with Python 3.5 (see https://github.com/IdentityPython/pysaml2/issues/749) - "pysaml2>=4.5.0,<6.4.0;python_version<'3.6'", - "pysaml2>=4.5.0;python_version>='3.6'", + "pysaml2>=4.5.0", ], "oidc": ["authlib>=0.14.0"], # systemd-python is necessary for logging to the systemd journal via diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index edda7861fa..8c9d21d3ea 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -14,6 +14,7 @@ import hashlib import hmac import logging +import secrets from http import HTTPStatus from typing import TYPE_CHECKING, Dict, List, Optional, Tuple @@ -375,7 +376,7 @@ class UserRegisterServlet(RestServlet): """ self._clear_old_nonces() - nonce = self.hs.get_secrets().token_hex(64) + nonce = secrets.token_hex(64) self.nonces[nonce] = int(self.reactor.seconds()) return 200, {"nonce": nonce} diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py index c4550d3cf0..b19cd8afc5 100644 --- a/synapse/rest/consent/consent_resource.py +++ b/synapse/rest/consent/consent_resource.py @@ -32,14 +32,6 @@ TEMPLATE_LANGUAGE = "en" logger = logging.getLogger(__name__) -# use hmac.compare_digest if we have it (python 2.7.7), else just use equality -if hasattr(hmac, "compare_digest"): - compare_digest = hmac.compare_digest -else: - - def compare_digest(a, b): - return a == b - class ConsentResource(DirectServeHtmlResource): """A twisted Resource to display a privacy policy and gather consent to it @@ -209,5 +201,5 @@ class ConsentResource(DirectServeHtmlResource): .encode("ascii") ) - if not compare_digest(want_mac, userhmac): + if not hmac.compare_digest(want_mac, userhmac): raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect") diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 4088e7a059..09531ebf54 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -21,7 +21,7 @@ from typing import Callable, List NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d") -def _wrap_in_base_path(func: "Callable[..., str]") -> "Callable[..., str]": +def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]: """Takes a function that returns a relative path and turns it into an absolute path based on the location of the primary media store """ diff --git a/synapse/secrets.py b/synapse/secrets.py deleted file mode 100644 index bf829251fd..0000000000 --- a/synapse/secrets.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Injectable secrets module for Synapse. - -See https://docs.python.org/3/library/secrets.html#module-secrets for the API -used in Python 3.6, and the API emulated in Python 2.7. -""" -import sys - -# secrets is available since python 3.6 -if sys.version_info[0:2] >= (3, 6): - import secrets - - class Secrets: - def token_bytes(self, nbytes: int = 32) -> bytes: - return secrets.token_bytes(nbytes) - - def token_hex(self, nbytes: int = 32) -> str: - return secrets.token_hex(nbytes) - - -else: - import binascii - import os - - class Secrets: - def token_bytes(self, nbytes: int = 32) -> bytes: - return os.urandom(nbytes) - - def token_hex(self, nbytes: int = 32) -> str: - return binascii.hexlify(self.token_bytes(nbytes)).decode("ascii") diff --git a/synapse/server.py b/synapse/server.py index 06570bb1ce..2337d2d9b4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -126,7 +126,6 @@ from synapse.rest.media.v1.media_repository import ( MediaRepository, MediaRepositoryResource, ) -from synapse.secrets import Secrets from synapse.server_notices.server_notices_manager import ServerNoticesManager from synapse.server_notices.server_notices_sender import ServerNoticesSender from synapse.server_notices.worker_server_notices_sender import ( @@ -641,10 +640,6 @@ class HomeServer(metaclass=abc.ABCMeta): def get_groups_attestation_renewer(self) -> GroupAttestionRenewer: return GroupAttestionRenewer(self) - @cache_in_self - def get_secrets(self) -> Secrets: - return Secrets() - @cache_in_self def get_stats_handler(self) -> StatsHandler: return StatsHandler(self) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d472676acf..6b68d8720c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -114,7 +114,7 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any: db_content = db_content.tobytes() # Decode it to a Unicode string before feeding it to the JSON decoder, since - # Python 3.5 does not support deserializing bytes. + # it only supports handling strings if isinstance(db_content, (bytes, bytearray)): db_content = db_content.decode("utf8") diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 9452368bf0..bd39c095af 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -171,10 +171,7 @@ class LoggingDatabaseConnection: # The type of entry which goes on our after_callbacks and exception_callbacks lists. -# -# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so -# that mypy sees the type but the runtime python doesn't. -_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]] +_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]] R = TypeVar("R") @@ -221,7 +218,7 @@ class LoggingTransaction: self.after_callbacks = after_callbacks self.exception_callbacks = exception_callbacks - def call_after(self, callback: "Callable[..., None]", *args: Any, **kwargs: Any): + def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. @@ -233,7 +230,7 @@ class LoggingTransaction: self.after_callbacks.append((callback, args, kwargs)) def call_on_exception( - self, callback: "Callable[..., None]", *args: Any, **kwargs: Any + self, callback: Callable[..., None], *args: Any, **kwargs: Any ): # if self.exception_callbacks is None, that means that whatever constructed the # LoggingTransaction isn't expecting there to be any callbacks; assert that @@ -485,7 +482,7 @@ class DatabasePool: desc: str, after_callbacks: List[_CallbackListEntry], exception_callbacks: List[_CallbackListEntry], - func: "Callable[..., R]", + func: Callable[..., R], *args: Any, **kwargs: Any, ) -> R: @@ -618,7 +615,7 @@ class DatabasePool: async def runInteraction( self, desc: str, - func: "Callable[..., R]", + func: Callable[..., R], *args: Any, db_autocommit: bool = False, **kwargs: Any, @@ -678,7 +675,7 @@ class DatabasePool: async def runWithConnection( self, - func: "Callable[..., R]", + func: Callable[..., R], *args: Any, db_autocommit: bool = False, **kwargs: Any, diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 2529845c9e..25ea1bcc91 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -110,7 +110,7 @@ class ResponseCache(Generic[T]): return result.observe() def wrap( - self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any + self, key: T, callback: Callable[..., Any], *args: Any, **kwargs: Any ) -> defer.Deferred: """Wrap together a *get* and *set* call, taking care of logcontexts diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index b3afd51522..d599a4c984 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -18,7 +18,7 @@ import json import urllib.parse from binascii import unhexlify from typing import List, Optional -from unittest.mock import Mock +from unittest.mock import Mock, patch import synapse.rest.admin from synapse.api.constants import UserTypes @@ -54,8 +54,6 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): self.datastore = Mock(return_value=Mock()) self.datastore.get_current_state_deltas = Mock(return_value=(0, [])) - self.secrets = Mock() - self.hs = self.setup_test_homeserver() self.hs.config.registration_shared_secret = "shared" @@ -84,14 +82,13 @@ class UserRegisterTestCase(unittest.HomeserverTestCase): Calling GET on the endpoint will return a randomised nonce, using the homeserver's secrets provider. """ - secrets = Mock() - secrets.token_hex = Mock(return_value="abcd") - - self.hs.get_secrets = Mock(return_value=secrets) + with patch("secrets.token_hex") as token_hex: + # Patch secrets.token_hex for the duration of this context + token_hex.return_value = "abcd" - channel = self.make_request("GET", self.url) + channel = self.make_request("GET", self.url) - self.assertEqual(channel.json_body, {"nonce": "abcd"}) + self.assertEqual(channel.json_body, {"nonce": "abcd"}) def test_expired_nonce(self): """ diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 6339a43f0c..200b9198f9 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import secrets from tests import unittest @@ -21,7 +22,7 @@ class UpsertManyTests(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): self.storage = hs.get_datastore() - self.table_name = "table_" + hs.get_secrets().token_hex(6) + self.table_name = "table_" + secrets.token_hex(6) self.get_success( self.storage.db_pool.runInteraction( "create", diff --git a/tests/unittest.py b/tests/unittest.py index 9bd02bd9c4..74db7c08f1 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -18,6 +18,7 @@ import hashlib import hmac import inspect import logging +import secrets import time from typing import Callable, Dict, Iterable, Optional, Tuple, Type, TypeVar, Union from unittest.mock import Mock, patch @@ -626,7 +627,6 @@ class HomeserverTestCase(TestCase): str: The new event's ID. """ event_creator = self.hs.get_event_creation_handler() - secrets = self.hs.get_secrets() requester = create_requester(user) event, context = self.get_success( diff --git a/tox.ini b/tox.ini index 998b04b224..ecd609271d 100644 --- a/tox.ini +++ b/tox.ini @@ -21,13 +21,11 @@ deps = # installed on that). # # anyway, make sure that we have a recent enough setuptools. - setuptools>=18.5 ; python_version >= '3.6' - setuptools>=18.5,<51.0.0 ; python_version < '3.6' + setuptools>=18.5 # we also need a semi-recent version of pip, because old ones fail to # install the "enum34" dependency of cryptography. - pip>=10 ; python_version >= '3.6' - pip>=10,<21.0 ; python_version < '3.6' + pip>=10 # directories/files we run the linters on. # if you update this list, make sure to do the same in scripts-dev/lint.sh @@ -168,8 +166,7 @@ skip_install = true usedevelop = false deps = coverage - pip>=10 ; python_version >= '3.6' - pip>=10,<21.0 ; python_version < '3.6' + pip>=10 commands= coverage combine coverage report -- cgit 1.5.1