diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index a01bac2997..4a9b0129c3 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -17,8 +17,6 @@ import sys
from synapse import python_dependencies # noqa: E402
-sys.dont_write_bytecode = True
-
logger = logging.getLogger(__name__)
try:
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 7911d126f5..7809c15589 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -120,6 +120,13 @@ class DeactivateAccountHandler(BaseHandler):
await self.store.user_set_password_hash(user_id, None)
+ # Most of the pushers will have been deleted when we logged out the
+ # associated devices above, but we still need to delete pushers not
+ # associated with devices, e.g. email pushers.
+ await self.store.delete_all_pushers_for_user(user_id)
+
+ # Set the user as no longer active. This will prevent their profile
+ # from being replicated.
user = UserID.from_string(user_id)
await self._profile_handler.set_active([user], False, False)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9059382246..b45b179fed 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -291,8 +291,9 @@ class SyncHandler:
user_id = sync_config.user.to_string()
await self.auth.check_auth_blocking(requester=requester)
- res = await self.response_cache.wrap(
+ res = await self.response_cache.wrap_conditional(
sync_config.request_key,
+ lambda result: since_token != result.next_batch,
self._wait_for_sync_for_user,
sync_config,
since_token,
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index c658862fe6..142b007d01 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -14,8 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
+from typing import Union
-from twisted.internet import task
+from twisted.internet import address, task
from twisted.web.client import FileBodyProducer
from twisted.web.iweb import IRequest
@@ -53,6 +54,40 @@ class QuieterFileBodyProducer(FileBodyProducer):
pass
+def get_request_uri(request: IRequest) -> bytes:
+ """Return the full URI that was requested by the client"""
+ return b"%s://%s%s" % (
+ b"https" if request.isSecure() else b"http",
+ _get_requested_host(request),
+ # despite its name, "request.uri" is only the path and query-string.
+ request.uri,
+ )
+
+
+def _get_requested_host(request: IRequest) -> bytes:
+ hostname = request.getHeader(b"host")
+ if hostname:
+ return hostname
+
+ # no Host header, use the address/port that the request arrived on
+ host = request.getHost() # type: Union[address.IPv4Address, address.IPv6Address]
+
+ hostname = host.host.encode("ascii")
+
+ if request.isSecure() and host.port == 443:
+ # default port for https
+ return hostname
+
+ if not request.isSecure() and host.port == 80:
+ # default port for http
+ return hostname
+
+ return b"%s:%i" % (
+ hostname,
+ host.port,
+ )
+
+
def get_request_user_agent(request: IRequest, default: str = "") -> str:
"""Return the last User-Agent header, or the given default."""
# There could be raw utf-8 bytes in the User-Agent header.
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 4a4fb5ef26..30153237e3 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -16,6 +16,10 @@ import logging
import time
from typing import Optional, Union
+import attr
+from zope.interface import implementer
+
+from twisted.internet.interfaces import IAddress
from twisted.python.failure import Failure
from twisted.web.server import Request, Site
@@ -333,26 +337,77 @@ class SynapseRequest(Request):
class XForwardedForRequest(SynapseRequest):
- def __init__(self, *args, **kw):
- SynapseRequest.__init__(self, *args, **kw)
+ """Request object which honours proxy headers
+ Extends SynapseRequest to replace getClientIP, getClientAddress, and isSecure with
+ information from request headers.
"""
- Add a layer on top of another request that only uses the value of an
- X-Forwarded-For header as the result of C{getClientIP}.
- """
- def getClientIP(self):
+ # the client IP and ssl flag, as extracted from the headers.
+ _forwarded_for = None # type: Optional[_XForwardedForAddress]
+ _forwarded_https = False # type: bool
+
+ def requestReceived(self, command, path, version):
+ # this method is called by the Channel once the full request has been
+ # received, to dispatch the request to a resource.
+ # We can use it to set the IP address and protocol according to the
+ # headers.
+ self._process_forwarded_headers()
+ return super().requestReceived(command, path, version)
+
+ def _process_forwarded_headers(self):
+ headers = self.requestHeaders.getRawHeaders(b"x-forwarded-for")
+ if not headers:
+ return
+
+ # for now, we just use the first x-forwarded-for header. Really, we ought
+ # to start from the client IP address, and check whether it is trusted; if it
+ # is, work backwards through the headers until we find an untrusted address.
+ # see https://github.com/matrix-org/synapse/issues/9471
+ self._forwarded_for = _XForwardedForAddress(
+ headers[0].split(b",")[0].strip().decode("ascii")
+ )
+
+ # if we got an x-forwarded-for header, also look for an x-forwarded-proto header
+ header = self.getHeader(b"x-forwarded-proto")
+ if header is not None:
+ self._forwarded_https = header.lower() == b"https"
+ else:
+ # this is done largely for backwards-compatibility so that people that
+ # haven't set an x-forwarded-proto header don't get a redirect loop.
+ logger.warning(
+ "forwarded request lacks an x-forwarded-proto header: assuming https"
+ )
+ self._forwarded_https = True
+
+ def isSecure(self):
+ if self._forwarded_https:
+ return True
+ return super().isSecure()
+
+ def getClientIP(self) -> str:
"""
- @return: The client address (the first address) in the value of the
- I{X-Forwarded-For header}. If the header is not present, return
- C{b"-"}.
+ Return the IP address of the client who submitted this request.
+
+ This method is deprecated. Use getClientAddress() instead.
"""
- return (
- self.requestHeaders.getRawHeaders(b"x-forwarded-for", [b"-"])[0]
- .split(b",")[0]
- .strip()
- .decode("ascii")
- )
+ if self._forwarded_for is not None:
+ return self._forwarded_for.host
+ return super().getClientIP()
+
+ def getClientAddress(self) -> IAddress:
+ """
+ Return the address of the client who submitted this request.
+ """
+ if self._forwarded_for is not None:
+ return self._forwarded_for
+ return super().getClientAddress()
+
+
+@implementer(IAddress)
+@attr.s(frozen=True, slots=True)
+class _XForwardedForAddress:
+ host = attr.ib(type=str)
class SynapseSite(Site):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 1679a6d211..2534ecb1d4 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Dict, Iterable, Optional
from prometheus_client import Gauge
+from synapse.api.errors import Codes, SynapseError
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -113,6 +114,11 @@ class PusherPool:
The newly created pusher.
"""
+ if kind == "email":
+ email_owner = await self.store.get_user_id_by_threepid("email", pushkey)
+ if email_owner != user_id:
+ raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND)
+
time_now_msec = self.clock.time_msec()
# create the pusher setting last_stream_ordering to the current maximum
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 6e2fbedd99..925edfc402 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -20,6 +20,7 @@ from synapse.api.errors import Codes, LoginError, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.appservice import ApplicationService
from synapse.handlers.sso import SsoIdentityProvider
+from synapse.http import get_request_uri
from synapse.http.server import HttpServer, finish_request
from synapse.http.servlet import (
RestServlet,
@@ -354,6 +355,7 @@ class SsoRedirectServlet(RestServlet):
hs.get_oidc_handler()
self._sso_handler = hs.get_sso_handler()
self._msc2858_enabled = hs.config.experimental.msc2858_enabled
+ self._public_baseurl = hs.config.public_baseurl
def register(self, http_server: HttpServer) -> None:
super().register(http_server)
@@ -373,6 +375,32 @@ class SsoRedirectServlet(RestServlet):
async def on_GET(
self, request: SynapseRequest, idp_id: Optional[str] = None
) -> None:
+ if not self._public_baseurl:
+ raise SynapseError(400, "SSO requires a valid public_baseurl")
+
+ # if this isn't the expected hostname, redirect to the right one, so that we
+ # get our cookies back.
+ requested_uri = get_request_uri(request)
+ baseurl_bytes = self._public_baseurl.encode("utf-8")
+ if not requested_uri.startswith(baseurl_bytes):
+ # swap out the incorrect base URL for the right one.
+ #
+ # The idea here is to redirect from
+ # https://foo.bar/whatever/_matrix/...
+ # to
+ # https://public.baseurl/_matrix/...
+ #
+ i = requested_uri.index(b"/_matrix")
+ new_uri = baseurl_bytes[:-1] + requested_uri[i:]
+ logger.info(
+ "Requested URI %s is not canonical: redirecting to %s",
+ requested_uri.decode("utf-8", errors="replace"),
+ new_uri.decode("utf-8", errors="replace"),
+ )
+ request.redirect(new_uri)
+ finish_request(request)
+ return
+
client_redirect_url = parse_string(
request, "redirectUrl", required=True, encoding=None
)
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 7cb69dd6bd..74219cb05e 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -373,3 +373,46 @@ class PusherStore(PusherWorkerStore):
await self.db_pool.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
+
+ async def delete_all_pushers_for_user(self, user_id: str) -> None:
+ """Delete all pushers associated with an account."""
+
+ # We want to generate a row in `deleted_pushers` for each pusher we're
+ # deleting, so we fetch the list now so we can generate the appropriate
+ # number of stream IDs.
+ #
+ # Note: technically there could be a race here between adding/deleting
+ # pushers, but a) the worst case if we don't stop a pusher until the
+ # next restart and b) this is only called when we're deactivating an
+ # account.
+ pushers = list(await self.get_pushers_by_user_id(user_id))
+
+ def delete_pushers_txn(txn, stream_ids):
+ self._invalidate_cache_and_stream( # type: ignore
+ txn, self.get_if_user_has_pusher, (user_id,)
+ )
+
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="pushers",
+ keyvalues={"user_name": user_id},
+ )
+
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="deleted_pushers",
+ values=[
+ {
+ "stream_id": stream_id,
+ "app_id": pusher.app_id,
+ "pushkey": pusher.pushkey,
+ "user_id": user_id,
+ }
+ for stream_id, pusher in zip(stream_ids, pushers)
+ ],
+ )
+
+ async with self._pushers_id_gen.get_next_mult(len(pushers)) as stream_ids:
+ await self.db_pool.runInteraction(
+ "delete_all_pushers_for_user", delete_pushers_txn, stream_ids
+ )
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
new file mode 100644
index 0000000000..20ba4abca3
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
@@ -0,0 +1,21 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- We may not have deleted all pushers for deactivated accounts. Do so now.
+--
+-- Note: We don't bother updating the `deleted_pushers` table as it's just use
+-- to stop pushers on workers, and that will happen when they get next restarted.
+DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1);
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
new file mode 100644
index 0000000000..2442eea6bc
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
@@ -0,0 +1,19 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Delete all pushers associated with deleted devices. This is to clear up after
+-- a bug where they weren't correctly deleted when using workers.
+DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens);
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 32228f42ee..53f85195a7 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar
+from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, Set, TypeVar
from twisted.internet import defer
@@ -40,6 +40,7 @@ class ResponseCache(Generic[T]):
def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0):
# Requests that haven't finished yet.
self.pending_result_cache = {} # type: Dict[T, ObservableDeferred]
+ self.pending_conditionals = {} # type: Dict[T, Set[Callable[[Any], bool]]]
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.0
@@ -101,7 +102,11 @@ class ResponseCache(Generic[T]):
self.pending_result_cache[key] = result
def remove(r):
- if self.timeout_sec:
+ should_cache = all(
+ func(r) for func in self.pending_conditionals.pop(key, [])
+ )
+
+ if self.timeout_sec and should_cache:
self.clock.call_later(
self.timeout_sec, self.pending_result_cache.pop, key, None
)
@@ -112,6 +117,31 @@ class ResponseCache(Generic[T]):
result.addBoth(remove)
return result.observe()
+ def add_conditional(self, key: T, conditional: Callable[[Any], bool]):
+ self.pending_conditionals.setdefault(key, set()).add(conditional)
+
+ def wrap_conditional(
+ self,
+ key: T,
+ should_cache: Callable[[Any], bool],
+ callback: "Callable[..., Any]",
+ *args: Any,
+ **kwargs: Any
+ ) -> defer.Deferred:
+ """The same as wrap(), but adds a conditional to the final execution.
+
+ When the final execution completes, *all* conditionals need to return True for it to properly cache,
+ else it'll not be cached in a timed fashion.
+ """
+
+ # See if there's already a result on this key that hasn't yet completed. Due to the single-threaded nature of
+ # python, adding a key immediately in the same execution thread will not cause a race condition.
+ result = self.get(key)
+ if not result or isinstance(result, defer.Deferred) and not result.called:
+ self.add_conditional(key, should_cache)
+
+ return self.wrap(key, callback, *args, **kwargs)
+
def wrap(
self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any
) -> defer.Deferred:
|