summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/__init__.py2
-rw-r--r--synapse/handlers/deactivate_account.py7
-rw-r--r--synapse/handlers/sync.py3
-rw-r--r--synapse/http/__init__.py37
-rw-r--r--synapse/http/site.py85
-rw-r--r--synapse/push/pusherpool.py6
-rw-r--r--synapse/rest/client/v1/login.py28
-rw-r--r--synapse/storage/databases/main/pusher.py43
-rw-r--r--synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql21
-rw-r--r--synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql19
-rw-r--r--synapse/util/caches/response_cache.py34
11 files changed, 264 insertions, 21 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py

index a01bac2997..4a9b0129c3 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py
@@ -17,8 +17,6 @@ import sys from synapse import python_dependencies # noqa: E402 -sys.dont_write_bytecode = True - logger = logging.getLogger(__name__) try: diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 7911d126f5..7809c15589 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py
@@ -120,6 +120,13 @@ class DeactivateAccountHandler(BaseHandler): await self.store.user_set_password_hash(user_id, None) + # Most of the pushers will have been deleted when we logged out the + # associated devices above, but we still need to delete pushers not + # associated with devices, e.g. email pushers. + await self.store.delete_all_pushers_for_user(user_id) + + # Set the user as no longer active. This will prevent their profile + # from being replicated. user = UserID.from_string(user_id) await self._profile_handler.set_active([user], False, False) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9059382246..b45b179fed 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -291,8 +291,9 @@ class SyncHandler: user_id = sync_config.user.to_string() await self.auth.check_auth_blocking(requester=requester) - res = await self.response_cache.wrap( + res = await self.response_cache.wrap_conditional( sync_config.request_key, + lambda result: since_token != result.next_batch, self._wait_for_sync_for_user, sync_config, since_token, diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index c658862fe6..142b007d01 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py
@@ -14,8 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import re +from typing import Union -from twisted.internet import task +from twisted.internet import address, task from twisted.web.client import FileBodyProducer from twisted.web.iweb import IRequest @@ -53,6 +54,40 @@ class QuieterFileBodyProducer(FileBodyProducer): pass +def get_request_uri(request: IRequest) -> bytes: + """Return the full URI that was requested by the client""" + return b"%s://%s%s" % ( + b"https" if request.isSecure() else b"http", + _get_requested_host(request), + # despite its name, "request.uri" is only the path and query-string. + request.uri, + ) + + +def _get_requested_host(request: IRequest) -> bytes: + hostname = request.getHeader(b"host") + if hostname: + return hostname + + # no Host header, use the address/port that the request arrived on + host = request.getHost() # type: Union[address.IPv4Address, address.IPv6Address] + + hostname = host.host.encode("ascii") + + if request.isSecure() and host.port == 443: + # default port for https + return hostname + + if not request.isSecure() and host.port == 80: + # default port for http + return hostname + + return b"%s:%i" % ( + hostname, + host.port, + ) + + def get_request_user_agent(request: IRequest, default: str = "") -> str: """Return the last User-Agent header, or the given default.""" # There could be raw utf-8 bytes in the User-Agent header. diff --git a/synapse/http/site.py b/synapse/http/site.py
index 4a4fb5ef26..30153237e3 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py
@@ -16,6 +16,10 @@ import logging import time from typing import Optional, Union +import attr +from zope.interface import implementer + +from twisted.internet.interfaces import IAddress from twisted.python.failure import Failure from twisted.web.server import Request, Site @@ -333,26 +337,77 @@ class SynapseRequest(Request): class XForwardedForRequest(SynapseRequest): - def __init__(self, *args, **kw): - SynapseRequest.__init__(self, *args, **kw) + """Request object which honours proxy headers + Extends SynapseRequest to replace getClientIP, getClientAddress, and isSecure with + information from request headers. """ - Add a layer on top of another request that only uses the value of an - X-Forwarded-For header as the result of C{getClientIP}. - """ - def getClientIP(self): + # the client IP and ssl flag, as extracted from the headers. + _forwarded_for = None # type: Optional[_XForwardedForAddress] + _forwarded_https = False # type: bool + + def requestReceived(self, command, path, version): + # this method is called by the Channel once the full request has been + # received, to dispatch the request to a resource. + # We can use it to set the IP address and protocol according to the + # headers. + self._process_forwarded_headers() + return super().requestReceived(command, path, version) + + def _process_forwarded_headers(self): + headers = self.requestHeaders.getRawHeaders(b"x-forwarded-for") + if not headers: + return + + # for now, we just use the first x-forwarded-for header. Really, we ought + # to start from the client IP address, and check whether it is trusted; if it + # is, work backwards through the headers until we find an untrusted address. + # see https://github.com/matrix-org/synapse/issues/9471 + self._forwarded_for = _XForwardedForAddress( + headers[0].split(b",")[0].strip().decode("ascii") + ) + + # if we got an x-forwarded-for header, also look for an x-forwarded-proto header + header = self.getHeader(b"x-forwarded-proto") + if header is not None: + self._forwarded_https = header.lower() == b"https" + else: + # this is done largely for backwards-compatibility so that people that + # haven't set an x-forwarded-proto header don't get a redirect loop. + logger.warning( + "forwarded request lacks an x-forwarded-proto header: assuming https" + ) + self._forwarded_https = True + + def isSecure(self): + if self._forwarded_https: + return True + return super().isSecure() + + def getClientIP(self) -> str: """ - @return: The client address (the first address) in the value of the - I{X-Forwarded-For header}. If the header is not present, return - C{b"-"}. + Return the IP address of the client who submitted this request. + + This method is deprecated. Use getClientAddress() instead. """ - return ( - self.requestHeaders.getRawHeaders(b"x-forwarded-for", [b"-"])[0] - .split(b",")[0] - .strip() - .decode("ascii") - ) + if self._forwarded_for is not None: + return self._forwarded_for.host + return super().getClientIP() + + def getClientAddress(self) -> IAddress: + """ + Return the address of the client who submitted this request. + """ + if self._forwarded_for is not None: + return self._forwarded_for + return super().getClientAddress() + + +@implementer(IAddress) +@attr.s(frozen=True, slots=True) +class _XForwardedForAddress: + host = attr.ib(type=str) class SynapseSite(Site): diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 1679a6d211..2534ecb1d4 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Dict, Iterable, Optional from prometheus_client import Gauge +from synapse.api.errors import Codes, SynapseError from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -113,6 +114,11 @@ class PusherPool: The newly created pusher. """ + if kind == "email": + email_owner = await self.store.get_user_id_by_threepid("email", pushkey) + if email_owner != user_id: + raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND) + time_now_msec = self.clock.time_msec() # create the pusher setting last_stream_ordering to the current maximum diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 6e2fbedd99..925edfc402 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py
@@ -20,6 +20,7 @@ from synapse.api.errors import Codes, LoginError, SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.appservice import ApplicationService from synapse.handlers.sso import SsoIdentityProvider +from synapse.http import get_request_uri from synapse.http.server import HttpServer, finish_request from synapse.http.servlet import ( RestServlet, @@ -354,6 +355,7 @@ class SsoRedirectServlet(RestServlet): hs.get_oidc_handler() self._sso_handler = hs.get_sso_handler() self._msc2858_enabled = hs.config.experimental.msc2858_enabled + self._public_baseurl = hs.config.public_baseurl def register(self, http_server: HttpServer) -> None: super().register(http_server) @@ -373,6 +375,32 @@ class SsoRedirectServlet(RestServlet): async def on_GET( self, request: SynapseRequest, idp_id: Optional[str] = None ) -> None: + if not self._public_baseurl: + raise SynapseError(400, "SSO requires a valid public_baseurl") + + # if this isn't the expected hostname, redirect to the right one, so that we + # get our cookies back. + requested_uri = get_request_uri(request) + baseurl_bytes = self._public_baseurl.encode("utf-8") + if not requested_uri.startswith(baseurl_bytes): + # swap out the incorrect base URL for the right one. + # + # The idea here is to redirect from + # https://foo.bar/whatever/_matrix/... + # to + # https://public.baseurl/_matrix/... + # + i = requested_uri.index(b"/_matrix") + new_uri = baseurl_bytes[:-1] + requested_uri[i:] + logger.info( + "Requested URI %s is not canonical: redirecting to %s", + requested_uri.decode("utf-8", errors="replace"), + new_uri.decode("utf-8", errors="replace"), + ) + request.redirect(new_uri) + finish_request(request) + return + client_redirect_url = parse_string( request, "redirectUrl", required=True, encoding=None ) diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 7cb69dd6bd..74219cb05e 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py
@@ -373,3 +373,46 @@ class PusherStore(PusherWorkerStore): await self.db_pool.runInteraction( "delete_pusher", delete_pusher_txn, stream_id ) + + async def delete_all_pushers_for_user(self, user_id: str) -> None: + """Delete all pushers associated with an account.""" + + # We want to generate a row in `deleted_pushers` for each pusher we're + # deleting, so we fetch the list now so we can generate the appropriate + # number of stream IDs. + # + # Note: technically there could be a race here between adding/deleting + # pushers, but a) the worst case if we don't stop a pusher until the + # next restart and b) this is only called when we're deactivating an + # account. + pushers = list(await self.get_pushers_by_user_id(user_id)) + + def delete_pushers_txn(txn, stream_ids): + self._invalidate_cache_and_stream( # type: ignore + txn, self.get_if_user_has_pusher, (user_id,) + ) + + self.db_pool.simple_delete_txn( + txn, + table="pushers", + keyvalues={"user_name": user_id}, + ) + + self.db_pool.simple_insert_many_txn( + txn, + table="deleted_pushers", + values=[ + { + "stream_id": stream_id, + "app_id": pusher.app_id, + "pushkey": pusher.pushkey, + "user_id": user_id, + } + for stream_id, pusher in zip(stream_ids, pushers) + ], + ) + + async with self._pushers_id_gen.get_next_mult(len(pushers)) as stream_ids: + await self.db_pool.runInteraction( + "delete_all_pushers_for_user", delete_pushers_txn, stream_ids + ) diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql new file mode 100644
index 0000000000..20ba4abca3 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
@@ -0,0 +1,21 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- We may not have deleted all pushers for deactivated accounts. Do so now. +-- +-- Note: We don't bother updating the `deleted_pushers` table as it's just use +-- to stop pushers on workers, and that will happen when they get next restarted. +DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1); diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql new file mode 100644
index 0000000000..2442eea6bc --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
@@ -0,0 +1,19 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Delete all pushers associated with deleted devices. This is to clear up after +-- a bug where they weren't correctly deleted when using workers. +DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens); diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 32228f42ee..53f85195a7 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, Set, TypeVar from twisted.internet import defer @@ -40,6 +40,7 @@ class ResponseCache(Generic[T]): def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0): # Requests that haven't finished yet. self.pending_result_cache = {} # type: Dict[T, ObservableDeferred] + self.pending_conditionals = {} # type: Dict[T, Set[Callable[[Any], bool]]] self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000.0 @@ -101,7 +102,11 @@ class ResponseCache(Generic[T]): self.pending_result_cache[key] = result def remove(r): - if self.timeout_sec: + should_cache = all( + func(r) for func in self.pending_conditionals.pop(key, []) + ) + + if self.timeout_sec and should_cache: self.clock.call_later( self.timeout_sec, self.pending_result_cache.pop, key, None ) @@ -112,6 +117,31 @@ class ResponseCache(Generic[T]): result.addBoth(remove) return result.observe() + def add_conditional(self, key: T, conditional: Callable[[Any], bool]): + self.pending_conditionals.setdefault(key, set()).add(conditional) + + def wrap_conditional( + self, + key: T, + should_cache: Callable[[Any], bool], + callback: "Callable[..., Any]", + *args: Any, + **kwargs: Any + ) -> defer.Deferred: + """The same as wrap(), but adds a conditional to the final execution. + + When the final execution completes, *all* conditionals need to return True for it to properly cache, + else it'll not be cached in a timed fashion. + """ + + # See if there's already a result on this key that hasn't yet completed. Due to the single-threaded nature of + # python, adding a key immediately in the same execution thread will not cause a race condition. + result = self.get(key) + if not result or isinstance(result, defer.Deferred) and not result.called: + self.add_conditional(key, should_cache) + + return self.wrap(key, callback, *args, **kwargs) + def wrap( self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any ) -> defer.Deferred: