summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py2
-rw-r--r--synapse/config/oidc_config.py13
-rw-r--r--synapse/config/stats.py45
-rw-r--r--synapse/events/spamcheck.py29
-rw-r--r--synapse/federation/federation_server.py152
-rw-r--r--synapse/federation/sender/per_destination_queue.py287
-rw-r--r--synapse/federation/sender/transaction_manager.py48
-rw-r--r--synapse/handlers/auth.py5
-rw-r--r--synapse/handlers/cas_handler.py1
-rw-r--r--synapse/handlers/federation.py220
-rw-r--r--synapse/handlers/oidc_handler.py15
-rw-r--r--synapse/handlers/register.py50
-rw-r--r--synapse/handlers/saml_handler.py1
-rw-r--r--synapse/handlers/sso.py5
-rw-r--r--synapse/http/client.py47
-rw-r--r--synapse/logging/_remote.py23
-rw-r--r--synapse/push/emailpusher.py4
-rw-r--r--synapse/replication/http/login.py4
-rw-r--r--synapse/replication/tcp/handler.py48
-rw-r--r--synapse/replication/tcp/protocol.py33
-rw-r--r--synapse/replication/tcp/redis.py45
-rw-r--r--synapse/rest/admin/_base.py15
-rw-r--r--synapse/rest/admin/media.py29
-rw-r--r--synapse/rest/admin/rooms.py5
-rw-r--r--synapse/rest/admin/users.py5
-rw-r--r--synapse/rest/client/v1/login.py39
-rw-r--r--synapse/rest/client/v1/room.py5
-rw-r--r--synapse/rest/client/v2_alpha/groups.py105
-rw-r--r--synapse/rest/media/v1/config_resource.py3
-rw-r--r--synapse/rest/media/v1/media_repository.py3
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py3
-rw-r--r--synapse/rest/media/v1/upload_resource.py3
-rw-r--r--synapse/rest/synapse/client/saml2/response_resource.py10
-rw-r--r--synapse/server.py8
-rw-r--r--synapse/storage/databases/main/event_federation.py148
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py79
-rw-r--r--synapse/storage/databases/main/events_worker.py12
-rw-r--r--synapse/storage/databases/main/purge_events.py8
-rw-r--r--synapse/storage/databases/main/registration.py6
-rw-r--r--synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql17
-rw-r--r--synapse/storage/databases/main/transactions.py10
-rw-r--r--synapse/types.py57
42 files changed, 1118 insertions, 529 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py

index 968cf6f174..e10e33fd23 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py
@@ -164,7 +164,7 @@ class Auth: async def get_user_by_req( self, - request: Request, + request: SynapseRequest, allow_guest: bool = False, rights: str = "access", allow_expired: bool = False, diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index 7f5e449eb2..2bfb537c15 100644 --- a/synapse/config/oidc_config.py +++ b/synapse/config/oidc_config.py
@@ -237,7 +237,7 @@ class OIDCConfig(Config): # #- idp_id: github # idp_name: Github - # idp_brand: org.matrix.github + # idp_brand: github # discover: false # issuer: "https://github.com/" # client_id: "your-client-id" # TO BE FILLED @@ -272,7 +272,12 @@ OIDC_PROVIDER_CONFIG_SCHEMA = { "idp_icon": {"type": "string"}, "idp_brand": { "type": "string", - # MSC2758-style namespaced identifier + "minLength": 1, + "maxLength": 255, + "pattern": "^[a-z][a-z0-9_.-]*$", + }, + "idp_unstable_brand": { + "type": "string", "minLength": 1, "maxLength": 255, "pattern": "^[a-z][a-z0-9_.-]*$", @@ -466,6 +471,7 @@ def _parse_oidc_config_dict( idp_name=oidc_config.get("idp_name", "OIDC"), idp_icon=idp_icon, idp_brand=oidc_config.get("idp_brand"), + unstable_idp_brand=oidc_config.get("unstable_idp_brand"), discover=oidc_config.get("discover", True), issuer=oidc_config["issuer"], client_id=oidc_config["client_id"], @@ -512,6 +518,9 @@ class OidcProviderConfig: # Optional brand identifier for this IdP. idp_brand = attr.ib(type=Optional[str]) + # Optional brand identifier for the unstable API (see MSC2858). + unstable_idp_brand = attr.ib(type=Optional[str]) + # whether the OIDC discovery mechanism is used to discover endpoints discover = attr.ib(type=bool) diff --git a/synapse/config/stats.py b/synapse/config/stats.py
index b559bfa411..2258329a52 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py
@@ -13,10 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys +import logging from ._base import Config +ROOM_STATS_DISABLED_WARN = """\ +WARNING: room/user statistics have been disabled via the stats.enabled +configuration setting. This means that certain features (such as the room +directory) will not operate correctly. Future versions of Synapse may ignore +this setting. + +To fix this warning, remove the stats.enabled setting from your configuration +file. +--------------------------------------------------------------------------------""" + +logger = logging.getLogger(__name__) + class StatsConfig(Config): """Stats Configuration @@ -28,30 +40,29 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True self.stats_bucket_size = 86400 * 1000 - self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) self.stats_bucket_size = self.parse_duration( stats_config.get("bucket_size", "1d") ) - self.stats_retention = self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) + if not self.stats_enabled: + logger.warning(ROOM_STATS_DISABLED_WARN) def generate_config_section(self, config_dir_path, server_name, **kwargs): return """ - # Local statistics collection. Used in populating the room directory. + # Settings for local room and user statistics collection. See + # docs/room_and_user_statistics.md. # - # 'bucket_size' controls how large each statistics timeslice is. It can - # be defined in a human readable short form -- e.g. "1d", "1y". - # - # 'retention' controls how long historical statistics will be kept for. - # It can be defined in a human readable short form -- e.g. "1d", "1y". - # - # - #stats: - # enabled: true - # bucket_size: 1d - # retention: 1y + stats: + # Uncomment the following to disable room and user statistics. Note that doing + # so may cause certain features (such as the room directory) not to work + # correctly. + # + #enabled: false + + # The size of each timeslice in the room_stats_historical and + # user_stats_historical tables, as a time period. Defaults to "1d". + # + #bucket_size: 1h """ diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index 8cfc0bb3cb..a9185987a2 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py
@@ -15,6 +15,7 @@ # limitations under the License. import inspect +import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from synapse.rest.media.v1._base import FileInfo @@ -27,6 +28,8 @@ if TYPE_CHECKING: import synapse.events import synapse.server +logger = logging.getLogger(__name__) + class SpamChecker: def __init__(self, hs: "synapse.server.HomeServer"): @@ -190,6 +193,7 @@ class SpamChecker: email_threepid: Optional[dict], username: Optional[str], request_info: Collection[Tuple[str, str]], + auth_provider_id: Optional[str] = None, ) -> RegistrationBehaviour: """Checks if we should allow the given registration request. @@ -198,6 +202,9 @@ class SpamChecker: username: The request user name, if any request_info: List of tuples of user agent and IP that were used during the registration process. + auth_provider_id: The SSO IdP the user used, e.g "oidc", "saml", + "cas". If any. Note this does not include users registered + via a password provider. Returns: Enum for how the request should be handled @@ -208,9 +215,25 @@ class SpamChecker: # spam checker checker = getattr(spam_checker, "check_registration_for_spam", None) if checker: - behaviour = await maybe_awaitable( - checker(email_threepid, username, request_info) - ) + # Provide auth_provider_id if the function supports it + checker_args = inspect.signature(checker) + if len(checker_args.parameters) == 4: + d = checker( + email_threepid, + username, + request_info, + auth_provider_id, + ) + elif len(checker_args.parameters) == 3: + d = checker(email_threepid, username, request_info) + else: + logger.error( + "Invalid signature for %s.check_registration_for_spam. Denying registration", + spam_checker.__module__, + ) + return RegistrationBehaviour.DENY + + behaviour = await maybe_awaitable(d) assert isinstance(behaviour, RegistrationBehaviour) if behaviour != RegistrationBehaviour.ALLOW: return behaviour diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f8e368f81b..98caf2a1a4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -112,10 +112,11 @@ class FederationServer(FederationBase): # with FederationHandlerRegistry. hs.get_directory_handler() - self._federation_ratelimiter = hs.get_federation_ratelimiter() - self._server_linearizer = Linearizer("fed_server") - self._transaction_linearizer = Linearizer("fed_txn_handler") + + # origins that we are currently processing a transaction from. + # a dict from origin to txn id. + self._active_transactions = {} # type: Dict[str, str] # We cache results for transaction with the same ID self._transaction_resp_cache = ResponseCache( @@ -169,6 +170,33 @@ class FederationServer(FederationBase): logger.debug("[%s] Got transaction", transaction_id) + # Reject malformed transactions early: reject if too many PDUs/EDUs + if len(transaction.pdus) > 50 or ( # type: ignore + hasattr(transaction, "edus") and len(transaction.edus) > 100 # type: ignore + ): + logger.info("Transaction PDU or EDU count too large. Returning 400") + return 400, {} + + # we only process one transaction from each origin at a time. We need to do + # this check here, rather than in _on_incoming_transaction_inner so that we + # don't cache the rejection in _transaction_resp_cache (so that if the txn + # arrives again later, we can process it). + current_transaction = self._active_transactions.get(origin) + if current_transaction and current_transaction != transaction_id: + logger.warning( + "Received another txn %s from %s while still processing %s", + transaction_id, + origin, + current_transaction, + ) + return 429, { + "errcode": Codes.UNKNOWN, + "error": "Too many concurrent transactions", + } + + # CRITICAL SECTION: we must now not await until we populate _active_transactions + # in _on_incoming_transaction_inner. + # We wrap in a ResponseCache so that we de-duplicate retried # transactions. return await self._transaction_resp_cache.wrap( @@ -182,26 +210,18 @@ class FederationServer(FederationBase): async def _on_incoming_transaction_inner( self, origin: str, transaction: Transaction, request_time: int ) -> Tuple[int, Dict[str, Any]]: - # Use a linearizer to ensure that transactions from a remote are - # processed in order. - with await self._transaction_linearizer.queue(origin): - # We rate limit here *after* we've queued up the incoming requests, - # so that we don't fill up the ratelimiter with blocked requests. - # - # This is important as the ratelimiter allows N concurrent requests - # at a time, and only starts ratelimiting if there are more requests - # than that being processed at a time. If we queued up requests in - # the linearizer/response cache *after* the ratelimiting then those - # queued up requests would count as part of the allowed limit of N - # concurrent requests. - with self._federation_ratelimiter.ratelimit(origin) as d: - await d - - result = await self._handle_incoming_transaction( - origin, transaction, request_time - ) + # CRITICAL SECTION: the first thing we must do (before awaiting) is + # add an entry to _active_transactions. + assert origin not in self._active_transactions + self._active_transactions[origin] = transaction.transaction_id # type: ignore - return result + try: + result = await self._handle_incoming_transaction( + origin, transaction, request_time + ) + return result + finally: + del self._active_transactions[origin] async def _handle_incoming_transaction( self, origin: str, transaction: Transaction, request_time: int @@ -227,19 +247,6 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) # type: ignore - # Reject if PDU count > 50 or EDU count > 100 - if len(transaction.pdus) > 50 or ( # type: ignore - hasattr(transaction, "edus") and len(transaction.edus) > 100 # type: ignore - ): - - logger.info("Transaction PDU or EDU count too large. Returning 400") - - response = {} - await self.transaction_actions.set_response( - origin, transaction, 400, response - ) - return 400, response - # We process PDUs and EDUs in parallel. This is important as we don't # want to block things like to device messages from reaching clients # behind the potentially expensive handling of PDUs. @@ -335,34 +342,41 @@ class FederationServer(FederationBase): # impose a limit to avoid going too crazy with ram/cpu. async def process_pdus_for_room(room_id: str): - logger.debug("Processing PDUs for %s", room_id) - try: - await self.check_server_matches_acl(origin_host, room_id) - except AuthError as e: - logger.warning("Ignoring PDUs for room %s from banned server", room_id) - for pdu in pdus_by_room[room_id]: - event_id = pdu.event_id - pdu_results[event_id] = e.error_dict() - return + with nested_logging_context(room_id): + logger.debug("Processing PDUs for %s", room_id) - for pdu in pdus_by_room[room_id]: - event_id = pdu.event_id - with pdu_process_time.time(): - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore - ) + try: + await self.check_server_matches_acl(origin_host, room_id) + except AuthError as e: + logger.warning( + "Ignoring PDUs for room %s from banned server", room_id + ) + for pdu in pdus_by_room[room_id]: + event_id = pdu.event_id + pdu_results[event_id] = e.error_dict() + return + + for pdu in pdus_by_room[room_id]: + pdu_results[pdu.event_id] = await process_pdu(pdu) + + async def process_pdu(pdu: EventBase) -> JsonDict: + event_id = pdu.event_id + with pdu_process_time.time(): + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + return {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + return {"error": str(e)} + except Exception as e: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + return {"error": str(e)} await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -447,7 +461,7 @@ class FederationServer(FederationBase): async def _on_state_ids_request_compute(self, room_id, event_id): state_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id) - auth_chain_ids = await self.store.get_auth_chain_ids(state_ids) + auth_chain_ids = await self.store.get_auth_chain_ids(room_id, state_ids) return {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} async def _on_context_state_request_compute( @@ -460,7 +474,9 @@ class FederationServer(FederationBase): else: pdus = (await self.state.get_current_state(room_id)).values() - auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus]) + auth_chain = await self.store.get_auth_chain( + room_id, [pdu.event_id for pdu in pdus] + ) return { "pdus": [pdu.get_pdu_json() for pdu in pdus], @@ -864,7 +880,9 @@ class FederationHandlerRegistry: self.edu_handlers = ( {} ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] - self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]] + self.query_handlers = ( + {} + ) # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]] # Map from type to instance names that we should route EDU handling to. # We randomly choose one instance from the list to route to for each new @@ -898,7 +916,7 @@ class FederationHandlerRegistry: self.edu_handlers[edu_type] = handler def register_query_handler( - self, query_type: str, handler: Callable[[dict], defer.Deferred] + self, query_type: str, handler: Callable[[dict], Awaitable[JsonDict]] ): """Sets the handler callable that will be used to handle an incoming federation query of the given type. @@ -975,7 +993,7 @@ class FederationHandlerRegistry: # Oh well, let's just log and move on. logger.warning("No handler registered for EDU type %s", edu_type) - async def on_query(self, query_type: str, args: dict): + async def on_query(self, query_type: str, args: dict) -> JsonDict: handler = self.query_handlers.get(query_type) if handler: return await handler(args) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index deb519f3ef..cc0d765e5f 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -17,6 +17,7 @@ import datetime import logging from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast +import attr from prometheus_client import Counter from synapse.api.errors import ( @@ -93,6 +94,10 @@ class PerDestinationQueue: self._destination = destination self.transmission_loop_running = False + # Flag to signal to any running transmission loop that there is new data + # queued up to be sent. + self._new_data_to_send = False + # True whilst we are sending events that the remote homeserver missed # because it was unreachable. We start in this state so we can perform # catch-up at startup. @@ -108,7 +113,7 @@ class PerDestinationQueue: # destination (we are the only updater so this is safe) self._last_successful_stream_ordering = None # type: Optional[int] - # a list of pending PDUs + # a queue of pending PDUs self._pending_pdus = [] # type: List[EventBase] # XXX this is never actually used: see @@ -208,6 +213,10 @@ class PerDestinationQueue: transaction in the background. """ + # Mark that we (may) have new things to send, so that any running + # transmission loop will recheck whether there is stuff to send. + self._new_data_to_send = True + if self.transmission_loop_running: # XXX: this can get stuck on by a never-ending # request at which point pending_pdus just keeps growing. @@ -250,125 +259,41 @@ class PerDestinationQueue: pending_pdus = [] while True: - # We have to keep 2 free slots for presence and rr_edus - limit = MAX_EDUS_PER_TRANSACTION - 2 - - device_update_edus, dev_list_id = await self._get_device_update_edus( - limit - ) - - limit -= len(device_update_edus) - - ( - to_device_edus, - device_stream_id, - ) = await self._get_to_device_message_edus(limit) - - pending_edus = device_update_edus + to_device_edus - - # BEGIN CRITICAL SECTION - # - # In order to avoid a race condition, we need to make sure that - # the following code (from popping the queues up to the point - # where we decide if we actually have any pending messages) is - # atomic - otherwise new PDUs or EDUs might arrive in the - # meantime, but not get sent because we hold the - # transmission_loop_running flag. - - pending_pdus = self._pending_pdus + self._new_data_to_send = False - # We can only include at most 50 PDUs per transactions - pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:] + async with _TransactionQueueManager(self) as ( + pending_pdus, + pending_edus, + ): + if not pending_pdus and not pending_edus: + logger.debug("TX [%s] Nothing to send", self._destination) + + # If we've gotten told about new things to send during + # checking for things to send, we try looking again. + # Otherwise new PDUs or EDUs might arrive in the meantime, + # but not get sent because we hold the + # `transmission_loop_running` flag. + if self._new_data_to_send: + continue + else: + return - pending_edus.extend(self._get_rr_edus(force_flush=False)) - pending_presence = self._pending_presence - self._pending_presence = {} - if pending_presence: - pending_edus.append( - Edu( - origin=self._server_name, - destination=self._destination, - edu_type="m.presence", - content={ - "push": [ - format_user_presence_state( - presence, self._clock.time_msec() - ) - for presence in pending_presence.values() - ] - }, + if pending_pdus: + logger.debug( + "TX [%s] len(pending_pdus_by_dest[dest]) = %d", + self._destination, + len(pending_pdus), ) - ) - pending_edus.extend( - self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self._pending_edus_keyed - ): - _, val = self._pending_edus_keyed.popitem() - pending_edus.append(val) - - if pending_pdus: - logger.debug( - "TX [%s] len(pending_pdus_by_dest[dest]) = %d", - self._destination, - len(pending_pdus), + await self._transaction_manager.send_new_transaction( + self._destination, pending_pdus, pending_edus ) - if not pending_pdus and not pending_edus: - logger.debug("TX [%s] Nothing to send", self._destination) - self._last_device_stream_id = device_stream_id - return - - # if we've decided to send a transaction anyway, and we have room, we - # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self._get_rr_edus(force_flush=True)) - - # END CRITICAL SECTION - - success = await self._transaction_manager.send_new_transaction( - self._destination, pending_pdus, pending_edus - ) - if success: sent_transactions_counter.inc() sent_edus_counter.inc(len(pending_edus)) for edu in pending_edus: sent_edus_by_type.labels(edu.edu_type).inc() - # Remove the acknowledged device messages from the database - # Only bother if we actually sent some device messages - if to_device_edus: - await self._store.delete_device_msgs_for_remote( - self._destination, device_stream_id - ) - # also mark the device updates as sent - if device_update_edus: - logger.info( - "Marking as sent %r %r", self._destination, dev_list_id - ) - await self._store.mark_as_sent_devices_by_remote( - self._destination, dev_list_id - ) - - self._last_device_stream_id = device_stream_id - self._last_device_list_stream_id = dev_list_id - - if pending_pdus: - # we sent some PDUs and it was successful, so update our - # last_successful_stream_ordering in the destinations table. - final_pdu = pending_pdus[-1] - last_successful_stream_ordering = ( - final_pdu.internal_metadata.stream_ordering - ) - assert last_successful_stream_ordering - await self._store.set_destination_last_successful_stream_ordering( - self._destination, last_successful_stream_ordering - ) - else: - break except NotRetryingDestination as e: logger.debug( "TX [%s] not ready for retry yet (next retry at %s) - " @@ -401,7 +326,7 @@ class PerDestinationQueue: self._pending_presence = {} self._pending_rrs = {} - self._start_catching_up() + self._start_catching_up() except FederationDeniedError as e: logger.info(e) except HttpResponseException as e: @@ -412,7 +337,6 @@ class PerDestinationQueue: e, ) - self._start_catching_up() except RequestSendFailed as e: logger.warning( "TX [%s] Failed to send transaction: %s", self._destination, e @@ -422,16 +346,12 @@ class PerDestinationQueue: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) - - self._start_catching_up() except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) for p in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) - - self._start_catching_up() finally: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False @@ -499,13 +419,10 @@ class PerDestinationQueue: rooms = [p.room_id for p in catchup_pdus] logger.info("Catching up rooms to %s: %r", self._destination, rooms) - success = await self._transaction_manager.send_new_transaction( + await self._transaction_manager.send_new_transaction( self._destination, catchup_pdus, [] ) - if not success: - return - sent_transactions_counter.inc() final_pdu = catchup_pdus[-1] self._last_successful_stream_ordering = cast( @@ -584,3 +501,135 @@ class PerDestinationQueue: """ self._catching_up = True self._pending_pdus = [] + + +@attr.s(slots=True) +class _TransactionQueueManager: + """A helper async context manager for pulling stuff off the queues and + tracking what was last successfully sent, etc. + """ + + queue = attr.ib(type=PerDestinationQueue) + + _device_stream_id = attr.ib(type=Optional[int], default=None) + _device_list_id = attr.ib(type=Optional[int], default=None) + _last_stream_ordering = attr.ib(type=Optional[int], default=None) + _pdus = attr.ib(type=List[EventBase], factory=list) + + async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: + # First we calculate the EDUs we want to send, if any. + + # We start by fetching device related EDUs, i.e device updates and to + # device messages. We have to keep 2 free slots for presence and rr_edus. + limit = MAX_EDUS_PER_TRANSACTION - 2 + + device_update_edus, dev_list_id = await self.queue._get_device_update_edus( + limit + ) + + if device_update_edus: + self._device_list_id = dev_list_id + else: + self.queue._last_device_list_stream_id = dev_list_id + + limit -= len(device_update_edus) + + ( + to_device_edus, + device_stream_id, + ) = await self.queue._get_to_device_message_edus(limit) + + if to_device_edus: + self._device_stream_id = device_stream_id + else: + self.queue._last_device_stream_id = device_stream_id + + pending_edus = device_update_edus + to_device_edus + + # Now add the read receipt EDU. + pending_edus.extend(self.queue._get_rr_edus(force_flush=False)) + + # And presence EDU. + if self.queue._pending_presence: + pending_edus.append( + Edu( + origin=self.queue._server_name, + destination=self.queue._destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self.queue._clock.time_msec() + ) + for presence in self.queue._pending_presence.values() + ] + }, + ) + ) + self.queue._pending_presence = {} + + # Finally add any other types of EDUs if there is room. + pending_edus.extend( + self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) + ) + while ( + len(pending_edus) < MAX_EDUS_PER_TRANSACTION + and self.queue._pending_edus_keyed + ): + _, val = self.queue._pending_edus_keyed.popitem() + pending_edus.append(val) + + # Now we look for any PDUs to send, by getting up to 50 PDUs from the + # queue + self._pdus = self.queue._pending_pdus[:50] + + if not self._pdus and not pending_edus: + return [], [] + + # if we've decided to send a transaction anyway, and we have room, we + # may as well send any pending RRs + if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: + pending_edus.extend(self.queue._get_rr_edus(force_flush=True)) + + if self._pdus: + self._last_stream_ordering = self._pdus[ + -1 + ].internal_metadata.stream_ordering + assert self._last_stream_ordering + + return self._pdus, pending_edus + + async def __aexit__(self, exc_type, exc, tb): + if exc_type is not None: + # Failed to send transaction, so we bail out. + return + + # Successfully sent transactions, so we remove pending PDUs from the queue + if self._pdus: + self.queue._pending_pdus = self.queue._pending_pdus[len(self._pdus) :] + + # Succeeded to send the transaction so we record where we have sent up + # to in the various streams + + if self._device_stream_id: + await self.queue._store.delete_device_msgs_for_remote( + self.queue._destination, self._device_stream_id + ) + self.queue._last_device_stream_id = self._device_stream_id + + # also mark the device updates as sent + if self._device_list_id: + logger.info( + "Marking as sent %r %r", self.queue._destination, self._device_list_id + ) + await self.queue._store.mark_as_sent_devices_by_remote( + self.queue._destination, self._device_list_id + ) + self.queue._last_device_list_stream_id = self._device_list_id + + if self._last_stream_ordering: + # we sent some PDUs and it was successful, so update our + # last_successful_stream_ordering in the destinations table. + await self.queue._store.set_destination_last_successful_stream_ordering( + self.queue._destination, self._last_stream_ordering + ) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 2a9cd063c4..07b740c2f2 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py
@@ -69,15 +69,12 @@ class TransactionManager: destination: str, pdus: List[EventBase], edus: List[Edu], - ) -> bool: + ) -> None: """ Args: destination: The destination to send to (e.g. 'example.org') pdus: In-order list of PDUs to send edus: List of EDUs to send - - Returns: - True iff the transaction was successful """ # Make a transaction-sending opentracing span. This span follows on from @@ -96,8 +93,6 @@ class TransactionManager: edu.strip_context() with start_active_span_follows_from("send_transaction", span_contexts): - success = True - logger.debug("TX [%s] _attempt_new_transaction", destination) txn_id = str(self._next_txn_id) @@ -152,44 +147,29 @@ class TransactionManager: response = await self._transport_layer.send_transaction( transaction, json_data_cb ) - code = 200 except HttpResponseException as e: code = e.code response = e.response - if e.code in (401, 404, 429) or 500 <= e.code: - logger.info( - "TX [%s] {%s} got %d response", destination, txn_id, code - ) - raise e - - logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) - - if code == 200: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warning( - "TX [%s] {%s} Remote returned error for %s: %s", - destination, - txn_id, - e_id, - r, - ) - else: - for p in pdus: + set_tag(tags.ERROR, True) + + logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) + raise + + logger.info("TX [%s] {%s} got 200 response", destination, txn_id) + + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: logger.warning( - "TX [%s] {%s} Failed to send event %s", + "TX [%s] {%s} Remote returned error for %s: %s", destination, txn_id, - p.event_id, + e_id, + r, ) - success = False - if success and pdus and destination in self._federation_metrics_domains: + if pdus and destination in self._federation_metrics_domains: last_pdu = pdus[-1] last_pdu_ts_metric.labels(server_name=destination).set( last_pdu.origin_server_ts / 1000 ) - - set_tag(tags.ERROR, not success) - return success diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index bec0c615d4..fb5f8118f0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py
@@ -337,7 +337,8 @@ class AuthHandler(BaseHandler): user is too high to proceed """ - + if not requester.access_token_id: + raise ValueError("Cannot validate a user without an access token") if self._ui_auth_session_timeout: last_validated = await self.store.get_access_token_last_validated( requester.access_token_id @@ -1213,7 +1214,7 @@ class AuthHandler(BaseHandler): async def delete_access_tokens_for_user( self, user_id: str, - except_token_id: Optional[str] = None, + except_token_id: Optional[int] = None, device_id: Optional[str] = None, ): """Invalidate access tokens belonging to a user diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 04972f9cf0..cb67589f7d 100644 --- a/synapse/handlers/cas_handler.py +++ b/synapse/handlers/cas_handler.py
@@ -83,6 +83,7 @@ class CasHandler: # the SsoIdentityProvider protocol type. self.idp_icon = None self.idp_brand = None + self.unstable_idp_brand = None self._sso_handler = hs.get_sso_handler() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2ead626a4d..598a66f74c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -201,7 +201,7 @@ class FederationHandler(BaseHandler): or pdu.internal_metadata.is_outlier() ) if already_seen: - logger.debug("[%s %s]: Already seen pdu", room_id, event_id) + logger.debug("Already seen pdu") return # do some initial sanity-checking of the event. In particular, make @@ -210,18 +210,14 @@ class FederationHandler(BaseHandler): try: self._sanity_check_event(pdu) except SynapseError as err: - logger.warning( - "[%s %s] Received event failed sanity checks", room_id, event_id - ) + logger.warning("Received event failed sanity checks") raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id) # If we are currently in the process of joining this room, then we # queue up events for later processing. if room_id in self.room_queues: logger.info( - "[%s %s] Queuing PDU from %s for now: join in progress", - room_id, - event_id, + "Queuing PDU from %s for now: join in progress", origin, ) self.room_queues[room_id].append((pdu, origin)) @@ -236,9 +232,7 @@ class FederationHandler(BaseHandler): is_in_room = await self.auth.check_host_in_room(room_id, self.server_name) if not is_in_room: logger.info( - "[%s %s] Ignoring PDU from %s as we're not in the room", - room_id, - event_id, + "Ignoring PDU from %s as we're not in the room", origin, ) return None @@ -250,7 +244,7 @@ class FederationHandler(BaseHandler): # We only backfill backwards to the min depth. min_depth = await self.get_min_depth_for_context(pdu.room_id) - logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth) + logger.debug("min_depth: %d", min_depth) prevs = set(pdu.prev_event_ids()) seen = await self.store.have_events_in_timeline(prevs) @@ -267,17 +261,13 @@ class FederationHandler(BaseHandler): # If we're missing stuff, ensure we only fetch stuff one # at a time. logger.info( - "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s", - room_id, - event_id, + "Acquiring room lock to fetch %d missing prev_events: %s", len(missing_prevs), shortstr(missing_prevs), ) with (await self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "[%s %s] Acquired room lock to fetch %d missing prev_events", - room_id, - event_id, + "Acquired room lock to fetch %d missing prev_events", len(missing_prevs), ) @@ -297,9 +287,7 @@ class FederationHandler(BaseHandler): if not prevs - seen: logger.info( - "[%s %s] Found all missing prev_events", - room_id, - event_id, + "Found all missing prev_events", ) if prevs - seen: @@ -329,9 +317,7 @@ class FederationHandler(BaseHandler): if sent_to_us_directly: logger.warning( - "[%s %s] Rejecting: failed to fetch %d prev events: %s", - room_id, - event_id, + "Rejecting: failed to fetch %d prev events: %s", len(prevs - seen), shortstr(prevs - seen), ) @@ -367,17 +353,16 @@ class FederationHandler(BaseHandler): # Ask the remote server for the states we don't # know about for p in prevs - seen: - logger.info( - "Requesting state at missing prev_event %s", - event_id, - ) + logger.info("Requesting state after missing prev_event %s", p) with nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped # by the get_pdu_cache in federation_client. - (remote_state, _,) = await self._get_state_for_room( - origin, room_id, p, include_event_in_state=True + remote_state = ( + await self._get_state_after_missing_prev_event( + origin, room_id, p + ) ) remote_state_map = { @@ -414,10 +399,7 @@ class FederationHandler(BaseHandler): state = [event_map[e] for e in state_map.values()] except Exception: logger.warning( - "[%s %s] Error attempting to resolve state at missing " - "prev_events", - room_id, - event_id, + "Error attempting to resolve state at missing " "prev_events", exc_info=True, ) raise FederationError( @@ -454,9 +436,7 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "[%s %s]: Requesting missing events between %s and %s", - room_id, - event_id, + "Requesting missing events between %s and %s", shortstr(latest), event_id, ) @@ -523,15 +503,11 @@ class FederationHandler(BaseHandler): # We failed to get the missing events, but since we need to handle # the case of `get_missing_events` not returning the necessary # events anyway, it is safe to simply log the error and continue. - logger.warning( - "[%s %s]: Failed to get prev_events: %s", room_id, event_id, e - ) + logger.warning("Failed to get prev_events: %s", e) return logger.info( - "[%s %s]: Got %d prev_events: %s", - room_id, - event_id, + "Got %d prev_events: %s", len(missing_events), shortstr(missing_events), ) @@ -542,9 +518,7 @@ class FederationHandler(BaseHandler): for ev in missing_events: logger.info( - "[%s %s] Handling received prev_event %s", - room_id, - event_id, + "Handling received prev_event %s", ev.event_id, ) with nested_logging_context(ev.event_id): @@ -553,9 +527,7 @@ class FederationHandler(BaseHandler): except FederationError as e: if e.code == 403: logger.warning( - "[%s %s] Received prev_event %s failed history check.", - room_id, - event_id, + "Received prev_event %s failed history check.", ev.event_id, ) else: @@ -566,7 +538,6 @@ class FederationHandler(BaseHandler): destination: str, room_id: str, event_id: str, - include_event_in_state: bool = False, ) -> Tuple[List[EventBase], List[EventBase]]: """Requests all of the room state at a given event from a remote homeserver. @@ -574,11 +545,9 @@ class FederationHandler(BaseHandler): destination: The remote homeserver to query for the state. room_id: The id of the room we're interested in. event_id: The id of the event we want the state at. - include_event_in_state: if true, the event itself will be included in the - returned state event list. Returns: - A list of events in the state, possibly including the event itself, and + A list of events in the state, not including the event itself, and a list of events in the auth chain for the given event. """ ( @@ -590,9 +559,6 @@ class FederationHandler(BaseHandler): desired_events = set(state_event_ids + auth_event_ids) - if include_event_in_state: - desired_events.add(event_id) - event_map = await self._get_events_from_store_or_dest( destination, room_id, desired_events ) @@ -609,13 +575,6 @@ class FederationHandler(BaseHandler): event_map[e_id] for e_id in state_event_ids if e_id in event_map ] - if include_event_in_state: - remote_event = event_map.get(event_id) - if not remote_event: - raise Exception("Unable to get missing prev_event %s" % (event_id,)) - if remote_event.is_state() and remote_event.rejected_reason is None: - remote_state.append(remote_event) - auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] auth_chain.sort(key=lambda e: e.depth) @@ -689,6 +648,131 @@ class FederationHandler(BaseHandler): return fetched_events + async def _get_state_after_missing_prev_event( + self, + destination: str, + room_id: str, + event_id: str, + ) -> List[EventBase]: + """Requests all of the room state at a given event from a remote homeserver. + + Args: + destination: The remote homeserver to query for the state. + room_id: The id of the room we're interested in. + event_id: The id of the event we want the state at. + + Returns: + A list of events in the state, including the event itself + """ + # TODO: This function is basically the same as _get_state_for_room. Can + # we make backfill() use it, rather than having two code paths? I think the + # only difference is that backfill() persists the prev events separately. + + ( + state_event_ids, + auth_event_ids, + ) = await self.federation_client.get_room_state_ids( + destination, room_id, event_id=event_id + ) + + logger.debug( + "state_ids returned %i state events, %i auth events", + len(state_event_ids), + len(auth_event_ids), + ) + + # start by just trying to fetch the events from the store + desired_events = set(state_event_ids) + desired_events.add(event_id) + logger.debug("Fetching %i events from cache/store", len(desired_events)) + fetched_events = await self.store.get_events( + desired_events, allow_rejected=True + ) + + missing_desired_events = desired_events - fetched_events.keys() + logger.debug( + "We are missing %i events (got %i)", + len(missing_desired_events), + len(fetched_events), + ) + + # We probably won't need most of the auth events, so let's just check which + # we have for now, rather than thrashing the event cache with them all + # unnecessarily. + + # TODO: we probably won't actually need all of the auth events, since we + # already have a bunch of the state events. It would be nice if the + # federation api gave us a way of finding out which we actually need. + + missing_auth_events = set(auth_event_ids) - fetched_events.keys() + missing_auth_events.difference_update( + await self.store.have_seen_events(missing_auth_events) + ) + logger.debug("We are also missing %i auth events", len(missing_auth_events)) + + missing_events = missing_desired_events | missing_auth_events + logger.debug("Fetching %i events from remote", len(missing_events)) + await self._get_events_and_persist( + destination=destination, room_id=room_id, events=missing_events + ) + + # we need to make sure we re-load from the database to get the rejected + # state correct. + fetched_events.update( + (await self.store.get_events(missing_desired_events, allow_rejected=True)) + ) + + # check for events which were in the wrong room. + # + # this can happen if a remote server claims that the state or + # auth_events at an event in room A are actually events in room B + + bad_events = [ + (event_id, event.room_id) + for event_id, event in fetched_events.items() + if event.room_id != room_id + ] + + for bad_event_id, bad_room_id in bad_events: + # This is a bogus situation, but since we may only discover it a long time + # after it happened, we try our best to carry on, by just omitting the + # bad events from the returned state set. + logger.warning( + "Remote server %s claims event %s in room %s is an auth/state " + "event in room %s", + destination, + bad_event_id, + bad_room_id, + room_id, + ) + + del fetched_events[bad_event_id] + + # if we couldn't get the prev event in question, that's a problem. + remote_event = fetched_events.get(event_id) + if not remote_event: + raise Exception("Unable to get missing prev_event %s" % (event_id,)) + + # missing state at that event is a warning, not a blocker + # XXX: this doesn't sound right? it means that we'll end up with incomplete + # state. + failed_to_fetch = desired_events - fetched_events.keys() + if failed_to_fetch: + logger.warning( + "Failed to fetch missing state events for %s %s", + event_id, + failed_to_fetch, + ) + + remote_state = [ + fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events + ] + + if remote_event.is_state() and remote_event.rejected_reason is None: + remote_state.append(remote_event) + + return remote_state + async def _process_received_pdu( self, origin: str, @@ -707,10 +791,7 @@ class FederationHandler(BaseHandler): (ie, we are missing one or more prev_events), the resolved state at the event """ - room_id = event.room_id - event_id = event.event_id - - logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) + logger.debug("Processing event: %s", event) try: await self._handle_new_event(origin, event, state=state) @@ -871,7 +952,6 @@ class FederationHandler(BaseHandler): destination=dest, room_id=room_id, event_id=e_id, - include_event_in_state=False, ) auth_events.update({a.event_id: a for a in auth}) auth_events.update({s.event_id: s for s in state}) @@ -1317,7 +1397,7 @@ class FederationHandler(BaseHandler): async def on_event_auth(self, event_id: str) -> List[EventBase]: event = await self.store.get_event(event_id) auth = await self.store.get_auth_chain( - list(event.auth_event_ids()), include_given=True + event.room_id, list(event.auth_event_ids()), include_given=True ) return list(auth) @@ -1580,7 +1660,7 @@ class FederationHandler(BaseHandler): prev_state_ids = await context.get_prev_state_ids() state_ids = list(prev_state_ids.values()) - auth_chain = await self.store.get_auth_chain(state_ids) + auth_chain = await self.store.get_auth_chain(event.room_id, state_ids) state = await self.store.get_events(list(prev_state_ids.values())) @@ -2219,7 +2299,7 @@ class FederationHandler(BaseHandler): # Now get the current auth_chain for the event. local_auth_chain = await self.store.get_auth_chain( - list(event.auth_event_ids()), include_given=True + room_id, list(event.auth_event_ids()), include_given=True ) # TODO: Check if we would now reject event_id. If so we need to tell diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 825fadb76f..6d8551a6d6 100644 --- a/synapse/handlers/oidc_handler.py +++ b/synapse/handlers/oidc_handler.py
@@ -29,11 +29,13 @@ from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url from jinja2 import Environment, Template from pymacaroons.exceptions import ( MacaroonDeserializationException, + MacaroonInitException, MacaroonInvalidSignatureException, ) from typing_extensions import TypedDict from twisted.web.client import readBody +from twisted.web.http_headers import Headers from synapse.config import ConfigError from synapse.config.oidc_config import ( @@ -216,7 +218,7 @@ class OidcHandler: session_data = self._token_generator.verify_oidc_session_token( session, state ) - except (MacaroonDeserializationException, KeyError) as e: + except (MacaroonInitException, MacaroonDeserializationException, KeyError) as e: logger.exception("Invalid session for OIDC callback") self._sso_handler.render_error(request, "invalid_session", str(e)) return @@ -329,6 +331,9 @@ class OidcProvider: # optional brand identifier for this auth provider self.idp_brand = provider.idp_brand + # Optional brand identifier for the unstable API (see MSC2858). + self.unstable_idp_brand = provider.unstable_idp_brand + self._sso_handler = hs.get_sso_handler() self._sso_handler.register_identity_provider(self) @@ -538,7 +543,7 @@ class OidcProvider: """ metadata = await self.load_metadata() token_endpoint = metadata.get("token_endpoint") - headers = { + raw_headers = { "Content-Type": "application/x-www-form-urlencoded", "User-Agent": self._http_client.user_agent, "Accept": "application/json", @@ -552,10 +557,10 @@ class OidcProvider: body = urlencode(args, True) # Fill the body/headers with credentials - uri, headers, body = self._client_auth.prepare( - method="POST", uri=token_endpoint, headers=headers, body=body + uri, raw_headers, body = self._client_auth.prepare( + method="POST", uri=token_endpoint, headers=raw_headers, body=body ) - headers = {k: [v] for (k, v) in headers.items()} + headers = Headers({k: [v] for (k, v) in raw_headers.items()}) # Do the actual request # We're not using the SimpleHttpClient util methods as we don't want to diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index b66f8756b8..1abc8875cb 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py
@@ -16,7 +16,7 @@ """Contains functions for registering clients.""" import logging -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple from prometheus_client import Counter @@ -82,6 +82,7 @@ class RegistrationHandler(BaseHandler): ) else: self.device_handler = hs.get_device_handler() + self._register_device_client = self.register_device_inner self.pusher_pool = hs.get_pusherpool() self.session_lifetime = hs.config.session_lifetime @@ -197,8 +198,7 @@ class RegistrationHandler(BaseHandler): admin api, otherwise False. user_agent_ips: Tuples of IP addresses and user-agents used during the registration process. - auth_provider_id: The SSO IdP the user used, if any (just used for the - prometheus metrics). + auth_provider_id: The SSO IdP the user used, if any. Returns: The registered user_id. Raises: @@ -210,6 +210,7 @@ class RegistrationHandler(BaseHandler): threepid, localpart, user_agent_ips or [], + auth_provider_id=auth_provider_id, ) if result == RegistrationBehaviour.DENY: @@ -678,17 +679,35 @@ class RegistrationHandler(BaseHandler): Returns: Tuple of device ID and access token """ + res = await self._register_device_client( + user_id=user_id, + device_id=device_id, + initial_display_name=initial_display_name, + is_guest=is_guest, + is_appservice_ghost=is_appservice_ghost, + ) - if self.hs.config.worker_app: - r = await self._register_device_client( - user_id=user_id, - device_id=device_id, - initial_display_name=initial_display_name, - is_guest=is_guest, - is_appservice_ghost=is_appservice_ghost, - ) - return r["device_id"], r["access_token"] + login_counter.labels( + guest=is_guest, + auth_provider=(auth_provider_id or ""), + ).inc() + + return res["device_id"], res["access_token"] + async def register_device_inner( + self, + user_id: str, + device_id: Optional[str], + initial_display_name: Optional[str], + is_guest: bool = False, + is_appservice_ghost: bool = False, + ) -> Dict[str, str]: + """Helper for register_device + + Does the bits that need doing on the main process. Not for use outside this + class and RegisterDeviceReplicationServlet. + """ + assert not self.hs.config.worker_app valid_until_ms = None if self.session_lifetime is not None: if is_guest: @@ -713,12 +732,7 @@ class RegistrationHandler(BaseHandler): is_appservice_ghost=is_appservice_ghost, ) - login_counter.labels( - guest=is_guest, - auth_provider=(auth_provider_id or ""), - ).inc() - - return (registered_device_id, access_token) + return {"device_id": registered_device_id, "access_token": access_token} async def post_registration_actions( self, user_id: str, auth_result: dict, access_token: Optional[str] diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index a9645b77d8..ec2ba11c75 100644 --- a/synapse/handlers/saml_handler.py +++ b/synapse/handlers/saml_handler.py
@@ -81,6 +81,7 @@ class SamlHandler(BaseHandler): # the SsoIdentityProvider protocol type. self.idp_icon = None self.idp_brand = None + self.unstable_idp_brand = None # a map from saml session id to Saml2SessionData object self._outstanding_requests_dict = {} # type: Dict[str, Saml2SessionData] diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 6ef459acff..415b1c2d17 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py
@@ -98,6 +98,11 @@ class SsoIdentityProvider(Protocol): """Optional branding identifier""" return None + @property + def unstable_idp_brand(self) -> Optional[str]: + """Optional brand identifier for the unstable API (see MSC2858).""" + return None + @abc.abstractmethod async def handle_redirect_request( self, diff --git a/synapse/http/client.py b/synapse/http/client.py
index af34d583ad..1e01e0a9f2 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py
@@ -39,12 +39,15 @@ from zope.interface import implementer, provider from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE from twisted.internet import defer, error as twisted_error, protocol, ssl +from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.interfaces import ( IAddress, IHostResolution, IReactorPluggableNameResolver, IResolutionReceiver, + ITCPTransport, ) +from twisted.internet.protocol import connectionDone from twisted.internet.task import Cooperator from twisted.python.failure import Failure from twisted.web._newclient import ResponseDone @@ -56,7 +59,13 @@ from twisted.web.client import ( ) from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers -from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse +from twisted.web.iweb import ( + UNKNOWN_LENGTH, + IAgent, + IBodyProducer, + IPolicyForHTTPS, + IResponse, +) from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri @@ -151,16 +160,17 @@ class _IPBlacklistingResolver: def resolveHostName( self, recv: IResolutionReceiver, hostname: str, portNumber: int = 0 ) -> IResolutionReceiver: - - r = recv() addresses = [] # type: List[IAddress] def _callback() -> None: - r.resolutionBegan(None) - has_bad_ip = False - for i in addresses: - ip_address = IPAddress(i.host) + for address in addresses: + # We only expect IPv4 and IPv6 addresses since only A/AAAA lookups + # should go through this path. + if not isinstance(address, (IPv4Address, IPv6Address)): + continue + + ip_address = IPAddress(address.host) if check_against_blacklist( ip_address, self._ip_whitelist, self._ip_blacklist @@ -175,15 +185,15 @@ class _IPBlacklistingResolver: # request, but all we can really do from here is claim that there were no # valid results. if not has_bad_ip: - for i in addresses: - r.addressResolved(i) - r.resolutionComplete() + for address in addresses: + recv.addressResolved(address) + recv.resolutionComplete() @provider(IResolutionReceiver) class EndpointReceiver: @staticmethod def resolutionBegan(resolutionInProgress: IHostResolution) -> None: - pass + recv.resolutionBegan(resolutionInProgress) @staticmethod def addressResolved(address: IAddress) -> None: @@ -197,7 +207,7 @@ class _IPBlacklistingResolver: EndpointReceiver, hostname, portNumber=portNumber ) - return r + return recv @implementer(ISynapseReactor) @@ -346,7 +356,7 @@ class SimpleHttpClient: contextFactory=self.hs.get_http_client_context_factory(), pool=pool, use_proxy=use_proxy, - ) + ) # type: IAgent if self._ip_blacklist: # If we have an IP blacklist, we then install the blacklisting Agent @@ -752,6 +762,8 @@ class BodyExceededMaxSize(Exception): class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol): """A protocol which immediately errors upon receiving data.""" + transport = None # type: Optional[ITCPTransport] + def __init__(self, deferred: defer.Deferred): self.deferred = deferred @@ -763,18 +775,21 @@ class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol): self.deferred.errback(BodyExceededMaxSize()) # Close the connection (forcefully) since all the data will get # discarded anyway. + assert self.transport is not None self.transport.abortConnection() def dataReceived(self, data: bytes) -> None: self._maybe_fail() - def connectionLost(self, reason: Failure) -> None: + def connectionLost(self, reason: Failure = connectionDone) -> None: self._maybe_fail() class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): """A protocol which reads body to a stream, erroring if the body exceeds a maximum size.""" + transport = None # type: Optional[ITCPTransport] + def __init__( self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] ): @@ -797,9 +812,10 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): self.deferred.errback(BodyExceededMaxSize()) # Close the connection (forcefully) since all the data will get # discarded anyway. + assert self.transport is not None self.transport.abortConnection() - def connectionLost(self, reason: Failure) -> None: + def connectionLost(self, reason: Failure = connectionDone) -> None: # If the maximum size was already exceeded, there's nothing to do. if self.deferred.called: return @@ -868,6 +884,7 @@ def encode_query_args(args: Optional[Mapping[str, Union[str, List[str]]]]) -> by return query_str.encode("utf8") +@implementer(IPolicyForHTTPS) class InsecureInterceptableContextFactory(ssl.ContextFactory): """ Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain. diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 174ca7be5a..643492ceaf 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py
@@ -32,8 +32,9 @@ from twisted.internet.endpoints import ( TCP4ClientEndpoint, TCP6ClientEndpoint, ) -from twisted.internet.interfaces import IPushProducer, IStreamClientEndpoint, ITransport +from twisted.internet.interfaces import IPushProducer, IStreamClientEndpoint from twisted.internet.protocol import Factory, Protocol +from twisted.internet.tcp import Connection from twisted.python.failure import Failure logger = logging.getLogger(__name__) @@ -52,7 +53,9 @@ class LogProducer: format: A callable to format the log record to a string. """ - transport = attr.ib(type=ITransport) + # This is essentially ITCPTransport, but that is missing certain fields + # (connected and registerProducer) which are part of the implementation. + transport = attr.ib(type=Connection) _format = attr.ib(type=Callable[[logging.LogRecord], str]) _buffer = attr.ib(type=deque) _paused = attr.ib(default=False, type=bool, init=False) @@ -149,8 +152,6 @@ class RemoteHandler(logging.Handler): if self._connection_waiter: return - self._connection_waiter = self._service.whenConnected(failAfterFailures=1) - def fail(failure: Failure) -> None: # If the Deferred was cancelled (e.g. during shutdown) do not try to # reconnect (this will cause an infinite loop of errors). @@ -163,9 +164,13 @@ class RemoteHandler(logging.Handler): self._connect() def writer(result: Protocol) -> None: + # Force recognising transport as a Connection and not the more + # generic ITransport. + transport = result.transport # type: Connection # type: ignore + # We have a connection. If we already have a producer, and its # transport is the same, just trigger a resumeProducing. - if self._producer and result.transport is self._producer.transport: + if self._producer and transport is self._producer.transport: self._producer.resumeProducing() self._connection_waiter = None return @@ -177,14 +182,16 @@ class RemoteHandler(logging.Handler): # Make a new producer and start it. self._producer = LogProducer( buffer=self._buffer, - transport=result.transport, + transport=transport, format=self.format, ) - result.transport.registerProducer(self._producer, True) + transport.registerProducer(self._producer, True) self._producer.resumeProducing() self._connection_waiter = None - self._connection_waiter.addCallbacks(writer, fail) + deferred = self._service.whenConnected(failAfterFailures=1) # type: Deferred + deferred.addCallbacks(writer, fail) + self._connection_waiter = deferred def _handle_pressure(self) -> None: """ diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 5fec2aaf5d..3dc06a79e8 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py
@@ -16,8 +16,8 @@ import logging from typing import TYPE_CHECKING, Dict, List, Optional -from twisted.internet.base import DelayedCall from twisted.internet.error import AlreadyCalled, AlreadyCancelled +from twisted.internet.interfaces import IDelayedCall from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import Pusher, PusherConfig, ThrottleParams @@ -66,7 +66,7 @@ class EmailPusher(Pusher): self.store = self.hs.get_datastore() self.email = pusher_config.pushkey - self.timed_call = None # type: Optional[DelayedCall] + self.timed_call = None # type: Optional[IDelayedCall] self.throttle_params = {} # type: Dict[str, ThrottleParams] self._inited = False diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index 36071feb36..4ec1bfa6ea 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py
@@ -61,7 +61,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): is_guest = content["is_guest"] is_appservice_ghost = content["is_appservice_ghost"] - device_id, access_token = await self.registration_handler.register_device( + res = await self.registration_handler.register_device_inner( user_id, device_id, initial_display_name, @@ -69,7 +69,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint): is_appservice_ghost=is_appservice_ghost, ) - return 200, {"device_id": device_id, "access_token": access_token} + return 200, res def register_servlets(hs, http_server): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index a7245da152..a8894beadf 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -48,7 +48,7 @@ from synapse.replication.tcp.commands import ( UserIpCommand, UserSyncCommand, ) -from synapse.replication.tcp.protocol import AbstractConnection +from synapse.replication.tcp.protocol import IReplicationConnection from synapse.replication.tcp.streams import ( STREAMS_MAP, AccountDataStream, @@ -82,7 +82,7 @@ user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache" # the type of the entries in _command_queues_by_stream _StreamCommandQueue = Deque[ - Tuple[Union[RdataCommand, PositionCommand], AbstractConnection] + Tuple[Union[RdataCommand, PositionCommand], IReplicationConnection] ] @@ -174,7 +174,7 @@ class ReplicationCommandHandler: # The currently connected connections. (The list of places we need to send # outgoing replication commands to.) - self._connections = [] # type: List[AbstractConnection] + self._connections = [] # type: List[IReplicationConnection] LaterGauge( "synapse_replication_tcp_resource_total_connections", @@ -197,7 +197,7 @@ class ReplicationCommandHandler: # For each connection, the incoming stream names that have received a POSITION # from that connection. - self._streams_by_connection = {} # type: Dict[AbstractConnection, Set[str]] + self._streams_by_connection = {} # type: Dict[IReplicationConnection, Set[str]] LaterGauge( "synapse_replication_tcp_command_queue", @@ -220,7 +220,7 @@ class ReplicationCommandHandler: self._server_notices_sender = hs.get_server_notices_sender() def _add_command_to_stream_queue( - self, conn: AbstractConnection, cmd: Union[RdataCommand, PositionCommand] + self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand] ) -> None: """Queue the given received command for processing @@ -267,7 +267,7 @@ class ReplicationCommandHandler: async def _process_command( self, cmd: Union[PositionCommand, RdataCommand], - conn: AbstractConnection, + conn: IReplicationConnection, stream_name: str, ) -> None: if isinstance(cmd, PositionCommand): @@ -302,7 +302,7 @@ class ReplicationCommandHandler: hs, outbound_redis_connection ) hs.get_reactor().connectTCP( - hs.config.redis.redis_host, + hs.config.redis.redis_host.encode(), hs.config.redis.redis_port, self._factory, ) @@ -311,7 +311,7 @@ class ReplicationCommandHandler: self._factory = DirectTcpReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - hs.get_reactor().connectTCP(host, port, self._factory) + hs.get_reactor().connectTCP(host.encode(), port, self._factory) def get_streams(self) -> Dict[str, Stream]: """Get a map from stream name to all streams.""" @@ -321,10 +321,10 @@ class ReplicationCommandHandler: """Get a list of streams that this instances replicates.""" return self._streams_to_replicate - def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): + def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand): self.send_positions_to_connection(conn) - def send_positions_to_connection(self, conn: AbstractConnection): + def send_positions_to_connection(self, conn: IReplicationConnection): """Send current position of all streams this process is source of to the connection. """ @@ -347,7 +347,7 @@ class ReplicationCommandHandler: ) def on_USER_SYNC( - self, conn: AbstractConnection, cmd: UserSyncCommand + self, conn: IReplicationConnection, cmd: UserSyncCommand ) -> Optional[Awaitable[None]]: user_sync_counter.inc() @@ -359,21 +359,23 @@ class ReplicationCommandHandler: return None def on_CLEAR_USER_SYNC( - self, conn: AbstractConnection, cmd: ClearUserSyncsCommand + self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand ) -> Optional[Awaitable[None]]: if self._is_master: return self._presence_handler.update_external_syncs_clear(cmd.instance_id) else: return None - def on_FEDERATION_ACK(self, conn: AbstractConnection, cmd: FederationAckCommand): + def on_FEDERATION_ACK( + self, conn: IReplicationConnection, cmd: FederationAckCommand + ): federation_ack_counter.inc() if self._federation_sender: self._federation_sender.federation_ack(cmd.instance_name, cmd.token) def on_USER_IP( - self, conn: AbstractConnection, cmd: UserIpCommand + self, conn: IReplicationConnection, cmd: UserIpCommand ) -> Optional[Awaitable[None]]: user_ip_cache_counter.inc() @@ -395,7 +397,7 @@ class ReplicationCommandHandler: assert self._server_notices_sender is not None await self._server_notices_sender.on_user_ip(cmd.user_id) - def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand): + def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand): if cmd.instance_name == self._instance_name: # Ignore RDATA that are just our own echoes return @@ -412,7 +414,7 @@ class ReplicationCommandHandler: self._add_command_to_stream_queue(conn, cmd) async def _process_rdata( - self, stream_name: str, conn: AbstractConnection, cmd: RdataCommand + self, stream_name: str, conn: IReplicationConnection, cmd: RdataCommand ) -> None: """Process an RDATA command @@ -486,7 +488,7 @@ class ReplicationCommandHandler: stream_name, instance_name, token, rows ) - def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand): + def on_POSITION(self, conn: IReplicationConnection, cmd: PositionCommand): if cmd.instance_name == self._instance_name: # Ignore POSITION that are just our own echoes return @@ -496,7 +498,7 @@ class ReplicationCommandHandler: self._add_command_to_stream_queue(conn, cmd) async def _process_position( - self, stream_name: str, conn: AbstractConnection, cmd: PositionCommand + self, stream_name: str, conn: IReplicationConnection, cmd: PositionCommand ) -> None: """Process a POSITION command @@ -553,7 +555,9 @@ class ReplicationCommandHandler: self._streams_by_connection.setdefault(conn, set()).add(stream_name) - def on_REMOTE_SERVER_UP(self, conn: AbstractConnection, cmd: RemoteServerUpCommand): + def on_REMOTE_SERVER_UP( + self, conn: IReplicationConnection, cmd: RemoteServerUpCommand + ): """"Called when get a new REMOTE_SERVER_UP command.""" self._replication_data_handler.on_remote_server_up(cmd.data) @@ -576,7 +580,7 @@ class ReplicationCommandHandler: # between two instances, but that is not currently supported). self.send_command(cmd, ignore_conn=conn) - def new_connection(self, connection: AbstractConnection): + def new_connection(self, connection: IReplicationConnection): """Called when we have a new connection.""" self._connections.append(connection) @@ -603,7 +607,7 @@ class ReplicationCommandHandler: UserSyncCommand(self._instance_id, user_id, True, now) ) - def lost_connection(self, connection: AbstractConnection): + def lost_connection(self, connection: IReplicationConnection): """Called when a connection is closed/lost.""" # we no longer need _streams_by_connection for this connection. streams = self._streams_by_connection.pop(connection, None) @@ -624,7 +628,7 @@ class ReplicationCommandHandler: return bool(self._connections) def send_command( - self, cmd: Command, ignore_conn: Optional[AbstractConnection] = None + self, cmd: Command, ignore_conn: Optional[IReplicationConnection] = None ): """Send a command to all connected connections. diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index e0b4ad314d..825900f64c 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -46,7 +46,6 @@ indicate which side is sending, these are *not* included on the wire:: > ERROR server stopping * connection closed by server * """ -import abc import fcntl import logging import struct @@ -54,8 +53,10 @@ from inspect import isawaitable from typing import TYPE_CHECKING, List, Optional from prometheus_client import Counter +from zope.interface import Interface, implementer from twisted.internet import task +from twisted.internet.tcp import Connection from twisted.protocols.basic import LineOnlyReceiver from twisted.python.failure import Failure @@ -121,6 +122,14 @@ class ConnectionStates: CLOSED = "closed" +class IReplicationConnection(Interface): + """An interface for replication connections.""" + + def send_command(cmd: Command): + """Send the command down the connection""" + + +@implementer(IReplicationConnection) class BaseReplicationStreamProtocol(LineOnlyReceiver): """Base replication protocol shared between client and server. @@ -137,6 +146,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): (if they send a `PING` command) """ + # The transport is going to be an ITCPTransport, but that doesn't have the + # (un)registerProducer methods, those are only on the implementation. + transport = None # type: Connection + delimiter = b"\n" # Valid commands we expect to receive @@ -181,6 +194,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): connected_connections.append(self) # Register connection for metrics + assert self.transport is not None self.transport.registerProducer(self, True) # For the *Producing callbacks self._send_pending_commands() @@ -205,6 +219,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): logger.info( "[%s] Failed to close connection gracefully, aborting", self.id() ) + assert self.transport is not None self.transport.abortConnection() else: if now - self.last_sent_command >= PING_TIME: @@ -294,6 +309,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def close(self): logger.warning("[%s] Closing connection", self.id()) self.time_we_closed = self.clock.time_msec() + assert self.transport is not None self.transport.loseConnection() self.on_connection_closed() @@ -391,6 +407,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def connectionLost(self, reason): logger.info("[%s] Replication connection closed: %r", self.id(), reason) if isinstance(reason, Failure): + assert reason.type is not None connection_close_counter.labels(reason.type.__name__).inc() else: connection_close_counter.labels(reason.__class__.__name__).inc() @@ -495,20 +512,6 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.send_command(ReplicateCommand()) -class AbstractConnection(abc.ABC): - """An interface for replication connections.""" - - @abc.abstractmethod - def send_command(self, cmd: Command): - """Send the command down the connection""" - pass - - -# This tells python that `BaseReplicationStreamProtocol` implements the -# interface. -AbstractConnection.register(BaseReplicationStreamProtocol) - - # The following simply registers metrics for the replication connections pending_commands = LaterGauge( diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 7560706b4b..2f4d407f94 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py
@@ -19,6 +19,11 @@ from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast import attr import txredisapi +from zope.interface import implementer + +from twisted.internet.address import IPv4Address, IPv6Address +from twisted.internet.interfaces import IAddress, IConnector +from twisted.python.failure import Failure from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import ( @@ -32,7 +37,7 @@ from synapse.replication.tcp.commands import ( parse_command_from_line, ) from synapse.replication.tcp.protocol import ( - AbstractConnection, + IReplicationConnection, tcp_inbound_commands_counter, tcp_outbound_commands_counter, ) @@ -62,7 +67,8 @@ class ConstantProperty(Generic[T, V]): pass -class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): +@implementer(IReplicationConnection) +class RedisSubscriber(txredisapi.SubscriberProtocol): """Connection to redis subscribed to replication stream. This class fulfils two functions: @@ -71,7 +77,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): connection, parsing *incoming* messages into replication commands, and passing them to `ReplicationCommandHandler` - (b) it implements the AbstractConnection API, where it sends *outgoing* commands + (b) it implements the IReplicationConnection API, where it sends *outgoing* commands onto outbound_redis_connection. Due to the vagaries of `txredisapi` we don't want to have a custom @@ -253,6 +259,37 @@ class SynapseRedisFactory(txredisapi.RedisFactory): except Exception: logger.warning("Failed to send ping to a redis connection") + # ReconnectingClientFactory has some logging (if you enable `self.noisy`), but + # it's rubbish. We add our own here. + + def startedConnecting(self, connector: IConnector): + logger.info( + "Connecting to redis server %s", format_address(connector.getDestination()) + ) + super().startedConnecting(connector) + + def clientConnectionFailed(self, connector: IConnector, reason: Failure): + logger.info( + "Connection to redis server %s failed: %s", + format_address(connector.getDestination()), + reason.value, + ) + super().clientConnectionFailed(connector, reason) + + def clientConnectionLost(self, connector: IConnector, reason: Failure): + logger.info( + "Connection to redis server %s lost: %s", + format_address(connector.getDestination()), + reason.value, + ) + super().clientConnectionLost(connector, reason) + + +def format_address(address: IAddress) -> str: + if isinstance(address, (IPv4Address, IPv6Address)): + return "%s:%i" % (address.host, address.port) + return str(address) + class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory): """This is a reconnecting factory that connects to redis and immediately @@ -328,6 +365,6 @@ def lazyConnection( factory.continueTrying = reconnect reactor = hs.get_reactor() - reactor.connectTCP(host, port, factory, timeout=30, bindAddress=None) + reactor.connectTCP(host.encode(), port, factory, timeout=30, bindAddress=None) return factory.handler diff --git a/synapse/rest/admin/_base.py b/synapse/rest/admin/_base.py
index e09234c644..7681e55b58 100644 --- a/synapse/rest/admin/_base.py +++ b/synapse/rest/admin/_base.py
@@ -15,10 +15,9 @@ import re -import twisted.web.server - -import synapse.api.auth +from synapse.api.auth import Auth from synapse.api.errors import AuthError +from synapse.http.site import SynapseRequest from synapse.types import UserID @@ -37,13 +36,11 @@ def admin_patterns(path_regex: str, version: str = "v1"): return patterns -async def assert_requester_is_admin( - auth: synapse.api.auth.Auth, request: twisted.web.server.Request -) -> None: +async def assert_requester_is_admin(auth: Auth, request: SynapseRequest) -> None: """Verify that the requester is an admin user Args: - auth: api.auth.Auth singleton + auth: Auth singleton request: incoming request Raises: @@ -53,11 +50,11 @@ async def assert_requester_is_admin( await assert_user_is_admin(auth, requester.user) -async def assert_user_is_admin(auth: synapse.api.auth.Auth, user_id: UserID) -> None: +async def assert_user_is_admin(auth: Auth, user_id: UserID) -> None: """Verify that the given user is an admin user Args: - auth: api.auth.Auth singleton + auth: Auth singleton user_id: user to check Raises: diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
index 511c859f64..7fcc48a9d7 100644 --- a/synapse/rest/admin/media.py +++ b/synapse/rest/admin/media.py
@@ -17,10 +17,9 @@ import logging from typing import TYPE_CHECKING, Tuple -from twisted.web.server import Request - from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.http.servlet import RestServlet, parse_boolean, parse_integer +from synapse.http.site import SynapseRequest from synapse.rest.admin._base import ( admin_patterns, assert_requester_is_admin, @@ -50,7 +49,9 @@ class QuarantineMediaInRoom(RestServlet): self.store = hs.get_datastore() self.auth = hs.get_auth() - async def on_POST(self, request: Request, room_id: str) -> Tuple[int, JsonDict]: + async def on_POST( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) @@ -75,7 +76,9 @@ class QuarantineMediaByUser(RestServlet): self.store = hs.get_datastore() self.auth = hs.get_auth() - async def on_POST(self, request: Request, user_id: str) -> Tuple[int, JsonDict]: + async def on_POST( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) @@ -103,7 +106,7 @@ class QuarantineMediaByID(RestServlet): self.auth = hs.get_auth() async def on_POST( - self, request: Request, server_name: str, media_id: str + self, request: SynapseRequest, server_name: str, media_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) @@ -127,7 +130,9 @@ class ProtectMediaByID(RestServlet): self.store = hs.get_datastore() self.auth = hs.get_auth() - async def on_POST(self, request: Request, media_id: str) -> Tuple[int, JsonDict]: + async def on_POST( + self, request: SynapseRequest, media_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) @@ -148,7 +153,9 @@ class ListMediaInRoom(RestServlet): self.store = hs.get_datastore() self.auth = hs.get_auth() - async def on_GET(self, request: Request, room_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, room_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) is_admin = await self.auth.is_server_admin(requester.user) if not is_admin: @@ -166,7 +173,7 @@ class PurgeMediaCacheRestServlet(RestServlet): self.media_repository = hs.get_media_repository() self.auth = hs.get_auth() - async def on_POST(self, request: Request) -> Tuple[int, JsonDict]: + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) before_ts = parse_integer(request, "before_ts", required=True) @@ -189,7 +196,7 @@ class DeleteMediaByID(RestServlet): self.media_repository = hs.get_media_repository() async def on_DELETE( - self, request: Request, server_name: str, media_id: str + self, request: SynapseRequest, server_name: str, media_id: str ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) @@ -218,7 +225,9 @@ class DeleteMediaByDateSize(RestServlet): self.server_name = hs.hostname self.media_repository = hs.get_media_repository() - async def on_POST(self, request: Request, server_name: str) -> Tuple[int, JsonDict]: + async def on_POST( + self, request: SynapseRequest, server_name: str + ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self.auth, request) before_ts = parse_integer(request, "before_ts", required=True) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index f2c42a0f30..263d8ec076 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py
@@ -685,7 +685,10 @@ class RoomEventContextServlet(RestServlet): results["events_after"], time_now ) results["state"] = await self._event_serializer.serialize_events( - results["state"], time_now + results["state"], + time_now, + # No need to bundle aggregations for state events + bundle_aggregations=False, ) return 200, results diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 267a993430..2c89b62e25 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py
@@ -269,7 +269,10 @@ class UserRestServletV2(RestServlet): target_user.to_string(), False, requester, by_admin=True ) elif not deactivate and user["deactivated"]: - if "password" not in body: + if ( + "password" not in body + and self.hs.config.password_localdb_enabled + ): raise SynapseError( 400, "Must provide a password to re-activate an account." ) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 34bc1bd49b..e4c352f572 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py
@@ -14,10 +14,12 @@ # limitations under the License. import logging +import re from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Optional from synapse.api.errors import Codes, LoginError, SynapseError from synapse.api.ratelimiting import Ratelimiter +from synapse.api.urls import CLIENT_API_PREFIX from synapse.appservice import ApplicationService from synapse.handlers.sso import SsoIdentityProvider from synapse.http import get_request_uri @@ -94,11 +96,21 @@ class LoginRestServlet(RestServlet): flows.append({"type": LoginRestServlet.CAS_TYPE}) if self.cas_enabled or self.saml2_enabled or self.oidc_enabled: - sso_flow = {"type": LoginRestServlet.SSO_TYPE} # type: JsonDict + sso_flow = { + "type": LoginRestServlet.SSO_TYPE, + "identity_providers": [ + _get_auth_flow_dict_for_idp( + idp, + ) + for idp in self._sso_handler.get_identity_providers().values() + ], + } # type: JsonDict if self._msc2858_enabled: + # backwards-compatibility support for clients which don't + # support the stable API yet sso_flow["org.matrix.msc2858.identity_providers"] = [ - _get_auth_flow_dict_for_idp(idp) + _get_auth_flow_dict_for_idp(idp, use_unstable_brands=True) for idp in self._sso_handler.get_identity_providers().values() ] @@ -331,22 +343,38 @@ class LoginRestServlet(RestServlet): return result -def _get_auth_flow_dict_for_idp(idp: SsoIdentityProvider) -> JsonDict: +def _get_auth_flow_dict_for_idp( + idp: SsoIdentityProvider, use_unstable_brands: bool = False +) -> JsonDict: """Return an entry for the login flow dict Returns an entry suitable for inclusion in "identity_providers" in the response to GET /_matrix/client/r0/login + + Args: + idp: the identity provider to describe + use_unstable_brands: whether we should use brand identifiers suitable + for the unstable API """ e = {"id": idp.idp_id, "name": idp.idp_name} # type: JsonDict if idp.idp_icon: e["icon"] = idp.idp_icon if idp.idp_brand: e["brand"] = idp.idp_brand + # use the stable brand identifier if the unstable identifier isn't defined. + if use_unstable_brands and idp.unstable_idp_brand: + e["brand"] = idp.unstable_idp_brand return e class SsoRedirectServlet(RestServlet): - PATTERNS = client_patterns("/login/(cas|sso)/redirect$", v1=True) + PATTERNS = list(client_patterns("/login/(cas|sso)/redirect$", v1=True)) + [ + re.compile( + "^" + + CLIENT_API_PREFIX + + "/r0/login/sso/redirect/(?P<idp_id>[A-Za-z0-9_.~-]+)$" + ) + ] def __init__(self, hs: "HomeServer"): # make sure that the relevant handlers are instantiated, so that they @@ -364,7 +392,8 @@ class SsoRedirectServlet(RestServlet): def register(self, http_server: HttpServer) -> None: super().register(http_server) if self._msc2858_enabled: - # expose additional endpoint for MSC2858 support + # expose additional endpoint for MSC2858 support: backwards-compat support + # for clients which don't yet support the stable endpoints. http_server.register_paths( "GET", client_patterns( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 9a1df30c29..5884daea6d 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py
@@ -671,7 +671,10 @@ class RoomEventContextServlet(RestServlet): results["events_after"], time_now ) results["state"] = await self._event_serializer.serialize_events( - results["state"], time_now + results["state"], + time_now, + # No need to bundle aggregations for state events + bundle_aggregations=False, ) return 200, results diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
index 7aea4cebf5..5901432fad 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py
@@ -32,6 +32,7 @@ from synapse.http.servlet import ( assert_params_in_dict, parse_json_object_from_request, ) +from synapse.http.site import SynapseRequest from synapse.types import GroupID, JsonDict from ._base import client_patterns @@ -70,7 +71,9 @@ class GroupServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_GET(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -81,7 +84,9 @@ class GroupServlet(RestServlet): return 200, group_description @_validate_group_id - async def on_POST(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_POST( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -111,7 +116,9 @@ class GroupSummaryServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_GET(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -144,7 +151,11 @@ class GroupSummaryRoomsCatServlet(RestServlet): @_validate_group_id async def on_PUT( - self, request: Request, group_id: str, category_id: Optional[str], room_id: str + self, + request: SynapseRequest, + group_id: str, + category_id: Optional[str], + room_id: str, ): requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -176,7 +187,7 @@ class GroupSummaryRoomsCatServlet(RestServlet): @_validate_group_id async def on_DELETE( - self, request: Request, group_id: str, category_id: str, room_id: str + self, request: SynapseRequest, group_id: str, category_id: str, room_id: str ): requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -206,7 +217,7 @@ class GroupCategoryServlet(RestServlet): @_validate_group_id async def on_GET( - self, request: Request, group_id: str, category_id: str + self, request: SynapseRequest, group_id: str, category_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -219,7 +230,7 @@ class GroupCategoryServlet(RestServlet): @_validate_group_id async def on_PUT( - self, request: Request, group_id: str, category_id: str + self, request: SynapseRequest, group_id: str, category_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -247,7 +258,7 @@ class GroupCategoryServlet(RestServlet): @_validate_group_id async def on_DELETE( - self, request: Request, group_id: str, category_id: str + self, request: SynapseRequest, group_id: str, category_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -274,7 +285,9 @@ class GroupCategoriesServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_GET(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -298,7 +311,7 @@ class GroupRoleServlet(RestServlet): @_validate_group_id async def on_GET( - self, request: Request, group_id: str, role_id: str + self, request: SynapseRequest, group_id: str, role_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -311,7 +324,7 @@ class GroupRoleServlet(RestServlet): @_validate_group_id async def on_PUT( - self, request: Request, group_id: str, role_id: str + self, request: SynapseRequest, group_id: str, role_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -339,7 +352,7 @@ class GroupRoleServlet(RestServlet): @_validate_group_id async def on_DELETE( - self, request: Request, group_id: str, role_id: str + self, request: SynapseRequest, group_id: str, role_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -366,7 +379,9 @@ class GroupRolesServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_GET(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -399,7 +414,11 @@ class GroupSummaryUsersRoleServlet(RestServlet): @_validate_group_id async def on_PUT( - self, request: Request, group_id: str, role_id: Optional[str], user_id: str + self, + request: SynapseRequest, + group_id: str, + role_id: Optional[str], + user_id: str, ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -431,7 +450,7 @@ class GroupSummaryUsersRoleServlet(RestServlet): @_validate_group_id async def on_DELETE( - self, request: Request, group_id: str, role_id: str, user_id: str + self, request: SynapseRequest, group_id: str, role_id: str, user_id: str ): requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -458,7 +477,9 @@ class GroupRoomServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_GET(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -481,7 +502,9 @@ class GroupUsersServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_GET(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() @@ -504,7 +527,9 @@ class GroupInvitedUsersServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_GET(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -526,7 +551,9 @@ class GroupSettingJoinPolicyServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_PUT(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_PUT( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -554,7 +581,7 @@ class GroupCreateServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() self.server_name = hs.hostname - async def on_POST(self, request: Request) -> Tuple[int, JsonDict]: + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -598,7 +625,7 @@ class GroupAdminRoomsServlet(RestServlet): @_validate_group_id async def on_PUT( - self, request: Request, group_id: str, room_id: str + self, request: SynapseRequest, group_id: str, room_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -615,7 +642,7 @@ class GroupAdminRoomsServlet(RestServlet): @_validate_group_id async def on_DELETE( - self, request: Request, group_id: str, room_id: str + self, request: SynapseRequest, group_id: str, room_id: str ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -646,7 +673,7 @@ class GroupAdminRoomsConfigServlet(RestServlet): @_validate_group_id async def on_PUT( - self, request: Request, group_id: str, room_id: str, config_key: str + self, request: SynapseRequest, group_id: str, room_id: str, config_key: str ): requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -678,7 +705,9 @@ class GroupAdminUsersInviteServlet(RestServlet): self.is_mine_id = hs.is_mine_id @_validate_group_id - async def on_PUT(self, request: Request, group_id, user_id) -> Tuple[int, JsonDict]: + async def on_PUT( + self, request: SynapseRequest, group_id, user_id + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -708,7 +737,9 @@ class GroupAdminUsersKickServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_PUT(self, request: Request, group_id, user_id) -> Tuple[int, JsonDict]: + async def on_PUT( + self, request: SynapseRequest, group_id, user_id + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -735,7 +766,9 @@ class GroupSelfLeaveServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_PUT(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_PUT( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -762,7 +795,9 @@ class GroupSelfJoinServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_PUT(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_PUT( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -789,7 +824,9 @@ class GroupSelfAcceptInviteServlet(RestServlet): self.groups_handler = hs.get_groups_local_handler() @_validate_group_id - async def on_PUT(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_PUT( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -816,7 +853,9 @@ class GroupSelfUpdatePublicityServlet(RestServlet): self.store = hs.get_datastore() @_validate_group_id - async def on_PUT(self, request: Request, group_id: str) -> Tuple[int, JsonDict]: + async def on_PUT( + self, request: SynapseRequest, group_id: str + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) requester_user_id = requester.user.to_string() @@ -839,7 +878,9 @@ class PublicisedGroupsForUserServlet(RestServlet): self.store = hs.get_datastore() self.groups_handler = hs.get_groups_local_handler() - async def on_GET(self, request: Request, user_id: str) -> Tuple[int, JsonDict]: + async def on_GET( + self, request: SynapseRequest, user_id: str + ) -> Tuple[int, JsonDict]: await self.auth.get_user_by_req(request, allow_guest=True) result = await self.groups_handler.get_publicised_groups_for_user(user_id) @@ -859,7 +900,7 @@ class PublicisedGroupsForUsersServlet(RestServlet): self.store = hs.get_datastore() self.groups_handler = hs.get_groups_local_handler() - async def on_POST(self, request: Request) -> Tuple[int, JsonDict]: + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: await self.auth.get_user_by_req(request, allow_guest=True) content = parse_json_object_from_request(request) @@ -881,7 +922,7 @@ class GroupsForUserServlet(RestServlet): self.clock = hs.get_clock() self.groups_handler = hs.get_groups_local_handler() - async def on_GET(self, request: Request) -> Tuple[int, JsonDict]: + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) requester_user_id = requester.user.to_string() diff --git a/synapse/rest/media/v1/config_resource.py b/synapse/rest/media/v1/config_resource.py
index 9039662f7e..1eff98ef14 100644 --- a/synapse/rest/media/v1/config_resource.py +++ b/synapse/rest/media/v1/config_resource.py
@@ -20,6 +20,7 @@ from typing import TYPE_CHECKING from twisted.web.server import Request from synapse.http.server import DirectServeJsonResource, respond_with_json +from synapse.http.site import SynapseRequest if TYPE_CHECKING: from synapse.app.homeserver import HomeServer @@ -35,7 +36,7 @@ class MediaConfigResource(DirectServeJsonResource): self.auth = hs.get_auth() self.limits_dict = {"m.upload.size": config.max_upload_size} - async def _async_render_GET(self, request: Request) -> None: + async def _async_render_GET(self, request: SynapseRequest) -> None: await self.auth.get_user_by_req(request) respond_with_json(request, 200, self.limits_dict, send_cors=True) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 0641924f18..8b4841ed5d 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py
@@ -35,6 +35,7 @@ from synapse.api.errors import ( from synapse.config._base import ConfigError from synapse.logging.context import defer_to_thread from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import UserID from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import random_string @@ -145,7 +146,7 @@ class MediaRepository: upload_name: Optional[str], content: IO, content_length: int, - auth_user: str, + auth_user: UserID, ) -> str: """Store uploaded content for a local user and return the mxc URL diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index a074e807dc..b8895aeaa9 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -39,6 +39,7 @@ from synapse.http.server import ( respond_with_json_bytes, ) from synapse.http.servlet import parse_integer, parse_string +from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.media.v1._base import get_filename_from_headers @@ -185,7 +186,7 @@ class PreviewUrlResource(DirectServeJsonResource): request.setHeader(b"Allow", b"OPTIONS, GET") respond_with_json(request, 200, {}, send_cors=True) - async def _async_render_GET(self, request: Request) -> None: + async def _async_render_GET(self, request: SynapseRequest) -> None: # XXX: if get_user_by_req fails, what should we do in an async render? requester = await self.auth.get_user_by_req(request) diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index 5e104fac40..ae5aef2f7f 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py
@@ -22,6 +22,7 @@ from twisted.web.server import Request from synapse.api.errors import Codes, SynapseError from synapse.http.server import DirectServeJsonResource, respond_with_json from synapse.http.servlet import parse_string +from synapse.http.site import SynapseRequest from synapse.rest.media.v1.media_storage import SpamMediaException if TYPE_CHECKING: @@ -49,7 +50,7 @@ class UploadResource(DirectServeJsonResource): async def _async_render_OPTIONS(self, request: Request) -> None: respond_with_json(request, 200, {}, send_cors=True) - async def _async_render_POST(self, request: Request) -> None: + async def _async_render_POST(self, request: SynapseRequest) -> None: requester = await self.auth.get_user_by_req(request) # TODO: The checks here are a bit late. The content will have # already been uploaded to a tmp file at this point diff --git a/synapse/rest/synapse/client/saml2/response_resource.py b/synapse/rest/synapse/client/saml2/response_resource.py
index f6668fb5e3..4dfadf1bfb 100644 --- a/synapse/rest/synapse/client/saml2/response_resource.py +++ b/synapse/rest/synapse/client/saml2/response_resource.py
@@ -14,24 +14,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING + from synapse.http.server import DirectServeHtmlResource +if TYPE_CHECKING: + from synapse.server import HomeServer + class SAML2ResponseResource(DirectServeHtmlResource): """A Twisted web resource which handles the SAML response""" isLeaf = 1 - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__() self._saml_handler = hs.get_saml_handler() + self._sso_handler = hs.get_sso_handler() async def _async_render_GET(self, request): # We're not expecting any GET request on that resource if everything goes right, # but some IdPs sometimes end up responding with a 302 redirect on this endpoint. # In this case, just tell the user that something went wrong and they should # try to authenticate again. - self._saml_handler._render_error( + self._sso_handler.render_error( request, "unexpected_get", "Unexpected GET request on /saml2/authn_response" ) diff --git a/synapse/server.py b/synapse/server.py
index 369cc88026..48ac87a124 100644 --- a/synapse/server.py +++ b/synapse/server.py
@@ -351,11 +351,9 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_http_client_context_factory(self) -> IPolicyForHTTPS: - return ( - InsecureInterceptableContextFactory() - if self.config.use_insecure_ssl_client_just_for_testing_do_not_use - else RegularPolicyForHTTPS() - ) + if self.config.use_insecure_ssl_client_just_for_testing_do_not_use: + return InsecureInterceptableContextFactory() + return RegularPolicyForHTTPS() @cache_in_self def get_simple_http_client(self) -> SimpleHttpClient: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 18ddb92fcc..332193ad1c 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -54,11 +54,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) # type: LruCache[str, List[Tuple[str, int]]] async def get_auth_chain( - self, event_ids: Collection[str], include_given: bool = False + self, room_id: str, event_ids: Collection[str], include_given: bool = False ) -> List[EventBase]: """Get auth events for given event_ids. The events *must* be state events. Args: + room_id: The room the event is in. event_ids: state events include_given: include the given events in result @@ -66,24 +67,44 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas list of events """ event_ids = await self.get_auth_chain_ids( - event_ids, include_given=include_given + room_id, event_ids, include_given=include_given ) return await self.get_events_as_list(event_ids) async def get_auth_chain_ids( self, + room_id: str, event_ids: Collection[str], include_given: bool = False, ) -> List[str]: """Get auth events for given event_ids. The events *must* be state events. Args: + room_id: The room the event is in. event_ids: state events include_given: include the given events in result Returns: - An awaitable which resolve to a list of event_ids + list of event_ids """ + + # Check if we have indexed the room so we can use the chain cover + # algorithm. + room = await self.get_room(room_id) + if room["has_auth_chain_index"]: + try: + return await self.db_pool.runInteraction( + "get_auth_chain_ids_chains", + self._get_auth_chain_ids_using_cover_index_txn, + room_id, + event_ids, + include_given, + ) + except _NoChainCoverIndex: + # For whatever reason we don't actually have a chain cover index + # for the events in question, so we fall back to the old method. + pass + return await self.db_pool.runInteraction( "get_auth_chain_ids", self._get_auth_chain_ids_txn, @@ -91,9 +112,130 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas include_given, ) + def _get_auth_chain_ids_using_cover_index_txn( + self, txn: Cursor, room_id: str, event_ids: Collection[str], include_given: bool + ) -> List[str]: + """Calculates the auth chain IDs using the chain index.""" + + # First we look up the chain ID/sequence numbers for the given events. + + initial_events = set(event_ids) + + # All the events that we've found that are reachable from the events. + seen_events = set() # type: Set[str] + + # A map from chain ID to max sequence number of the given events. + event_chains = {} # type: Dict[int, int] + + sql = """ + SELECT event_id, chain_id, sequence_number + FROM event_auth_chains + WHERE %s + """ + for batch in batch_iter(initial_events, 1000): + clause, args = make_in_list_sql_clause( + txn.database_engine, "event_id", batch + ) + txn.execute(sql % (clause,), args) + + for event_id, chain_id, sequence_number in txn: + seen_events.add(event_id) + event_chains[chain_id] = max( + sequence_number, event_chains.get(chain_id, 0) + ) + + # Check that we actually have a chain ID for all the events. + events_missing_chain_info = initial_events.difference(seen_events) + if events_missing_chain_info: + # This can happen due to e.g. downgrade/upgrade of the server. We + # raise an exception and fall back to the previous algorithm. + logger.info( + "Unexpectedly found that events don't have chain IDs in room %s: %s", + room_id, + events_missing_chain_info, + ) + raise _NoChainCoverIndex(room_id) + + # Now we look up all links for the chains we have, adding chains that + # are reachable from any event. + sql = """ + SELECT + origin_chain_id, origin_sequence_number, + target_chain_id, target_sequence_number + FROM event_auth_chain_links + WHERE %s + """ + + # A map from chain ID to max sequence number *reachable* from any event ID. + chains = {} # type: Dict[int, int] + + # Add all linked chains reachable from initial set of chains. + for batch in batch_iter(event_chains, 1000): + clause, args = make_in_list_sql_clause( + txn.database_engine, "origin_chain_id", batch + ) + txn.execute(sql % (clause,), args) + + for ( + origin_chain_id, + origin_sequence_number, + target_chain_id, + target_sequence_number, + ) in txn: + # chains are only reachable if the origin sequence number of + # the link is less than the max sequence number in the + # origin chain. + if origin_sequence_number <= event_chains.get(origin_chain_id, 0): + chains[target_chain_id] = max( + target_sequence_number, + chains.get(target_chain_id, 0), + ) + + # Add the initial set of chains, excluding the sequence corresponding to + # initial event. + for chain_id, seq_no in event_chains.items(): + chains[chain_id] = max(seq_no - 1, chains.get(chain_id, 0)) + + # Now for each chain we figure out the maximum sequence number reachable + # from *any* event ID. Events with a sequence less than that are in the + # auth chain. + if include_given: + results = initial_events + else: + results = set() + + if isinstance(self.database_engine, PostgresEngine): + # We can use `execute_values` to efficiently fetch the gaps when + # using postgres. + sql = """ + SELECT event_id + FROM event_auth_chains AS c, (VALUES ?) AS l(chain_id, max_seq) + WHERE + c.chain_id = l.chain_id + AND sequence_number <= max_seq + """ + + rows = txn.execute_values(sql, chains.items()) + results.update(r for r, in rows) + else: + # For SQLite we just fall back to doing a noddy for loop. + sql = """ + SELECT event_id FROM event_auth_chains + WHERE chain_id = ? AND sequence_number <= ? + """ + for chain_id, max_no in chains.items(): + txn.execute(sql, (chain_id, max_no)) + results.update(r for r, in txn) + + return list(results) + def _get_auth_chain_ids_txn( self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool ) -> List[str]: + """Calculates the auth chain IDs. + + This is used when we don't have a cover index for the room. + """ if include_given: results = set(event_ids) else: diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index cb6b1f8a0c..78367ea58d 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -135,6 +135,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._chain_cover_index, ) + self.db_pool.updates.register_background_update_handler( + "purged_chain_cover", + self._purged_chain_cover_index, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -932,3 +937,77 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): processed_count=count, finished_room_map=finished_rooms, ) + + async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> int: + """ + A background updates that iterates over the chain cover and deletes the + chain cover for events that have been purged. + + This may be due to fully purging a room or via setting a retention policy. + """ + current_event_id = progress.get("current_event_id", "") + + def purged_chain_cover_txn(txn) -> int: + # The event ID from events will be null if the chain ID / sequence + # number points to a purged event. + sql = """ + SELECT event_id, chain_id, sequence_number, e.event_id IS NOT NULL + FROM event_auth_chains + LEFT JOIN events AS e USING (event_id) + WHERE event_id > ? ORDER BY event_auth_chains.event_id ASC LIMIT ? + """ + txn.execute(sql, (current_event_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + # The event IDs and chain IDs / sequence numbers where the event has + # been purged. + unreferenced_event_ids = [] + unreferenced_chain_id_tuples = [] + event_id = "" + for event_id, chain_id, sequence_number, has_event in rows: + if not has_event: + unreferenced_event_ids.append((event_id,)) + unreferenced_chain_id_tuples.append((chain_id, sequence_number)) + + # Delete the unreferenced auth chains from event_auth_chain_links and + # event_auth_chains. + txn.executemany( + """ + DELETE FROM event_auth_chains WHERE event_id = ? + """, + unreferenced_event_ids, + ) + # We should also delete matching target_*, but there is no index on + # target_chain_id. Hopefully any purged events are due to a room + # being fully purged and they will be removed from the origin_* + # searches. + txn.executemany( + """ + DELETE FROM event_auth_chain_links WHERE + origin_chain_id = ? AND origin_sequence_number = ? + """, + unreferenced_chain_id_tuples, + ) + + progress = { + "current_event_id": event_id, + } + + self.db_pool.updates._background_update_progress_txn( + txn, "purged_chain_cover", progress + ) + + return len(rows) + + result = await self.db_pool.runInteraction( + "_purged_chain_cover_index", + purged_chain_cover_txn, + ) + + if not result: + await self.db_pool.updates._end_background_update("purged_chain_cover") + + return result diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index edbe42f2bf..c04e162ccc 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools + import logging import threading from collections import namedtuple @@ -1044,7 +1044,8 @@ class EventsWorkerStore(SQLBaseStore): Returns: set[str]: The events we have already seen. """ - results = set() + # if the event cache contains the event, obviously we've seen it. + results = {x for x in event_ids if self._get_event_cache.contains(x)} def have_seen_events_txn(txn, chunk): sql = "SELECT event_id FROM events as e WHERE " @@ -1052,12 +1053,9 @@ class EventsWorkerStore(SQLBaseStore): txn.database_engine, "e.event_id", chunk ) txn.execute(sql + clause, args) - for (event_id,) in txn: - results.add(event_id) + results.update(row[0] for row in txn) - # break the input up into chunks of 100 - input_iterator = iter(event_ids) - for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), []): + for chunk in batch_iter((x for x in event_ids if x not in results), 100): await self.db_pool.runInteraction( "have_seen_events", have_seen_events_txn, chunk ) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 0836e4af49..41f4fe7f95 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -331,13 +331,9 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): txn.executemany( """ DELETE FROM event_auth_chain_links WHERE - (origin_chain_id = ? AND origin_sequence_number = ?) OR - (target_chain_id = ? AND target_sequence_number = ?) + origin_chain_id = ? AND origin_sequence_number = ? """, - ( - (chain_id, seq_num, chain_id, seq_num) - for (chain_id, seq_num) in referenced_chain_id_tuples - ), + referenced_chain_id_tuples, ) # Now we delete tables which lack an index on room_id but have one on event_id diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 61a7556e56..eba66ff352 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -16,7 +16,7 @@ # limitations under the License. import logging import re -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union import attr @@ -1510,7 +1510,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): async def user_delete_access_tokens( self, user_id: str, - except_token_id: Optional[str] = None, + except_token_id: Optional[int] = None, device_id: Optional[str] = None, ) -> List[Tuple[str, int, Optional[str]]]: """ @@ -1533,7 +1533,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): items = keyvalues.items() where_clause = " AND ".join(k + " = ?" for k, _ in items) - values = [v for _, v in items] + values = [v for _, v in items] # type: List[Union[str, int]] if except_token_id: where_clause += " AND id != ?" values.append(except_token_id) diff --git a/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql b/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql new file mode 100644
index 0000000000..87cb1f3cfd --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql
@@ -0,0 +1,17 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (5910, 'purged_chain_cover', '{}'); diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index b921d63d30..0309661841 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -350,11 +350,11 @@ class TransactionStore(TransactionWorkerStore): self.db_pool.simple_upsert_many_txn( txn, - "destination_rooms", - ["destination", "room_id"], - rows, - ["stream_ordering"], - [(stream_ordering,)] * len(rows), + table="destination_rooms", + key_names=("destination", "room_id"), + key_values=rows, + value_names=["stream_ordering"], + value_values=[(stream_ordering,)] * len(rows), ) async def get_destination_last_successful_stream_ordering( diff --git a/synapse/types.py b/synapse/types.py
index 0216d213c7..b08ce90140 100644 --- a/synapse/types.py +++ b/synapse/types.py
@@ -83,33 +83,32 @@ class ISynapseReactor( """The interfaces necessary for Synapse to function.""" -class Requester( - namedtuple( - "Requester", - [ - "user", - "access_token_id", - "is_guest", - "shadow_banned", - "device_id", - "app_service", - "authenticated_entity", - ], - ) -): +@attr.s(frozen=True, slots=True) +class Requester: """ Represents the user making a request Attributes: - user (UserID): id of the user making the request - access_token_id (int|None): *ID* of the access token used for this + user: id of the user making the request + access_token_id: *ID* of the access token used for this request, or None if it came via the appservice API or similar - is_guest (bool): True if the user making this request is a guest user - shadow_banned (bool): True if the user making this request has been shadow-banned. - device_id (str|None): device_id which was set at authentication time - app_service (ApplicationService|None): the AS requesting on behalf of the user + is_guest: True if the user making this request is a guest user + shadow_banned: True if the user making this request has been shadow-banned. + device_id: device_id which was set at authentication time + app_service: the AS requesting on behalf of the user + authenticated_entity: The entity that authenticated when making the request. + This is different to the user_id when an admin user or the server is + "puppeting" the user. """ + user = attr.ib(type="UserID") + access_token_id = attr.ib(type=Optional[int]) + is_guest = attr.ib(type=bool) + shadow_banned = attr.ib(type=bool) + device_id = attr.ib(type=Optional[str]) + app_service = attr.ib(type=Optional["ApplicationService"]) + authenticated_entity = attr.ib(type=str) + def serialize(self): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` @@ -157,23 +156,23 @@ class Requester( def create_requester( user_id: Union[str, "UserID"], access_token_id: Optional[int] = None, - is_guest: Optional[bool] = False, - shadow_banned: Optional[bool] = False, + is_guest: bool = False, + shadow_banned: bool = False, device_id: Optional[str] = None, app_service: Optional["ApplicationService"] = None, authenticated_entity: Optional[str] = None, -): +) -> Requester: """ Create a new ``Requester`` object Args: - user_id (str|UserID): id of the user making the request - access_token_id (int|None): *ID* of the access token used for this + user_id: id of the user making the request + access_token_id: *ID* of the access token used for this request, or None if it came via the appservice API or similar - is_guest (bool): True if the user making this request is a guest user - shadow_banned (bool): True if the user making this request is shadow-banned. - device_id (str|None): device_id which was set at authentication time - app_service (ApplicationService|None): the AS requesting on behalf of the user + is_guest: True if the user making this request is a guest user + shadow_banned: True if the user making this request is shadow-banned. + device_id: device_id which was set at authentication time + app_service: the AS requesting on behalf of the user authenticated_entity: The entity that authenticated when making the request. This is different to the user_id when an admin user or the server is "puppeting" the user.