diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2019-01-25 11:09:53 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2019-01-25 11:09:53 +0000 |
commit | 821b65aeb51e744055e0eaaf9b528587fa494b20 (patch) | |
tree | c4231f142f51d319c4b86c2dcc0e4fa691a3bed0 /synapse | |
parent | Move tag and direct state copying into separate function (diff) | |
parent | Merge pull request #4415 from matrix-org/anoa/full_search_upgraded_rooms (diff) | |
download | synapse-821b65aeb51e744055e0eaaf9b528587fa494b20.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/dm_room_upgrade
Diffstat (limited to 'synapse')
38 files changed, 1140 insertions, 409 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index ba1019b9b2..e37b807c94 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -819,7 +819,9 @@ class Auth(object): elif threepid: # If the user does not exist yet, but is signing up with a # reserved threepid then pass auth check - if is_threepid_reserved(self.hs.config, threepid): + if is_threepid_reserved( + self.hs.config.mau_limits_reserved_threepids, threepid + ): return # Else if there is no room in the MAU bucket, bail current_mau = yield self.store.get_monthly_active_count() diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 46c4b4b9dc..51ee078bc3 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -120,6 +120,19 @@ KNOWN_ROOM_VERSIONS = { RoomVersions.STATE_V2_TEST, } + +class EventFormatVersions(object): + """This is an internal enum for tracking the version of the event format, + independently from the room version. + """ + V1 = 1 + + +KNOWN_EVENT_FORMAT_VERSIONS = { + EventFormatVersions.V1, +} + + ServerNoticeMsgType = "m.server_notice" ServerNoticeLimitReached = "m.server_notice.usage_limit_reached" diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 16ad654864..3906475403 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -444,6 +444,20 @@ class Filter(object): def include_redundant_members(self): return self.filter_json.get("include_redundant_members", False) + def with_room_ids(self, room_ids): + """Returns a new filter with the given room IDs appended. + + Args: + room_ids (iterable[unicode]): The room_ids to add + + Returns: + filter: A new filter including the given rooms and the old + filter's rooms. + """ + newFilter = Filter(self.filter_json) + newFilter.rooms += room_ids + return newFilter + def _matches_wildcard(actual_value, filter_value): if filter_value.endswith("*"): diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index f3ac3d19f0..ffc49d77cc 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -13,10 +13,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import gc import logging import os import sys +import traceback from six import iteritems @@ -324,17 +326,12 @@ def setup(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts - tls_server_context_factory = context_factory.ServerContextFactory(config) - tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) - database_engine = create_engine(config.database_config) config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection hs = SynapseHomeServer( config.server_name, db_config=config.database_config, - tls_server_context_factory=tls_server_context_factory, - tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, @@ -361,12 +358,53 @@ def setup(config_options): logger.info("Database prepared in %s.", config.database_config['name']) hs.setup() - hs.start_listening() + @defer.inlineCallbacks def start(): - hs.get_pusherpool().start() - hs.get_datastore().start_profiling() - hs.get_datastore().start_doing_background_updates() + try: + # Check if the certificate is still valid. + cert_days_remaining = hs.config.is_disk_cert_valid() + + if hs.config.acme_enabled: + # If ACME is enabled, we might need to provision a certificate + # before starting. + acme = hs.get_acme_handler() + + # Start up the webservices which we will respond to ACME + # challenges with. + yield acme.start_listening() + + # We want to reprovision if cert_days_remaining is None (meaning no + # certificate exists), or the days remaining number it returns + # is less than our re-registration threshold. + if (cert_days_remaining is None) or ( + not cert_days_remaining > hs.config.acme_reprovision_threshold + ): + yield acme.provision_certificate() + + # Read the certificate from disk and build the context factories for + # TLS. + hs.config.read_certificate_from_disk() + hs.tls_server_context_factory = context_factory.ServerContextFactory(config) + hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( + config + ) + + # It is now safe to start your Synapse. + hs.start_listening() + hs.get_pusherpool().start() + hs.get_datastore().start_profiling() + hs.get_datastore().start_doing_background_updates() + except Exception as e: + # If a DeferredList failed (like in listening on the ACME listener), + # we need to print the subfailure explicitly. + if isinstance(e, defer.FirstError): + e.subFailure.printTraceback(sys.stderr) + sys.exit(1) + + # Something else went wrong when starting. Print it and bail out. + traceback.print_exc(file=sys.stderr) + sys.exit(1) reactor.callWhenRunning(start) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index fd2d6d52ef..5858fb92b4 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -367,7 +367,7 @@ class Config(object): if not keys_directory: keys_directory = os.path.dirname(config_files[-1]) - config_dir_path = os.path.abspath(keys_directory) + self.config_dir_path = os.path.abspath(keys_directory) specified_config = {} for config_file in config_files: @@ -379,7 +379,7 @@ class Config(object): server_name = specified_config["server_name"] config_string = self.generate_config( - config_dir_path=config_dir_path, + config_dir_path=self.config_dir_path, data_dir_path=os.getcwd(), server_name=server_name, generate_secrets=False, diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 6c2b543b8c..fe520d6855 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -50,6 +50,10 @@ class RegistrationConfig(Config): raise ConfigError('Invalid auto_join_rooms entry %s' % (room_alias,)) self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True) + self.disable_msisdn_registration = ( + config.get("disable_msisdn_registration", False) + ) + def default_config(self, generate_secrets=False, **kwargs): if generate_secrets: registration_shared_secret = 'registration_shared_secret: "%s"' % ( @@ -70,6 +74,11 @@ class RegistrationConfig(Config): # - email # - msisdn + # Explicitly disable asking for MSISDNs from the registration + # flow (overrides registrations_require_3pid if MSISDNs are set as required) + # + # disable_msisdn_registration = True + # Mandate that users are only allowed to associate certain formats of # 3PIDs with accounts on this server. # diff --git a/synapse/config/server.py b/synapse/config/server.py index fb57791098..22dcc87d8a 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -256,7 +256,11 @@ class ServerConfig(Config): # # web_client_location: "/path/to/web/root" - # The public-facing base URL for the client API (not including _matrix/...) + # The public-facing base URL that clients use to access this HS + # (not including _matrix/...). This is the same URL a user would + # enter into the 'custom HS URL' field on their client. If you + # use synapse with a reverse proxy, this should be the URL to reach + # synapse via the proxy. # public_baseurl: https://example.com:8448/ # Set the soft limit on the number of file descriptors synapse can use @@ -420,19 +424,18 @@ class ServerConfig(Config): " service on the given port.") -def is_threepid_reserved(config, threepid): +def is_threepid_reserved(reserved_threepids, threepid): """Check the threepid against the reserved threepid config Args: - config(ServerConfig) - to access server config attributes + reserved_threepids([dict]) - list of reserved threepids threepid(dict) - The threepid to test for Returns: boolean Is the threepid undertest reserved_user """ - for tp in config.mau_limits_reserved_threepids: - if (threepid['medium'] == tp['medium'] - and threepid['address'] == tp['address']): + for tp in reserved_threepids: + if (threepid['medium'] == tp['medium'] and threepid['address'] == tp['address']): return True return False diff --git a/synapse/config/tls.py b/synapse/config/tls.py index bb8952c672..a75e233aa0 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -13,60 +13,110 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os +from datetime import datetime from hashlib import sha256 from unpaddedbase64 import encode_base64 from OpenSSL import crypto -from ._base import Config +from synapse.config._base import Config + +logger = logging.getLogger() class TlsConfig(Config): def read_config(self, config): - self.tls_certificate = self.read_tls_certificate( - config.get("tls_certificate_path") - ) - self.tls_certificate_file = config.get("tls_certificate_path") + acme_config = config.get("acme", {}) + self.acme_enabled = acme_config.get("enabled", False) + self.acme_url = acme_config.get( + "url", "https://acme-v01.api.letsencrypt.org/directory" + ) + self.acme_port = acme_config.get("port", 8449) + self.acme_bind_addresses = acme_config.get("bind_addresses", ["127.0.0.1"]) + self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30) + + self.tls_certificate_file = os.path.abspath(config.get("tls_certificate_path")) + self.tls_private_key_file = os.path.abspath(config.get("tls_private_key_path")) + self._original_tls_fingerprints = config["tls_fingerprints"] + self.tls_fingerprints = list(self._original_tls_fingerprints) self.no_tls = config.get("no_tls", False) - if self.no_tls: - self.tls_private_key = None - else: - self.tls_private_key = self.read_tls_private_key( - config.get("tls_private_key_path") - ) + # This config option applies to non-federation HTTP clients + # (e.g. for talking to recaptcha, identity servers, and such) + # It should never be used in production, and is intended for + # use only when running tests. + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use" + ) + + self.tls_certificate = None + self.tls_private_key = None + + def is_disk_cert_valid(self): + """ + Is the certificate we have on disk valid, and if so, for how long? + + Returns: + int: Days remaining of certificate validity. + None: No certificate exists. + """ + if not os.path.exists(self.tls_certificate_file): + return None + + try: + with open(self.tls_certificate_file, 'rb') as f: + cert_pem = f.read() + except Exception: + logger.exception("Failed to read existing certificate off disk!") + raise + + try: + tls_certificate = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + except Exception: + logger.exception("Failed to parse existing certificate off disk!") + raise + + # YYYYMMDDhhmmssZ -- in UTC + expires_on = datetime.strptime( + tls_certificate.get_notAfter().decode('ascii'), "%Y%m%d%H%M%SZ" + ) + now = datetime.utcnow() + days_remaining = (expires_on - now).days + return days_remaining - self.tls_fingerprints = config["tls_fingerprints"] + def read_certificate_from_disk(self): + """ + Read the certificates from disk. + """ + self.tls_certificate = self.read_tls_certificate(self.tls_certificate_file) + + if not self.no_tls: + self.tls_private_key = self.read_tls_private_key(self.tls_private_key_file) + + self.tls_fingerprints = list(self._original_tls_fingerprints) # Check that our own certificate is included in the list of fingerprints # and include it if it is not. x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, - self.tls_certificate + crypto.FILETYPE_ASN1, self.tls_certificate ) sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest()) sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints) if sha256_fingerprint not in sha256_fingerprints: self.tls_fingerprints.append({u"sha256": sha256_fingerprint}) - # This config option applies to non-federation HTTP clients - # (e.g. for talking to recaptcha, identity servers, and such) - # It should never be used in production, and is intended for - # use only when running tests. - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use" - ) - def default_config(self, config_dir_path, server_name, **kwargs): base_key_name = os.path.join(config_dir_path, server_name) tls_certificate_path = base_key_name + ".tls.crt" tls_private_key_path = base_key_name + ".tls.key" - return """\ + return ( + """\ # PEM encoded X509 certificate for TLS. # You can replace the self-signed certificate that synapse # autogenerates on launch with your own SSL certificate + key pair @@ -107,7 +157,24 @@ class TlsConfig(Config): # tls_fingerprints: [] # tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}] - """ % locals() + + ## Support for ACME certificate auto-provisioning. + # acme: + # enabled: false + ## ACME path. + ## If you only want to test, use the staging url: + ## https://acme-staging.api.letsencrypt.org/directory + # url: 'https://acme-v01.api.letsencrypt.org/directory' + ## Port number (to listen for the HTTP-01 challenge). + ## Using port 80 requires utilising something like authbind, or proxying to it. + # port: 8449 + ## Hosts to bind to. + # bind_addresses: ['127.0.0.1'] + ## How many days remaining on a certificate before it is renewed. + # reprovision_threshold: 30 + """ + % locals() + ) def read_tls_certificate(self, cert_path): cert_pem = self.read_file(cert_path, "tls_certificate") diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index 6ba3eca7b2..286ad80100 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -17,6 +17,7 @@ from zope.interface import implementer from OpenSSL import SSL, crypto from twisted.internet._sslverify import _defaultCurveName +from twisted.internet.abstract import isIPAddress, isIPv6Address from twisted.internet.interfaces import IOpenSSLClientConnectionCreator from twisted.internet.ssl import CertificateOptions, ContextFactory from twisted.python.failure import Failure @@ -98,8 +99,14 @@ class ClientTLSOptions(object): def __init__(self, hostname, ctx): self._ctx = ctx - self._hostname = hostname - self._hostnameBytes = _idnaBytes(hostname) + + if isIPAddress(hostname) or isIPv6Address(hostname): + self._hostnameBytes = hostname.encode('ascii') + self._sendSNI = False + else: + self._hostnameBytes = _idnaBytes(hostname) + self._sendSNI = True + ctx.set_info_callback( _tolerateErrors(self._identityVerifyingInfoCallback) ) @@ -111,7 +118,9 @@ class ClientTLSOptions(object): return connection def _identityVerifyingInfoCallback(self, connection, where, ret): - if where & SSL.SSL_CB_HANDSHAKE_START: + # Literal IPv4 and IPv6 addresses are not permitted + # as host names according to the RFCs + if where & SSL.SSL_CB_HANDSHAKE_START and self._sendSNI: connection.set_tlsext_host_name(self._hostnameBytes) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 84c75495d5..38470ad176 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -18,6 +18,7 @@ from distutils.util import strtobool import six +from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -41,8 +42,13 @@ class _EventInternalMetadata(object): def is_outlier(self): return getattr(self, "outlier", False) - def is_invite_from_remote(self): - return getattr(self, "invite_from_remote", False) + def is_out_of_band_membership(self): + """Whether this is an out of band membership, like an invite or an invite + rejection. This is needed as those events are marked as outliers, but + they still need to be processed as if they're new events (e.g. updating + invite state in the database, relaying to clients, etc). + """ + return getattr(self, "out_of_band_membership", False) def get_send_on_behalf_of(self): """Whether this server should send the event on behalf of another server. @@ -179,6 +185,8 @@ class EventBase(object): class FrozenEvent(EventBase): + format_version = EventFormatVersions.V1 # All events of this type are V1 + def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None): event_dict = dict(event_dict) @@ -232,3 +240,19 @@ class FrozenEvent(EventBase): self.get("type", None), self.get("state_key", None), ) + + +def room_version_to_event_format(room_version): + """Converts a room version string to the event format + + Args: + room_version (str) + + Returns: + int + """ + if room_version not in KNOWN_ROOM_VERSIONS: + # We should have already checked version, so this should not happen + raise RuntimeError("Unrecognized room version %s" % (room_version,)) + + return EventFormatVersions.V1 diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index b7ad729c63..d749bfdd3a 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -43,8 +43,8 @@ class FederationBase(object): self._clock = hs.get_clock() @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, - include_none=False): + def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version, + outlier=False, include_none=False): """Takes a list of PDUs and checks the signatures and hashs of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of @@ -56,8 +56,12 @@ class FederationBase(object): a new list. Args: + origin (str) pdu (list) - outlier (bool) + room_version (str) + outlier (bool): Whether the events are outliers or not + include_none (str): Whether to include None in the returned list + for events that have failed their checks Returns: Deferred : A list of PDUs that have valid signatures and hashes. @@ -84,6 +88,7 @@ class FederationBase(object): res = yield self.get_pdu( destinations=[pdu.origin], event_id=pdu.event_id, + room_version=room_version, outlier=outlier, timeout=10000, ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d05ed91d64..33ecabca29 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,14 +25,20 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership +from synapse.api.constants import ( + KNOWN_ROOM_VERSIONS, + EventTypes, + Membership, + RoomVersions, +) from synapse.api.errors import ( CodeMessageException, FederationDeniedError, HttpResponseException, SynapseError, ) -from synapse.events import builder +from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.events import room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -66,6 +72,8 @@ class FederationClient(FederationBase): self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() + self.event_builder_factory = hs.get_event_builder_factory() + self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, @@ -202,7 +210,8 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - def get_pdu(self, destinations, event_id, outlier=False, timeout=None): + def get_pdu(self, destinations, event_id, room_version, outlier=False, + timeout=None): """Requests the PDU with given origin and ID from the remote home servers. @@ -212,6 +221,7 @@ class FederationClient(FederationBase): Args: destinations (list): Which home servers to query event_id (str): event to fetch + room_version (str): version of the room outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` @@ -352,10 +362,13 @@ class FederationClient(FederationBase): ev.event_id for ev in itertools.chain(pdus, auth_chain) ]) + room_version = yield self.store.get_room_version(room_id) + signed_pdus = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in pdus if p.event_id not in seen_events], - outlier=True + outlier=True, + room_version=room_version, ) signed_pdus.extend( seen_events[p.event_id] for p in pdus if p.event_id in seen_events @@ -364,7 +377,8 @@ class FederationClient(FederationBase): signed_auth = yield self._check_sigs_and_hash_and_fetch( destination, [p for p in auth_chain if p.event_id not in seen_events], - outlier=True + outlier=True, + room_version=room_version, ) signed_auth.extend( seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events @@ -411,6 +425,8 @@ class FederationClient(FederationBase): random.shuffle(srvs) return srvs + room_version = yield self.store.get_room_version(room_id) + batch_size = 20 missing_events = list(missing_events) for i in range(0, len(missing_events), batch_size): @@ -421,6 +437,7 @@ class FederationClient(FederationBase): self.get_pdu, destinations=random_server_list(), event_id=e_id, + room_version=room_version, ) for e_id in batch ] @@ -450,8 +467,11 @@ class FederationClient(FederationBase): for p in res["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, + outlier=True, room_version=room_version, ) signed_auth.sort(key=lambda e: e.depth) @@ -522,6 +542,8 @@ class FederationClient(FederationBase): Does so by asking one of the already participating servers to create an event with proper context. + Returns a fully signed and hashed event. + Note that this does not append any events to any graphs. Args: @@ -536,8 +558,10 @@ class FederationClient(FederationBase): params (dict[str, str|Iterable[str]]): Query parameters to include in the request. Return: - Deferred: resolves to a tuple of (origin (str), event (object)) - where origin is the remote homeserver which generated the event. + Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of + `(origin, event, event_format)` where origin is the remote + homeserver which generated the event, and event_format is one of + `synapse.api.constants.EventFormatVersions`. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. @@ -557,6 +581,11 @@ class FederationClient(FederationBase): destination, room_id, user_id, membership, params, ) + # Note: If not supplied, the room version may be either v1 or v2, + # however either way the event format version will be v1. + room_version = ret.get("room_version", RoomVersions.V1) + event_format = room_version_to_event_format(room_version) + pdu_dict = ret.get("event", None) if not isinstance(pdu_dict, dict): raise InvalidResponseError("Bad 'event' field in response") @@ -571,10 +600,21 @@ class FederationClient(FederationBase): if "prev_state" not in pdu_dict: pdu_dict["prev_state"] = [] - ev = builder.EventBuilder(pdu_dict) + # Strip off the fields that we want to clobber. + pdu_dict.pop("origin", None) + pdu_dict.pop("origin_server_ts", None) + pdu_dict.pop("unsigned", None) + + builder = self.event_builder_factory.new(pdu_dict) + add_hashes_and_signatures( + builder, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ev = builder.build() defer.returnValue( - (destination, ev) + (destination, ev, event_format) ) return self._try_destination_list( @@ -650,9 +690,21 @@ class FederationClient(FederationBase): for p in itertools.chain(state, auth_chain) } + room_version = None + for e in state: + if (e.type, e.state_key) == (EventTypes.Create, ""): + room_version = e.content.get("room_version", RoomVersions.V1) + break + + if room_version is None: + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. + raise SynapseError(400, "No create event in state") + valid_pdus = yield self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), outlier=True, + room_version=room_version, ) valid_pdus_map = { @@ -790,8 +842,10 @@ class FederationClient(FederationBase): for e in content["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, auth_chain, outlier=True, room_version=room_version, ) signed_auth.sort(key=lambda e: e.depth) @@ -838,8 +892,10 @@ class FederationClient(FederationBase): for e in content.get("events", []) ] + room_version = yield self.store.get_room_version(room_id) + signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False + destination, events, outlier=False, room_version=room_version, ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 37d29e7027..dde166e295 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -400,8 +400,14 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) pdu = yield self.handler.on_make_leave_request(room_id, user_id) + + room_version = yield self.store.get_room_version(room_id) + time_now = self._clock.time_msec() - defer.returnValue({"event": pdu.get_pdu_json(time_now)}) + defer.returnValue({ + "event": pdu.get_pdu_json(time_now), + "room_version": room_version, + }) @defer.inlineCallbacks def on_send_leave_request(self, origin, content): @@ -457,8 +463,10 @@ class FederationServer(FederationBase): for e in content["auth_chain"] ] + room_version = yield self.store.get_room_version(room_id) + signed_auth = yield self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True + origin, auth_chain, outlier=True, room_version=room_version, ) ret = yield self.handler.on_query_auth( diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py new file mode 100644 index 0000000000..73ea7ed018 --- /dev/null +++ b/synapse/handlers/acme.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import attr +from zope.interface import implementer + +from twisted.internet import defer +from twisted.internet.endpoints import serverFromString +from twisted.python.filepath import FilePath +from twisted.python.url import URL +from twisted.web import server, static +from twisted.web.resource import Resource + +logger = logging.getLogger(__name__) + +try: + from txacme.interfaces import ICertificateStore + + @attr.s + @implementer(ICertificateStore) + class ErsatzStore(object): + """ + A store that only stores in memory. + """ + + certs = attr.ib(default=attr.Factory(dict)) + + def store(self, server_name, pem_objects): + self.certs[server_name] = [o.as_bytes() for o in pem_objects] + return defer.succeed(None) + + +except ImportError: + # txacme is missing + pass + + +class AcmeHandler(object): + def __init__(self, hs): + self.hs = hs + self.reactor = hs.get_reactor() + + @defer.inlineCallbacks + def start_listening(self): + + # Configure logging for txacme, if you need to debug + # from eliot import add_destinations + # from eliot.twisted import TwistedDestination + # + # add_destinations(TwistedDestination()) + + from txacme.challenges import HTTP01Responder + from txacme.service import AcmeIssuingService + from txacme.endpoint import load_or_create_client_key + from txacme.client import Client + from josepy.jwa import RS256 + + self._store = ErsatzStore() + responder = HTTP01Responder() + + self._issuer = AcmeIssuingService( + cert_store=self._store, + client_creator=( + lambda: Client.from_url( + reactor=self.reactor, + url=URL.from_text(self.hs.config.acme_url), + key=load_or_create_client_key( + FilePath(self.hs.config.config_dir_path) + ), + alg=RS256, + ) + ), + clock=self.reactor, + responders=[responder], + ) + + well_known = Resource() + well_known.putChild(b'acme-challenge', responder.resource) + responder_resource = Resource() + responder_resource.putChild(b'.well-known', well_known) + responder_resource.putChild(b'check', static.Data(b'OK', b'text/plain')) + + srv = server.Site(responder_resource) + + listeners = [] + + for host in self.hs.config.acme_bind_addresses: + logger.info( + "Listening for ACME requests on %s:%s", host, self.hs.config.acme_port + ) + endpoint = serverFromString( + self.reactor, "tcp:%s:interface=%s" % (self.hs.config.acme_port, host) + ) + listeners.append(endpoint.listen(srv)) + + # Make sure we are registered to the ACME server. There's no public API + # for this, it is usually triggered by startService, but since we don't + # want it to control where we save the certificates, we have to reach in + # and trigger the registration machinery ourselves. + self._issuer._registered = False + yield self._issuer._ensure_registered() + + # Return a Deferred that will fire when all the servers have started up. + yield defer.DeferredList(listeners, fireOnOneErrback=True, consumeErrors=True) + + @defer.inlineCallbacks + def provision_certificate(self): + + logger.warning("Reprovisioning %s", self.hs.hostname) + + try: + yield self._issuer.issue_cert(self.hs.hostname) + except Exception: + logger.exception("Fail!") + raise + logger.warning("Reprovisioned %s, saving.", self.hs.hostname) + cert_chain = self._store.certs[self.hs.hostname] + + try: + with open(self.hs.config.tls_private_key_file, "wb") as private_key_file: + for x in cert_chain: + if x.startswith(b"-----BEGIN RSA PRIVATE KEY-----"): + private_key_file.write(x) + + with open(self.hs.config.tls_certificate_file, "wb") as certificate_file: + for x in cert_chain: + if x.startswith(b"-----BEGIN CERTIFICATE-----"): + certificate_file.write(x) + except Exception: + logger.exception("Failed saving!") + raise + + defer.returnValue(True) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3bb864bb2..c52dca1b81 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -34,6 +34,7 @@ from synapse.api.constants import ( EventTypes, Membership, RejectedReason, + RoomVersions, ) from synapse.api.errors import ( AuthError, @@ -43,10 +44,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) -from synapse.crypto.event_signing import ( - add_hashes_and_signatures, - compute_event_signature, -) +from synapse.crypto.event_signing import compute_event_signature from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -58,7 +56,6 @@ from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room -from synapse.util.frozenutils import unfreeze from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -342,6 +339,8 @@ class FederationHandler(BaseHandler): room_id, event_id, p, ) + room_version = yield self.store.get_room_version(room_id) + with logcontext.nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped @@ -355,7 +354,7 @@ class FederationHandler(BaseHandler): # we want the state *after* p; get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( - [origin], p, outlier=True, + [origin], p, room_version, outlier=True, ) if remote_event is None: @@ -379,7 +378,6 @@ class FederationHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_store( room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), @@ -655,6 +653,8 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") + room_version = yield self.store.get_room_version(room_id) + events = yield self.federation_client.backfill( dest, room_id, @@ -748,6 +748,7 @@ class FederationHandler(BaseHandler): self.federation_client.get_pdu, [dest], event_id, + room_version=room_version, outlier=True, timeout=10000, ) @@ -1083,7 +1084,6 @@ class FederationHandler(BaseHandler): handled_events = set() try: - event = self._sign_event(event) # Try the host we successfully got a response to /make_join/ # request first. try: @@ -1287,7 +1287,7 @@ class FederationHandler(BaseHandler): ) event.internal_metadata.outlier = True - event.internal_metadata.invite_from_remote = True + event.internal_metadata.out_of_band_membership = True event.signatures.update( compute_event_signature( @@ -1313,7 +1313,7 @@ class FederationHandler(BaseHandler): # Mark as outlier as we don't have any state for this event; we're not # even in the room. event.internal_metadata.outlier = True - event = self._sign_event(event) + event.internal_metadata.out_of_band_membership = True # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1336,7 +1336,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content={}, params=None): - origin, pdu = yield self.federation_client.make_membership_event( + origin, pdu, _ = yield self.federation_client.make_membership_event( target_hosts, room_id, user_id, @@ -1357,27 +1357,6 @@ class FederationHandler(BaseHandler): assert(event.room_id == room_id) defer.returnValue((origin, event)) - def _sign_event(self, event): - event.internal_metadata.outlier = False - - builder = self.event_builder_factory.new( - unfreeze(event.get_pdu_json()) - ) - - builder.event_id = self.event_builder_factory.create_event_id() - builder.origin = self.hs.hostname - - if not hasattr(event, "signatures"): - builder.signatures = {} - - add_hashes_and_signatures( - builder, - self.hs.hostname, - self.hs.config.signing_key[0], - ) - - return builder.build() - @defer.inlineCallbacks @log_function def on_make_leave_request(self, room_id, user_id): @@ -1659,6 +1638,13 @@ class FederationHandler(BaseHandler): create_event = e break + if create_event is None: + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. + raise SynapseError(400, "No create event in state") + + room_version = create_event.content.get("room_version", RoomVersions.V1) + missing_auth_events = set() for e in itertools.chain(auth_events, state, [event]): for e_id in e.auth_event_ids(): @@ -1669,6 +1655,7 @@ class FederationHandler(BaseHandler): m_ev = yield self.federation_client.get_pdu( [origin], e_id, + room_version=room_version, outlier=True, timeout=10000, ) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index dc88620885..13e212d669 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -73,8 +73,14 @@ class RoomListHandler(BaseHandler): # We explicitly don't bother caching searches or requests for # appservice specific lists. logger.info("Bypassing cache as search request.") + + # XXX: Quick hack to stop room directory queries taking too long. + # Timeout request after 60s. Probably want a more fundamental + # solution at some point + timeout = self.clock.time() + 60 return self._get_public_room_list( - limit, since_token, search_filter, network_tuple=network_tuple, + limit, since_token, search_filter, + network_tuple=network_tuple, timeout=timeout, ) key = (limit, since_token, network_tuple) @@ -87,7 +93,8 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMPTY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID, + timeout=None,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -202,6 +209,9 @@ class RoomListHandler(BaseHandler): chunk = [] for i in range(0, len(rooms_to_scan), step): + if timeout and self.clock.time() > timeout: + raise Exception("Timed out searching room directory") + batch = rooms_to_scan[i:i + step] logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index ec936bbb4e..49c439313e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -38,6 +38,41 @@ class SearchHandler(BaseHandler): super(SearchHandler, self).__init__(hs) @defer.inlineCallbacks + def get_old_rooms_from_upgraded_room(self, room_id): + """Retrieves room IDs of old rooms in the history of an upgraded room. + + We do so by checking the m.room.create event of the room for a + `predecessor` key. If it exists, we add the room ID to our return + list and then check that room for a m.room.create event and so on + until we can no longer find any more previous rooms. + + The full list of all found rooms in then returned. + + Args: + room_id (str): id of the room to search through. + + Returns: + Deferred[iterable[unicode]]: predecessor room ids + """ + + historical_room_ids = [] + + while True: + predecessor = yield self.store.get_room_predecessor(room_id) + + # If no predecessor, assume we've hit a dead end + if not predecessor: + break + + # Add predecessor's room ID + historical_room_ids.append(predecessor["room_id"]) + + # Scan through the old room for further predecessors + room_id = predecessor["room_id"] + + defer.returnValue(historical_room_ids) + + @defer.inlineCallbacks def search(self, user, content, batch=None): """Performs a full text search for a user. @@ -137,6 +172,18 @@ class SearchHandler(BaseHandler): ) room_ids = set(r.room_id for r in rooms) + # If doing a subset of all rooms seearch, check if any of the rooms + # are from an upgraded room, and search their contents as well + if search_filter.rooms: + historical_room_ids = [] + for room_id in search_filter.rooms: + # Add any previous rooms to the search if they exist + ids = yield self.get_old_rooms_from_upgraded_room(room_id) + historical_room_ids += ids + + # Prevent any historical events from being filtered + search_filter = search_filter.with_room_ids(historical_room_ids) + room_ids = search_filter.filter_rooms(room_ids) if batch_group == "room_id": diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 3c40999338..120815b09b 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -19,6 +19,7 @@ from six import iteritems from twisted.internet import defer +import synapse.metrics from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo @@ -163,6 +164,11 @@ class UserDirectoryHandler(object): yield self._handle_deltas(deltas) self.pos = deltas[-1]["stream_id"] + + # Expose current event processing position to prometheus + synapse.metrics.event_processing_positions.labels( + "user_dir").set(self.pos) + yield self.store.update_user_directory_stream_pos(self.pos) @defer.inlineCallbacks diff --git a/synapse/http/client.py b/synapse/http/client.py index afcf698b29..47a1f82ff0 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -333,9 +333,10 @@ class SimpleHttpClient(object): "POST", uri, headers=Headers(actual_headers), data=query_bytes ) + body = yield make_deferred_yieldable(readBody(response)) + if 200 <= response.code < 300: - body = yield make_deferred_yieldable(treq.json_content(response)) - defer.returnValue(body) + defer.returnValue(json.loads(body)) else: raise HttpResponseException(response.code, response.phrase, body) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 815f8ff2f7..cd79ebab62 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -13,15 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import random import re -from twisted.internet import defer -from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet.error import ConnectError - -from synapse.http.federation.srv_resolver import Server, resolve_service - logger = logging.getLogger(__name__) @@ -88,140 +81,3 @@ def parse_and_validate_server_name(server_name): )) return host, port - - -def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None, - timeout=None): - """Construct an endpoint for the given matrix destination. - - Args: - reactor: Twisted reactor. - destination (unicode): The name of the server to connect to. - tls_client_options_factory - (synapse.crypto.context_factory.ClientTLSOptionsFactory): - Factory which generates TLS options for client connections. - timeout (int): connection timeout in seconds - """ - - domain, port = parse_server_name(destination) - - endpoint_kw_args = {} - - if timeout is not None: - endpoint_kw_args.update(timeout=timeout) - - if tls_client_options_factory is None: - transport_endpoint = HostnameEndpoint - default_port = 8008 - else: - # the SNI string should be the same as the Host header, minus the port. - # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777, - # the Host header and SNI should therefore be the server_name of the remote - # server. - tls_options = tls_client_options_factory.get_options(domain) - - def transport_endpoint(reactor, host, port, timeout): - return wrapClientTLS( - tls_options, - HostnameEndpoint(reactor, host, port, timeout=timeout), - ) - default_port = 8448 - - if port is None: - return SRVClientEndpoint( - reactor, "matrix", domain, protocol="tcp", - default_port=default_port, endpoint=transport_endpoint, - endpoint_kw_args=endpoint_kw_args - ) - else: - return transport_endpoint( - reactor, domain, port, **endpoint_kw_args - ) - - -class SRVClientEndpoint(object): - """An endpoint which looks up SRV records for a service. - Cycles through the list of servers starting with each call to connect - picking the next server. - Implements twisted.internet.interfaces.IStreamClientEndpoint. - """ - - def __init__(self, reactor, service, domain, protocol="tcp", - default_port=None, endpoint=HostnameEndpoint, - endpoint_kw_args={}): - self.reactor = reactor - self.service_name = "_%s._%s.%s" % (service, protocol, domain) - - if default_port is not None: - self.default_server = Server( - host=domain, - port=default_port, - ) - else: - self.default_server = None - - self.endpoint = endpoint - self.endpoint_kw_args = endpoint_kw_args - - self.servers = None - self.used_servers = None - - @defer.inlineCallbacks - def fetch_servers(self): - self.used_servers = [] - self.servers = yield resolve_service(self.service_name) - - def pick_server(self): - if not self.servers: - if self.used_servers: - self.servers = self.used_servers - self.used_servers = [] - self.servers.sort() - elif self.default_server: - return self.default_server - else: - raise ConnectError( - "No server available for %s" % self.service_name - ) - - # look for all servers with the same priority - min_priority = self.servers[0].priority - weight_indexes = list( - (index, server.weight + 1) - for index, server in enumerate(self.servers) - if server.priority == min_priority - ) - - total_weight = sum(weight for index, weight in weight_indexes) - target_weight = random.randint(0, total_weight) - for index, weight in weight_indexes: - target_weight -= weight - if target_weight <= 0: - server = self.servers[index] - # XXX: this looks totally dubious: - # - # (a) we never reuse a server until we have been through - # all of the servers at the same priority, so if the - # weights are A: 100, B:1, we always do ABABAB instead of - # AAAA...AAAB (approximately). - # - # (b) After using all the servers at the lowest priority, - # we move onto the next priority. We should only use the - # second priority if servers at the top priority are - # unreachable. - # - del self.servers[index] - self.used_servers.append(server) - return server - - @defer.inlineCallbacks - def connect(self, protocolFactory): - if self.servers is None: - yield self.fetch_servers() - server = self.pick_server() - logger.info("Connecting to %s:%s", server.host, server.port) - endpoint = self.endpoint( - self.reactor, server.host, server.port, **self.endpoint_kw_args - ) - connection = yield endpoint.connect(protocolFactory) - defer.returnValue(connection) diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py new file mode 100644 index 0000000000..0ec28c6696 --- /dev/null +++ b/synapse/http/federation/matrix_federation_agent.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from zope.interface import implementer + +from twisted.internet import defer +from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.web.client import URI, Agent, HTTPConnectionPool +from twisted.web.iweb import IAgent + +from synapse.http.endpoint import parse_server_name +from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list +from synapse.util.logcontext import make_deferred_yieldable + +logger = logging.getLogger(__name__) + + +@implementer(IAgent) +class MatrixFederationAgent(object): + """An Agent-like thing which provides a `request` method which will look up a matrix + server and send an HTTP request to it. + + Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.) + + Args: + reactor (IReactor): twisted reactor to use for underlying requests + + tls_client_options_factory (ClientTLSOptionsFactory|None): + factory to use for fetching client tls options, or none to disable TLS. + + srv_resolver (SrvResolver|None): + SRVResolver impl to use for looking up SRV records. None to use a default + implementation. + """ + + def __init__( + self, reactor, tls_client_options_factory, _srv_resolver=None, + ): + self._reactor = reactor + self._tls_client_options_factory = tls_client_options_factory + if _srv_resolver is None: + _srv_resolver = SrvResolver() + self._srv_resolver = _srv_resolver + + self._pool = HTTPConnectionPool(reactor) + self._pool.retryAutomatically = False + self._pool.maxPersistentPerHost = 5 + self._pool.cachedConnectionTimeout = 2 * 60 + + @defer.inlineCallbacks + def request(self, method, uri, headers=None, bodyProducer=None): + """ + Args: + method (bytes): HTTP method: GET/POST/etc + + uri (bytes): Absolute URI to be retrieved + + headers (twisted.web.http_headers.Headers|None): + HTTP headers to send with the request, or None to + send no extra headers. + + bodyProducer (twisted.web.iweb.IBodyProducer|None): + An object which can generate bytes to make up the + body of this request (for example, the properly encoded contents of + a file for a file upload). Or None if the request is to have + no body. + + Returns: + Deferred[twisted.web.iweb.IResponse]: + fires when the header of the response has been received (regardless of the + response status code). Fails if there is any problem which prevents that + response from being received (including problems that prevent the request + from being sent). + """ + + parsed_uri = URI.fromBytes(uri) + server_name_bytes = parsed_uri.netloc + host, port = parse_server_name(server_name_bytes.decode("ascii")) + + # XXX disabling TLS is really only supported here for the benefit of the + # unit tests. We should make the UTs cope with TLS rather than having to make + # the code support the unit tests. + if self._tls_client_options_factory is None: + tls_options = None + else: + tls_options = self._tls_client_options_factory.get_options(host) + + if port is not None: + target = (host, port) + else: + service_name = b"_matrix._tcp.%s" % (server_name_bytes, ) + server_list = yield self._srv_resolver.resolve_service(service_name) + if not server_list: + target = (host, 8448) + logger.debug("No SRV record for %s, using %s", host, target) + else: + target = pick_server_from_list(server_list) + + class EndpointFactory(object): + @staticmethod + def endpointForURI(_uri): + logger.info("Connecting to %s:%s", target[0], target[1]) + ep = HostnameEndpoint(self._reactor, host=target[0], port=target[1]) + if tls_options is not None: + ep = wrapClientTLS(tls_options, ep) + return ep + + agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool) + res = yield make_deferred_yieldable( + agent.request(method, uri, headers, bodyProducer) + ) + defer.returnValue(res) diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index c49b82c394..71830c549d 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +import random import time import attr @@ -51,74 +52,118 @@ class Server(object): expires = attr.ib(default=0) -@defer.inlineCallbacks -def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=time): - """Look up a SRV record, with caching +def pick_server_from_list(server_list): + """Randomly choose a server from the server list + + Args: + server_list (list[Server]): list of candidate servers + + Returns: + Tuple[bytes, int]: (host, port) pair for the chosen server + """ + if not server_list: + raise RuntimeError("pick_server_from_list called with empty list") + + # TODO: currently we only use the lowest-priority servers. We should maintain a + # cache of servers known to be "down" and filter them out + + min_priority = min(s.priority for s in server_list) + eligible_servers = list(s for s in server_list if s.priority == min_priority) + total_weight = sum(s.weight for s in eligible_servers) + target_weight = random.randint(0, total_weight) + + for s in eligible_servers: + target_weight -= s.weight + + if target_weight <= 0: + return s.host, s.port + + # this should be impossible. + raise RuntimeError( + "pick_server_from_list got to end of eligible server list.", + ) + + +class SrvResolver(object): + """Interface to the dns client to do SRV lookups, with result caching. The default resolver in twisted.names doesn't do any caching (it has a CacheResolver, but the cache never gets populated), so we add our own caching layer here. Args: - service_name (unicode|bytes): record to look up dns_client (twisted.internet.interfaces.IResolver): twisted resolver impl cache (dict): cache object - clock (object): clock implementation. must provide a time() method. - - Returns: - Deferred[list[Server]]: a list of the SRV records, or an empty list if none found + get_time (callable): clock implementation. Should return seconds since the epoch """ - # TODO: the dns client handles both unicode names (encoding via idna) and pre-encoded - # byteses; however they will obviously end up as separate entries in the cache. We - # should pick one form and stick with it. - cache_entry = cache.get(service_name, None) - if cache_entry: - if all(s.expires > int(clock.time()) for s in cache_entry): - servers = list(cache_entry) - defer.returnValue(servers) - - try: - answers, _, _ = yield make_deferred_yieldable( - dns_client.lookupService(service_name), - ) - except DNSNameError: - # TODO: cache this. We can get the SOA out of the exception, and use - # the negative-TTL value. - defer.returnValue([]) - except DomainError as e: - # We failed to resolve the name (other than a NameError) - # Try something in the cache, else rereaise - cache_entry = cache.get(service_name, None) + def __init__(self, dns_client=client, cache=SERVER_CACHE, get_time=time.time): + self._dns_client = dns_client + self._cache = cache + self._get_time = get_time + + @defer.inlineCallbacks + def resolve_service(self, service_name): + """Look up a SRV record + + Args: + service_name (bytes): record to look up + + Returns: + Deferred[list[Server]]: + a list of the SRV records, or an empty list if none found + """ + now = int(self._get_time()) + + if not isinstance(service_name, bytes): + raise TypeError("%r is not a byte string" % (service_name,)) + + cache_entry = self._cache.get(service_name, None) if cache_entry: - logger.warn( - "Failed to resolve %r, falling back to cache. %r", - service_name, e + if all(s.expires > now for s in cache_entry): + servers = list(cache_entry) + defer.returnValue(servers) + + try: + answers, _, _ = yield make_deferred_yieldable( + self._dns_client.lookupService(service_name), ) - defer.returnValue(list(cache_entry)) - else: - raise e - - if (len(answers) == 1 - and answers[0].type == dns.SRV - and answers[0].payload - and answers[0].payload.target == dns.Name(b'.')): - raise ConnectError("Service %s unavailable" % service_name) - - servers = [] - - for answer in answers: - if answer.type != dns.SRV or not answer.payload: - continue - - payload = answer.payload - - servers.append(Server( - host=payload.target.name, - port=payload.port, - priority=payload.priority, - weight=payload.weight, - expires=int(clock.time()) + answer.ttl, - )) - - servers.sort() # FIXME: get rid of this (it's broken by the attrs change) - cache[service_name] = list(servers) - defer.returnValue(servers) + except DNSNameError: + # TODO: cache this. We can get the SOA out of the exception, and use + # the negative-TTL value. + defer.returnValue([]) + except DomainError as e: + # We failed to resolve the name (other than a NameError) + # Try something in the cache, else rereaise + cache_entry = self._cache.get(service_name, None) + if cache_entry: + logger.warn( + "Failed to resolve %r, falling back to cache. %r", + service_name, e + ) + defer.returnValue(list(cache_entry)) + else: + raise e + + if (len(answers) == 1 + and answers[0].type == dns.SRV + and answers[0].payload + and answers[0].payload.target == dns.Name(b'.')): + raise ConnectError("Service %s unavailable" % service_name) + + servers = [] + + for answer in answers: + if answer.type != dns.SRV or not answer.payload: + continue + + payload = answer.payload + + servers.append(Server( + host=payload.target.name, + port=payload.port, + priority=payload.priority, + weight=payload.weight, + expires=now + answer.ttl, + )) + + self._cache[service_name] = list(servers) + defer.returnValue(servers) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 250bb1ef91..980e912348 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -32,7 +32,7 @@ from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool +from twisted.web.client import FileBodyProducer from twisted.web.http_headers import Headers import synapse.metrics @@ -44,7 +44,7 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) -from synapse.http.endpoint import matrix_federation_endpoint +from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure @@ -66,20 +66,6 @@ else: MAXINT = sys.maxint -class MatrixFederationEndpointFactory(object): - def __init__(self, hs): - self.reactor = hs.get_reactor() - self.tls_client_options_factory = hs.tls_client_options_factory - - def endpointForURI(self, uri): - destination = uri.netloc.decode('ascii') - - return matrix_federation_endpoint( - self.reactor, destination, timeout=10, - tls_client_options_factory=self.tls_client_options_factory - ) - - _next_id = 1 @@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname reactor = hs.get_reactor() - pool = HTTPConnectionPool(reactor) - pool.retryAutomatically = False - pool.maxPersistentPerHost = 5 - pool.cachedConnectionTimeout = 2 * 60 - self.agent = Agent.usingEndpointFactory( - reactor, MatrixFederationEndpointFactory(hs), pool=pool + + self.agent = MatrixFederationAgent( + hs.get_reactor(), + hs.tls_client_options_factory, ) self.clock = hs.get_clock() self._store = hs.get_datastore() @@ -316,9 +300,9 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers logger.info( - "{%s} [%s] Sending request: %s %s", + "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, request.method, - url_str, + url_str, _sec_timeout, ) try: @@ -338,12 +322,11 @@ class MatrixFederationHttpClient(object): reactor=self.hs.get_reactor(), ) - response = yield make_deferred_yieldable( - request_deferred, - ) + response = yield request_deferred except DNSLookupError as e: raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) except Exception as e: + logger.info("Failed to send request: %s", e) raise_from(RequestSendFailed(e, can_retry=True), e) logger.info( diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 882e844eb1..756721e304 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -79,6 +79,10 @@ CONDITIONAL_REQUIREMENTS = { # ConsentResource uses select_autoescape, which arrived in jinja 2.9 "resources.consent": ["Jinja2>=2.9"], + # ACME support is required to provision TLS certificates from authorities + # that use the protocol, such as Let's Encrypt. + "acme": ["txacme>=0.9.2"], + "saml2": ["pysaml2>=4.5.0"], "url_preview": ["lxml>=3.5.0"], "test": ["mock>=2.0"], diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index aec0c6b075..7f812b8209 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -309,22 +309,16 @@ class RegisterRestServlet(RestServlet): assigned_user_id=registered_user_id, ) - # Only give msisdn flows if the x_show_msisdn flag is given: - # this is a hack to work around the fact that clients were shipped - # that use fallback registration if they see any flows that they don't - # recognise, which means we break registration for these clients if we - # advertise msisdn flows. Once usage of Riot iOS <=0.3.9 and Riot - # Android <=0.6.9 have fallen below an acceptable threshold, this - # parameter should go away and we should always advertise msisdn flows. - show_msisdn = False - if 'x_show_msisdn' in body and body['x_show_msisdn']: - show_msisdn = True - # FIXME: need a better error than "no auth flow found" for scenarios # where we required 3PID for registration but the user didn't give one require_email = 'email' in self.hs.config.registrations_require_3pid require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid + show_msisdn = True + if self.hs.config.disable_msisdn_registration: + show_msisdn = False + require_msisdn = False + flows = [] if self.hs.config.enable_registration_captcha: # only support 3PIDless registration if no 3PIDs are required @@ -422,8 +416,11 @@ class RegisterRestServlet(RestServlet): ) # Necessary due to auth checks prior to the threepid being # written to the db - if is_threepid_reserved(self.hs.config, threepid): - yield self.store.upsert_monthly_active_user(registered_user_id) + if threepid: + if is_threepid_reserved( + self.hs.config.mau_limits_reserved_threepids, threepid + ): + yield self.store.upsert_monthly_active_user(registered_user_id) # remember that we've now registered that user account, and with # what user ID (since the user may not have specified) diff --git a/synapse/server.py b/synapse/server.py index 9985687b95..c8914302cf 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -46,6 +46,7 @@ from synapse.federation.transport.client import TransportLayerClient from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer from synapse.groups.groups_server import GroupsServerHandler from synapse.handlers import Handlers +from synapse.handlers.acme import AcmeHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGenerator from synapse.handlers.deactivate_account import DeactivateAccountHandler @@ -129,6 +130,7 @@ class HomeServer(object): 'sync_handler', 'typing_handler', 'room_list_handler', + 'acme_handler', 'auth_handler', 'device_handler', 'e2e_keys_handler', @@ -310,6 +312,9 @@ class HomeServer(object): def build_e2e_room_keys_handler(self): return E2eRoomKeysHandler(self) + def build_acme_handler(self): + return AcmeHandler(self) + def build_application_service_api(self): return ApplicationServiceApi(self) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 865b5e915a..f62f70b9f1 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -26,6 +26,7 @@ from prometheus_client import Histogram from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import Cache from synapse.util.logcontext import LoggingContext, PreserveLoggingContext @@ -192,6 +193,51 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine + # A set of tables that are not safe to use native upserts in. + self._unsafe_to_upsert_tables = {"user_ips"} + + if self.database_engine.can_native_upsert: + # Check ASAP (and then later, every 1s) to see if we have finished + # background updates of tables that aren't safe to update. + self._clock.call_later( + 0.0, + run_as_background_process, + "upsert_safety_check", + self._check_safe_to_upsert + ) + + @defer.inlineCallbacks + def _check_safe_to_upsert(self): + """ + Is it safe to use native UPSERT? + + If there are background updates, we will need to wait, as they may be + the addition of indexes that set the UNIQUE constraint that we require. + + If the background updates have not completed, wait 15 sec and check again. + """ + updates = yield self._simple_select_list( + "background_updates", + keyvalues=None, + retcols=["update_name"], + desc="check_background_updates", + ) + updates = [x["update_name"] for x in updates] + + # The User IPs table in schema #53 was missing a unique index, which we + # run as a background update. + if "user_ips_device_unique_index" not in updates: + self._unsafe_to_upsert_tables.discard("user_ips") + + # If there's any tables left to check, reschedule to run. + if self._unsafe_to_upsert_tables: + self._clock.call_later( + 15.0, + run_as_background_process, + "upsert_safety_check", + self._check_safe_to_upsert + ) + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -494,8 +540,15 @@ class SQLBaseStore(object): txn.executemany(sql, vals) @defer.inlineCallbacks - def _simple_upsert(self, table, keyvalues, values, - insertion_values={}, desc="_simple_upsert", lock=True): + def _simple_upsert( + self, + table, + keyvalues, + values, + insertion_values={}, + desc="_simple_upsert", + lock=True + ): """ `lock` should generally be set to True (the default), but can be set @@ -516,16 +569,21 @@ class SQLBaseStore(object): inserting lock (bool): True to lock the table when doing the upsert. Returns: - Deferred(bool): True if a new entry was created, False if an - existing one was updated. + Deferred(None or bool): Native upserts always return None. Emulated + upserts return True if a new entry was created, False if an existing + one was updated. """ attempts = 0 while True: try: result = yield self.runInteraction( desc, - self._simple_upsert_txn, table, keyvalues, values, insertion_values, - lock=lock + self._simple_upsert_txn, + table, + keyvalues, + values, + insertion_values, + lock=lock, ) defer.returnValue(result) except self.database_engine.module.IntegrityError as e: @@ -537,12 +595,71 @@ class SQLBaseStore(object): # presumably we raced with another transaction: let's retry. logger.warn( - "IntegrityError when upserting into %s; retrying: %s", - table, e + "%s when upserting into %s; retrying: %s", e.__name__, table, e ) - def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, - lock=True): + def _simple_upsert_txn( + self, + txn, + table, + keyvalues, + values, + insertion_values={}, + lock=True, + ): + """ + Pick the UPSERT method which works best on the platform. Either the + native one (Pg9.5+, recent SQLites), or fall back to an emulated method. + + Args: + txn: The transaction to use. + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + insertion_values (dict): additional key/values to use only when + inserting + lock (bool): True to lock the table when doing the upsert. + Returns: + None or bool: Native upserts always return None. Emulated + upserts return True if a new entry was created, False if an existing + one was updated. + """ + if ( + self.database_engine.can_native_upsert + and table not in self._unsafe_to_upsert_tables + ): + return self._simple_upsert_txn_native_upsert( + txn, + table, + keyvalues, + values, + insertion_values=insertion_values, + ) + else: + return self._simple_upsert_txn_emulated( + txn, + table, + keyvalues, + values, + insertion_values=insertion_values, + lock=lock, + ) + + def _simple_upsert_txn_emulated( + self, txn, table, keyvalues, values, insertion_values={}, lock=True + ): + """ + Args: + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + insertion_values (dict): additional key/values to use only when + inserting + lock (bool): True to lock the table when doing the upsert. + Returns: + bool: Return True if a new entry was created, False if an existing + one was updated. + """ # We need to lock the table :(, unless we're *really* careful if lock: self.database_engine.lock_table(txn, table) @@ -577,12 +694,44 @@ class SQLBaseStore(object): sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, ", ".join(k for k in allvalues), - ", ".join("?" for _ in allvalues) + ", ".join("?" for _ in allvalues), ) txn.execute(sql, list(allvalues.values())) # successfully inserted return True + def _simple_upsert_txn_native_upsert( + self, txn, table, keyvalues, values, insertion_values={} + ): + """ + Use the native UPSERT functionality in recent PostgreSQL versions. + + Args: + table (str): The table to upsert into + keyvalues (dict): The unique key tables and their new values + values (dict): The nonunique columns and their new values + insertion_values (dict): additional key/values to use only when + inserting + Returns: + None + """ + allvalues = {} + allvalues.update(keyvalues) + allvalues.update(values) + allvalues.update(insertion_values) + + sql = ( + "INSERT INTO %s (%s) VALUES (%s) " + "ON CONFLICT (%s) DO UPDATE SET %s" + ) % ( + table, + ", ".join(k for k in allvalues), + ", ".join("?" for _ in allvalues), + ", ".join(k for k in keyvalues), + ", ".join(k + "=EXCLUDED." + k for k in values), + ) + txn.execute(sql, list(allvalues.values())) + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 5d548f250a..091d7116c5 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -110,8 +110,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): @defer.inlineCallbacks def _remove_user_ip_dupes(self, progress, batch_size): + # This works function works by scanning the user_ips table in batches + # based on `last_seen`. For each row in a batch it searches the rest of + # the table to see if there are any duplicates, if there are then they + # are removed and replaced with a suitable row. - last_seen_progress = progress.get("last_seen", 0) + # Fetch the start of the batch + begin_last_seen = progress.get("last_seen", 0) def get_last_seen(txn): txn.execute( @@ -122,29 +127,28 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): LIMIT 1 OFFSET ? """, - (last_seen_progress, batch_size) + (begin_last_seen, batch_size) ) - results = txn.fetchone() - return results - - # Get a last seen that's sufficiently far away enough from the last one - last_seen = yield self.runInteraction( + row = txn.fetchone() + if row: + return row[0] + else: + return None + + # Get a last seen that has roughly `batch_size` since `begin_last_seen` + end_last_seen = yield self.runInteraction( "user_ips_dups_get_last_seen", get_last_seen ) - if not last_seen: - # If we get a None then we're reaching the end and just need to - # delete the last batch. - last = True + # If it returns None, then we're processing the last batch + last = end_last_seen is None - # We fake not having an upper bound by using a future date, by - # just multiplying the current time by two.... - last_seen = int(self.clock.time_msec()) * 2 - else: - last = False - last_seen = last_seen[0] + logger.info( + "Scanning for duplicate 'user_ips' rows in range: %s <= last_seen < %s", + begin_last_seen, end_last_seen, + ) - def remove(txn, last_seen_progress, last_seen): + def remove(txn): # This works by looking at all entries in the given time span, and # then for each (user_id, access_token, ip) tuple in that range # checking for any duplicates in the rest of the table (via a join). @@ -153,6 +157,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): # all other duplicates. # It is efficient due to the existence of (user_id, access_token, # ip) and (last_seen) indices. + + # Define the search space, which requires handling the last batch in + # a different way + if last: + clause = "? <= last_seen" + args = (begin_last_seen,) + else: + clause = "? <= last_seen AND last_seen < ?" + args = (begin_last_seen, end_last_seen) + txn.execute( """ SELECT user_id, access_token, ip, @@ -160,13 +174,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): FROM ( SELECT user_id, access_token, ip FROM user_ips - WHERE ? <= last_seen AND last_seen < ? - ORDER BY last_seen + WHERE {} ) c INNER JOIN user_ips USING (user_id, access_token, ip) GROUP BY user_id, access_token, ip - HAVING count(*) > 1""", - (last_seen_progress, last_seen) + HAVING count(*) > 1 + """.format(clause), + args ) res = txn.fetchall() @@ -194,12 +208,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) self._background_update_progress_txn( - txn, "user_ips_remove_dupes", {"last_seen": last_seen} + txn, "user_ips_remove_dupes", {"last_seen": end_last_seen} ) - yield self.runInteraction( - "user_ips_dups_remove", remove, last_seen_progress, last_seen - ) + yield self.runInteraction("user_ips_dups_remove", remove) + if last: yield self._end_background_update("user_ips_remove_dupes") @@ -244,7 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) def _update_client_ips_batch_txn(self, txn, to_update): - self.database_engine.lock_table(txn, "user_ips") + if "user_ips" in self._unsafe_to_upsert_tables or ( + not self.database_engine.can_native_upsert + ): + self.database_engine.lock_table(txn, "user_ips") for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index e2f9de8451..ff5ef97ca8 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -18,7 +18,7 @@ import platform from ._base import IncorrectDatabaseSetup from .postgres import PostgresEngine -from .sqlite3 import Sqlite3Engine +from .sqlite import Sqlite3Engine SUPPORTED_MODULE = { "sqlite3": Sqlite3Engine, diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 42225f8a2a..4004427c7b 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -38,6 +38,13 @@ class PostgresEngine(object): return sql.replace("?", "%s") def on_new_connection(self, db_conn): + + # Get the version of PostgreSQL that we're using. As per the psycopg2 + # docs: The number is formed by converting the major, minor, and + # revision numbers into two-decimal-digit numbers and appending them + # together. For example, version 8.1.5 will be returned as 80105 + self._version = db_conn.server_version + db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) @@ -54,6 +61,13 @@ class PostgresEngine(object): cursor.close() + @property + def can_native_upsert(self): + """ + Can we use native UPSERTs? This requires PostgreSQL 9.5+. + """ + return self._version >= 90500 + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite.py index 19949fc474..c64d73ff21 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite.py @@ -15,6 +15,7 @@ import struct import threading +from sqlite3 import sqlite_version_info from synapse.storage.prepare_database import prepare_database @@ -30,6 +31,14 @@ class Sqlite3Engine(object): self._current_state_group_id = None self._current_state_group_id_lock = threading.Lock() + @property + def can_native_upsert(self): + """ + Do we support native UPSERTs? This requires SQLite3 3.24+, plus some + more work we haven't done yet to tell what was inserted vs updated. + """ + return sqlite_version_info >= (3, 24, 0) + def check_database(self, txn): pass diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 79e0276de6..3e1915fb87 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1268,6 +1268,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore event.internal_metadata.get_dict() ), "json": encode_json(event_dict(event)), + "format_version": event.format_version, } for event, _ in events_and_contexts ], diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a8326f5296..599f892858 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -21,10 +21,10 @@ from canonicaljson import json from twisted.internet import defer +from synapse.api.constants import EventFormatVersions from synapse.api.errors import NotFoundError -# these are only included to make the type annotations work -from synapse.events import EventBase # noqa: F401 from synapse.events import FrozenEvent +# these are only included to make the type annotations work from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event from synapse.metrics.background_process_metrics import run_as_background_process @@ -353,6 +353,7 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], + format_version=row["format_version"], ) for row in rows ], @@ -377,6 +378,7 @@ class EventsWorkerStore(SQLBaseStore): " e.event_id as event_id, " " e.internal_metadata," " e.json," + " e.format_version, " " r.redacts as redacts," " rej.event_id as rejects " " FROM event_json as e" @@ -392,7 +394,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, - rejected_reason=None): + format_version, rejected_reason=None): with Measure(self._clock, "_get_event_from_row"): d = json.loads(js) internal_metadata = json.loads(internal_metadata) @@ -405,8 +407,17 @@ class EventsWorkerStore(SQLBaseStore): desc="_get_event_from_row_rejected_reason", ) + if format_version is None: + # This means that we stored the event before we had the concept + # of a event format version, so it must be a V1 event. + format_version = EventFormatVersions.V1 + + # TODO: When we implement new event formats we'll need to use a + # different event python type + assert format_version == EventFormatVersions.V1 + original_ev = FrozenEvent( - d, + event_dict=d, internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 2743b52bad..134297e284 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -215,7 +215,7 @@ class PusherStore(PusherWorkerStore): with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on # (app_id, pushkey, user_name) so _simple_upsert will retry - newly_inserted = yield self._simple_upsert( + yield self._simple_upsert( table="pushers", keyvalues={ "app_id": app_id, @@ -238,7 +238,12 @@ class PusherStore(PusherWorkerStore): lock=False, ) - if newly_inserted: + user_has_pusher = self.get_if_user_has_pusher.cache.get( + (user_id,), None, update_metrics=False + ) + + if user_has_pusher is not True: + # invalidate, since we the user might not have had a pusher before yield self.runInteraction( "add_pusher", self._invalidate_cache_and_stream, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0707f9a86a..592c1bcd33 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -588,12 +588,12 @@ class RoomMemberStore(RoomMemberWorkerStore): ) # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. - # The only current event that can also be an outlier is if its an - # invite that has come in across federation. + # i.e., its something that has just happened. If the event is an + # outlier it is only current if its an "out of band membership", + # like a remote invite or a rejection of a remote invite. is_new_state = not backfilled and ( not event.internal_metadata.is_outlier() - or event.internal_metadata.is_invite_from_remote() + or event.internal_metadata.is_out_of_band_membership() ) is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: diff --git a/synapse/storage/schema/delta/53/event_format_version.sql b/synapse/storage/schema/delta/53/event_format_version.sql new file mode 100644 index 0000000000..1d977c2834 --- /dev/null +++ b/synapse/storage/schema/delta/53/event_format_version.sql @@ -0,0 +1,16 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE event_json ADD COLUMN format_version INTEGER; diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a134e9b3e8..c3ab7db7ae 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -437,6 +437,30 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): create_event = yield self.get_event(create_id) defer.returnValue(create_event.content.get("room_version", "1")) + @defer.inlineCallbacks + def get_room_predecessor(self, room_id): + """Get the predecessor room of an upgraded room if one exists. + Otherwise return None. + + Args: + room_id (str) + + Returns: + Deferred[unicode|None]: predecessor room id + """ + state_ids = yield self.get_current_state_ids(room_id) + create_id = state_ids.get((EventTypes.Create, "")) + + # If we can't find the create event, assume we've hit a dead end + if not create_id: + defer.returnValue(None) + + # Retrieve the room's create event + create_event = yield self.get_event(create_id) + + # Return predecessor if present + defer.returnValue(create_event.content.get("predecessor", None)) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): """Get the current state event ids for a room based on the diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index a8781b0e5d..ce48212265 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -168,14 +168,14 @@ class UserDirectoryStore(SQLBaseStore): if isinstance(self.database_engine, PostgresEngine): # We weight the localpart most highly, then display name and finally # server name - if new_entry: + if self.database_engine.can_native_upsert: sql = """ INSERT INTO user_directory_search(user_id, vector) VALUES (?, setweight(to_tsvector('english', ?), 'A') || setweight(to_tsvector('english', ?), 'D') || setweight(to_tsvector('english', COALESCE(?, '')), 'B') - ) + ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector """ txn.execute( sql, @@ -185,20 +185,45 @@ class UserDirectoryStore(SQLBaseStore): ) ) else: - sql = """ - UPDATE user_directory_search - SET vector = setweight(to_tsvector('english', ?), 'A') - || setweight(to_tsvector('english', ?), 'D') - || setweight(to_tsvector('english', COALESCE(?, '')), 'B') - WHERE user_id = ? - """ - txn.execute( - sql, - ( - get_localpart_from_id(user_id), get_domain_from_id(user_id), - display_name, user_id, + # TODO: Remove this code after we've bumped the minimum version + # of postgres to always support upserts, so we can get rid of + # `new_entry` usage + if new_entry is True: + sql = """ + INSERT INTO user_directory_search(user_id, vector) + VALUES (?, + setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + ) + """ + txn.execute( + sql, + ( + user_id, get_localpart_from_id(user_id), + get_domain_from_id(user_id), display_name, + ) + ) + elif new_entry is False: + sql = """ + UPDATE user_directory_search + SET vector = setweight(to_tsvector('english', ?), 'A') + || setweight(to_tsvector('english', ?), 'D') + || setweight(to_tsvector('english', COALESCE(?, '')), 'B') + WHERE user_id = ? + """ + txn.execute( + sql, + ( + get_localpart_from_id(user_id), + get_domain_from_id(user_id), + display_name, user_id, + ) + ) + else: + raise RuntimeError( + "upsert returned None when 'can_native_upsert' is False" ) - ) elif isinstance(self.database_engine, Sqlite3Engine): value = "%s %s" % (user_id, display_name,) if display_name else user_id self._simple_upsert_txn( |