From d619b113edf2942185a502a91cbf5b51642f6814 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 22 Jan 2019 16:52:29 +0000 Subject: Fix None guard in config.server.is_threepid_reserved --- synapse/api/auth.py | 4 +++- synapse/config/server.py | 11 ++++++----- synapse/rest/client/v2_alpha/register.py | 4 +++- 3 files changed, 12 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index ba1019b9b2..e37b807c94 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -819,7 +819,9 @@ class Auth(object): elif threepid: # If the user does not exist yet, but is signing up with a # reserved threepid then pass auth check - if is_threepid_reserved(self.hs.config, threepid): + if is_threepid_reserved( + self.hs.config.mau_limits_reserved_threepids, threepid + ): return # Else if there is no room in the MAU bucket, bail current_mau = yield self.store.get_monthly_active_count() diff --git a/synapse/config/server.py b/synapse/config/server.py index fb57791098..927c54ee5b 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -420,19 +420,20 @@ class ServerConfig(Config): " service on the given port.") -def is_threepid_reserved(config, threepid): +def is_threepid_reserved(reserved_threepids, threepid): """Check the threepid against the reserved threepid config Args: - config(ServerConfig) - to access server config attributes + reserved_threepids([dict]) - list of reserved threepids threepid(dict) - The threepid to test for Returns: boolean Is the threepid undertest reserved_user """ + if not threepid: + return False - for tp in config.mau_limits_reserved_threepids: - if (threepid['medium'] == tp['medium'] - and threepid['address'] == tp['address']): + for tp in reserved_threepids: + if (threepid['medium'] == tp['medium'] and threepid['address'] == tp['address']): return True return False diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 14025cd219..3ab253462b 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -416,7 +416,9 @@ class RegisterRestServlet(RestServlet): ) # Necessary due to auth checks prior to the threepid being # written to the db - if is_threepid_reserved(self.hs.config, threepid): + if is_threepid_reserved( + self.hs.config.mau_limits_reserved_threepids, threepid + ): yield self.store.upsert_monthly_active_user(registered_user_id) # remember that we've now registered that user account, and with -- cgit 1.4.1 From c99c2d58d775a7a7bcbcae426fed608d6a0e8ee3 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 22 Jan 2019 17:47:00 +0000 Subject: move guard out of is_threepid_reserved and into register.py --- changelog.d/4435.bugfix | 2 +- synapse/config/server.py | 2 -- synapse/rest/client/v2_alpha/register.py | 9 +++++---- 3 files changed, 6 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/changelog.d/4435.bugfix b/changelog.d/4435.bugfix index 0e0535f1a3..4ea9a5df02 100644 --- a/changelog.d/4435.bugfix +++ b/changelog.d/4435.bugfix @@ -1 +1 @@ -Fix None guard in config.server.is_threepid_reserved +Fix None guard in calling config.server.is_threepid_reserved diff --git a/synapse/config/server.py b/synapse/config/server.py index 927c54ee5b..a915bb8b64 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -429,8 +429,6 @@ def is_threepid_reserved(reserved_threepids, threepid): Returns: boolean Is the threepid undertest reserved_user """ - if not threepid: - return False for tp in reserved_threepids: if (threepid['medium'] == tp['medium'] and threepid['address'] == tp['address']): diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 3ab253462b..7f812b8209 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -416,10 +416,11 @@ class RegisterRestServlet(RestServlet): ) # Necessary due to auth checks prior to the threepid being # written to the db - if is_threepid_reserved( - self.hs.config.mau_limits_reserved_threepids, threepid - ): - yield self.store.upsert_monthly_active_user(registered_user_id) + if threepid: + if is_threepid_reserved( + self.hs.config.mau_limits_reserved_threepids, threepid + ): + yield self.store.upsert_monthly_active_user(registered_user_id) # remember that we've now registered that user account, and with # what user ID (since the user may not have specified) -- cgit 1.4.1 From 44be7513bff501055cc2e667a5cca3fb87c23f70 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 21 Jan 2019 23:27:57 +0000 Subject: MatrixFederationAgent Pull the magic that is currently in matrix_federation_endpoint and friends into an agent-like thing --- synapse/http/federation/matrix_federation_agent.py | 114 +++++++++++++++++++++ synapse/http/federation/srv_resolver.py | 33 ++++++ 2 files changed, 147 insertions(+) create mode 100644 synapse/http/federation/matrix_federation_agent.py (limited to 'synapse') diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py new file mode 100644 index 0000000000..32bfd68ed1 --- /dev/null +++ b/synapse/http/federation/matrix_federation_agent.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from zope.interface import implementer + +from twisted.internet import defer +from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.web.client import URI, Agent, HTTPConnectionPool +from twisted.web.iweb import IAgent + +from synapse.http.endpoint import parse_server_name +from synapse.http.federation.srv_resolver import pick_server_from_list, resolve_service +from synapse.util.logcontext import make_deferred_yieldable + +logger = logging.getLogger(__name__) + + +@implementer(IAgent) +class MatrixFederationAgent(object): + """An Agent-like thing which provides a `request` method which will look up a matrix + server and send an HTTP request to it. + + Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.) + + Args: + reactor (IReactor): twisted reactor to use for underlying requests + tls_client_options_factory (ClientTLSOptionsFactory|None): + factory to use for fetching client tls options, or none to disable TLS. + """ + + def __init__(self, reactor, tls_client_options_factory): + self._reactor = reactor + self._tls_client_options_factory = tls_client_options_factory + + self._pool = HTTPConnectionPool(reactor) + self._pool.retryAutomatically = False + self._pool.maxPersistentPerHost = 5 + self._pool.cachedConnectionTimeout = 2 * 60 + + @defer.inlineCallbacks + def request(self, method, uri, headers=None, bodyProducer=None): + """ + Args: + method (bytes): HTTP method: GET/POST/etc + + uri (bytes): Absolute URI to be retrieved + + headers (twisted.web.http_headers.Headers|None): + HTTP headers to send with the request, or None to + send no extra headers. + + bodyProducer (twisted.web.iweb.IBodyProducer|None): + An object which can generate bytes to make up the + body of this request (for example, the properly encoded contents of + a file for a file upload). Or None if the request is to have + no body. + + Returns: + Deferred[twisted.web.iweb.IResponse]: + fires when the header of the response has been received (regardless of the + response status code). Fails if there is any problem which prevents that + response from being received (including problems that prevent the request + from being sent). + """ + + parsed_uri = URI.fromBytes(uri) + server_name_bytes = parsed_uri.netloc + host, port = parse_server_name(server_name_bytes.decode("ascii")) + + # XXX disabling TLS is really only supported here for the benefit of the + # unit tests. We should make the UTs cope with TLS rather than having to make + # the code support the unit tests. + if self._tls_client_options_factory is None: + tls_options = None + else: + tls_options = self._tls_client_options_factory.get_options(host) + + if port is not None: + target = (host, port) + else: + server_list = yield resolve_service(server_name_bytes) + if not server_list: + target = (host, 8448) + logger.debug("No SRV record for %s, using %s", host, target) + else: + target = pick_server_from_list(server_list) + + class EndpointFactory(object): + @staticmethod + def endpointForURI(_uri): + logger.info("Connecting to %s:%s", target[0], target[1]) + ep = HostnameEndpoint(self._reactor, host=target[0], port=target[1]) + if tls_options is not None: + ep = wrapClientTLS(tls_options, ep) + return ep + + agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool) + res = yield make_deferred_yieldable( + agent.request(method, uri, headers, bodyProducer) + ) + defer.returnValue(res) diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index c49b82c394..ded0b32832 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +import random import time import attr @@ -51,6 +52,38 @@ class Server(object): expires = attr.ib(default=0) +def pick_server_from_list(server_list): + """Randomly choose a server from the server list + + Args: + server_list (list[Server]): list of candidate servers + + Returns: + Tuple[bytes, int]: (host, port) pair for the chosen server + """ + if not server_list: + raise RuntimeError("pick_server_from_list called with empty list") + + # TODO: currently we only use the lowest-priority servers. We should maintain a + # cache of servers known to be "down" and filter them out + + min_priority = min(s.priority for s in server_list) + eligible_servers = list(s for s in server_list if s.priority == min_priority) + total_weight = sum(s.weight for s in eligible_servers) + target_weight = random.randint(0, total_weight) + + for s in eligible_servers: + target_weight -= s.weight + + if target_weight <= 0: + return s.host, s.port + + # this should be impossible. + raise RuntimeError( + "pick_server_from_list got to end of eligible server list.", + ) + + @defer.inlineCallbacks def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=time): """Look up a SRV record, with caching -- cgit 1.4.1 From 7871146667afd76576939c91986a8f8bacb49446 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 21 Jan 2019 23:29:47 +0000 Subject: Make MatrixFederationClient use MatrixFederationAgent ... instead of the matrix_federation_endpoint --- synapse/http/matrixfederationclient.py | 37 ++++--------- tests/http/test_fedclient.py | 96 ++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 27 deletions(-) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 250bb1ef91..980e912348 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -32,7 +32,7 @@ from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool +from twisted.web.client import FileBodyProducer from twisted.web.http_headers import Headers import synapse.metrics @@ -44,7 +44,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) -from synapse.http.endpoint import matrix_federation_endpoint +from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure @@ -66,20 +66,6 @@ else: MAXINT = sys.maxint -class MatrixFederationEndpointFactory(object): - def __init__(self, hs): - self.reactor = hs.get_reactor() - self.tls_client_options_factory = hs.tls_client_options_factory - - def endpointForURI(self, uri): - destination = uri.netloc.decode('ascii') - - return matrix_federation_endpoint( - self.reactor, destination, timeout=10, - tls_client_options_factory=self.tls_client_options_factory - ) - - _next_id = 1 @@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname reactor = hs.get_reactor() - pool = HTTPConnectionPool(reactor) - pool.retryAutomatically = False - pool.maxPersistentPerHost = 5 - pool.cachedConnectionTimeout = 2 * 60 - self.agent = Agent.usingEndpointFactory( - reactor, MatrixFederationEndpointFactory(hs), pool=pool + + self.agent = MatrixFederationAgent( + hs.get_reactor(), + hs.tls_client_options_factory, ) self.clock = hs.get_clock() self._store = hs.get_datastore() @@ -316,9 +300,9 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers logger.info( - "{%s} [%s] Sending request: %s %s", + "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, request.method, - url_str, + url_str, _sec_timeout, ) try: @@ -338,12 +322,11 @@ class MatrixFederationHttpClient(object): reactor=self.hs.get_reactor(), ) - response = yield make_deferred_yieldable( - request_deferred, - ) + response = yield request_deferred except DNSLookupError as e: raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) except Exception as e: + logger.info("Failed to send request: %s", e) raise_from(RequestSendFailed(e, can_retry=True), e) logger.info( diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index 8426eee400..d37f8f9981 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -15,6 +15,7 @@ from mock import Mock +from twisted.internet import defer from twisted.internet.defer import TimeoutError from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.test.proto_helpers import StringTransport @@ -26,11 +27,20 @@ from synapse.http.matrixfederationclient import ( MatrixFederationHttpClient, MatrixFederationRequest, ) +from synapse.util.logcontext import LoggingContext from tests.server import FakeTransport from tests.unittest import HomeserverTestCase +def check_logcontext(context): + current = LoggingContext.current_context() + if current is not context: + raise AssertionError( + "Expected logcontext %s but was %s" % (context, current), + ) + + class FederationClientTests(HomeserverTestCase): def make_homeserver(self, reactor, clock): @@ -43,6 +53,70 @@ class FederationClientTests(HomeserverTestCase): self.cl = MatrixFederationHttpClient(self.hs) self.reactor.lookups["testserv"] = "1.2.3.4" + def test_client_get(self): + """ + happy-path test of a GET request + """ + @defer.inlineCallbacks + def do_request(): + with LoggingContext("one") as context: + fetch_d = self.cl.get_json("testserv:8008", "foo/bar") + + # Nothing happened yet + self.assertNoResult(fetch_d) + + # should have reset logcontext to the sentinel + check_logcontext(LoggingContext.sentinel) + + try: + fetch_res = yield fetch_d + defer.returnValue(fetch_res) + finally: + check_logcontext(context) + + test_d = do_request() + + self.pump() + + # Nothing happened yet + self.assertNoResult(test_d) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, '1.2.3.4') + self.assertEqual(port, 8008) + + # complete the connection and wire it up to a fake transport + protocol = factory.buildProtocol(None) + transport = StringTransport() + protocol.makeConnection(transport) + + # that should have made it send the request to the transport + self.assertRegex(transport.value(), b"^GET /foo/bar") + + # Deferred is still without a result + self.assertNoResult(test_d) + + # Send it the HTTP response + res_json = '{ "a": 1 }'.encode('ascii') + protocol.dataReceived( + b"HTTP/1.1 200 OK\r\n" + b"Server: Fake\r\n" + b"Content-Type: application/json\r\n" + b"Content-Length: %i\r\n" + b"\r\n" + b"%s" % (len(res_json), res_json) + ) + + self.pump() + + res = self.successResultOf(test_d) + + # check the response is as expected + self.assertEqual(res, {"a": 1}) + def test_dns_error(self): """ If the DNS lookup returns an error, it will bubble up. @@ -54,6 +128,28 @@ class FederationClientTests(HomeserverTestCase): self.assertIsInstance(f.value, RequestSendFailed) self.assertIsInstance(f.value.inner_exception, DNSLookupError) + def test_client_connection_refused(self): + d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertNoResult(d) + + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, '1.2.3.4') + self.assertEqual(port, 8008) + e = Exception("go away") + factory.clientConnectionFailed(None, e) + self.pump(0.5) + + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, RequestSendFailed) + self.assertIs(f.value.inner_exception, e) + def test_client_never_connect(self): """ If the HTTP request is not connected and is timed out, it'll give a -- cgit 1.4.1 From fe212bbe4a6774e34b0a474e1d3c6aee4ae4e3c4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 21 Jan 2019 23:42:22 +0000 Subject: Kill off matrix_federation_endpoint this thing is now redundant. --- synapse/http/endpoint.py | 144 -------------------------------- synapse/http/federation/srv_resolver.py | 1 - 2 files changed, 145 deletions(-) (limited to 'synapse') diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 815f8ff2f7..cd79ebab62 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -13,15 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import random import re -from twisted.internet import defer -from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet.error import ConnectError - -from synapse.http.federation.srv_resolver import Server, resolve_service - logger = logging.getLogger(__name__) @@ -88,140 +81,3 @@ def parse_and_validate_server_name(server_name): )) return host, port - - -def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None, - timeout=None): - """Construct an endpoint for the given matrix destination. - - Args: - reactor: Twisted reactor. - destination (unicode): The name of the server to connect to. - tls_client_options_factory - (synapse.crypto.context_factory.ClientTLSOptionsFactory): - Factory which generates TLS options for client connections. - timeout (int): connection timeout in seconds - """ - - domain, port = parse_server_name(destination) - - endpoint_kw_args = {} - - if timeout is not None: - endpoint_kw_args.update(timeout=timeout) - - if tls_client_options_factory is None: - transport_endpoint = HostnameEndpoint - default_port = 8008 - else: - # the SNI string should be the same as the Host header, minus the port. - # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777, - # the Host header and SNI should therefore be the server_name of the remote - # server. - tls_options = tls_client_options_factory.get_options(domain) - - def transport_endpoint(reactor, host, port, timeout): - return wrapClientTLS( - tls_options, - HostnameEndpoint(reactor, host, port, timeout=timeout), - ) - default_port = 8448 - - if port is None: - return SRVClientEndpoint( - reactor, "matrix", domain, protocol="tcp", - default_port=default_port, endpoint=transport_endpoint, - endpoint_kw_args=endpoint_kw_args - ) - else: - return transport_endpoint( - reactor, domain, port, **endpoint_kw_args - ) - - -class SRVClientEndpoint(object): - """An endpoint which looks up SRV records for a service. - Cycles through the list of servers starting with each call to connect - picking the next server. - Implements twisted.internet.interfaces.IStreamClientEndpoint. - """ - - def __init__(self, reactor, service, domain, protocol="tcp", - default_port=None, endpoint=HostnameEndpoint, - endpoint_kw_args={}): - self.reactor = reactor - self.service_name = "_%s._%s.%s" % (service, protocol, domain) - - if default_port is not None: - self.default_server = Server( - host=domain, - port=default_port, - ) - else: - self.default_server = None - - self.endpoint = endpoint - self.endpoint_kw_args = endpoint_kw_args - - self.servers = None - self.used_servers = None - - @defer.inlineCallbacks - def fetch_servers(self): - self.used_servers = [] - self.servers = yield resolve_service(self.service_name) - - def pick_server(self): - if not self.servers: - if self.used_servers: - self.servers = self.used_servers - self.used_servers = [] - self.servers.sort() - elif self.default_server: - return self.default_server - else: - raise ConnectError( - "No server available for %s" % self.service_name - ) - - # look for all servers with the same priority - min_priority = self.servers[0].priority - weight_indexes = list( - (index, server.weight + 1) - for index, server in enumerate(self.servers) - if server.priority == min_priority - ) - - total_weight = sum(weight for index, weight in weight_indexes) - target_weight = random.randint(0, total_weight) - for index, weight in weight_indexes: - target_weight -= weight - if target_weight <= 0: - server = self.servers[index] - # XXX: this looks totally dubious: - # - # (a) we never reuse a server until we have been through - # all of the servers at the same priority, so if the - # weights are A: 100, B:1, we always do ABABAB instead of - # AAAA...AAAB (approximately). - # - # (b) After using all the servers at the lowest priority, - # we move onto the next priority. We should only use the - # second priority if servers at the top priority are - # unreachable. - # - del self.servers[index] - self.used_servers.append(server) - return server - - @defer.inlineCallbacks - def connect(self, protocolFactory): - if self.servers is None: - yield self.fetch_servers() - server = self.pick_server() - logger.info("Connecting to %s:%s", server.host, server.port) - endpoint = self.endpoint( - self.reactor, server.host, server.port, **self.endpoint_kw_args - ) - connection = yield endpoint.connect(protocolFactory) - defer.returnValue(connection) diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index ded0b32832..a6e92fdf40 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -152,6 +152,5 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t expires=int(clock.time()) + answer.ttl, )) - servers.sort() # FIXME: get rid of this (it's broken by the attrs change) cache[service_name] = list(servers) defer.returnValue(servers) -- cgit 1.4.1 From 53a327b4d5bdcab36651aa86ddf1815ff86e5db2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 22 Jan 2019 17:35:09 +0000 Subject: Require that service_name be a byte string it is only ever a bytes now, so let's enforce that. --- synapse/http/federation/srv_resolver.py | 8 ++++---- tests/http/federation/test_srv_resolver.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index a6e92fdf40..e05f934d0b 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -92,7 +92,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t but the cache never gets populated), so we add our own caching layer here. Args: - service_name (unicode|bytes): record to look up + service_name (bytes): record to look up dns_client (twisted.internet.interfaces.IResolver): twisted resolver impl cache (dict): cache object clock (object): clock implementation. must provide a time() method. @@ -100,9 +100,9 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t Returns: Deferred[list[Server]]: a list of the SRV records, or an empty list if none found """ - # TODO: the dns client handles both unicode names (encoding via idna) and pre-encoded - # byteses; however they will obviously end up as separate entries in the cache. We - # should pick one form and stick with it. + if not isinstance(service_name, bytes): + raise TypeError("%r is not a byte string" % (service_name,)) + cache_entry = cache.get(service_name, None) if cache_entry: if all(s.expires > int(clock.time()) for s in cache_entry): diff --git a/tests/http/federation/test_srv_resolver.py b/tests/http/federation/test_srv_resolver.py index 1271a495e1..de4d0089c8 100644 --- a/tests/http/federation/test_srv_resolver.py +++ b/tests/http/federation/test_srv_resolver.py @@ -83,7 +83,7 @@ class SrvResolverTestCase(unittest.TestCase): dns_client_mock = Mock() dns_client_mock.lookupService.return_value = defer.fail(error.DNSServerError()) - service_name = "test_service.example.com" + service_name = b"test_service.example.com" entry = Mock(spec_set=["expires"]) entry.expires = 0 @@ -106,7 +106,7 @@ class SrvResolverTestCase(unittest.TestCase): dns_client_mock = Mock(spec_set=['lookupService']) dns_client_mock.lookupService = Mock(spec_set=[]) - service_name = "test_service.example.com" + service_name = b"test_service.example.com" entry = Mock(spec_set=["expires"]) entry.expires = 999999999 @@ -128,7 +128,7 @@ class SrvResolverTestCase(unittest.TestCase): dns_client_mock.lookupService.return_value = defer.fail(error.DNSServerError()) - service_name = "test_service.example.com" + service_name = b"test_service.example.com" cache = {} @@ -141,7 +141,7 @@ class SrvResolverTestCase(unittest.TestCase): dns_client_mock.lookupService.return_value = defer.fail(error.DNSNameError()) - service_name = "test_service.example.com" + service_name = b"test_service.example.com" cache = {} -- cgit 1.4.1 From 7021784d4612c328ef174963c6d5ca9a37d24bc7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 22 Jan 2019 17:42:26 +0000 Subject: put resolve_service in an object this makes it easier to stub things out for tests. --- synapse/http/federation/matrix_federation_agent.py | 16 ++- synapse/http/federation/srv_resolver.py | 133 +++++++++++---------- tests/http/federation/test_srv_resolver.py | 38 +++--- 3 files changed, 104 insertions(+), 83 deletions(-) (limited to 'synapse') diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 32bfd68ed1..64c780a341 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -22,7 +22,7 @@ from twisted.web.client import URI, Agent, HTTPConnectionPool from twisted.web.iweb import IAgent from synapse.http.endpoint import parse_server_name -from synapse.http.federation.srv_resolver import pick_server_from_list, resolve_service +from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) @@ -37,13 +37,23 @@ class MatrixFederationAgent(object): Args: reactor (IReactor): twisted reactor to use for underlying requests + tls_client_options_factory (ClientTLSOptionsFactory|None): factory to use for fetching client tls options, or none to disable TLS. + + srv_resolver (SrvResolver|None): + SRVResolver impl to use for looking up SRV records. None to use a default + implementation. """ - def __init__(self, reactor, tls_client_options_factory): + def __init__( + self, reactor, tls_client_options_factory, _srv_resolver=None, + ): self._reactor = reactor self._tls_client_options_factory = tls_client_options_factory + if _srv_resolver is None: + _srv_resolver = SrvResolver() + self._srv_resolver = _srv_resolver self._pool = HTTPConnectionPool(reactor) self._pool.retryAutomatically = False @@ -91,7 +101,7 @@ class MatrixFederationAgent(object): if port is not None: target = (host, port) else: - server_list = yield resolve_service(server_name_bytes) + server_list = yield self._srv_resolver.resolve_service(server_name_bytes) if not server_list: target = (host, 8448) logger.debug("No SRV record for %s, using %s", host, target) diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index e05f934d0b..71830c549d 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -84,73 +84,86 @@ def pick_server_from_list(server_list): ) -@defer.inlineCallbacks -def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=time): - """Look up a SRV record, with caching +class SrvResolver(object): + """Interface to the dns client to do SRV lookups, with result caching. The default resolver in twisted.names doesn't do any caching (it has a CacheResolver, but the cache never gets populated), so we add our own caching layer here. Args: - service_name (bytes): record to look up dns_client (twisted.internet.interfaces.IResolver): twisted resolver impl cache (dict): cache object - clock (object): clock implementation. must provide a time() method. - - Returns: - Deferred[list[Server]]: a list of the SRV records, or an empty list if none found + get_time (callable): clock implementation. Should return seconds since the epoch """ - if not isinstance(service_name, bytes): - raise TypeError("%r is not a byte string" % (service_name,)) - - cache_entry = cache.get(service_name, None) - if cache_entry: - if all(s.expires > int(clock.time()) for s in cache_entry): - servers = list(cache_entry) - defer.returnValue(servers) - - try: - answers, _, _ = yield make_deferred_yieldable( - dns_client.lookupService(service_name), - ) - except DNSNameError: - # TODO: cache this. We can get the SOA out of the exception, and use - # the negative-TTL value. - defer.returnValue([]) - except DomainError as e: - # We failed to resolve the name (other than a NameError) - # Try something in the cache, else rereaise - cache_entry = cache.get(service_name, None) + def __init__(self, dns_client=client, cache=SERVER_CACHE, get_time=time.time): + self._dns_client = dns_client + self._cache = cache + self._get_time = get_time + + @defer.inlineCallbacks + def resolve_service(self, service_name): + """Look up a SRV record + + Args: + service_name (bytes): record to look up + + Returns: + Deferred[list[Server]]: + a list of the SRV records, or an empty list if none found + """ + now = int(self._get_time()) + + if not isinstance(service_name, bytes): + raise TypeError("%r is not a byte string" % (service_name,)) + + cache_entry = self._cache.get(service_name, None) if cache_entry: - logger.warn( - "Failed to resolve %r, falling back to cache. %r", - service_name, e + if all(s.expires > now for s in cache_entry): + servers = list(cache_entry) + defer.returnValue(servers) + + try: + answers, _, _ = yield make_deferred_yieldable( + self._dns_client.lookupService(service_name), ) - defer.returnValue(list(cache_entry)) - else: - raise e - - if (len(answers) == 1 - and answers[0].type == dns.SRV - and answers[0].payload - and answers[0].payload.target == dns.Name(b'.')): - raise ConnectError("Service %s unavailable" % service_name) - - servers = [] - - for answer in answers: - if answer.type != dns.SRV or not answer.payload: - continue - - payload = answer.payload - - servers.append(Server( - host=payload.target.name, - port=payload.port, - priority=payload.priority, - weight=payload.weight, - expires=int(clock.time()) + answer.ttl, - )) - - cache[service_name] = list(servers) - defer.returnValue(servers) + except DNSNameError: + # TODO: cache this. We can get the SOA out of the exception, and use + # the negative-TTL value. + defer.returnValue([]) + except DomainError as e: + # We failed to resolve the name (other than a NameError) + # Try something in the cache, else rereaise + cache_entry = self._cache.get(service_name, None) + if cache_entry: + logger.warn( + "Failed to resolve %r, falling back to cache. %r", + service_name, e + ) + defer.returnValue(list(cache_entry)) + else: + raise e + + if (len(answers) == 1 + and answers[0].type == dns.SRV + and answers[0].payload + and answers[0].payload.target == dns.Name(b'.')): + raise ConnectError("Service %s unavailable" % service_name) + + servers = [] + + for answer in answers: + if answer.type != dns.SRV or not answer.payload: + continue + + payload = answer.payload + + servers.append(Server( + host=payload.target.name, + port=payload.port, + priority=payload.priority, + weight=payload.weight, + expires=now + answer.ttl, + )) + + self._cache[service_name] = list(servers) + defer.returnValue(servers) diff --git a/tests/http/federation/test_srv_resolver.py b/tests/http/federation/test_srv_resolver.py index de4d0089c8..a872e2441e 100644 --- a/tests/http/federation/test_srv_resolver.py +++ b/tests/http/federation/test_srv_resolver.py @@ -21,7 +21,7 @@ from twisted.internet.defer import Deferred from twisted.internet.error import ConnectError from twisted.names import dns, error -from synapse.http.federation.srv_resolver import resolve_service +from synapse.http.federation.srv_resolver import SrvResolver from synapse.util.logcontext import LoggingContext from tests import unittest @@ -43,13 +43,13 @@ class SrvResolverTestCase(unittest.TestCase): dns_client_mock.lookupService.return_value = result_deferred cache = {} + resolver = SrvResolver(dns_client=dns_client_mock, cache=cache) @defer.inlineCallbacks def do_lookup(): + with LoggingContext("one") as ctx: - resolve_d = resolve_service( - service_name, dns_client=dns_client_mock, cache=cache - ) + resolve_d = resolver.resolve_service(service_name) self.assertNoResult(resolve_d) @@ -89,10 +89,9 @@ class SrvResolverTestCase(unittest.TestCase): entry.expires = 0 cache = {service_name: [entry]} + resolver = SrvResolver(dns_client=dns_client_mock, cache=cache) - servers = yield resolve_service( - service_name, dns_client=dns_client_mock, cache=cache - ) + servers = yield resolver.resolve_service(service_name) dns_client_mock.lookupService.assert_called_once_with(service_name) @@ -112,11 +111,12 @@ class SrvResolverTestCase(unittest.TestCase): entry.expires = 999999999 cache = {service_name: [entry]} - - servers = yield resolve_service( - service_name, dns_client=dns_client_mock, cache=cache, clock=clock + resolver = SrvResolver( + dns_client=dns_client_mock, cache=cache, get_time=clock.time, ) + servers = yield resolver.resolve_service(service_name) + self.assertFalse(dns_client_mock.lookupService.called) self.assertEquals(len(servers), 1) @@ -131,9 +131,10 @@ class SrvResolverTestCase(unittest.TestCase): service_name = b"test_service.example.com" cache = {} + resolver = SrvResolver(dns_client=dns_client_mock, cache=cache) with self.assertRaises(error.DNSServerError): - yield resolve_service(service_name, dns_client=dns_client_mock, cache=cache) + yield resolver.resolve_service(service_name) @defer.inlineCallbacks def test_name_error(self): @@ -144,10 +145,9 @@ class SrvResolverTestCase(unittest.TestCase): service_name = b"test_service.example.com" cache = {} + resolver = SrvResolver(dns_client=dns_client_mock, cache=cache) - servers = yield resolve_service( - service_name, dns_client=dns_client_mock, cache=cache - ) + servers = yield resolver.resolve_service(service_name) self.assertEquals(len(servers), 0) self.assertEquals(len(cache), 0) @@ -162,10 +162,9 @@ class SrvResolverTestCase(unittest.TestCase): dns_client_mock = Mock() dns_client_mock.lookupService.return_value = lookup_deferred cache = {} + resolver = SrvResolver(dns_client=dns_client_mock, cache=cache) - resolve_d = resolve_service( - service_name, dns_client=dns_client_mock, cache=cache - ) + resolve_d = resolver.resolve_service(service_name) self.assertNoResult(resolve_d) # returning a single "." should make the lookup fail with a ConenctError @@ -187,10 +186,9 @@ class SrvResolverTestCase(unittest.TestCase): dns_client_mock = Mock() dns_client_mock.lookupService.return_value = lookup_deferred cache = {} + resolver = SrvResolver(dns_client=dns_client_mock, cache=cache) - resolve_d = resolve_service( - service_name, dns_client=dns_client_mock, cache=cache - ) + resolve_d = resolver.resolve_service(service_name) self.assertNoResult(resolve_d) lookup_deferred.callback(( -- cgit 1.4.1 From 6129e52f437c2e03b711453434924e170f3d11bf Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 23 Jan 2019 19:39:06 +1100 Subject: Support ACME for certificate provisioning (#4384) --- changelog.d/4384.feature | 1 + scripts-dev/build_debian_packages | 2 +- synapse/app/homeserver.py | 56 ++++++++++++--- synapse/config/_base.py | 4 +- synapse/config/tls.py | 115 ++++++++++++++++++++++------- synapse/handlers/acme.py | 147 ++++++++++++++++++++++++++++++++++++++ synapse/python_dependencies.py | 4 ++ synapse/server.py | 5 ++ 8 files changed, 298 insertions(+), 36 deletions(-) create mode 100644 changelog.d/4384.feature create mode 100644 synapse/handlers/acme.py (limited to 'synapse') diff --git a/changelog.d/4384.feature b/changelog.d/4384.feature new file mode 100644 index 0000000000..daedcd58c4 --- /dev/null +++ b/changelog.d/4384.feature @@ -0,0 +1 @@ +Synapse can now automatically provision TLS certificates via ACME (the protocol used by CAs like Let's Encrypt). diff --git a/scripts-dev/build_debian_packages b/scripts-dev/build_debian_packages index 577d93e6f6..6b9be99060 100755 --- a/scripts-dev/build_debian_packages +++ b/scripts-dev/build_debian_packages @@ -10,12 +10,12 @@ # can be passed on the commandline for debugging. import argparse -from concurrent.futures import ThreadPoolExecutor import os import signal import subprocess import sys import threading +from concurrent.futures import ThreadPoolExecutor DISTS = ( "debian:stretch", diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index f3ac3d19f0..ffc49d77cc 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -13,10 +13,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import gc import logging import os import sys +import traceback from six import iteritems @@ -324,17 +326,12 @@ def setup(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - tls_server_context_factory = context_factory.ServerContextFactory(config) - tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) - database_engine = create_engine(config.database_config) config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection hs = SynapseHomeServer( config.server_name, db_config=config.database_config, - tls_server_context_factory=tls_server_context_factory, - tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, @@ -361,12 +358,53 @@ def setup(config_options): logger.info("Database prepared in %s.", config.database_config['name']) hs.setup() - hs.start_listening() + @defer.inlineCallbacks def start(): - hs.get_pusherpool().start() - hs.get_datastore().start_profiling() - hs.get_datastore().start_doing_background_updates() + try: + # Check if the certificate is still valid. + cert_days_remaining = hs.config.is_disk_cert_valid() + + if hs.config.acme_enabled: + # If ACME is enabled, we might need to provision a certificate + # before starting. + acme = hs.get_acme_handler() + + # Start up the webservices which we will respond to ACME + # challenges with. + yield acme.start_listening() + + # We want to reprovision if cert_days_remaining is None (meaning no + # certificate exists), or the days remaining number it returns + # is less than our re-registration threshold. + if (cert_days_remaining is None) or ( + not cert_days_remaining > hs.config.acme_reprovision_threshold + ): + yield acme.provision_certificate() + + # Read the certificate from disk and build the context factories for + # TLS. + hs.config.read_certificate_from_disk() + hs.tls_server_context_factory = context_factory.ServerContextFactory(config) + hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( + config + ) + + # It is now safe to start your Synapse. + hs.start_listening() + hs.get_pusherpool().start() + hs.get_datastore().start_profiling() + hs.get_datastore().start_doing_background_updates() + except Exception as e: + # If a DeferredList failed (like in listening on the ACME listener), + # we need to print the subfailure explicitly. + if isinstance(e, defer.FirstError): + e.subFailure.printTraceback(sys.stderr) + sys.exit(1) + + # Something else went wrong when starting. Print it and bail out. + traceback.print_exc(file=sys.stderr) + sys.exit(1) reactor.callWhenRunning(start) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index fd2d6d52ef..5858fb92b4 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -367,7 +367,7 @@ class Config(object): if not keys_directory: keys_directory = os.path.dirname(config_files[-1]) - config_dir_path = os.path.abspath(keys_directory) + self.config_dir_path = os.path.abspath(keys_directory) specified_config = {} for config_file in config_files: @@ -379,7 +379,7 @@ class Config(object): server_name = specified_config["server_name"] config_string = self.generate_config( - config_dir_path=config_dir_path, + config_dir_path=self.config_dir_path, data_dir_path=os.getcwd(), server_name=server_name, generate_secrets=False, diff --git a/synapse/config/tls.py b/synapse/config/tls.py index bb8952c672..a75e233aa0 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -13,60 +13,110 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os +from datetime import datetime from hashlib import sha256 from unpaddedbase64 import encode_base64 from OpenSSL import crypto -from ._base import Config +from synapse.config._base import Config + +logger = logging.getLogger() class TlsConfig(Config): def read_config(self, config): - self.tls_certificate = self.read_tls_certificate( - config.get("tls_certificate_path") - ) - self.tls_certificate_file = config.get("tls_certificate_path") + acme_config = config.get("acme", {}) + self.acme_enabled = acme_config.get("enabled", False) + self.acme_url = acme_config.get( + "url", "https://acme-v01.api.letsencrypt.org/directory" + ) + self.acme_port = acme_config.get("port", 8449) + self.acme_bind_addresses = acme_config.get("bind_addresses", ["127.0.0.1"]) + self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30) + + self.tls_certificate_file = os.path.abspath(config.get("tls_certificate_path")) + self.tls_private_key_file = os.path.abspath(config.get("tls_private_key_path")) + self._original_tls_fingerprints = config["tls_fingerprints"] + self.tls_fingerprints = list(self._original_tls_fingerprints) self.no_tls = config.get("no_tls", False) - if self.no_tls: - self.tls_private_key = None - else: - self.tls_private_key = self.read_tls_private_key( - config.get("tls_private_key_path") - ) + # This config option applies to non-federation HTTP clients + # (e.g. for talking to recaptcha, identity servers, and such) + # It should never be used in production, and is intended for + # use only when running tests. + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use" + ) + + self.tls_certificate = None + self.tls_private_key = None + + def is_disk_cert_valid(self): + """ + Is the certificate we have on disk valid, and if so, for how long? + + Returns: + int: Days remaining of certificate validity. + None: No certificate exists. + """ + if not os.path.exists(self.tls_certificate_file): + return None + + try: + with open(self.tls_certificate_file, 'rb') as f: + cert_pem = f.read() + except Exception: + logger.exception("Failed to read existing certificate off disk!") + raise + + try: + tls_certificate = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + except Exception: + logger.exception("Failed to parse existing certificate off disk!") + raise + + # YYYYMMDDhhmmssZ -- in UTC + expires_on = datetime.strptime( + tls_certificate.get_notAfter().decode('ascii'), "%Y%m%d%H%M%SZ" + ) + now = datetime.utcnow() + days_remaining = (expires_on - now).days + return days_remaining - self.tls_fingerprints = config["tls_fingerprints"] + def read_certificate_from_disk(self): + """ + Read the certificates from disk. + """ + self.tls_certificate = self.read_tls_certificate(self.tls_certificate_file) + + if not self.no_tls: + self.tls_private_key = self.read_tls_private_key(self.tls_private_key_file) + + self.tls_fingerprints = list(self._original_tls_fingerprints) # Check that our own certificate is included in the list of fingerprints # and include it if it is not. x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, - self.tls_certificate + crypto.FILETYPE_ASN1, self.tls_certificate ) sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest()) sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints) if sha256_fingerprint not in sha256_fingerprints: self.tls_fingerprints.append({u"sha256": sha256_fingerprint}) - # This config option applies to non-federation HTTP clients - # (e.g. for talking to recaptcha, identity servers, and such) - # It should never be used in production, and is intended for - # use only when running tests. - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use" - ) - def default_config(self, config_dir_path, server_name, **kwargs): base_key_name = os.path.join(config_dir_path, server_name) tls_certificate_path = base_key_name + ".tls.crt" tls_private_key_path = base_key_name + ".tls.key" - return """\ + return ( + """\ # PEM encoded X509 certificate for TLS. # You can replace the self-signed certificate that synapse # autogenerates on launch with your own SSL certificate + key pair @@ -107,7 +157,24 @@ class TlsConfig(Config): # tls_fingerprints: [] # tls_fingerprints: [{"sha256": ""}] - """ % locals() + + ## Support for ACME certificate auto-provisioning. + # acme: + # enabled: false + ## ACME path. + ## If you only want to test, use the staging url: + ## https://acme-staging.api.letsencrypt.org/directory + # url: 'https://acme-v01.api.letsencrypt.org/directory' + ## Port number (to listen for the HTTP-01 challenge). + ## Using port 80 requires utilising something like authbind, or proxying to it. + # port: 8449 + ## Hosts to bind to. + # bind_addresses: ['127.0.0.1'] + ## How many days remaining on a certificate before it is renewed. + # reprovision_threshold: 30 + """ + % locals() + ) def read_tls_certificate(self, cert_path): cert_pem = self.read_file(cert_path, "tls_certificate") diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py new file mode 100644 index 0000000000..73ea7ed018 --- /dev/null +++ b/synapse/handlers/acme.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import attr +from zope.interface import implementer + +from twisted.internet import defer +from twisted.internet.endpoints import serverFromString +from twisted.python.filepath import FilePath +from twisted.python.url import URL +from twisted.web import server, static +from twisted.web.resource import Resource + +logger = logging.getLogger(__name__) + +try: + from txacme.interfaces import ICertificateStore + + @attr.s + @implementer(ICertificateStore) + class ErsatzStore(object): + """ + A store that only stores in memory. + """ + + certs = attr.ib(default=attr.Factory(dict)) + + def store(self, server_name, pem_objects): + self.certs[server_name] = [o.as_bytes() for o in pem_objects] + return defer.succeed(None) + + +except ImportError: + # txacme is missing + pass + + +class AcmeHandler(object): + def __init__(self, hs): + self.hs = hs + self.reactor = hs.get_reactor() + + @defer.inlineCallbacks + def start_listening(self): + + # Configure logging for txacme, if you need to debug + # from eliot import add_destinations + # from eliot.twisted import TwistedDestination + # + # add_destinations(TwistedDestination()) + + from txacme.challenges import HTTP01Responder + from txacme.service import AcmeIssuingService + from txacme.endpoint import load_or_create_client_key + from txacme.client import Client + from josepy.jwa import RS256 + + self._store = ErsatzStore() + responder = HTTP01Responder() + + self._issuer = AcmeIssuingService( + cert_store=self._store, + client_creator=( + lambda: Client.from_url( + reactor=self.reactor, + url=URL.from_text(self.hs.config.acme_url), + key=load_or_create_client_key( + FilePath(self.hs.config.config_dir_path) + ), + alg=RS256, + ) + ), + clock=self.reactor, + responders=[responder], + ) + + well_known = Resource() + well_known.putChild(b'acme-challenge', responder.resource) + responder_resource = Resource() + responder_resource.putChild(b'.well-known', well_known) + responder_resource.putChild(b'check', static.Data(b'OK', b'text/plain')) + + srv = server.Site(responder_resource) + + listeners = [] + + for host in self.hs.config.acme_bind_addresses: + logger.info( + "Listening for ACME requests on %s:%s", host, self.hs.config.acme_port + ) + endpoint = serverFromString( + self.reactor, "tcp:%s:interface=%s" % (self.hs.config.acme_port, host) + ) + listeners.append(endpoint.listen(srv)) + + # Make sure we are registered to the ACME server. There's no public API + # for this, it is usually triggered by startService, but since we don't + # want it to control where we save the certificates, we have to reach in + # and trigger the registration machinery ourselves. + self._issuer._registered = False + yield self._issuer._ensure_registered() + + # Return a Deferred that will fire when all the servers have started up. + yield defer.DeferredList(listeners, fireOnOneErrback=True, consumeErrors=True) + + @defer.inlineCallbacks + def provision_certificate(self): + + logger.warning("Reprovisioning %s", self.hs.hostname) + + try: + yield self._issuer.issue_cert(self.hs.hostname) + except Exception: + logger.exception("Fail!") + raise + logger.warning("Reprovisioned %s, saving.", self.hs.hostname) + cert_chain = self._store.certs[self.hs.hostname] + + try: + with open(self.hs.config.tls_private_key_file, "wb") as private_key_file: + for x in cert_chain: + if x.startswith(b"-----BEGIN RSA PRIVATE KEY-----"): + private_key_file.write(x) + + with open(self.hs.config.tls_certificate_file, "wb") as certificate_file: + for x in cert_chain: + if x.startswith(b"-----BEGIN CERTIFICATE-----"): + certificate_file.write(x) + except Exception: + logger.exception("Failed saving!") + raise + + defer.returnValue(True) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 882e844eb1..756721e304 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -79,6 +79,10 @@ CONDITIONAL_REQUIREMENTS = { # ConsentResource uses select_autoescape, which arrived in jinja 2.9 "resources.consent": ["Jinja2>=2.9"], + # ACME support is required to provision TLS certificates from authorities + # that use the protocol, such as Let's Encrypt. + "acme": ["txacme>=0.9.2"], + "saml2": ["pysaml2>=4.5.0"], "url_preview": ["lxml>=3.5.0"], "test": ["mock>=2.0"], diff --git a/synapse/server.py b/synapse/server.py index 9985687b95..c8914302cf 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -46,6 +46,7 @@ from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer from synapse.groups.groups_server import GroupsServerHandler from synapse.handlers import Handlers +from synapse.handlers.acme import AcmeHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGenerator from synapse.handlers.deactivate_account import DeactivateAccountHandler @@ -129,6 +130,7 @@ class HomeServer(object): 'sync_handler', 'typing_handler', 'room_list_handler', + 'acme_handler', 'auth_handler', 'device_handler', 'e2e_keys_handler', @@ -310,6 +312,9 @@ class HomeServer(object): def build_e2e_room_keys_handler(self): return E2eRoomKeysHandler(self) + def build_acme_handler(self): + return AcmeHandler(self) + def build_application_service_api(self): return ApplicationServiceApi(self) -- cgit 1.4.1 From 90743c9d8910679bb688070b3929f3005e106d00 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 08:45:18 +0000 Subject: Fixup removal of duplicate `user_ips` rows (#4432) * Remove unnecessary ORDER BY clause * Add logging * Newsfile --- changelog.d/4432.misc | 1 + synapse/storage/client_ips.py | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 changelog.d/4432.misc (limited to 'synapse') diff --git a/changelog.d/4432.misc b/changelog.d/4432.misc new file mode 100644 index 0000000000..047061ed3c --- /dev/null +++ b/changelog.d/4432.misc @@ -0,0 +1 @@ +Apply a unique index to the user_ips table, preventing duplicates. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 78721a941a..b228a20ac2 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -143,6 +143,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): # If it returns None, then we're processing the last batch last = end_last_seen is None + logger.info( + "Scanning for duplicate 'user_ips' rows in range: %s <= last_seen < %s", + begin_last_seen, end_last_seen, + ) + def remove(txn): # This works by looking at all entries in the given time span, and # then for each (user_id, access_token, ip) tuple in that range @@ -170,7 +175,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): SELECT user_id, access_token, ip FROM user_ips WHERE {} - ORDER BY last_seen ) c INNER JOIN user_ips USING (user_id, access_token, ip) GROUP BY user_id, access_token, ip -- cgit 1.4.1 From 82a92ba535b424009ef752add2e0d5e198254e04 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 23 Jan 2019 15:01:09 +0000 Subject: Add metric for user dir current event stream position --- synapse/handlers/user_directory.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 3c40999338..120815b09b 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -19,6 +19,7 @@ from six import iteritems from twisted.internet import defer +import synapse.metrics from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo @@ -163,6 +164,11 @@ class UserDirectoryHandler(object): yield self._handle_deltas(deltas) self.pos = deltas[-1]["stream_id"] + + # Expose current event processing position to prometheus + synapse.metrics.event_processing_positions.labels( + "user_dir").set(self.pos) + yield self.store.update_user_directory_stream_pos(self.pos) @defer.inlineCallbacks -- cgit 1.4.1 From 6a41d2a187ecef484a3aa67518ec9b4b0638c614 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 17:19:58 +0000 Subject: Add room_version param to get_pdu When we add new event format we'll need to know the event format or room version when parsing events. --- synapse/federation/federation_base.py | 11 +++++--- synapse/federation/federation_client.py | 46 ++++++++++++++++++++++++++++----- synapse/federation/federation_server.py | 4 ++- synapse/handlers/federation.py | 12 +++++++-- 4 files changed, 60 insertions(+), 13 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index b7ad729c63..d749bfdd3a 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -43,8 +43,8 @@ class FederationBase(object): self._clock = hs.get_clock() @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, - include_none=False): + def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version, + outlier=False, include_none=False): """Takes a list of PDUs and checks the signatures and hashs of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of @@ -56,8 +56,12 @@ class FederationBase(object): a new list. Args: + origin (str) pdu (list) - outlier (bool) + room_version (str) + outlier (bool): Whether the events are outliers or not + include_none (str): Whether to include None in the returned list + for events that have failed their checks Returns: Deferred : A list of PDUs that have valid signatures and hashes. @@ -84,6 +88,7 @@ class FederationBase(object): res = yield self.get_pdu( destinations=[pdu.origin], event_id=pdu.event_id, + room_version=room_version, outlier=outlier, timeout=10000, ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d05ed91d64..4e171f9b56 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,7 +25,12 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RoomVersions, +) from synapse.api.errors import ( CodeMessageException, FederationDeniedError, @@ -202,7 +207,8 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - def get_pdu(self, destinations, event_id, outlier=False, timeout=None): + def get_pdu(self, destinations, event_id, room_version, outlier=False, + timeout=None): """Requests the PDU with given origin and ID from the remote home servers. @@ -212,6 +218,7 @@ class FederationClient(FederationBase): Args: destinations (list): Which home servers to query event_id (str): event to fetch + room_version (str): version of the room outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` @@ -352,10 +359,13 @@ class FederationClient(FederationBase): ev.event_id for ev in itertools.chain(pdus, auth_chain) ]) + room_version = yield self.store.get_room_version(room_id) + signed_pdus = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in pdus if p.event_id not in seen_events], - outlier=True + outlier=True, + room_version=room_version, ) signed_pdus.extend( seen_events[p.event_id] for p in pdus if p.event_id in seen_events @@ -364,7 +374,8 @@ class FederationClient(FederationBase): signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in auth_chain if p.event_id not in seen_events], - outlier=True + outlier=True, + room_version=room_version, ) signed_auth.extend( seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events @@ -411,6 +422,8 @@ class FederationClient(FederationBase): random.shuffle(srvs) return srvs + room_version = yield self.store.get_room_version(room_id) + batch_size = 20 missing_events = list(missing_events) for i in range(0, len(missing_events), batch_size): @@ -421,6 +434,7 @@ class FederationClient(FederationBase): self.get_pdu, destinations=random_server_list(), event_id=e_id, + room_version=room_version, ) for e_id in batch ] @@ -450,8 +464,11 @@ class FederationClient(FederationBase): for p in res["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, + outlier=True, room_version=room_version, ) signed_auth.sort(key=lambda e: e.depth) @@ -650,9 +667,20 @@ class FederationClient(FederationBase): for p in itertools.chain(state, auth_chain) } + room_version = None + for e in state: + if (e.type, e.state_key) == (EventTypes.Create, ""): + room_version = e.content.get("room_version", RoomVersions.V1) + break + + if room_version is None: + # We use this error has that is what + raise SynapseError(400, "No create event in state") + valid_pdus = yield self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), outlier=True, + room_version=room_version, ) valid_pdus_map = { @@ -790,8 +818,10 @@ class FederationClient(FederationBase): for e in content["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, outlier=True, room_version=room_version, ) signed_auth.sort(key=lambda e: e.depth) @@ -838,8 +868,10 @@ class FederationClient(FederationBase): for e in content.get("events", []) ] + room_version = yield self.store.get_room_version(room_id) + signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False + destination, events, outlier=False, room_version=room_version, ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 37d29e7027..cb729c69ea 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -457,8 +457,10 @@ class FederationServer(FederationBase): for e in content["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True + origin, auth_chain, outlier=True, room_version=room_version, ) ret = yield self.handler.on_query_auth( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb2..a9dc4a4e4e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -34,6 +34,7 @@ from synapse.api.constants import ( EventTypes, Membership, RejectedReason, + RoomVersions, ) from synapse.api.errors import ( AuthError, @@ -342,6 +343,8 @@ class FederationHandler(BaseHandler): room_id, event_id, p, ) + room_version = yield self.store.get_room_version(room_id) + with logcontext.nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped @@ -355,7 +358,7 @@ class FederationHandler(BaseHandler): # we want the state *after* p; get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( - [origin], p, outlier=True, + [origin], p, room_version, outlier=True, ) if remote_event is None: @@ -379,7 +382,6 @@ class FederationHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_store( room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), @@ -655,6 +657,8 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") + room_version = yield self.store.get_room_version(room_id) + events = yield self.federation_client.backfill( dest, room_id, @@ -748,6 +752,7 @@ class FederationHandler(BaseHandler): self.federation_client.get_pdu, [dest], event_id, + room_version=room_version, outlier=True, timeout=10000, ) @@ -1659,6 +1664,8 @@ class FederationHandler(BaseHandler): create_event = e break + room_version = create_event.content.get("room_version", RoomVersions.V1) + missing_auth_events = set() for e in itertools.chain(auth_events, state, [event]): for e_id in e.auth_event_ids(): @@ -1669,6 +1676,7 @@ class FederationHandler(BaseHandler): m_ev = yield self.federation_client.get_pdu( [origin], e_id, + room_version=room_version, outlier=True, timeout=10000, ) -- cgit 1.4.1 From 886e5acc762b879b606773b511ff92345aef14c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Jan 2019 15:13:07 +0000 Subject: Store rejected remote invite events as outliers Currently they're stored as non-outliers even though the server isn't in the room, which can be problematic in places where the code assumes it has the state for all non outlier events. In particular, there is an edge case where persisting the leave event triggers a state resolution, which requires looking up the room version from state. Since the server doesn't have the state, this causes an exception to be thrown. --- synapse/federation/federation_client.py | 10 ++++++-- synapse/handlers/federation.py | 44 +++++++++------------------------ synapse/storage/roommember.py | 5 +--- 3 files changed, 21 insertions(+), 38 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d05ed91d64..8fa726759e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,7 +32,6 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) -from synapse.events import builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -66,6 +65,8 @@ class FederationClient(FederationBase): self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() + self.event_builder_factory = hs.get_event_builder_factory() + self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, @@ -571,7 +572,12 @@ class FederationClient(FederationBase): if "prev_state" not in pdu_dict: pdu_dict["prev_state"] = [] - ev = builder.EventBuilder(pdu_dict) + # Strip off the fields that we want to clobber. + pdu_dict.pop("origin", None) + pdu_dict.pop("origin_server_ts", None) + pdu_dict.pop("unsigned", None) + + ev = self.event_builder_factory.new(pdu_dict) defer.returnValue( (destination, ev) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb2..70be87cd3d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,10 +43,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) -from synapse.crypto.event_signing import ( - add_hashes_and_signatures, - compute_event_signature, -) +from synapse.crypto.event_signing import compute_event_signature from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -58,7 +55,6 @@ from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room -from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -1083,7 +1079,9 @@ class FederationHandler(BaseHandler): handled_events = set() try: - event = self._sign_event(event) + self._sign_event(event) + event.internal_metadata.outlier = False + # Try the host we successfully got a response to /make_join/ # request first. try: @@ -1289,13 +1287,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) + self._sign_event(event) context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) @@ -1313,7 +1305,7 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - event = self._sign_event(event) + self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1358,26 +1350,14 @@ class FederationHandler(BaseHandler): defer.returnValue((origin, event)) def _sign_event(self, event): - event.internal_metadata.outlier = False - - builder = self.event_builder_factory.new( - unfreeze(event.get_pdu_json()) - ) - - builder.event_id = self.event_builder_factory.create_event_id() - builder.origin = self.hs.hostname - - if not hasattr(event, "signatures"): - builder.signatures = {} - - add_hashes_and_signatures( - builder, - self.hs.hostname, - self.hs.config.signing_key[0], + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - return builder.build() - @defer.inlineCallbacks @log_function def on_make_leave_request(self, room_id, user_id): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0707f9a86a..c7488f4259 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -591,10 +591,7 @@ class RoomMemberStore(RoomMemberWorkerStore): # i.e., its something that has just happened. # The only current event that can also be an outlier is if its an # invite that has come in across federation. - is_new_state = not backfilled and ( - not event.internal_metadata.is_outlier() - or event.internal_metadata.is_invite_from_remote() - ) + is_new_state = not backfilled is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: if event.membership == Membership.INVITE: -- cgit 1.4.1 From 07f62da55ac8903f7ea224255b8defd122724ec4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 19:44:37 +0000 Subject: Remove unnecessary '_sign_event' --- synapse/federation/federation_client.py | 9 ++++++++- synapse/handlers/federation.py | 14 -------------- 2 files changed, 8 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8fa726759e..f4adcb556d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,6 +32,7 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -577,7 +578,13 @@ class FederationClient(FederationBase): pdu_dict.pop("origin_server_ts", None) pdu_dict.pop("unsigned", None) - ev = self.event_builder_factory.new(pdu_dict) + builder = self.event_builder_factory.new(pdu_dict) + add_hashes_and_signatures( + builder, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ev = builder.build() defer.returnValue( (destination, ev) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 70be87cd3d..9a14ba4517 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,7 +43,6 @@ from synapse.api.errors import ( StoreError, SynapseError, ) -from synapse.crypto.event_signing import compute_event_signature from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -1079,7 +1078,6 @@ class FederationHandler(BaseHandler): handled_events = set() try: - self._sign_event(event) event.internal_metadata.outlier = False # Try the host we successfully got a response to /make_join/ @@ -1287,8 +1285,6 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True - self._sign_event(event) - context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) @@ -1305,7 +1301,6 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1349,15 +1344,6 @@ class FederationHandler(BaseHandler): assert(event.room_id == room_id) defer.returnValue((origin, event)) - def _sign_event(self, event): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - @defer.inlineCallbacks @log_function def on_make_leave_request(self, room_id, user_id): -- cgit 1.4.1 From 7c288c22500e2045d36a29c38d2671fad6484e30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Jan 2019 20:05:44 +0000 Subject: Clarify the invite flows --- synapse/events/__init__.py | 8 ++++++-- synapse/handlers/federation.py | 12 +++++++++++- synapse/storage/roommember.py | 11 +++++++---- 3 files changed, 24 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 84c75495d5..5030636c7e 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -41,8 +41,12 @@ class _EventInternalMetadata(object): def is_outlier(self): return getattr(self, "outlier", False) - def is_invite_from_remote(self): - return getattr(self, "invite_from_remote", False) + def is_new_remote_event(self): + """Whether this is a new remote event, like an invite or an invite + rejection. This is needed as those events are marked as outliers, but + they still need to be processed. + """ + return getattr(self, "new_remote_event", False) def get_send_on_behalf_of(self): """Whether this server should send the event on behalf of another server. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9a14ba4517..e017cab777 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -43,6 +43,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) +from synapse.crypto.event_signing import compute_event_signature from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -1283,7 +1284,15 @@ class FederationHandler(BaseHandler): ) event.internal_metadata.outlier = True - event.internal_metadata.invite_from_remote = True + event.internal_metadata.new_remote_event = True + + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) @@ -1301,6 +1310,7 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True + event.internal_metadata.new_remote_event = True # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c7488f4259..40b13de80b 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -588,10 +588,13 @@ class RoomMemberStore(RoomMemberWorkerStore): ) # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. - # The only current event that can also be an outlier is if its an - # invite that has come in across federation. - is_new_state = not backfilled + # i.e., its something that has just happened. If the event is an + # outlier it is only current if its a "new remote event", like a + # remote invite or a rejection of a remote invite. + is_new_state = not backfilled and ( + not event.internal_metadata.is_outlier() + or event.internal_metadata.is_new_remote_event() + ) is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: if event.membership == Membership.INVITE: -- cgit 1.4.1 From 97fd29c019ae92cd3dc0635de249acfc9c892340 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 24 Jan 2019 09:34:44 +0000 Subject: Don't send IP addresses as SNI (#4452) The problem here is that we have cut-and-pasted an impl from Twisted, and then failed to maintain it. It was fixed in Twisted in https://github.com/twisted/twisted/pull/1047/files; let's do the same here. --- changelog.d/4452.bugfix | 1 + synapse/crypto/context_factory.py | 15 ++++-- .../federation/test_matrix_federation_agent.py | 63 ++++++++++++++++++++-- 3 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 changelog.d/4452.bugfix (limited to 'synapse') diff --git a/changelog.d/4452.bugfix b/changelog.d/4452.bugfix new file mode 100644 index 0000000000..a715ca3788 --- /dev/null +++ b/changelog.d/4452.bugfix @@ -0,0 +1 @@ +Don't send IP addresses as SNI diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index 6ba3eca7b2..286ad80100 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -17,6 +17,7 @@ from zope.interface import implementer from OpenSSL import SSL, crypto from twisted.internet._sslverify import _defaultCurveName +from twisted.internet.abstract import isIPAddress, isIPv6Address from twisted.internet.interfaces import IOpenSSLClientConnectionCreator from twisted.internet.ssl import CertificateOptions, ContextFactory from twisted.python.failure import Failure @@ -98,8 +99,14 @@ class ClientTLSOptions(object): def __init__(self, hostname, ctx): self._ctx = ctx - self._hostname = hostname - self._hostnameBytes = _idnaBytes(hostname) + + if isIPAddress(hostname) or isIPv6Address(hostname): + self._hostnameBytes = hostname.encode('ascii') + self._sendSNI = False + else: + self._hostnameBytes = _idnaBytes(hostname) + self._sendSNI = True + ctx.set_info_callback( _tolerateErrors(self._identityVerifyingInfoCallback) ) @@ -111,7 +118,9 @@ class ClientTLSOptions(object): return connection def _identityVerifyingInfoCallback(self, connection, where, ret): - if where & SSL.SSL_CB_HANDSHAKE_START: + # Literal IPv4 and IPv6 addresses are not permitted + # as host names according to the RFCs + if where & SSL.SSL_CB_HANDSHAKE_START and self._sendSNI: connection.set_tlsext_host_name(self._hostnameBytes) diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index eb963d80fb..7a3881f558 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -46,7 +46,7 @@ class MatrixFederationAgentTests(TestCase): _srv_resolver=self.mock_resolver, ) - def _make_connection(self, client_factory): + def _make_connection(self, client_factory, expected_sni): """Builds a test server, and completes the outgoing client connection Returns: @@ -69,9 +69,17 @@ class MatrixFederationAgentTests(TestCase): # tell the server tls protocol to send its stuff back to the client, too server_tls_protocol.makeConnection(FakeTransport(client_protocol, self.reactor)) - # finally, give the reactor a pump to get the TLS juices flowing. + # give the reactor a pump to get the TLS juices flowing. self.reactor.pump((0.1,)) + # check the SNI + server_name = server_tls_protocol._tlsConnection.get_servername() + self.assertEqual( + server_name, + expected_sni, + "Expected SNI %s but got %s" % (expected_sni, server_name), + ) + # fish the test server back out of the server-side TLS protocol. return server_tls_protocol.wrappedProtocol @@ -113,7 +121,10 @@ class MatrixFederationAgentTests(TestCase): self.assertEqual(port, 8448) # make a test server, and wire up the client - http_server = self._make_connection(client_factory) + http_server = self._make_connection( + client_factory, + expected_sni=b"testserv", + ) self.assertEqual(len(http_server.requests), 1) request = http_server.requests[0] @@ -150,6 +161,52 @@ class MatrixFederationAgentTests(TestCase): json = self.successResultOf(treq.json_content(response)) self.assertEqual(json, {"a": 1}) + def test_get_ip_address(self): + """ + Test the behaviour when the server name contains an explicit IP (with no port) + """ + + # the SRV lookup will return an empty list (XXX: why do we even do an SRV lookup?) + self.mock_resolver.resolve_service.side_effect = lambda _: [] + + # then there will be a getaddrinfo on the IP + self.reactor.lookups["1.2.3.4"] = "1.2.3.4" + + test_d = self._make_get_request(b"matrix://1.2.3.4/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + self.mock_resolver.resolve_service.assert_called_once() + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, '1.2.3.4') + self.assertEqual(port, 8448) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, + expected_sni=None, + ) + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b'GET') + self.assertEqual(request.path, b'/foo/bar') + # XXX currently broken + # self.assertEqual( + # request.requestHeaders.getRawHeaders(b'host'), + # [b'1.2.3.4:8448'] + # ) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + def _check_logcontext(context): current = LoggingContext.current_context() -- cgit 1.4.1 From 58f6c4818337364dd9c6bf01062e7b0dadcb8a25 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 24 Jan 2019 21:31:54 +1100 Subject: Use native UPSERTs where possible (#4306) --- .coveragerc | 6 +- .gitignore | 6 +- changelog.d/4306.misc | 1 + synapse/storage/_base.py | 148 +++++++++++++++++++++++++++++++++--- synapse/storage/client_ips.py | 5 +- synapse/storage/engines/__init__.py | 2 +- synapse/storage/engines/postgres.py | 14 ++++ synapse/storage/engines/sqlite.py | 96 +++++++++++++++++++++++ synapse/storage/engines/sqlite3.py | 87 --------------------- synapse/storage/pusher.py | 9 ++- synapse/storage/user_directory.py | 55 ++++++++++---- tests/storage/test_base.py | 1 + tests/test_server.py | 12 ++- tests/unittest.py | 12 ++- tox.ini | 1 + 15 files changed, 325 insertions(+), 130 deletions(-) create mode 100644 changelog.d/4306.misc create mode 100644 synapse/storage/engines/sqlite.py delete mode 100644 synapse/storage/engines/sqlite3.py (limited to 'synapse') diff --git a/.coveragerc b/.coveragerc index 9873a30738..e9460a340a 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,11 +1,7 @@ [run] branch = True parallel = True -source = synapse - -[paths] -source= - coverage +include = synapse/* [report] precision = 2 diff --git a/.gitignore b/.gitignore index d739595c3a..1033124f1d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,9 +25,9 @@ homeserver*.pid *.tls.dh *.tls.key -.coverage -.coverage.* -!.coverage.rc +.coverage* +coverage.* +!.coveragerc htmlcov demo/*/*.db diff --git a/changelog.d/4306.misc b/changelog.d/4306.misc new file mode 100644 index 0000000000..58130b6190 --- /dev/null +++ b/changelog.d/4306.misc @@ -0,0 +1 @@ +Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 865b5e915a..254fdc04c6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -192,6 +192,41 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine + # A set of tables that are not safe to use native upserts in. + self._unsafe_to_upsert_tables = {"user_ips"} + + if self.database_engine.can_native_upsert: + # Check ASAP (and then later, every 1s) to see if we have finished + # background updates of tables that aren't safe to update. + self._clock.call_later(0.0, self._check_safe_to_upsert) + + @defer.inlineCallbacks + def _check_safe_to_upsert(self): + """ + Is it safe to use native UPSERT? + + If there are background updates, we will need to wait, as they may be + the addition of indexes that set the UNIQUE constraint that we require. + + If the background updates have not completed, wait a second and check again. + """ + updates = yield self._simple_select_list( + "background_updates", + keyvalues=None, + retcols=["update_name"], + desc="check_background_updates", + ) + updates = [x["update_name"] for x in updates] + + # The User IPs table in schema #53 was missing a unique index, which we + # run as a background update. + if "user_ips_device_unique_index" not in updates: + self._unsafe_to_upsert_tables.discard("user_id") + + # If there's any tables left to check, reschedule to run. + if self._unsafe_to_upsert_tables: + self._clock.call_later(1.0, self._check_safe_to_upsert) + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -494,8 +529,15 @@ class SQLBaseStore(object): txn.executemany(sql, vals) @defer.inlineCallbacks - def _simple_upsert(self, table, keyvalues, values, - insertion_values={}, desc="_simple_upsert", lock=True): + def _simple_upsert( + self, + table, + keyvalues, + values, + insertion_values={}, + desc="_simple_upsert", + lock=True + ): """ `lock` should generally be set to True (the default), but can be set @@ -516,16 +558,21 @@ class SQLBaseStore(object): inserting lock (bool): True to lock the table when doing the upsert. Returns: - Deferred(bool): True if a new entry was created, False if an - existing one was updated. + Deferred(None or bool): Native upserts always return None. Emulated + upserts return True if a new entry was created, False if an existing + one was updated. """ attempts = 0 while True: try: result = yield self.runInteraction( desc, - self._simple_upsert_txn, table, keyvalues, values, insertion_values, - lock=lock + self._simple_upsert_txn, + table, + keyvalues, + values, + insertion_values, + lock=lock, ) defer.returnValue(result) except self.database_engine.module.IntegrityError as e: @@ -537,12 +584,59 @@ class SQLBaseStore(object): # presumably we raced with another transaction: let's retry. logger.warn( - "IntegrityError when upserting into %s; retrying: %s", - table, e + "%s when upserting into %s; retrying: %s", e.__name__, table, e ) - def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, - lock=True): + def _simple_upsert_txn( + self, + txn, + table, + keyvalues, + values, + insertion_values={}, + lock=True, + ): + """ + Pick the UPSERT method which works best on the platform. Either the + native one (Pg9.5+, recent SQLites), or fall back to an emulated method. + + Args: + txn: The transaction to use. + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + insertion_values (dict): additional key/values to use only when + inserting + lock (bool): True to lock the table when doing the upsert. + Returns: + Deferred(None or bool): Native upserts always return None. Emulated + upserts return True if a new entry was created, False if an existing + one was updated. + """ + if ( + self.database_engine.can_native_upsert + and table not in self._unsafe_to_upsert_tables + ): + return self._simple_upsert_txn_native_upsert( + txn, + table, + keyvalues, + values, + insertion_values=insertion_values, + ) + else: + return self._simple_upsert_txn_emulated( + txn, + table, + keyvalues, + values, + insertion_values=insertion_values, + lock=lock, + ) + + def _simple_upsert_txn_emulated( + self, txn, table, keyvalues, values, insertion_values={}, lock=True + ): # We need to lock the table :(, unless we're *really* careful if lock: self.database_engine.lock_table(txn, table) @@ -577,12 +671,44 @@ class SQLBaseStore(object): sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, ", ".join(k for k in allvalues), - ", ".join("?" for _ in allvalues) + ", ".join("?" for _ in allvalues), ) txn.execute(sql, list(allvalues.values())) # successfully inserted return True + def _simple_upsert_txn_native_upsert( + self, txn, table, keyvalues, values, insertion_values={} + ): + """ + Use the native UPSERT functionality in recent PostgreSQL versions. + + Args: + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + insertion_values (dict): additional key/values to use only when + inserting + Returns: + None + """ + allvalues = {} + allvalues.update(keyvalues) + allvalues.update(values) + allvalues.update(insertion_values) + + sql = ( + "INSERT INTO %s (%s) VALUES (%s) " + "ON CONFLICT (%s) DO UPDATE SET %s" + ) % ( + table, + ", ".join(k for k in allvalues), + ", ".join("?" for _ in allvalues), + ", ".join(k for k in keyvalues), + ", ".join(k + "=EXCLUDED." + k for k in values), + ) + txn.execute(sql, list(allvalues.values())) + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index b228a20ac2..091d7116c5 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -257,7 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) def _update_client_ips_batch_txn(self, txn, to_update): - self.database_engine.lock_table(txn, "user_ips") + if "user_ips" in self._unsafe_to_upsert_tables or ( + not self.database_engine.can_native_upsert + ): + self.database_engine.lock_table(txn, "user_ips") for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index e2f9de8451..ff5ef97ca8 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -18,7 +18,7 @@ import platform from ._base import IncorrectDatabaseSetup from .postgres import PostgresEngine -from .sqlite3 import Sqlite3Engine +from .sqlite import Sqlite3Engine SUPPORTED_MODULE = { "sqlite3": Sqlite3Engine, diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 42225f8a2a..4004427c7b 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -38,6 +38,13 @@ class PostgresEngine(object): return sql.replace("?", "%s") def on_new_connection(self, db_conn): + + # Get the version of PostgreSQL that we're using. As per the psycopg2 + # docs: The number is formed by converting the major, minor, and + # revision numbers into two-decimal-digit numbers and appending them + # together. For example, version 8.1.5 will be returned as 80105 + self._version = db_conn.server_version + db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) @@ -54,6 +61,13 @@ class PostgresEngine(object): cursor.close() + @property + def can_native_upsert(self): + """ + Can we use native UPSERTs? This requires PostgreSQL 9.5+. + """ + return self._version >= 90500 + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py new file mode 100644 index 0000000000..c64d73ff21 --- /dev/null +++ b/synapse/storage/engines/sqlite.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +import threading +from sqlite3 import sqlite_version_info + +from synapse.storage.prepare_database import prepare_database + + +class Sqlite3Engine(object): + single_threaded = True + + def __init__(self, database_module, database_config): + self.module = database_module + + # The current max state_group, or None if we haven't looked + # in the DB yet. + self._current_state_group_id = None + self._current_state_group_id_lock = threading.Lock() + + @property + def can_native_upsert(self): + """ + Do we support native UPSERTs? This requires SQLite3 3.24+, plus some + more work we haven't done yet to tell what was inserted vs updated. + """ + return sqlite_version_info >= (3, 24, 0) + + def check_database(self, txn): + pass + + def convert_param_style(self, sql): + return sql + + def on_new_connection(self, db_conn): + prepare_database(db_conn, self, config=None) + db_conn.create_function("rank", 1, _rank) + + def is_deadlock(self, error): + return False + + def is_connection_closed(self, conn): + return False + + def lock_table(self, txn, table): + return + + def get_next_state_group_id(self, txn): + """Returns an int that can be used as a new state_group ID + """ + # We do application locking here since if we're using sqlite then + # we are a single process synapse. + with self._current_state_group_id_lock: + if self._current_state_group_id is None: + txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups") + self._current_state_group_id = txn.fetchone()[0] + + self._current_state_group_id += 1 + return self._current_state_group_id + + +# Following functions taken from: https://github.com/coleifer/peewee + +def _parse_match_info(buf): + bufsize = len(buf) + return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)] + + +def _rank(raw_match_info): + """Handle match_info called w/default args 'pcx' - based on the example rank + function http://sqlite.org/fts3.html#appendix_a + """ + match_info = _parse_match_info(raw_match_info) + score = 0.0 + p, c = match_info[:2] + for phrase_num in range(p): + phrase_info_idx = 2 + (phrase_num * c * 3) + for col_num in range(c): + col_idx = phrase_info_idx + (col_num * 3) + x1, x2 = match_info[col_idx:col_idx + 2] + if x1 > 0: + score += float(x1) / x2 + return score diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py deleted file mode 100644 index 19949fc474..0000000000 --- a/synapse/storage/engines/sqlite3.py +++ /dev/null @@ -1,87 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import struct -import threading - -from synapse.storage.prepare_database import prepare_database - - -class Sqlite3Engine(object): - single_threaded = True - - def __init__(self, database_module, database_config): - self.module = database_module - - # The current max state_group, or None if we haven't looked - # in the DB yet. - self._current_state_group_id = None - self._current_state_group_id_lock = threading.Lock() - - def check_database(self, txn): - pass - - def convert_param_style(self, sql): - return sql - - def on_new_connection(self, db_conn): - prepare_database(db_conn, self, config=None) - db_conn.create_function("rank", 1, _rank) - - def is_deadlock(self, error): - return False - - def is_connection_closed(self, conn): - return False - - def lock_table(self, txn, table): - return - - def get_next_state_group_id(self, txn): - """Returns an int that can be used as a new state_group ID - """ - # We do application locking here since if we're using sqlite then - # we are a single process synapse. - with self._current_state_group_id_lock: - if self._current_state_group_id is None: - txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups") - self._current_state_group_id = txn.fetchone()[0] - - self._current_state_group_id += 1 - return self._current_state_group_id - - -# Following functions taken from: https://github.com/coleifer/peewee - -def _parse_match_info(buf): - bufsize = len(buf) - return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)] - - -def _rank(raw_match_info): - """Handle match_info called w/default args 'pcx' - based on the example rank - function http://sqlite.org/fts3.html#appendix_a - """ - match_info = _parse_match_info(raw_match_info) - score = 0.0 - p, c = match_info[:2] - for phrase_num in range(p): - phrase_info_idx = 2 + (phrase_num * c * 3) - for col_num in range(c): - col_idx = phrase_info_idx + (col_num * 3) - x1, x2 = match_info[col_idx:col_idx + 2] - if x1 > 0: - score += float(x1) / x2 - return score diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 2743b52bad..134297e284 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -215,7 +215,7 @@ class PusherStore(PusherWorkerStore): with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on # (app_id, pushkey, user_name) so _simple_upsert will retry - newly_inserted = yield self._simple_upsert( + yield self._simple_upsert( table="pushers", keyvalues={ "app_id": app_id, @@ -238,7 +238,12 @@ class PusherStore(PusherWorkerStore): lock=False, ) - if newly_inserted: + user_has_pusher = self.get_if_user_has_pusher.cache.get( + (user_id,), None, update_metrics=False + ) + + if user_has_pusher is not True: + # invalidate, since we the user might not have had a pusher before yield self.runInteraction( "add_pusher", self._invalidate_cache_and_stream, diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index a8781b0e5d..ce48212265 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -168,14 +168,14 @@ class UserDirectoryStore(SQLBaseStore): if isinstance(self.database_engine, PostgresEngine): # We weight the localpart most highly, then display name and finally # server name - if new_entry: + if self.database_engine.can_native_upsert: sql = """ INSERT INTO user_directory_search(user_id, vector) VALUES (?, setweight(to_tsvector('english', ?), 'A') || setweight(to_tsvector('english', ?), 'D') || setweight(to_tsvector('english', COALESCE(?, '')), 'B') - ) + ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector """ txn.execute( sql, @@ -185,20 +185,45 @@ class UserDirectoryStore(SQLBaseStore): ) ) else: - sql = """ - UPDATE user_directory_search - SET vector = setweight(to_tsvector('english', ?), 'A') - || setweight(to_tsvector('english', ?), 'D') - || setweight(to_tsvector('english', COALESCE(?, '')), 'B') - WHERE user_id = ? - """ - txn.execute( - sql, - ( - get_localpart_from_id(user_id), get_domain_from_id(user_id), - display_name, user_id, + # TODO: Remove this code after we've bumped the minimum version + # of postgres to always support upserts, so we can get rid of + # `new_entry` usage + if new_entry is True: + sql = """ + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, + setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + ) + """ + txn.execute( + sql, + ( + user_id, get_localpart_from_id(user_id), + get_domain_from_id(user_id), display_name, + ) + ) + elif new_entry is False: + sql = """ + UPDATE user_directory_search + SET vector = setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + WHERE user_id = ? + """ + txn.execute( + sql, + ( + get_localpart_from_id(user_id), + get_domain_from_id(user_id), + display_name, user_id, + ) + ) + else: + raise RuntimeError( + "upsert returned None when 'can_native_upsert' is False" ) - ) elif isinstance(self.database_engine, Sqlite3Engine): value = "%s %s" % (user_id, display_name,) if display_name else user_id self._simple_upsert_txn( diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 829f47d2e8..452d76ddd5 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -49,6 +49,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.db_pool.runWithConnection = runWithConnection config = Mock() + config._enable_native_upserts = False config.event_cache_size = 1 config.database_config = {"name": "sqlite3"} hs = TestHomeServer( diff --git a/tests/test_server.py b/tests/test_server.py index 634a8fbca5..08fb3fe02f 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -19,7 +19,7 @@ from six import StringIO from twisted.internet.defer import Deferred from twisted.python.failure import Failure -from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock +from twisted.test.proto_helpers import AccumulatingProtocol from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -30,12 +30,18 @@ from synapse.util import Clock from synapse.util.logcontext import make_deferred_yieldable from tests import unittest -from tests.server import FakeTransport, make_request, render, setup_test_homeserver +from tests.server import ( + FakeTransport, + ThreadedMemoryReactorClock, + make_request, + render, + setup_test_homeserver, +) class JsonResourceTests(unittest.TestCase): def setUp(self): - self.reactor = MemoryReactorClock() + self.reactor = ThreadedMemoryReactorClock() self.hs_clock = Clock(self.reactor) self.homeserver = setup_test_homeserver( self.addCleanup, http_client=None, clock=self.hs_clock, reactor=self.reactor diff --git a/tests/unittest.py b/tests/unittest.py index 78d2f740f9..cda549c783 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -96,7 +96,7 @@ class TestCase(unittest.TestCase): method = getattr(self, methodName) - level = getattr(method, "loglevel", getattr(self, "loglevel", logging.ERROR)) + level = getattr(method, "loglevel", getattr(self, "loglevel", logging.WARNING)) @around(self) def setUp(orig): @@ -333,7 +333,15 @@ class HomeserverTestCase(TestCase): """ kwargs = dict(kwargs) kwargs.update(self._hs_args) - return setup_test_homeserver(self.addCleanup, *args, **kwargs) + hs = setup_test_homeserver(self.addCleanup, *args, **kwargs) + stor = hs.get_datastore() + + # Run the database background updates. + if hasattr(stor, "do_next_background_update"): + while not self.get_success(stor.has_completed_background_updates()): + self.get_success(stor.do_next_background_update(1)) + + return hs def pump(self, by=0.0): """ diff --git a/tox.ini b/tox.ini index a0f5486829..9b2d78ed6d 100644 --- a/tox.ini +++ b/tox.ini @@ -149,4 +149,5 @@ deps = codecov commands = coverage combine + coverage xml codecov -X gcov -- cgit 1.4.1 From 92d8a068adba8ba8b873521878ed4a70c3fcff60 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 24 Jan 2019 10:52:06 +0000 Subject: Clarify docs for public_baseurl This is leading to problems with people upgrading to clients that support MSC1730 because people have this misconfigured, so try to make the docs completely unambiguous. --- synapse/config/server.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/config/server.py b/synapse/config/server.py index fb57791098..bc97b44a5b 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -256,7 +256,11 @@ class ServerConfig(Config): # # web_client_location: "/path/to/web/root" - # The public-facing base URL for the client API (not including _matrix/...) + # The public-facing base URL that clients use to access this HS + # (not including _matrix/...). This is the same URL a user would + # enter into the 'custom HS URL' field on their client. If you + # use synapse with a reverse proxy, this should be the URL to reach + # synapse via the proxy. # public_baseurl: https://example.com:8448/ # Set the soft limit on the number of file descriptors synapse can use -- cgit 1.4.1 From 0e27501ee5454aca019cdfbfe899027aa8ae44a1 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 24 Jan 2019 22:57:41 +1100 Subject: Fix UPSERT check (#4459) --- changelog.d/4459.misc | 1 + synapse/storage/_base.py | 33 ++++++++++++++++++++++++++++----- 2 files changed, 29 insertions(+), 5 deletions(-) create mode 100644 changelog.d/4459.misc (limited to 'synapse') diff --git a/changelog.d/4459.misc b/changelog.d/4459.misc new file mode 100644 index 0000000000..58130b6190 --- /dev/null +++ b/changelog.d/4459.misc @@ -0,0 +1 @@ +Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 254fdc04c6..f62f70b9f1 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -26,6 +26,7 @@ from prometheus_client import Histogram from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import Cache from synapse.util.logcontext import LoggingContext, PreserveLoggingContext @@ -198,7 +199,12 @@ class SQLBaseStore(object): if self.database_engine.can_native_upsert: # Check ASAP (and then later, every 1s) to see if we have finished # background updates of tables that aren't safe to update. - self._clock.call_later(0.0, self._check_safe_to_upsert) + self._clock.call_later( + 0.0, + run_as_background_process, + "upsert_safety_check", + self._check_safe_to_upsert + ) @defer.inlineCallbacks def _check_safe_to_upsert(self): @@ -208,7 +214,7 @@ class SQLBaseStore(object): If there are background updates, we will need to wait, as they may be the addition of indexes that set the UNIQUE constraint that we require. - If the background updates have not completed, wait a second and check again. + If the background updates have not completed, wait 15 sec and check again. """ updates = yield self._simple_select_list( "background_updates", @@ -221,11 +227,16 @@ class SQLBaseStore(object): # The User IPs table in schema #53 was missing a unique index, which we # run as a background update. if "user_ips_device_unique_index" not in updates: - self._unsafe_to_upsert_tables.discard("user_id") + self._unsafe_to_upsert_tables.discard("user_ips") # If there's any tables left to check, reschedule to run. if self._unsafe_to_upsert_tables: - self._clock.call_later(1.0, self._check_safe_to_upsert) + self._clock.call_later( + 15.0, + run_as_background_process, + "upsert_safety_check", + self._check_safe_to_upsert + ) def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -609,7 +620,7 @@ class SQLBaseStore(object): inserting lock (bool): True to lock the table when doing the upsert. Returns: - Deferred(None or bool): Native upserts always return None. Emulated + None or bool: Native upserts always return None. Emulated upserts return True if a new entry was created, False if an existing one was updated. """ @@ -637,6 +648,18 @@ class SQLBaseStore(object): def _simple_upsert_txn_emulated( self, txn, table, keyvalues, values, insertion_values={}, lock=True ): + """ + Args: + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + insertion_values (dict): additional key/values to use only when + inserting + lock (bool): True to lock the table when doing the upsert. + Returns: + bool: Return True if a new entry was created, False if an existing + one was updated. + """ # We need to lock the table :(, unless we're *really* careful if lock: self.database_engine.lock_table(txn, table) -- cgit 1.4.1 From 068aa1d22840a1154bb8fbdd445a8c36b290db91 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 24 Jan 2019 12:44:27 +0000 Subject: Time out filtered room dir queries after 60s --- synapse/handlers/room_list.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index dc88620885..ea63fb604c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -31,6 +31,7 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache from ._base import BaseHandler +from datetime import datetime, timedelta logger = logging.getLogger(__name__) @@ -73,8 +74,13 @@ class RoomListHandler(BaseHandler): # We explicitly don't bother caching searches or requests for # appservice specific lists. logger.info("Bypassing cache as search request.") + + # XXX: Quick hack to stop room directory queries taking too long. + # Timeout request after 60s. Probably want a more fundamental + # solution at some point + timeout = datetime.now() + timedelta(seconds=60) return self._get_public_room_list( - limit, since_token, search_filter, network_tuple=network_tuple, + limit, since_token, search_filter, network_tuple=network_tuple, timeout=timeout, ) key = (limit, since_token, network_tuple) @@ -87,7 +93,8 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMPTY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID, + timeout=None,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -202,6 +209,9 @@ class RoomListHandler(BaseHandler): chunk = [] for i in range(0, len(rooms_to_scan), step): + if timeout and datetime.now() > timeout: + raise Exception("Timed out searching room directory") + batch = rooms_to_scan[i:i + step] logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( -- cgit 1.4.1 From 5541645e80d2907721f17f648717f0b5a2b6f4fe Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 24 Jan 2019 12:45:32 +0000 Subject: lint --- synapse/handlers/room_list.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index ea63fb604c..a99b6e1460 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -80,7 +80,8 @@ class RoomListHandler(BaseHandler): # solution at some point timeout = datetime.now() + timedelta(seconds=60) return self._get_public_room_list( - limit, since_token, search_filter, network_tuple=network_tuple, timeout=timeout, + limit, since_token, search_filter, + network_tuple=network_tuple, timeout=timeout, ) key = (limit, since_token, network_tuple) -- cgit 1.4.1 From afd69a0920d16bdd9ca0c5cf9238e48986424ecb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 24 Jan 2019 13:29:33 +0000 Subject: Look up the right SRV record --- changelog.d/4464.misc | 1 + synapse/http/federation/matrix_federation_agent.py | 3 ++- tests/http/federation/test_matrix_federation_agent.py | 12 +++++++++--- 3 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 changelog.d/4464.misc (limited to 'synapse') diff --git a/changelog.d/4464.misc b/changelog.d/4464.misc new file mode 100644 index 0000000000..9a51434755 --- /dev/null +++ b/changelog.d/4464.misc @@ -0,0 +1 @@ +Move SRV logic into the Agent layer diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 64c780a341..0ec28c6696 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -101,7 +101,8 @@ class MatrixFederationAgent(object): if port is not None: target = (host, port) else: - server_list = yield self._srv_resolver.resolve_service(server_name_bytes) + service_name = b"_matrix._tcp.%s" % (server_name_bytes, ) + server_list = yield self._srv_resolver.resolve_service(service_name) if not server_list: target = (host, 8448) logger.debug("No SRV record for %s, using %s", host, target) diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index bfae69a978..b32d7566a5 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -174,7 +174,9 @@ class MatrixFederationAgentTests(TestCase): # Nothing happened yet self.assertNoResult(test_d) - self.mock_resolver.resolve_service.assert_called_once() + self.mock_resolver.resolve_service.assert_called_once_with( + b"_matrix._tcp.1.2.3.4", + ) # Make sure treq is trying to connect clients = self.reactor.tcpClients @@ -212,7 +214,9 @@ class MatrixFederationAgentTests(TestCase): # Nothing happened yet self.assertNoResult(test_d) - self.mock_resolver.resolve_service.assert_called_once() + self.mock_resolver.resolve_service.assert_called_once_with( + b"_matrix._tcp.testserv", + ) # Make sure treq is trying to connect clients = self.reactor.tcpClients @@ -251,7 +255,9 @@ class MatrixFederationAgentTests(TestCase): # Nothing happened yet self.assertNoResult(test_d) - self.mock_resolver.resolve_service.assert_called_once() + self.mock_resolver.resolve_service.assert_called_once_with( + b"_matrix._tcp.testserv", + ) # Make sure treq is trying to connect clients = self.reactor.tcpClients -- cgit 1.4.1 From f4697b5ec1071905b1177fd473e20df0b1455a4c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 24 Jan 2019 13:38:29 +0000 Subject: Fix UnboundLocalError in post_urlencoded_get_json (#4460) This could cause exceptions if the id server returned 4xx responses. --- changelog.d/4460.bugfix | 1 + synapse/http/client.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 changelog.d/4460.bugfix (limited to 'synapse') diff --git a/changelog.d/4460.bugfix b/changelog.d/4460.bugfix new file mode 100644 index 0000000000..8c5d5b4e0e --- /dev/null +++ b/changelog.d/4460.bugfix @@ -0,0 +1 @@ +Fix UnboundLocalError in post_urlencoded_get_json diff --git a/synapse/http/client.py b/synapse/http/client.py index afcf698b29..47a1f82ff0 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -333,9 +333,10 @@ class SimpleHttpClient(object): "POST", uri, headers=Headers(actual_headers), data=query_bytes ) + body = yield make_deferred_yieldable(readBody(response)) + if 200 <= response.code < 300: - body = yield make_deferred_yieldable(treq.json_content(response)) - defer.returnValue(body) + defer.returnValue(json.loads(body)) else: raise HttpResponseException(response.code, response.phrase, body) -- cgit 1.4.1 From a2d85144e54457df2aae2c1a759f1baae910de91 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 24 Jan 2019 14:22:26 +0000 Subject: isort --- synapse/handlers/room_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index a99b6e1460..5f7b33473e 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -15,6 +15,7 @@ import logging from collections import namedtuple +from datetime import datetime, timedelta from six import PY3, iteritems from six.moves import range @@ -31,7 +32,6 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache from ._base import BaseHandler -from datetime import datetime, timedelta logger = logging.getLogger(__name__) -- cgit 1.4.1 From 5b1dc940839b6390562475fe1b033ca7fce33c37 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 24 Jan 2019 14:59:50 +0000 Subject: Use self.clock instead of datetime --- synapse/handlers/room_list.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5f7b33473e..2af520819e 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -15,7 +15,6 @@ import logging from collections import namedtuple -from datetime import datetime, timedelta from six import PY3, iteritems from six.moves import range @@ -78,7 +77,7 @@ class RoomListHandler(BaseHandler): # XXX: Quick hack to stop room directory queries taking too long. # Timeout request after 60s. Probably want a more fundamental # solution at some point - timeout = datetime.now() + timedelta(seconds=60) + timeout = self.clock.time() + 60 return self._get_public_room_list( limit, since_token, search_filter, network_tuple=network_tuple, timeout=timeout, @@ -95,7 +94,7 @@ class RoomListHandler(BaseHandler): def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, network_tuple=EMPTY_THIRD_PARTY_ID, - timeout=None,): + timeout=0,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -210,7 +209,7 @@ class RoomListHandler(BaseHandler): chunk = [] for i in range(0, len(rooms_to_scan), step): - if timeout and datetime.now() > timeout: + if timeout and self.clock.time() > timeout: raise Exception("Timed out searching room directory") batch = rooms_to_scan[i:i + step] -- cgit 1.4.1 From 075ff3ede9aac0bc31d638b45c63007d664d9eee Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 24 Jan 2019 15:10:22 +0000 Subject: Change default timeout value from 0 to None --- synapse/handlers/room_list.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 2af520819e..13e212d669 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -94,7 +94,7 @@ class RoomListHandler(BaseHandler): def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, network_tuple=EMPTY_THIRD_PARTY_ID, - timeout=0,): + timeout=None,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: -- cgit 1.4.1 From b8082a54451bb4db30e3b2a4d19dc8cb23330eb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Jan 2019 17:33:19 +0000 Subject: Use term 'out of band membership' instead --- synapse/events/__init__.py | 9 +++++---- synapse/handlers/federation.py | 4 ++-- synapse/storage/roommember.py | 6 +++--- 3 files changed, 10 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 5030636c7e..48289cad06 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -41,12 +41,13 @@ class _EventInternalMetadata(object): def is_outlier(self): return getattr(self, "outlier", False) - def is_new_remote_event(self): - """Whether this is a new remote event, like an invite or an invite + def is_out_of_band_membership(self): + """Whether this is an out of band membership, like an invite or an invite rejection. This is needed as those events are marked as outliers, but - they still need to be processed. + they still need to be processed as if they're new events (e.g. updating + invite state in the database, relaying to clients, etc). """ - return getattr(self, "new_remote_event", False) + return getattr(self, "out_of_band_membership", False) def get_send_on_behalf_of(self): """Whether this server should send the event on behalf of another server. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e017cab777..242719b7ce 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1284,7 +1284,7 @@ class FederationHandler(BaseHandler): ) event.internal_metadata.outlier = True - event.internal_metadata.new_remote_event = True + event.internal_metadata.out_of_band_membership = True event.signatures.update( compute_event_signature( @@ -1310,7 +1310,7 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - event.internal_metadata.new_remote_event = True + event.internal_metadata.out_of_band_membership = True # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 40b13de80b..592c1bcd33 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -589,11 +589,11 @@ class RoomMemberStore(RoomMemberWorkerStore): # We update the local_invites table only if the event is "current", # i.e., its something that has just happened. If the event is an - # outlier it is only current if its a "new remote event", like a - # remote invite or a rejection of a remote invite. + # outlier it is only current if its an "out of band membership", + # like a remote invite or a rejection of a remote invite. is_new_state = not backfilled and ( not event.internal_metadata.is_outlier() - or event.internal_metadata.is_new_remote_event() + or event.internal_metadata.is_out_of_band_membership() ) is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: -- cgit 1.4.1 From 9139b87be420fcfce22e70a7c35ba52b2ea32f3a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Jan 2019 18:04:02 +0000 Subject: Remove unecessary setting of outlier bit --- synapse/handlers/federation.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 242719b7ce..d53b716ffb 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1079,8 +1079,6 @@ class FederationHandler(BaseHandler): handled_events = set() try: - event.internal_metadata.outlier = False - # Try the host we successfully got a response to /make_join/ # request first. try: -- cgit 1.4.1 From 5ee1f997a8e7177077e2c5f0750e28725a452791 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Jan 2019 18:08:08 +0000 Subject: Update make_membership_event docs --- synapse/federation/federation_client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f4adcb556d..df7d18700c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -524,6 +524,8 @@ class FederationClient(FederationBase): Does so by asking one of the already participating servers to create an event with proper context. + Returns a fully signed and hashed event. + Note that this does not append any events to any graphs. Args: @@ -538,8 +540,9 @@ class FederationClient(FederationBase): params (dict[str, str|Iterable[str]]): Query parameters to include in the request. Return: - Deferred: resolves to a tuple of (origin (str), event (object)) - where origin is the remote homeserver which generated the event. + Deferred[tuple[str, FrozenEvent]]: resolves to a tuple of `origin` + and event where origin is the remote homeserver which generated + the event. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. -- cgit 1.4.1 From d148c43050f7a85523a743ff6069683c644a517d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 24 Jan 2019 18:31:23 +0000 Subject: Review comments --- synapse/federation/federation_client.py | 3 ++- synapse/handlers/federation.py | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 4e171f9b56..7fb5736142 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -674,7 +674,8 @@ class FederationClient(FederationBase): break if room_version is None: - # We use this error has that is what + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. raise SynapseError(400, "No create event in state") valid_pdus = yield self._check_sigs_and_hash_and_fetch( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a9dc4a4e4e..5280d88a50 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1664,6 +1664,11 @@ class FederationHandler(BaseHandler): create_event = e break + if create_event is None: + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. + raise SynapseError(400, "No create event in state") + room_version = create_event.content.get("room_version", RoomVersions.V1) missing_auth_events = set() -- cgit 1.4.1