diff options
68 files changed, 2520 insertions, 1206 deletions
diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 24f22c85b4..369a1ffed1 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -40,6 +40,14 @@ steps: - command: - "python -m pip install tox" + - "tox -e check-sampleconfig" + label: "\U0001F9F9 check-sample-config" + plugins: + - docker#v3.0.1: + image: "python:3.6" + + - command: + - "python -m pip install tox" - "tox -e py27,codecov" label: ":python: 2.7 / SQLite" env: diff --git a/CHANGES.md b/CHANGES.md index 22684420a4..b25775d18e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,5 @@ -Synapse 0.99.2rc1 (2019-02-27) -============================== +Synapse 0.99.2 (2019-03-01) +=========================== Features -------- diff --git a/README.rst b/README.rst index 8e22109973..c04fb4d19a 100644 --- a/README.rst +++ b/README.rst @@ -117,9 +117,9 @@ recommended to also set up CAPTCHA - see `<docs/CAPTCHA_SETUP.rst>`_.) Once ``enable_registration`` is set to ``true``, it is possible to register a user via `riot.im <https://riot.im/app/#/register>`_ or other Matrix clients. -Your new user name will be formed partly from the ``server_name`` (see -`Configuring synapse`_), and partly from a localpart you specify when you -create the account. Your name will take the form of:: +Your new user name will be formed partly from the ``server_name``, and partly +from a localpart you specify when you create the account. Your name will take +the form of:: @localpart:my.domain.name diff --git a/changelog.d/4699.bugfix b/changelog.d/4699.bugfix new file mode 100644 index 0000000000..1d7f3174e7 --- /dev/null +++ b/changelog.d/4699.bugfix @@ -0,0 +1 @@ +Fix attempting to paginate in rooms where server cannot see any events, to avoid unnecessarily pulling in lots of redacted events. diff --git a/changelog.d/4735.feature b/changelog.d/4735.feature new file mode 100644 index 0000000000..a4c0b196f6 --- /dev/null +++ b/changelog.d/4735.feature @@ -0,0 +1 @@ +Add configurable rate limiting to the /register endpoint. diff --git a/changelog.d/4770.misc b/changelog.d/4770.misc new file mode 100644 index 0000000000..144d819958 --- /dev/null +++ b/changelog.d/4770.misc @@ -0,0 +1 @@ +Optimise EDU transmission for the federation_sender worker. diff --git a/changelog.d/4771.misc b/changelog.d/4771.misc new file mode 100644 index 0000000000..8fa3401ca4 --- /dev/null +++ b/changelog.d/4771.misc @@ -0,0 +1 @@ +Update test_typing to use HomeserverTestCase. diff --git a/changelog.d/4776.bugfix b/changelog.d/4776.bugfix new file mode 100644 index 0000000000..ce3e6ce33c --- /dev/null +++ b/changelog.d/4776.bugfix @@ -0,0 +1 @@ +Fix incorrect log about not persisting duplicate state event. diff --git a/changelog.d/4790.bugfix b/changelog.d/4790.bugfix new file mode 100644 index 0000000000..aa8eb93246 --- /dev/null +++ b/changelog.d/4790.bugfix @@ -0,0 +1 @@ +Fix v4v6 option in HAProxy example config. Contributed by Flakebi. diff --git a/changelog.d/4791.feature b/changelog.d/4791.feature new file mode 100644 index 0000000000..1e5fd32463 --- /dev/null +++ b/changelog.d/4791.feature @@ -0,0 +1 @@ +Include a default configuration file in the 'docs' directory. diff --git a/changelog.d/4794.misc b/changelog.d/4794.misc new file mode 100644 index 0000000000..99b543ecba --- /dev/null +++ b/changelog.d/4794.misc @@ -0,0 +1 @@ +Removed unnecessary $ from some federation endpoint path regexes. \ No newline at end of file diff --git a/changelog.d/4795.misc b/changelog.d/4795.misc new file mode 100644 index 0000000000..03995f42fe --- /dev/null +++ b/changelog.d/4795.misc @@ -0,0 +1 @@ +Remove link to deleted title in README. \ No newline at end of file diff --git a/changelog.d/4796.feature b/changelog.d/4796.feature new file mode 100644 index 0000000000..9e05560a3f --- /dev/null +++ b/changelog.d/4796.feature @@ -0,0 +1 @@ +Add support for /keys/query and /keys/changes REST endpoints to client_reader worker. diff --git a/changelog.d/4797.misc b/changelog.d/4797.misc new file mode 100644 index 0000000000..822e98e6a7 --- /dev/null +++ b/changelog.d/4797.misc @@ -0,0 +1 @@ +Clean up read-receipt handling. diff --git a/changelog.d/4798.misc b/changelog.d/4798.misc new file mode 100644 index 0000000000..d60f208dc3 --- /dev/null +++ b/changelog.d/4798.misc @@ -0,0 +1 @@ +Add some debug about processing read receipts. diff --git a/changelog.d/4799.misc b/changelog.d/4799.misc new file mode 100644 index 0000000000..5ab11a5c0b --- /dev/null +++ b/changelog.d/4799.misc @@ -0,0 +1 @@ +Clean up some replication code. diff --git a/debian/changelog b/debian/changelog index 8a59c708d7..fd77ce13a2 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,9 +1,9 @@ -matrix-synapse-py3 (0.99.2rc1) stable; urgency=medium +matrix-synapse-py3 (0.99.2) stable; urgency=medium * Fix overwriting of config settings on upgrade. - * New synapse release 0.99.2rc1. + * New synapse release 0.99.2. - -- Synapse Packaging team <packages@matrix.org> Wed, 27 Feb 2019 10:45:58 +0000 + -- Synapse Packaging team <packages@matrix.org> Fri, 01 Mar 2019 10:55:08 +0000 matrix-synapse-py3 (0.99.1.1) stable; urgency=medium diff --git a/docs/.sample_config_header.yaml b/docs/.sample_config_header.yaml new file mode 100644 index 0000000000..576fc98609 --- /dev/null +++ b/docs/.sample_config_header.yaml @@ -0,0 +1,7 @@ +# This file is a reference to the configuration options which can be set in +# homeserver.yaml. +# +# Note that it is not quite ready to be used as-is. If you are starting from +# scratch, it is easier to generate the config files following the instructions +# in INSTALL.md. + diff --git a/docs/reverse_proxy.rst b/docs/reverse_proxy.rst index 4706061eba..6cd129abf4 100644 --- a/docs/reverse_proxy.rst +++ b/docs/reverse_proxy.rst @@ -88,18 +88,16 @@ Let's assume that we expect clients to connect to our server at * HAProxy:: frontend https - bind 0.0.0.0:443 v4v6 ssl crt /etc/ssl/haproxy/ strict-sni alpn h2,http/1.1 - bind :::443 ssl crt /etc/ssl/haproxy/ strict-sni alpn h2,http/1.1 - + bind :::443 v4v6 ssl crt /etc/ssl/haproxy/ strict-sni alpn h2,http/1.1 + # Matrix client traffic acl matrix hdr(host) -i matrix.example.com use_backend matrix if matrix - + frontend matrix-federation - bind 0.0.0.0:8448 v4v6 ssl crt /etc/ssl/haproxy/synapse.pem alpn h2,http/1.1 - bind :::8448 ssl crt /etc/ssl/haproxy/synapse.pem alpn h2,http/1.1 + bind :::8448 v4v6 ssl crt /etc/ssl/haproxy/synapse.pem alpn h2,http/1.1 default_backend matrix - + backend matrix server matrix 127.0.0.1:8008 diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml new file mode 100644 index 0000000000..e0140003fd --- /dev/null +++ b/docs/sample_config.yaml @@ -0,0 +1,1052 @@ +# This file is a reference to the configuration options which can be set in +# homeserver.yaml. +# +# Note that it is not quite ready to be used as-is. If you are starting from +# scratch, it is easier to generate the config files following the instructions +# in INSTALL.md. + +## Server ## + +# The domain name of the server, with optional explicit port. +# This is used by remote servers to connect to this server, +# e.g. matrix.org, localhost:8080, etc. +# This is also the last part of your UserID. +# +server_name: "SERVERNAME" + +# When running as a daemon, the file to store the pid in +# +pid_file: DATADIR/homeserver.pid + +# CPU affinity mask. Setting this restricts the CPUs on which the +# process will be scheduled. It is represented as a bitmask, with the +# lowest order bit corresponding to the first logical CPU and the +# highest order bit corresponding to the last logical CPU. Not all CPUs +# may exist on a given system but a mask may specify more CPUs than are +# present. +# +# For example: +# 0x00000001 is processor #0, +# 0x00000003 is processors #0 and #1, +# 0xFFFFFFFF is all processors (#0 through #31). +# +# Pinning a Python process to a single CPU is desirable, because Python +# is inherently single-threaded due to the GIL, and can suffer a +# 30-40% slowdown due to cache blow-out and thread context switching +# if the scheduler happens to schedule the underlying threads across +# different cores. See +# https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/. +# +# This setting requires the affinity package to be installed! +# +#cpu_affinity: 0xFFFFFFFF + +# The path to the web client which will be served at /_matrix/client/ +# if 'webclient' is configured under the 'listeners' configuration. +# +#web_client_location: "/path/to/web/root" + +# The public-facing base URL that clients use to access this HS +# (not including _matrix/...). This is the same URL a user would +# enter into the 'custom HS URL' field on their client. If you +# use synapse with a reverse proxy, this should be the URL to reach +# synapse via the proxy. +# +#public_baseurl: https://example.com/ + +# Set the soft limit on the number of file descriptors synapse can use +# Zero is used to indicate synapse should set the soft limit to the +# hard limit. +# +soft_file_limit: 0 + +# Set to false to disable presence tracking on this homeserver. +# +use_presence: true + +# The GC threshold parameters to pass to `gc.set_threshold`, if defined +# +#gc_thresholds: [700, 10, 10] + +# Set the limit on the returned events in the timeline in the get +# and sync operations. The default value is -1, means no upper limit. +# +#filter_timeline_limit: 5000 + +# Whether room invites to users on this server should be blocked +# (except those sent by local server admins). The default is False. +# +#block_non_admin_invites: True + +# Room searching +# +# If disabled, new messages will not be indexed for searching and users +# will receive errors when searching for messages. Defaults to enabled. +# +#enable_search: false + +# Restrict federation to the following whitelist of domains. +# N.B. we recommend also firewalling your federation listener to limit +# inbound federation traffic as early as possible, rather than relying +# purely on this application-layer restriction. If not specified, the +# default is to whitelist everything. +# +#federation_domain_whitelist: +# - lon.example.com +# - nyc.example.com +# - syd.example.com + +# List of ports that Synapse should listen on, their purpose and their +# configuration. +# +# Options for each listener include: +# +# port: the TCP port to bind to +# +# bind_addresses: a list of local addresses to listen on. The default is +# 'all local interfaces'. +# +# type: the type of listener. Normally 'http', but other valid options are: +# 'manhole' (see docs/manhole.md), +# 'metrics' (see docs/metrics-howto.rst), +# 'replication' (see docs/workers.rst). +# +# tls: set to true to enable TLS for this listener. Will use the TLS +# key/cert specified in tls_private_key_path / tls_certificate_path. +# +# x_forwarded: Only valid for an 'http' listener. Set to true to use the +# X-Forwarded-For header as the client IP. Useful when Synapse is +# behind a reverse-proxy. +# +# resources: Only valid for an 'http' listener. A list of resources to host +# on this port. Options for each resource are: +# +# names: a list of names of HTTP resources. See below for a list of +# valid resource names. +# +# compress: set to true to enable HTTP comression for this resource. +# +# additional_resources: Only valid for an 'http' listener. A map of +# additional endpoints which should be loaded via dynamic modules. +# +# Valid resource names are: +# +# client: the client-server API (/_matrix/client). Also implies 'media' and +# 'static'. +# +# consent: user consent forms (/_matrix/consent). See +# docs/consent_tracking.md. +# +# federation: the server-server API (/_matrix/federation). Also implies +# 'media', 'keys', 'openid' +# +# keys: the key discovery API (/_matrix/keys). +# +# media: the media API (/_matrix/media). +# +# metrics: the metrics interface. See docs/metrics-howto.rst. +# +# openid: OpenID authentication. +# +# replication: the HTTP replication API (/_synapse/replication). See +# docs/workers.rst. +# +# static: static resources under synapse/static (/_matrix/static). (Mostly +# useful for 'fallback authentication'.) +# +# webclient: A web client. Requires web_client_location to be set. +# +listeners: + # TLS-enabled listener: for when matrix traffic is sent directly to synapse. + # + # Disabled by default. To enable it, uncomment the following. (Note that you + # will also need to give Synapse a TLS key and certificate: see the TLS section + # below.) + # + #- port: 8448 + # type: http + # tls: true + # resources: + # - names: [client, federation] + + # Unsecure HTTP listener: for when matrix traffic passes through a reverse proxy + # that unwraps TLS. + # + # If you plan to use a reverse proxy, please see + # https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst. + # + - port: 8008 + tls: false + bind_addresses: ['::1', '127.0.0.1'] + type: http + x_forwarded: true + + resources: + - names: [client, federation] + compress: false + + # example additonal_resources: + # + #additional_resources: + # "/_matrix/my/custom/endpoint": + # module: my_module.CustomRequestHandler + # config: {} + + # Turn on the twisted ssh manhole service on localhost on the given + # port. + # + #- port: 9000 + # bind_addresses: ['::1', '127.0.0.1'] + # type: manhole + + +## Homeserver blocking ## + +# How to reach the server admin, used in ResourceLimitError +# +#admin_contact: 'mailto:admin@server.com' + +# Global blocking +# +#hs_disabled: False +#hs_disabled_message: 'Human readable reason for why the HS is blocked' +#hs_disabled_limit_type: 'error code(str), to help clients decode reason' + +# Monthly Active User Blocking +# +#limit_usage_by_mau: False +#max_mau_value: 50 +#mau_trial_days: 2 + +# If enabled, the metrics for the number of monthly active users will +# be populated, however no one will be limited. If limit_usage_by_mau +# is true, this is implied to be true. +# +#mau_stats_only: False + +# Sometimes the server admin will want to ensure certain accounts are +# never blocked by mau checking. These accounts are specified here. +# +#mau_limit_reserved_threepids: +# - medium: 'email' +# address: 'reserved_user@example.com' + + +## TLS ## + +# PEM-encoded X509 certificate for TLS. +# This certificate, as of Synapse 1.0, will need to be a valid and verifiable +# certificate, signed by a recognised Certificate Authority. +# +# See 'ACME support' below to enable auto-provisioning this certificate via +# Let's Encrypt. +# +#tls_certificate_path: "CONFDIR/SERVERNAME.tls.crt" + +# PEM-encoded private key for TLS +# +#tls_private_key_path: "CONFDIR/SERVERNAME.tls.key" + +# ACME support: This will configure Synapse to request a valid TLS certificate +# for your configured `server_name` via Let's Encrypt. +# +# Note that provisioning a certificate in this way requires port 80 to be +# routed to Synapse so that it can complete the http-01 ACME challenge. +# By default, if you enable ACME support, Synapse will attempt to listen on +# port 80 for incoming http-01 challenges - however, this will likely fail +# with 'Permission denied' or a similar error. +# +# There are a couple of potential solutions to this: +# +# * If you already have an Apache, Nginx, or similar listening on port 80, +# you can configure Synapse to use an alternate port, and have your web +# server forward the requests. For example, assuming you set 'port: 8009' +# below, on Apache, you would write: +# +# ProxyPass /.well-known/acme-challenge http://localhost:8009/.well-known/acme-challenge +# +# * Alternatively, you can use something like `authbind` to give Synapse +# permission to listen on port 80. +# +acme: + # ACME support is disabled by default. Uncomment the following line + # (and tls_certificate_path and tls_private_key_path above) to enable it. + # + #enabled: true + + # Endpoint to use to request certificates. If you only want to test, + # use Let's Encrypt's staging url: + # https://acme-staging.api.letsencrypt.org/directory + # + #url: https://acme-v01.api.letsencrypt.org/directory + + # Port number to listen on for the HTTP-01 challenge. Change this if + # you are forwarding connections through Apache/Nginx/etc. + # + #port: 80 + + # Local addresses to listen on for incoming connections. + # Again, you may want to change this if you are forwarding connections + # through Apache/Nginx/etc. + # + #bind_addresses: ['::', '0.0.0.0'] + + # How many days remaining on a certificate before it is renewed. + # + #reprovision_threshold: 30 + + # The domain that the certificate should be for. Normally this + # should be the same as your Matrix domain (i.e., 'server_name'), but, + # by putting a file at 'https://<server_name>/.well-known/matrix/server', + # you can delegate incoming traffic to another server. If you do that, + # you should give the target of the delegation here. + # + # For example: if your 'server_name' is 'example.com', but + # 'https://example.com/.well-known/matrix/server' delegates to + # 'matrix.example.com', you should put 'matrix.example.com' here. + # + # If not set, defaults to your 'server_name'. + # + #domain: matrix.example.com + +# List of allowed TLS fingerprints for this server to publish along +# with the signing keys for this server. Other matrix servers that +# make HTTPS requests to this server will check that the TLS +# certificates returned by this server match one of the fingerprints. +# +# Synapse automatically adds the fingerprint of its own certificate +# to the list. So if federation traffic is handled directly by synapse +# then no modification to the list is required. +# +# If synapse is run behind a load balancer that handles the TLS then it +# will be necessary to add the fingerprints of the certificates used by +# the loadbalancers to this list if they are different to the one +# synapse is using. +# +# Homeservers are permitted to cache the list of TLS fingerprints +# returned in the key responses up to the "valid_until_ts" returned in +# key. It may be necessary to publish the fingerprints of a new +# certificate and wait until the "valid_until_ts" of the previous key +# responses have passed before deploying it. +# +# You can calculate a fingerprint from a given TLS listener via: +# openssl s_client -connect $host:$port < /dev/null 2> /dev/null | +# openssl x509 -outform DER | openssl sha256 -binary | base64 | tr -d '=' +# or by checking matrix.org/federationtester/api/report?server_name=$host +# +#tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}] + + + +## Database ## + +database: + # The database engine name + name: "sqlite3" + # Arguments to pass to the engine + args: + # Path to the database + database: "DATADIR/homeserver.db" + +# Number of events to cache in memory. +event_cache_size: "10K" + + +## Logging ## + +# A yaml python logging config file +# +log_config: "CONFDIR/SERVERNAME.log.config" + + +## Ratelimiting ## + +# Number of messages a client can send per second +# +rc_messages_per_second: 0.2 + +# Number of message a client can send before being throttled +# +rc_message_burst_count: 10.0 + +# The federation window size in milliseconds +# +federation_rc_window_size: 1000 + +# The number of federation requests from a single server in a window +# before the server will delay processing the request. +# +federation_rc_sleep_limit: 10 + +# The duration in milliseconds to delay processing events from +# remote servers by if they go over the sleep limit. +# +federation_rc_sleep_delay: 500 + +# The maximum number of concurrent federation requests allowed +# from a single server +# +federation_rc_reject_limit: 50 + +# The number of federation requests to concurrently process from a +# single server +# +federation_rc_concurrent: 3 + + + +# Directory where uploaded images and attachments are stored. +# +media_store_path: "DATADIR/media_store" + +# Media storage providers allow media to be stored in different +# locations. +# +#media_storage_providers: +# - module: file_system +# # Whether to write new local files. +# store_local: false +# # Whether to write new remote media +# store_remote: false +# # Whether to block upload requests waiting for write to this +# # provider to complete +# store_synchronous: false +# config: +# directory: /mnt/some/other/directory + +# Directory where in-progress uploads are stored. +# +uploads_path: "DATADIR/uploads" + +# The largest allowed upload size in bytes +# +max_upload_size: "10M" + +# Maximum number of pixels that will be thumbnailed +# +max_image_pixels: "32M" + +# Whether to generate new thumbnails on the fly to precisely match +# the resolution requested by the client. If true then whenever +# a new resolution is requested by the client the server will +# generate a new thumbnail. If false the server will pick a thumbnail +# from a precalculated list. +# +dynamic_thumbnails: false + +# List of thumbnails to precalculate when an image is uploaded. +# +thumbnail_sizes: +- width: 32 + height: 32 + method: crop +- width: 96 + height: 96 + method: crop +- width: 320 + height: 240 + method: scale +- width: 640 + height: 480 + method: scale +- width: 800 + height: 600 + method: scale + +# Is the preview URL API enabled? If enabled, you *must* specify +# an explicit url_preview_ip_range_blacklist of IPs that the spider is +# denied from accessing. +# +url_preview_enabled: False + +# List of IP address CIDR ranges that the URL preview spider is denied +# from accessing. There are no defaults: you must explicitly +# specify a list for URL previewing to work. You should specify any +# internal services in your network that you do not want synapse to try +# to connect to, otherwise anyone in any Matrix room could cause your +# synapse to issue arbitrary GET requests to your internal services, +# causing serious security issues. +# +#url_preview_ip_range_blacklist: +# - '127.0.0.0/8' +# - '10.0.0.0/8' +# - '172.16.0.0/12' +# - '192.168.0.0/16' +# - '100.64.0.0/10' +# - '169.254.0.0/16' +# - '::1/128' +# - 'fe80::/64' +# - 'fc00::/7' +# +# List of IP address CIDR ranges that the URL preview spider is allowed +# to access even if they are specified in url_preview_ip_range_blacklist. +# This is useful for specifying exceptions to wide-ranging blacklisted +# target IP ranges - e.g. for enabling URL previews for a specific private +# website only visible in your network. +# +#url_preview_ip_range_whitelist: +# - '192.168.1.1' + +# Optional list of URL matches that the URL preview spider is +# denied from accessing. You should use url_preview_ip_range_blacklist +# in preference to this, otherwise someone could define a public DNS +# entry that points to a private IP address and circumvent the blacklist. +# This is more useful if you know there is an entire shape of URL that +# you know that will never want synapse to try to spider. +# +# Each list entry is a dictionary of url component attributes as returned +# by urlparse.urlsplit as applied to the absolute form of the URL. See +# https://docs.python.org/2/library/urlparse.html#urlparse.urlsplit +# The values of the dictionary are treated as an filename match pattern +# applied to that component of URLs, unless they start with a ^ in which +# case they are treated as a regular expression match. If all the +# specified component matches for a given list item succeed, the URL is +# blacklisted. +# +#url_preview_url_blacklist: +# # blacklist any URL with a username in its URI +# - username: '*' +# +# # blacklist all *.google.com URLs +# - netloc: 'google.com' +# - netloc: '*.google.com' +# +# # blacklist all plain HTTP URLs +# - scheme: 'http' +# +# # blacklist http(s)://www.acme.com/foo +# - netloc: 'www.acme.com' +# path: '/foo' +# +# # blacklist any URL with a literal IPv4 address +# - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$' + +# The largest allowed URL preview spidering size in bytes +max_spider_size: "10M" + + + +## Captcha ## +# See docs/CAPTCHA_SETUP for full details of configuring this. + +# This Home Server's ReCAPTCHA public key. +# +recaptcha_public_key: "YOUR_PUBLIC_KEY" + +# This Home Server's ReCAPTCHA private key. +# +recaptcha_private_key: "YOUR_PRIVATE_KEY" + +# Enables ReCaptcha checks when registering, preventing signup +# unless a captcha is answered. Requires a valid ReCaptcha +# public/private key. +# +enable_registration_captcha: False + +# A secret key used to bypass the captcha test entirely. +#captcha_bypass_secret: "YOUR_SECRET_HERE" + +# The API endpoint to use for verifying m.login.recaptcha responses. +recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify" + + +## TURN ## + +# The public URIs of the TURN server to give to clients +# +#turn_uris: [] + +# The shared secret used to compute passwords for the TURN server +# +#turn_shared_secret: "YOUR_SHARED_SECRET" + +# The Username and password if the TURN server needs them and +# does not use a token +# +#turn_username: "TURNSERVER_USERNAME" +#turn_password: "TURNSERVER_PASSWORD" + +# How long generated TURN credentials last +# +turn_user_lifetime: "1h" + +# Whether guests should be allowed to use the TURN server. +# This defaults to True, otherwise VoIP will be unreliable for guests. +# However, it does introduce a slight security risk as it allows users to +# connect to arbitrary endpoints without having first signed up for a +# valid account (e.g. by passing a CAPTCHA). +# +turn_allow_guests: True + + +## Registration ## + +# Enable registration for new users. +enable_registration: False + +# The user must provide all of the below types of 3PID when registering. +# +#registrations_require_3pid: +# - email +# - msisdn + +# Explicitly disable asking for MSISDNs from the registration +# flow (overrides registrations_require_3pid if MSISDNs are set as required) +# +#disable_msisdn_registration: True + +# Mandate that users are only allowed to associate certain formats of +# 3PIDs with accounts on this server. +# +#allowed_local_3pids: +# - medium: email +# pattern: '.*@matrix\.org' +# - medium: email +# pattern: '.*@vector\.im' +# - medium: msisdn +# pattern: '\+44' + +# If set, allows registration by anyone who also has the shared +# secret, even if registration is otherwise disabled. +# +# registration_shared_secret: <PRIVATE STRING> + +# Set the number of bcrypt rounds used to generate password hash. +# Larger numbers increase the work factor needed to generate the hash. +# The default number is 12 (which equates to 2^12 rounds). +# N.B. that increasing this will exponentially increase the time required +# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins. +# +bcrypt_rounds: 12 + +# Allows users to register as guests without a password/email/etc, and +# participate in rooms hosted on this server which have been made +# accessible to anonymous users. +# +allow_guest_access: False + +# The identity server which we suggest that clients should use when users log +# in on this server. +# +# (By default, no suggestion is made, so it is left up to the client. +# This setting is ignored unless public_baseurl is also set.) +# +#default_identity_server: https://matrix.org + +# The list of identity servers trusted to verify third party +# identifiers by this server. +# +# Also defines the ID server which will be called when an account is +# deactivated (one will be picked arbitrarily). +# +trusted_third_party_id_servers: + - matrix.org + - vector.im + +# Users who register on this homeserver will automatically be joined +# to these rooms +# +#auto_join_rooms: +# - "#example:example.com" + +# Where auto_join_rooms are specified, setting this flag ensures that the +# the rooms exist by creating them when the first user on the +# homeserver registers. +# Setting to false means that if the rooms are not manually created, +# users cannot be auto-joined since they do not exist. +# +autocreate_auto_join_rooms: true + +# Number of registration requests a client can send per second. +# Defaults to 1/minute (0.17). +# +#rc_registration_requests_per_second: 0.17 + +# Number of registration requests a client can send before being +# throttled. +# Defaults to 3. +# +#rc_registration_request_burst_count: 3.0 + + +## Metrics ### + +# Enable collection and rendering of performance metrics +# +enable_metrics: False + +# Enable sentry integration +# NOTE: While attempts are made to ensure that the logs don't contain +# any sensitive information, this cannot be guaranteed. By enabling +# this option the sentry server may therefore receive sensitive +# information, and it in turn may then diseminate sensitive information +# through insecure notification channels if so configured. +# +#sentry: +# dsn: "..." + +# Whether or not to report anonymized homeserver usage statistics. +# report_stats: true|false + + +## API Configuration ## + +# A list of event types that will be included in the room_invite_state +# +room_invite_state_types: + - "m.room.join_rules" + - "m.room.canonical_alias" + - "m.room.avatar" + - "m.room.encryption" + - "m.room.name" + + +# A list of application service config file to use +# +app_service_config_files: [] + +# Whether or not to track application service IP addresses. Implicitly +# enables MAU tracking for application service users. +# +track_appservice_user_ips: False + + +# a secret which is used to sign access tokens. If none is specified, +# the registration_shared_secret is used, if one is given; otherwise, +# a secret key is derived from the signing key. +# +# macaroon_secret_key: <PRIVATE STRING> + +# Used to enable access token expiration. +# +expire_access_token: False + +# a secret which is used to calculate HMACs for form values, to stop +# falsification of values. Must be specified for the User Consent +# forms to work. +# +# form_secret: <PRIVATE STRING> + +## Signing Keys ## + +# Path to the signing key to sign messages with +# +signing_key_path: "CONFDIR/SERVERNAME.signing.key" + +# The keys that the server used to sign messages with but won't use +# to sign new messages. E.g. it has lost its private key +# +#old_signing_keys: +# "ed25519:auto": +# # Base64 encoded public key +# key: "The public part of your old signing key." +# # Millisecond POSIX timestamp when the key expired. +# expired_ts: 123456789123 + +# How long key response published by this server is valid for. +# Used to set the valid_until_ts in /key/v2 APIs. +# Determines how quickly servers will query to check which keys +# are still valid. +# +key_refresh_interval: "1d" # 1 Day. + +# The trusted servers to download signing keys from. +# +perspectives: + servers: + "matrix.org": + verify_keys: + "ed25519:auto": + key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw" + + + +# Enable SAML2 for registration and login. Uses pysaml2. +# +# `sp_config` is the configuration for the pysaml2 Service Provider. +# See pysaml2 docs for format of config. +# +# Default values will be used for the 'entityid' and 'service' settings, +# so it is not normally necessary to specify them unless you need to +# override them. +# +#saml2_config: +# sp_config: +# # point this to the IdP's metadata. You can use either a local file or +# # (preferably) a URL. +# metadata: +# #local: ["saml2/idp.xml"] +# remote: +# - url: https://our_idp/metadata.xml +# +# # The rest of sp_config is just used to generate our metadata xml, and you +# # may well not need it, depending on your setup. Alternatively you +# # may need a whole lot more detail - see the pysaml2 docs! +# +# description: ["My awesome SP", "en"] +# name: ["Test SP", "en"] +# +# organization: +# name: Example com +# display_name: +# - ["Example co", "en"] +# url: "http://example.com" +# +# contact_person: +# - given_name: Bob +# sur_name: "the Sysadmin" +# email_address": ["admin@example.com"] +# contact_type": technical +# +# # Instead of putting the config inline as above, you can specify a +# # separate pysaml2 configuration file: +# # +# config_path: "CONFDIR/sp_conf.py" + + + +# Enable CAS for registration and login. +# +#cas_config: +# enabled: true +# server_url: "https://cas-server.com" +# service_url: "https://homeserver.domain.com:8448" +# #required_attributes: +# # name: value + + +# The JWT needs to contain a globally unique "sub" (subject) claim. +# +#jwt_config: +# enabled: true +# secret: "a secret" +# algorithm: "HS256" + + + +# Enable password for login. +# +password_config: + enabled: true + # Uncomment and change to a secret random string for extra security. + # DO NOT CHANGE THIS AFTER INITIAL SETUP! + #pepper: "" + + + +# Enable sending emails for notification events +# Defining a custom URL for Riot is only needed if email notifications +# should contain links to a self-hosted installation of Riot; when set +# the "app_name" setting is ignored. +# +# If your SMTP server requires authentication, the optional smtp_user & +# smtp_pass variables should be used +# +#email: +# enable_notifs: false +# smtp_host: "localhost" +# smtp_port: 25 +# smtp_user: "exampleusername" +# smtp_pass: "examplepassword" +# require_transport_security: False +# notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>" +# app_name: Matrix +# # if template_dir is unset, uses the example templates that are part of +# # the Synapse distribution. +# #template_dir: res/templates +# notif_template_html: notif_mail.html +# notif_template_text: notif_mail.txt +# notif_for_new_users: True +# riot_base_url: "http://localhost/riot" + + +#password_providers: +# - module: "ldap_auth_provider.LdapAuthProvider" +# config: +# enabled: true +# uri: "ldap://ldap.example.com:389" +# start_tls: true +# base: "ou=users,dc=example,dc=com" +# attributes: +# uid: "cn" +# mail: "email" +# name: "givenName" +# #bind_dn: +# #bind_password: +# #filter: "(objectClass=posixAccount)" + + + +# Clients requesting push notifications can either have the body of +# the message sent in the notification poke along with other details +# like the sender, or just the event ID and room ID (`event_id_only`). +# If clients choose the former, this option controls whether the +# notification request includes the content of the event (other details +# like the sender are still included). For `event_id_only` push, it +# has no effect. +# +# For modern android devices the notification content will still appear +# because it is loaded by the app. iPhone, however will send a +# notification saying only that a message arrived and who it came from. +# +#push: +# include_content: true + + +#spam_checker: +# module: "my_custom_project.SuperSpamChecker" +# config: +# example_option: 'things' + + +# Whether to allow non server admins to create groups on this server +# +enable_group_creation: false + +# If enabled, non server admins can only create groups with local parts +# starting with this prefix +# +#group_creation_prefix: "unofficial/" + + + +# User Directory configuration +# +# 'search_all_users' defines whether to search all users visible to your HS +# when searching the user directory, rather than limiting to users visible +# in public rooms. Defaults to false. If you set it True, you'll have to run +# UPDATE user_directory_stream_pos SET stream_id = NULL; +# on your database to tell it to rebuild the user_directory search indexes. +# +#user_directory: +# search_all_users: false + + +# User Consent configuration +# +# for detailed instructions, see +# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md +# +# Parts of this section are required if enabling the 'consent' resource under +# 'listeners', in particular 'template_dir' and 'version'. +# +# 'template_dir' gives the location of the templates for the HTML forms. +# This directory should contain one subdirectory per language (eg, 'en', 'fr'), +# and each language directory should contain the policy document (named as +# '<version>.html') and a success page (success.html). +# +# 'version' specifies the 'current' version of the policy document. It defines +# the version to be served by the consent resource if there is no 'v' +# parameter. +# +# 'server_notice_content', if enabled, will send a user a "Server Notice" +# asking them to consent to the privacy policy. The 'server_notices' section +# must also be configured for this to work. Notices will *not* be sent to +# guest users unless 'send_server_notice_to_guests' is set to true. +# +# 'block_events_error', if set, will block any attempts to send events +# until the user consents to the privacy policy. The value of the setting is +# used as the text of the error. +# +# 'require_at_registration', if enabled, will add a step to the registration +# process, similar to how captcha works. Users will be required to accept the +# policy before their account is created. +# +# 'policy_name' is the display name of the policy users will see when registering +# for an account. Has no effect unless `require_at_registration` is enabled. +# Defaults to "Privacy Policy". +# +#user_consent: +# template_dir: res/templates/privacy +# version: 1.0 +# server_notice_content: +# msgtype: m.text +# body: >- +# To continue using this homeserver you must review and agree to the +# terms and conditions at %(consent_uri)s +# send_server_notice_to_guests: True +# block_events_error: >- +# To continue using this homeserver you must review and agree to the +# terms and conditions at %(consent_uri)s +# require_at_registration: False +# policy_name: Privacy Policy +# + + +# Server Notices room configuration +# +# Uncomment this section to enable a room which can be used to send notices +# from the server to users. It is a special room which cannot be left; notices +# come from a special "notices" user id. +# +# If you uncomment this section, you *must* define the system_mxid_localpart +# setting, which defines the id of the user which will be used to send the +# notices. +# +# It's also possible to override the room name, the display name of the +# "notices" user, and the avatar for the user. +# +#server_notices: +# system_mxid_localpart: notices +# system_mxid_display_name: "Server Notices" +# system_mxid_avatar_url: "mxc://server.com/oumMVlgDnLYFaPVkExemNVVZ" +# room_name: "Server Notices" + + + +# The `alias_creation` option controls who's allowed to create aliases +# on this server. +# +# The format of this option is a list of rules that contain globs that +# match against user_id, room_id and the new alias (fully qualified with +# server name). The action in the first rule that matches is taken, +# which can currently either be "allow" or "deny". +# +# Missing user_id/room_id/alias fields default to "*". +# +# If no rules match the request is denied. An empty list means no one +# can create aliases. +# +# Options for the rules include: +# +# user_id: Matches against the creator of the alias +# alias: Matches against the alias being created +# room_id: Matches against the room ID the alias is being pointed at +# action: Whether to "allow" or "deny" the request if the rule matches +# +# The default is: +# +#alias_creation_rules: +# - user_id: "*" +# alias: "*" +# room_id: "*" +# action: allow + +# The `room_list_publication_rules` option controls who can publish and +# which rooms can be published in the public room list. +# +# The format of this option is the same as that for +# `alias_creation_rules`. +# +# If the room has one or more aliases associated with it, only one of +# the aliases needs to match the alias rule. If there are no aliases +# then only rules with `alias: *` match. +# +# If no rules match the request is denied. An empty list means no one +# can publish rooms. +# +# Options for the rules include: +# +# user_id: Matches agaisnt the creator of the alias +# room_id: Matches against the room ID being published +# alias: Matches against any current local or canonical aliases +# associated with the room +# action: Whether to "allow" or "deny" the request if the rule matches +# +# The default is: +# +#room_list_publication_rules: +# - user_id: "*" +# alias: "*" +# room_id: "*" +# action: allow diff --git a/docs/workers.rst b/docs/workers.rst index 3c18db1b19..d80fc04d2e 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -225,6 +225,8 @@ following regular expressions:: ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ ^/_matrix/client/(api/v1|r0|unstable)/login$ ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ + ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ + ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ Additionally, the following REST endpoints can be handled, but all requests must be routed to the same instance:: diff --git a/scripts-dev/generate_sample_config b/scripts-dev/generate_sample_config new file mode 100755 index 0000000000..5e33b9b549 --- /dev/null +++ b/scripts-dev/generate_sample_config @@ -0,0 +1,18 @@ +#!/bin/bash +# +# Update/check the docs/sample_config.yaml + +set -e + +cd `dirname $0`/.. + +SAMPLE_CONFIG="docs/sample_config.yaml" + +if [ "$1" == "--check" ]; then + diff -u "$SAMPLE_CONFIG" <(./scripts/generate_config --header-file docs/.sample_config_header.yaml) >/dev/null || { + echo -e "\e[1m\e[31m$SAMPLE_CONFIG is not up-to-date. Regenerate it with \`scripts-dev/generate_sample_config\`.\e[0m" >&2 + exit 1 + } +else + ./scripts/generate_config --header-file docs/.sample_config_header.yaml -o "$SAMPLE_CONFIG" +fi diff --git a/scripts/generate_config b/scripts/generate_config index 61c5f049e8..93b6406992 100755 --- a/scripts/generate_config +++ b/scripts/generate_config @@ -1,6 +1,7 @@ #!/usr/bin/env python import argparse +import shutil import sys from synapse.config.homeserver import HomeServerConfig @@ -50,6 +51,13 @@ if __name__ == "__main__": help="File to write the configuration to. Default: stdout", ) + parser.add_argument( + "--header-file", + type=argparse.FileType('r'), + help="File from which to read a header, which will be printed before the " + "generated config.", + ) + args = parser.parse_args() report_stats = args.report_stats @@ -64,4 +72,7 @@ if __name__ == "__main__": report_stats=report_stats, ) + if args.header_file: + shutil.copyfileobj(args.header_file, args.output_file) + args.output_file.write(conf) diff --git a/synapse/__init__.py b/synapse/__init__.py index 29b1fe4c03..25c10244d3 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -27,4 +27,4 @@ try: except ImportError: pass -__version__ = "0.99.2rc1" +__version__ = "0.99.2" diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index 3bb5b3da37..ad68079eeb 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -23,12 +23,13 @@ class Ratelimiter(object): def __init__(self): self.message_counts = collections.OrderedDict() - def send_message(self, user_id, time_now_s, msg_rate_hz, burst_count, update=True): - """Can the user send a message? + def can_do_action(self, key, time_now_s, rate_hz, burst_count, update=True): + """Can the entity (e.g. user or IP address) perform the action? Args: - user_id: The user sending a message. + key: The key we should use when rate limiting. Can be a user ID + (when sending events), an IP address, etc. time_now_s: The time now. - msg_rate_hz: The long term number of messages a user can send in a + rate_hz: The long term number of messages a user can send in a second. burst_count: How many messages the user can send before being limited. @@ -41,10 +42,10 @@ class Ratelimiter(object): """ self.prune_message_counts(time_now_s) message_count, time_start, _ignored = self.message_counts.get( - user_id, (0., time_now_s, None), + key, (0., time_now_s, None), ) time_delta = time_now_s - time_start - sent_count = message_count - time_delta * msg_rate_hz + sent_count = message_count - time_delta * rate_hz if sent_count < 0: allowed = True time_start = time_now_s @@ -56,13 +57,13 @@ class Ratelimiter(object): message_count += 1 if update: - self.message_counts[user_id] = ( - message_count, time_start, msg_rate_hz + self.message_counts[key] = ( + message_count, time_start, rate_hz ) - if msg_rate_hz > 0: + if rate_hz > 0: time_allowed = ( - time_start + (message_count - burst_count + 1) / msg_rate_hz + time_start + (message_count - burst_count + 1) / rate_hz ) if time_allowed < time_now_s: time_allowed = time_now_s @@ -72,12 +73,12 @@ class Ratelimiter(object): return allowed, time_allowed def prune_message_counts(self, time_now_s): - for user_id in list(self.message_counts.keys()): - message_count, time_start, msg_rate_hz = ( - self.message_counts[user_id] + for key in list(self.message_counts.keys()): + message_count, time_start, rate_hz = ( + self.message_counts[key] ) time_delta = time_now_s - time_start - if message_count - time_delta * msg_rate_hz > 0: + if message_count - time_delta * rate_hz > 0: break else: - del self.message_counts[user_id] + del self.message_counts[key] diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 5070094cad..beaea64a61 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -33,9 +33,13 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore @@ -49,6 +53,7 @@ from synapse.rest.client.v1.room import ( RoomStateRestServlet, ) from synapse.rest.client.v2_alpha.account import ThreepidRestServlet +from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -61,6 +66,10 @@ logger = logging.getLogger("synapse.app.client_reader") class ClientReaderSlavedStore( + SlavedDeviceInboxStore, + SlavedDeviceStore, + SlavedReceiptsStore, + SlavedPushRuleStore, SlavedAccountDataStore, SlavedEventStore, SlavedKeyStore, @@ -98,6 +107,8 @@ class ClientReaderServer(HomeServer): RegisterRestServlet(self).register(resource) LoginRestServlet(self).register(resource) ThreepidRestServlet(self).register(resource) + KeyQueryServlet(self).register(resource) + KeyChangesServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 5aec43b702..c4d3087fa4 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -180,9 +180,7 @@ class Config(object): Returns: str: the yaml config file """ - default_config = "# vim:ft=yaml\n" - - default_config += "\n\n".join( + default_config = "\n\n".join( dedent(conf) for conf in self.invoke_all( "default_config", @@ -297,19 +295,26 @@ class Config(object): "Must specify a server_name to a generate config for." " Pass -H server.name." ) + + config_str = obj.generate_config( + config_dir_path=config_dir_path, + data_dir_path=os.getcwd(), + server_name=server_name, + report_stats=(config_args.report_stats == "yes"), + generate_secrets=True, + ) + if not cls.path_exists(config_dir_path): os.makedirs(config_dir_path) with open(config_path, "w") as config_file: - config_str = obj.generate_config( - config_dir_path=config_dir_path, - data_dir_path=os.getcwd(), - server_name=server_name, - report_stats=(config_args.report_stats == "yes"), - generate_secrets=True, + config_file.write( + "# vim:ft=yaml\n\n" ) - config = yaml.load(config_str) - obj.invoke_all("generate_files", config) config_file.write(config_str) + + config = yaml.load(config_str) + obj.invoke_all("generate_files", config) + print( ( "A config file has been generated in %r for server name" diff --git a/synapse/config/database.py b/synapse/config/database.py index c8890147a6..63e9cb63f8 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -49,7 +49,8 @@ class DatabaseConfig(Config): def default_config(self, data_dir_path, **kwargs): database_path = os.path.join(data_dir_path, "homeserver.db") return """\ - # Database configuration + ## Database ## + database: # The database engine name name: "sqlite3" diff --git a/synapse/config/logger.py b/synapse/config/logger.py index f6940b65fd..464c28c2d9 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -81,7 +81,9 @@ class LoggingConfig(Config): def default_config(self, config_dir_path, server_name, **kwargs): log_config = os.path.join(config_dir_path, server_name + ".log.config") - return """ + return """\ + ## Logging ## + # A yaml python logging config file # log_config: "%(log_config)s" diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 2881482f96..d32f6fff73 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -54,6 +54,13 @@ class RegistrationConfig(Config): config.get("disable_msisdn_registration", False) ) + self.rc_registration_requests_per_second = config.get( + "rc_registration_requests_per_second", 0.17, + ) + self.rc_registration_request_burst_count = config.get( + "rc_registration_request_burst_count", 3, + ) + def default_config(self, generate_secrets=False, **kwargs): if generate_secrets: registration_shared_secret = 'registration_shared_secret: "%s"' % ( @@ -140,6 +147,17 @@ class RegistrationConfig(Config): # users cannot be auto-joined since they do not exist. # autocreate_auto_join_rooms: true + + # Number of registration requests a client can send per second. + # Defaults to 1/minute (0.17). + # + #rc_registration_requests_per_second: 0.17 + + # Number of registration requests a client can send before being + # throttled. + # Defaults to 3. + # + #rc_registration_request_burst_count: 3.0 """ % locals() def add_arguments(self, parser): diff --git a/synapse/config/server.py b/synapse/config/server.py index 4200f10da3..35a322fee0 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -260,9 +260,11 @@ class ServerConfig(Config): # This is used by remote servers to connect to this server, # e.g. matrix.org, localhost:8080, etc. # This is also the last part of your UserID. + # server_name: "%(server_name)s" # When running as a daemon, the file to store the pid in + # pid_file: %(pid_file)s # CPU affinity mask. Setting this restricts the CPUs on which the @@ -304,9 +306,11 @@ class ServerConfig(Config): # Set the soft limit on the number of file descriptors synapse can use # Zero is used to indicate synapse should set the soft limit to the # hard limit. + # soft_file_limit: 0 # Set to false to disable presence tracking on this homeserver. + # use_presence: true # The GC threshold parameters to pass to `gc.set_threshold`, if defined diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6f5995735a..b7d0b25781 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -159,8 +159,12 @@ class FederationRemoteSendQueue(object): # stream. pass - def send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu(self, destination, edu_type, content, key=None): """As per TransactionQueue""" + if destination == self.server_name: + logger.info("Not sending EDU to ourselves") + return + pos = self._next_pos() edu = Edu( @@ -465,15 +469,11 @@ def process_rows_for_federation(transaction_queue, rows): for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): - transaction_queue.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) + transaction_queue.send_edu(edu, key) for destination, edu_list in iteritems(buff.edus): for edu in edu_list: - transaction_queue.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) + transaction_queue.send_edu(edu, None) for destination in buff.device_destinations: transaction_queue.send_device_messages(destination) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 30941f5ad6..e5e42c647d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -361,7 +361,19 @@ class TransactionQueue(object): self._attempt_new_transaction(destination) - def send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu(self, destination, edu_type, content, key=None): + """Construct an Edu object, and queue it for sending + + Args: + destination (str): name of server to send to + edu_type (str): type of EDU to send + content (dict): content of EDU + key (Any|None): clobbering key for this edu + """ + if destination == self.server_name: + logger.info("Not sending EDU to ourselves") + return + edu = Edu( origin=self.server_name, destination=destination, @@ -369,18 +381,23 @@ class TransactionQueue(object): content=content, ) - if destination == self.server_name: - logger.info("Not sending EDU to ourselves") - return + self.send_edu(edu, key) + + def send_edu(self, edu, key): + """Queue an EDU for sending + Args: + edu (Edu): edu to send + key (Any|None): clobbering key for this edu + """ if key: self.pending_edus_keyed_by_dest.setdefault( - destination, {} + edu.destination, {} )[(edu.edu_type, key)] = edu else: - self.pending_edus_by_dest.setdefault(destination, []).append(edu) + self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu) - self._attempt_new_transaction(destination) + self._attempt_new_transaction(edu.destination) def send_device_messages(self, destination): if destination == self.server_name: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ebb81be377..96d680a5ad 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -759,7 +759,7 @@ class FederationVersionServlet(BaseFederationServlet): class FederationGroupsProfileServlet(BaseFederationServlet): """Get/set the basic profile of a group on behalf of a user """ - PATH = "/groups/(?P<group_id>[^/]*)/profile$" + PATH = "/groups/(?P<group_id>[^/]*)/profile" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -787,7 +787,7 @@ class FederationGroupsProfileServlet(BaseFederationServlet): class FederationGroupsSummaryServlet(BaseFederationServlet): - PATH = "/groups/(?P<group_id>[^/]*)/summary$" + PATH = "/groups/(?P<group_id>[^/]*)/summary" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -805,7 +805,7 @@ class FederationGroupsSummaryServlet(BaseFederationServlet): class FederationGroupsRoomsServlet(BaseFederationServlet): """Get the rooms in a group on behalf of a user """ - PATH = "/groups/(?P<group_id>[^/]*)/rooms$" + PATH = "/groups/(?P<group_id>[^/]*)/rooms" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -823,7 +823,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet): class FederationGroupsAddRoomsServlet(BaseFederationServlet): """Add/remove room from group """ - PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)$" + PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, room_id): @@ -855,7 +855,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): """ PATH = ( "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)" - "/config/(?P<config_key>[^/]*)$" + "/config/(?P<config_key>[^/]*)" ) @defer.inlineCallbacks @@ -874,7 +874,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): class FederationGroupsUsersServlet(BaseFederationServlet): """Get the users in a group on behalf of a user """ - PATH = "/groups/(?P<group_id>[^/]*)/users$" + PATH = "/groups/(?P<group_id>[^/]*)/users" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -892,7 +892,7 @@ class FederationGroupsUsersServlet(BaseFederationServlet): class FederationGroupsInvitedUsersServlet(BaseFederationServlet): """Get the users that have been invited to a group """ - PATH = "/groups/(?P<group_id>[^/]*)/invited_users$" + PATH = "/groups/(?P<group_id>[^/]*)/invited_users" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -910,7 +910,7 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet): class FederationGroupsInviteServlet(BaseFederationServlet): """Ask a group server to invite someone to the group """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -928,7 +928,7 @@ class FederationGroupsInviteServlet(BaseFederationServlet): class FederationGroupsAcceptInviteServlet(BaseFederationServlet): """Accept an invitation from the group server """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -945,7 +945,7 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet): class FederationGroupsJoinServlet(BaseFederationServlet): """Attempt to join a group """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -962,7 +962,7 @@ class FederationGroupsJoinServlet(BaseFederationServlet): class FederationGroupsRemoveUserServlet(BaseFederationServlet): """Leave or kick a user from the group """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -980,7 +980,7 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): class FederationGroupsLocalInviteServlet(BaseFederationServlet): """A group server has invited a local user """ - PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$" + PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -997,7 +997,7 @@ class FederationGroupsLocalInviteServlet(BaseFederationServlet): class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): """A group server has removed a local user """ - PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$" + PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -1014,7 +1014,7 @@ class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): """A group or user's server renews their attestation """ - PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)$" + PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -1037,7 +1037,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): PATH = ( "/groups/(?P<group_id>[^/]*)/summary" "(/categories/(?P<category_id>[^/]+))?" - "/rooms/(?P<room_id>[^/]*)$" + "/rooms/(?P<room_id>[^/]*)" ) @defer.inlineCallbacks @@ -1080,7 +1080,7 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet): """Get all categories for a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/categories/$" + "/groups/(?P<group_id>[^/]*)/categories/" ) @defer.inlineCallbacks @@ -1100,7 +1100,7 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): """Add/remove/get a category in a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)$" + "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)" ) @defer.inlineCallbacks @@ -1150,7 +1150,7 @@ class FederationGroupsRolesServlet(BaseFederationServlet): """Get roles in a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/roles/$" + "/groups/(?P<group_id>[^/]*)/roles/" ) @defer.inlineCallbacks @@ -1170,7 +1170,7 @@ class FederationGroupsRoleServlet(BaseFederationServlet): """Add/remove/get a role in a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)$" + "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)" ) @defer.inlineCallbacks @@ -1226,7 +1226,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): PATH = ( "/groups/(?P<group_id>[^/]*)/summary" "(/roles/(?P<role_id>[^/]+))?" - "/users/(?P<user_id>[^/]*)$" + "/users/(?P<user_id>[^/]*)" ) @defer.inlineCallbacks @@ -1269,7 +1269,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): """Get roles in a group """ PATH = ( - "/get_groups_publicised$" + "/get_groups_publicised" ) @defer.inlineCallbacks @@ -1284,7 +1284,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): """Sets whether a group is joinable without an invite or knock """ - PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$" + PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy" @defer.inlineCallbacks def on_PUT(self, origin, content, query, group_id): diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 594754cfd8..d8d86d6ff3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -93,9 +93,9 @@ class BaseHandler(object): messages_per_second = self.hs.config.rc_messages_per_second burst_count = self.hs.config.rc_message_burst_count - allowed, time_allowed = self.ratelimiter.send_message( + allowed, time_allowed = self.ratelimiter.can_do_action( user_id, time_now, - msg_rate_hz=messages_per_second, + rate_hz=messages_per_second, burst_count=burst_count, update=update, ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c708c35d4d..c09a7c6280 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -37,13 +37,185 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) -class DeviceHandler(BaseHandler): +class DeviceWorkerHandler(BaseHandler): def __init__(self, hs): - super(DeviceHandler, self).__init__(hs) + super(DeviceWorkerHandler, self).__init__(hs) self.hs = hs self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """ + Retrieve the given user's devices + + Args: + user_id (str): + Returns: + defer.Deferred: list[dict[str, X]]: info on each device + """ + + device_map = yield self.store.get_devices_by_user(user_id) + + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id=None + ) + + devices = list(device_map.values()) + for device in devices: + _update_device_from_client_ips(device, ips) + + defer.returnValue(devices) + + @defer.inlineCallbacks + def get_device(self, user_id, device_id): + """ Retrieve the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: dict[str, X]: info on the device + Raises: + errors.NotFoundError: if the device was not found + """ + try: + device = yield self.store.get_device(user_id, device_id) + except errors.StoreError: + raise errors.NotFoundError + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id, + ) + _update_device_from_client_ips(device, ips) + defer.returnValue(device) + + @measure_func("device.get_user_ids_changed") + @defer.inlineCallbacks + def get_user_ids_changed(self, user_id, from_token): + """Get list of users that have had the devices updated, or have newly + joined a room, that `user_id` may be interested in. + + Args: + user_id (str) + from_token (StreamToken) + """ + now_room_key = yield self.store.get_room_events_max_id() + + room_ids = yield self.store.get_rooms_for_user(user_id) + + # First we check if any devices have changed + changed = yield self.store.get_user_whose_devices_changed( + from_token.device_list_key + ) + + # Then work out if any users have since joined + rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + + member_events = yield self.store.get_membership_changes_for_user( + user_id, from_token.room_key, now_room_key, + ) + rooms_changed.update(event.room_id for event in member_events) + + stream_ordering = RoomStreamToken.parse_stream_token( + from_token.room_key + ).stream + + possibly_changed = set(changed) + possibly_left = set() + for room_id in rooms_changed: + current_state_ids = yield self.store.get_current_state_ids(room_id) + + # The user may have left the room + # TODO: Check if they actually did or if we were just invited. + if room_id not in room_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_left.add(state_key) + continue + + # Fetch the current state at the time. + try: + event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering=stream_ordering + ) + except errors.StoreError: + # we have purged the stream_ordering index since the stream + # ordering: treat it the same as a new room + event_ids = [] + + # special-case for an empty prev state: include all members + # in the changed list + if not event_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + continue + + current_member_id = current_state_ids.get((EventTypes.Member, user_id)) + if not current_member_id: + continue + + # mapping from event_id -> state_dict + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + + # Check if we've joined the room? If so we just blindly add all the users to + # the "possibly changed" users. + for state_dict in itervalues(prev_state_ids): + member_event = state_dict.get((EventTypes.Member, user_id), None) + if not member_event or member_event != current_member_id: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + break + + # If there has been any change in membership, include them in the + # possibly changed list. We'll check if they are joined below, + # and we're not toooo worried about spuriously adding users. + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + + # check if this member has changed since any of the extremities + # at the stream_ordering, and add them to the list if so. + for state_dict in itervalues(prev_state_ids): + prev_event_id = state_dict.get(key, None) + if not prev_event_id or prev_event_id != event_id: + if state_key != user_id: + possibly_changed.add(state_key) + break + + if possibly_changed or possibly_left: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = (possibly_changed | possibly_left) - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) + + +class DeviceHandler(DeviceWorkerHandler): + def __init__(self, hs): + super(DeviceHandler, self).__init__(hs) + self.federation_sender = hs.get_federation_sender() self._edu_updater = DeviceListEduUpdater(hs, self) @@ -104,52 +276,6 @@ class DeviceHandler(BaseHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") @defer.inlineCallbacks - def get_devices_by_user(self, user_id): - """ - Retrieve the given user's devices - - Args: - user_id (str): - Returns: - defer.Deferred: list[dict[str, X]]: info on each device - """ - - device_map = yield self.store.get_devices_by_user(user_id) - - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id=None - ) - - devices = list(device_map.values()) - for device in devices: - _update_device_from_client_ips(device, ips) - - defer.returnValue(devices) - - @defer.inlineCallbacks - def get_device(self, user_id, device_id): - """ Retrieve the given device - - Args: - user_id (str): - device_id (str): - - Returns: - defer.Deferred: dict[str, X]: info on the device - Raises: - errors.NotFoundError: if the device was not found - """ - try: - device = yield self.store.get_device(user_id, device_id) - except errors.StoreError: - raise errors.NotFoundError - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id, - ) - _update_device_from_client_ips(device, ips) - defer.returnValue(device) - - @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -287,126 +413,6 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) - @measure_func("device.get_user_ids_changed") - @defer.inlineCallbacks - def get_user_ids_changed(self, user_id, from_token): - """Get list of users that have had the devices updated, or have newly - joined a room, that `user_id` may be interested in. - - Args: - user_id (str) - from_token (StreamToken) - """ - now_token = yield self.hs.get_event_sources().get_current_token() - - room_ids = yield self.store.get_rooms_for_user(user_id) - - # First we check if any devices have changed - changed = yield self.store.get_user_whose_devices_changed( - from_token.device_list_key - ) - - # Then work out if any users have since joined - rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) - - member_events = yield self.store.get_membership_changes_for_user( - user_id, from_token.room_key, now_token.room_key - ) - rooms_changed.update(event.room_id for event in member_events) - - stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key - ).stream - - possibly_changed = set(changed) - possibly_left = set() - for room_id in rooms_changed: - current_state_ids = yield self.store.get_current_state_ids(room_id) - - # The user may have left the room - # TODO: Check if they actually did or if we were just invited. - if room_id not in room_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_left.add(state_key) - continue - - # Fetch the current state at the time. - try: - event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_ordering=stream_ordering - ) - except errors.StoreError: - # we have purged the stream_ordering index since the stream - # ordering: treat it the same as a new room - event_ids = [] - - # special-case for an empty prev state: include all members - # in the changed list - if not event_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - continue - - current_member_id = current_state_ids.get((EventTypes.Member, user_id)) - if not current_member_id: - continue - - # mapping from event_id -> state_dict - prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) - - # Check if we've joined the room? If so we just blindly add all the users to - # the "possibly changed" users. - for state_dict in itervalues(prev_state_ids): - member_event = state_dict.get((EventTypes.Member, user_id), None) - if not member_event or member_event != current_member_id: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - break - - # If there has been any change in membership, include them in the - # possibly changed list. We'll check if they are joined below, - # and we're not toooo worried about spuriously adding users. - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - - # check if this member has changed since any of the extremities - # at the stream_ordering, and add them to the list if so. - for state_dict in itervalues(prev_state_ids): - prev_event_id = state_dict.get(key, None) - if not prev_event_id or prev_event_id != event_id: - if state_key != user_id: - possibly_changed.add(state_key) - break - - if possibly_changed or possibly_left: - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - possibly_joined = possibly_changed & users_who_share_room - possibly_left = (possibly_changed | possibly_left) - users_who_share_room - else: - possibly_joined = [] - possibly_left = [] - - defer.returnValue({ - "changed": list(possibly_joined), - "left": list(possibly_left), - }) - @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f80486102a..72b63d64d0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -858,6 +858,52 @@ class FederationHandler(BaseHandler): logger.debug("Not backfilling as no extremeties found.") return + # We only want to paginate if we can actually see the events we'll get, + # as otherwise we'll just spend a lot of resources to get redacted + # events. + # + # We do this by filtering all the backwards extremities and seeing if + # any remain. Given we don't have the extremity events themselves, we + # need to actually check the events that reference them. + # + # *Note*: the spec wants us to keep backfilling until we reach the start + # of the room in case we are allowed to see some of the history. However + # in practice that causes more issues than its worth, as a) its + # relatively rare for there to be any visible history and b) even when + # there is its often sufficiently long ago that clients would stop + # attempting to paginate before backfill reached the visible history. + # + # TODO: If we do do a backfill then we should filter the backwards + # extremities to only include those that point to visible portions of + # history. + # + # TODO: Correctly handle the case where we are allowed to see the + # forward event but not the backward extremity, e.g. in the case of + # initial join of the server where we are allowed to see the join + # event but not anything before it. This would require looking at the + # state *before* the event, ignoring the special casing certain event + # types have. + + forward_events = yield self.store.get_successor_events( + list(extremities), + ) + + extremities_events = yield self.store.get_events( + forward_events, + check_redacted=False, + get_prev_content=False, + ) + + # We set `check_history_visibility_only` as we might otherwise get false + # positives from users having been erased. + filtered_extremities = yield filter_events_for_server( + self.store, self.server_name, list(extremities_events.values()), + redact=False, check_history_visibility_only=True, + ) + + if not filtered_extremities: + defer.returnValue(False) + # Check if we reached a point where we should start backfilling. sorted_extremeties_tuple = sorted( extremities.items(), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3981fe69ce..c762b58902 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -436,10 +436,11 @@ class EventCreationHandler(object): if event.is_state(): prev_state = yield self.deduplicate_state_event(event, context) - logger.info( - "Not bothering to persist duplicate state event %s", event.event_id, - ) if prev_state is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, prev_state.event_id, + ) defer.returnValue(prev_state) yield self.handle_new_client_event( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ba3856674d..37e87fc054 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -816,7 +816,7 @@ class PresenceHandler(object): if self.is_mine(observed_user): yield self.invite_presence(observed_user, observer_user) else: - yield self.federation.send_edu( + yield self.federation.build_and_send_edu( destination=observed_user.domain, edu_type="m.presence_invite", content={ @@ -836,7 +836,7 @@ class PresenceHandler(object): if self.is_mine(observer_user): yield self.accept_presence(observed_user, observer_user) else: - self.federation.send_edu( + self.federation.build_and_send_edu( destination=observer_user.domain, edu_type="m.presence_accept", content={ @@ -848,7 +848,7 @@ class PresenceHandler(object): state_dict = yield self.get_state(observed_user, as_event=False) state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) - self.federation.send_edu( + self.federation.build_and_send_edu( destination=observer_user.domain, edu_type="m.presence", content={ diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 696469732c..1728089667 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -16,7 +16,6 @@ import logging from twisted.internet import defer -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id from ._base import BaseHandler @@ -39,31 +38,6 @@ class ReceiptsHandler(BaseHandler): self.state = hs.get_state_handler() @defer.inlineCallbacks - def received_client_receipt(self, room_id, receipt_type, user_id, - event_id): - """Called when a client tells us a local user has read up to the given - event_id in the room. - """ - receipt = { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_ids": [event_id], - "data": { - "ts": int(self.clock.time_msec()), - } - } - - is_new = yield self._handle_new_receipts([receipt]) - - if is_new: - # fire off a process in the background to send the receipt to - # remote servers - run_as_background_process( - 'push_receipts_to_remotes', self._push_remotes, receipt - ) - - @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): """Called when we receive an EDU of type m.receipt from a remote HS. """ @@ -128,43 +102,54 @@ class ReceiptsHandler(BaseHandler): defer.returnValue(True) @defer.inlineCallbacks - def _push_remotes(self, receipt): - """Given a receipt, works out which remote servers should be - poked and pokes them. + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + """Called when a client tells us a local user has read up to the given + event_id in the room. """ - try: - # TODO: optimise this to move some of the work to the workers. - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] + receipt = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + "data": { + "ts": int(self.clock.time_msec()), + } + } - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, - } + is_new = yield self._handle_new_receipts([receipt]) + if not is_new: + return + + # Work out which remote servers should be poked and poke them. + + # TODO: optimise this to move some of the work to the workers. + data = receipt["data"] + + # XXX why does this not use state.get_current_hosts_in_room() ? + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.build_and_send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": [event_id], + "data": data, } - }, + } }, - key=(room_id, receipt_type, user_id), - ) - except Exception: - logger.exception("Error pushing receipts to remote servers") + }, + key=(room_id, receipt_type, user_id), + ) @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index c0e06929bd..47d5e276f8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -24,6 +24,7 @@ from synapse.api.errors import ( AuthError, Codes, InvalidCaptchaError, + LimitExceededError, RegistrationError, SynapseError, ) @@ -60,6 +61,7 @@ class RegistrationHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() self.captcha_client = CaptchaServerHttpClient(hs) self.identity_handler = self.hs.get_handlers().identity_handler + self.ratelimiter = hs.get_ratelimiter() self._next_generated_user_id = None @@ -149,6 +151,7 @@ class RegistrationHandler(BaseHandler): threepid=None, user_type=None, default_display_name=None, + address=None, ): """Registers a new client on the server. @@ -167,6 +170,7 @@ class RegistrationHandler(BaseHandler): api.constants.UserTypes, or None for a normal user. default_display_name (unicode|None): if set, the new user's displayname will be set to this. Defaults to 'localpart'. + address (str|None): the IP address used to perform the regitration. Returns: A tuple of (user_id, access_token). Raises: @@ -206,7 +210,7 @@ class RegistrationHandler(BaseHandler): token = None if generate_token: token = self.macaroon_gen.generate_access_token(user_id) - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, @@ -215,6 +219,7 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=default_display_name, admin=admin, user_type=user_type, + address=address, ) if self.hs.config.user_directory_search_all_users: @@ -238,12 +243,13 @@ class RegistrationHandler(BaseHandler): if default_display_name is None: default_display_name = localpart try: - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, make_guest=make_guest, create_profile_with_displayname=default_display_name, + address=address, ) except SynapseError: # if user id is taken, just generate another @@ -337,7 +343,7 @@ class RegistrationHandler(BaseHandler): user_id, allowed_appservice=service ) - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, password_hash="", appservice_id=service_id, @@ -513,7 +519,7 @@ class RegistrationHandler(BaseHandler): token = self.macaroon_gen.generate_access_token(user_id) if need_register: - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, @@ -590,10 +596,10 @@ class RegistrationHandler(BaseHandler): ratelimit=False, ) - def _register_with_store(self, user_id, token=None, password_hash=None, - was_guest=False, make_guest=False, appservice_id=None, - create_profile_with_displayname=None, admin=False, - user_type=None): + def register_with_store(self, user_id, token=None, password_hash=None, + was_guest=False, make_guest=False, appservice_id=None, + create_profile_with_displayname=None, admin=False, + user_type=None, address=None): """Register user in the datastore. Args: @@ -612,10 +618,26 @@ class RegistrationHandler(BaseHandler): admin (boolean): is an admin user? user_type (str|None): type of user. One of the values from api.constants.UserTypes, or None for a normal user. + address (str|None): the IP address used to perform the regitration. Returns: Deferred """ + # Don't rate limit for app services + if appservice_id is None and address is not None: + time_now = self.clock.time() + + allowed, time_allowed = self.ratelimiter.can_do_action( + address, time_now_s=time_now, + rate_hz=self.hs.config.rc_registration_requests_per_second, + burst_count=self.hs.config.rc_registration_request_burst_count, + ) + + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now)), + ) + if self.hs.config.worker_app: return self._register_client( user_id=user_id, @@ -627,6 +649,7 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=create_profile_with_displayname, admin=admin, user_type=user_type, + address=address, ) else: return self.store.register( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a61bbf9392..39df960c31 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -231,7 +231,7 @@ class TypingHandler(object): for domain in set(get_domain_from_id(u) for u in users): if domain != self.server_name: logger.debug("sending typing update to %s", domain) - self.federation.send_edu( + self.federation.build_and_send_edu( destination=domain, edu_type="m.typing", content={ diff --git a/synapse/notifier.py b/synapse/notifier.py index de02b1017e..ff589660da 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -178,8 +178,6 @@ class Notifier(object): self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS ) - self.replication_deferred = ObservableDeferred(defer.Deferred()) - # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. @@ -205,7 +203,9 @@ class Notifier(object): def add_replication_callback(self, cb): """Add a callback that will be called when some new data is available. - Callback is not given any arguments. + Callback is not given any arguments. It should *not* return a Deferred - if + it needs to do any asynchronous work, a background thread should be started and + wrapped with run_as_background_process. """ self.replication_callbacks.append(cb) @@ -517,60 +517,5 @@ class Notifier(object): def notify_replication(self): """Notify the any replication listeners that there's a new event""" - with PreserveLoggingContext(): - deferred = self.replication_deferred - self.replication_deferred = ObservableDeferred(defer.Deferred()) - deferred.callback(None) - - # the callbacks may well outlast the current request, so we run - # them in the sentinel logcontext. - # - # (ideally it would be up to the callbacks to know if they were - # starting off background processes and drop the logcontext - # accordingly, but that requires more changes) - for cb in self.replication_callbacks: - cb() - - @defer.inlineCallbacks - def wait_for_replication(self, callback, timeout): - """Wait for an event to happen. - - Args: - callback: Gets called whenever an event happens. If this returns a - truthy value then ``wait_for_replication`` returns, otherwise - it waits for another event. - timeout: How many milliseconds to wait for callback return a truthy - value. - - Returns: - A deferred that resolves with the value returned by the callback. - """ - listener = _NotificationListener(None) - - end_time = self.clock.time_msec() + timeout - - while True: - listener.deferred = self.replication_deferred.observe() - result = yield callback() - if result: - break - - now = self.clock.time_msec() - if end_time <= now: - break - - listener.deferred = timeout_deferred( - listener.deferred, - timeout=(end_time - now) / 1000., - reactor=self.hs.get_reactor(), - ) - - try: - with PreserveLoggingContext(): - yield listener.deferred - except defer.TimeoutError: - break - except defer.CancelledError: - break - - defer.returnValue(result) + for cb in self.replication_callbacks: + cb() diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 1d27c9221f..912a5ac341 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -33,11 +33,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint): def __init__(self, hs): super(ReplicationRegisterServlet, self).__init__(hs) self.store = hs.get_datastore() + self.registration_handler = hs.get_registration_handler() @staticmethod def _serialize_payload( user_id, token, password_hash, was_guest, make_guest, appservice_id, - create_profile_with_displayname, admin, user_type, + create_profile_with_displayname, admin, user_type, address, ): """ Args: @@ -56,6 +57,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): admin (boolean): is an admin user? user_type (str|None): type of user. One of the values from api.constants.UserTypes, or None for a normal user. + address (str|None): the IP address used to perform the regitration. """ return { "token": token, @@ -66,13 +68,14 @@ class ReplicationRegisterServlet(ReplicationEndpoint): "create_profile_with_displayname": create_profile_with_displayname, "admin": admin, "user_type": user_type, + "address": address, } @defer.inlineCallbacks def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) - yield self.store.register( + yield self.registration_handler.register_with_store( user_id=user_id, token=content["token"], password_hash=content["password_hash"], @@ -82,6 +85,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): create_profile_with_displayname=content["create_profile_with_displayname"], admin=content["admin"], user_type=content["user_type"], + address=content["address"] ) defer.returnValue((200, {})) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 4f19fd35aa..4d59778863 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -13,15 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.storage.deviceinbox import DeviceInboxWorkerStore from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore, __func__ -from ._slaved_id_tracker import SlavedIdTracker - -class SlavedDeviceInboxStore(BaseSlavedStore): +class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceInboxStore, self).__init__(db_conn, hs) self._device_inbox_id_gen = SlavedIdTracker( @@ -43,12 +42,6 @@ class SlavedDeviceInboxStore(BaseSlavedStore): expiry_ms=30 * 60 * 1000, ) - get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token) - get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device) - get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote) - delete_messages_for_device = __func__(DataStore.delete_messages_for_device) - delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote) - def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() result["to_device"] = self._device_inbox_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index ec2fd561cc..16c9a162c5 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -13,15 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore -from synapse.storage.end_to_end_keys import EndToEndKeyStore +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.storage.devices import DeviceWorkerStore +from synapse.storage.end_to_end_keys import EndToEndKeyWorkerStore from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore, __func__ -from ._slaved_id_tracker import SlavedIdTracker - -class SlavedDeviceStore(BaseSlavedStore): +class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceStore, self).__init__(db_conn, hs) @@ -38,17 +37,6 @@ class SlavedDeviceStore(BaseSlavedStore): "DeviceListFederationStreamChangeCache", device_list_max, ) - get_device_stream_token = __func__(DataStore.get_device_stream_token) - get_user_whose_devices_changed = __func__(DataStore.get_user_whose_devices_changed) - get_devices_by_remote = __func__(DataStore.get_devices_by_remote) - _get_devices_by_remote_txn = __func__(DataStore._get_devices_by_remote_txn) - _get_e2e_device_keys_txn = __func__(DataStore._get_e2e_device_keys_txn) - mark_as_sent_devices_by_remote = __func__(DataStore.mark_as_sent_devices_by_remote) - _mark_as_sent_devices_by_remote_txn = ( - __func__(DataStore._mark_as_sent_devices_by_remote_txn) - ) - count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"] - def stream_positions(self): result = super(SlavedDeviceStore, self).stream_positions() result["device_lists"] = self._device_list_id_gen.get_current_token() @@ -58,14 +46,23 @@ class SlavedDeviceStore(BaseSlavedStore): if stream_name == "device_lists": self._device_list_id_gen.advance(token) for row in rows: - self._device_list_stream_cache.entity_has_changed( - row.user_id, token + self._invalidate_caches_for_devices( + token, row.user_id, row.destination, ) - - if row.destination: - self._device_list_federation_stream_cache.entity_has_changed( - row.destination, token - ) return super(SlavedDeviceStore, self).process_replication_rows( stream_name, token, rows ) + + def _invalidate_caches_for_devices(self, token, user_id, destination): + self._device_list_stream_cache.entity_has_changed( + user_id, token + ) + + if destination: + self._device_list_federation_stream_cache.entity_has_changed( + destination, token + ) + + self._get_cached_devices_for_user.invalidate((user_id,)) + self._get_cached_user_device.invalidate_many((user_id,)) + self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index f0200c1e98..45fc913c52 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -20,7 +20,7 @@ from ._slaved_id_tracker import SlavedIdTracker from .events import SlavedEventStore -class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore): +class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): def __init__(self, db_conn, hs): self._push_rules_stream_id_gen = SlavedIdTracker( db_conn, "push_rules_stream", "stream_id", diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 94cbba4303..b7f354570c 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -25,7 +25,12 @@ from twisted.internet import defer import synapse import synapse.types from synapse.api.constants import LoginType -from synapse.api.errors import Codes, SynapseError, UnrecognizedRequestError +from synapse.api.errors import ( + Codes, + LimitExceededError, + SynapseError, + UnrecognizedRequestError, +) from synapse.config.server import is_threepid_reserved from synapse.http.servlet import ( RestServlet, @@ -191,18 +196,36 @@ class RegisterRestServlet(RestServlet): self.identity_handler = hs.get_handlers().identity_handler self.room_member_handler = hs.get_room_member_handler() self.macaroon_gen = hs.get_macaroon_generator() + self.ratelimiter = hs.get_ratelimiter() + self.clock = hs.get_clock() @interactive_auth_handler @defer.inlineCallbacks def on_POST(self, request): body = parse_json_object_from_request(request) + client_addr = request.getClientIP() + + time_now = self.clock.time() + + allowed, time_allowed = self.ratelimiter.can_do_action( + client_addr, time_now_s=time_now, + rate_hz=self.hs.config.rc_registration_requests_per_second, + burst_count=self.hs.config.rc_registration_request_burst_count, + update=False, + ) + + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now)), + ) + kind = b"user" if b"kind" in request.args: kind = request.args[b"kind"][0] if kind == b"guest": - ret = yield self._do_guest_registration(body) + ret = yield self._do_guest_registration(body, address=client_addr) defer.returnValue(ret) return elif kind != b"user": @@ -411,6 +434,7 @@ class RegisterRestServlet(RestServlet): guest_access_token=guest_access_token, generate_token=False, threepid=threepid, + address=client_addr, ) # Necessary due to auth checks prior to the threepid being # written to the db @@ -522,12 +546,13 @@ class RegisterRestServlet(RestServlet): defer.returnValue(result) @defer.inlineCallbacks - def _do_guest_registration(self, params): + def _do_guest_registration(self, params, address=None): if not self.hs.config.allow_guest_access: raise SynapseError(403, "Guest access is disabled") user_id, _ = yield self.registration_handler.register( generate_token=False, - make_guest=True + make_guest=True, + address=address, ) # we don't allow guests to specify their own device_id, because diff --git a/synapse/server.py b/synapse/server.py index 4d364fccce..4323e7ff12 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -51,7 +51,7 @@ from synapse.handlers.acme import AcmeHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGenerator from synapse.handlers.deactivate_account import DeactivateAccountHandler -from synapse.handlers.device import DeviceHandler +from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler @@ -307,7 +307,10 @@ class HomeServer(object): return MacaroonGenerator(self) def build_device_handler(self): - return DeviceHandler(self) + if self.config.worker_app: + return DeviceWorkerHandler(self) + else: + return DeviceHandler(self) def build_device_message_handler(self): return DeviceMessageHandler(self) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index e06b0bc56d..e6a42a53bb 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -19,14 +19,174 @@ from canonicaljson import json from twisted.internet import defer +from synapse.storage._base import SQLBaseStore +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache -from .background_updates import BackgroundUpdateStore - logger = logging.getLogger(__name__) -class DeviceInboxStore(BackgroundUpdateStore): +class DeviceInboxWorkerStore(SQLBaseStore): + def get_to_device_stream_token(self): + return self._device_inbox_id_gen.get_current_token() + + def get_new_messages_for_device( + self, user_id, device_id, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + user_id(str): The recipient user_id. + device_id(str): The recipient device_id. + current_stream_id(int): The current position of the to device + message stream. + Returns: + Deferred ([dict], int): List of messages for the device and where + in the stream the messages got to. + """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + + def get_new_messages_for_device_txn(txn): + sql = ( + "SELECT stream_id, message_json FROM device_inbox" + " WHERE user_id = ? AND device_id = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + user_id, device_id, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn: + stream_pos = row[0] + messages.append(json.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_messages_for_device", get_new_messages_for_device_txn, + ) + + @defer.inlineCallbacks + def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): + """ + Args: + user_id(str): The recipient user_id. + device_id(str): The recipient device_id. + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves to the number of messages deleted. + """ + # If we have cached the last stream id we've deleted up to, we can + # check if there is likely to be anything that needs deleting + last_deleted_stream_id = self._last_device_delete_cache.get( + (user_id, device_id), None + ) + if last_deleted_stream_id: + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_deleted_stream_id + ) + if not has_changed: + defer.returnValue(0) + + def delete_messages_for_device_txn(txn): + sql = ( + "DELETE FROM device_inbox" + " WHERE user_id = ? AND device_id = ?" + " AND stream_id <= ?" + ) + txn.execute(sql, (user_id, device_id, up_to_stream_id)) + return txn.rowcount + + count = yield self.runInteraction( + "delete_messages_for_device", delete_messages_for_device_txn + ) + + # Update the cache, ensuring that we only ever increase the value + last_deleted_stream_id = self._last_device_delete_cache.get( + (user_id, device_id), 0 + ) + self._last_device_delete_cache[(user_id, device_id)] = max( + last_deleted_stream_id, up_to_stream_id + ) + + defer.returnValue(count) + + def get_new_device_msgs_for_remote( + self, destination, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + destination(str): The name of the remote server. + last_stream_id(int|long): The last position of the device message stream + that the server sent up to. + current_stream_id(int|long): The current position of the device + message stream. + Returns: + Deferred ([dict], int|long): List of messages for the device and where + in the stream the messages got to. + """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed or last_stream_id == current_stream_id: + return defer.succeed(([], current_stream_id)) + + def get_new_messages_for_remote_destination_txn(txn): + sql = ( + "SELECT stream_id, messages_json FROM device_federation_outbox" + " WHERE destination = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + destination, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn: + stream_pos = row[0] + messages.append(json.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_device_msgs_for_remote", + get_new_messages_for_remote_destination_txn, + ) + + def delete_device_msgs_for_remote(self, destination, up_to_stream_id): + """Used to delete messages when the remote destination acknowledges + their receipt. + + Args: + destination(str): The destination server_name + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves when the messages have been deleted. + """ + def delete_messages_for_remote_destination_txn(txn): + sql = ( + "DELETE FROM device_federation_outbox" + " WHERE destination = ?" + " AND stream_id <= ?" + ) + txn.execute(sql, (destination, up_to_stream_id)) + + return self.runInteraction( + "delete_device_msgs_for_remote", + delete_messages_for_remote_destination_txn + ) + + +class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" def __init__(self, db_conn, hs): @@ -220,93 +380,6 @@ class DeviceInboxStore(BackgroundUpdateStore): txn.executemany(sql, rows) - def get_new_messages_for_device( - self, user_id, device_id, last_stream_id, current_stream_id, limit=100 - ): - """ - Args: - user_id(str): The recipient user_id. - device_id(str): The recipient device_id. - current_stream_id(int): The current position of the to device - message stream. - Returns: - Deferred ([dict], int): List of messages for the device and where - in the stream the messages got to. - """ - has_changed = self._device_inbox_stream_cache.has_entity_changed( - user_id, last_stream_id - ) - if not has_changed: - return defer.succeed(([], current_stream_id)) - - def get_new_messages_for_device_txn(txn): - sql = ( - "SELECT stream_id, message_json FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - " LIMIT ?" - ) - txn.execute(sql, ( - user_id, device_id, last_stream_id, current_stream_id, limit - )) - messages = [] - for row in txn: - stream_pos = row[0] - messages.append(json.loads(row[1])) - if len(messages) < limit: - stream_pos = current_stream_id - return (messages, stream_pos) - - return self.runInteraction( - "get_new_messages_for_device", get_new_messages_for_device_txn, - ) - - @defer.inlineCallbacks - def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): - """ - Args: - user_id(str): The recipient user_id. - device_id(str): The recipient device_id. - up_to_stream_id(int): Where to delete messages up to. - Returns: - A deferred that resolves to the number of messages deleted. - """ - # If we have cached the last stream id we've deleted up to, we can - # check if there is likely to be anything that needs deleting - last_deleted_stream_id = self._last_device_delete_cache.get( - (user_id, device_id), None - ) - if last_deleted_stream_id: - has_changed = self._device_inbox_stream_cache.has_entity_changed( - user_id, last_deleted_stream_id - ) - if not has_changed: - defer.returnValue(0) - - def delete_messages_for_device_txn(txn): - sql = ( - "DELETE FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" - ) - txn.execute(sql, (user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = yield self.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) - - # Update the cache, ensuring that we only ever increase the value - last_deleted_stream_id = self._last_device_delete_cache.get( - (user_id, device_id), 0 - ) - self._last_device_delete_cache[(user_id, device_id)] = max( - last_deleted_stream_id, up_to_stream_id - ) - - defer.returnValue(count) - def get_all_new_device_messages(self, last_pos, current_pos, limit): """ Args: @@ -351,77 +424,6 @@ class DeviceInboxStore(BackgroundUpdateStore): "get_all_new_device_messages", get_all_new_device_messages_txn ) - def get_to_device_stream_token(self): - return self._device_inbox_id_gen.get_current_token() - - def get_new_device_msgs_for_remote( - self, destination, last_stream_id, current_stream_id, limit=100 - ): - """ - Args: - destination(str): The name of the remote server. - last_stream_id(int|long): The last position of the device message stream - that the server sent up to. - current_stream_id(int|long): The current position of the device - message stream. - Returns: - Deferred ([dict], int|long): List of messages for the device and where - in the stream the messages got to. - """ - - has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( - destination, last_stream_id - ) - if not has_changed or last_stream_id == current_stream_id: - return defer.succeed(([], current_stream_id)) - - def get_new_messages_for_remote_destination_txn(txn): - sql = ( - "SELECT stream_id, messages_json FROM device_federation_outbox" - " WHERE destination = ?" - " AND ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - " LIMIT ?" - ) - txn.execute(sql, ( - destination, last_stream_id, current_stream_id, limit - )) - messages = [] - for row in txn: - stream_pos = row[0] - messages.append(json.loads(row[1])) - if len(messages) < limit: - stream_pos = current_stream_id - return (messages, stream_pos) - - return self.runInteraction( - "get_new_device_msgs_for_remote", - get_new_messages_for_remote_destination_txn, - ) - - def delete_device_msgs_for_remote(self, destination, up_to_stream_id): - """Used to delete messages when the remote destination acknowledges - their receipt. - - Args: - destination(str): The destination server_name - up_to_stream_id(int): Where to delete messages up to. - Returns: - A deferred that resolves when the messages have been deleted. - """ - def delete_messages_for_remote_destination_txn(txn): - sql = ( - "DELETE FROM device_federation_outbox" - " WHERE destination = ?" - " AND stream_id <= ?" - ) - txn.execute(sql, (destination, up_to_stream_id)) - - return self.runInteraction( - "delete_device_msgs_for_remote", - delete_messages_for_remote_destination_txn - ) - @defer.inlineCallbacks def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index ecdab34e7d..e716dc1437 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -22,11 +22,10 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList -from ._base import Cache, db_to_json - logger = logging.getLogger(__name__) DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( @@ -34,7 +33,343 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( ) -class DeviceStore(BackgroundUpdateStore): +class DeviceWorkerStore(SQLBaseStore): + def get_device(self, user_id, device_id): + """Retrieve a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to retrieve + Returns: + defer.Deferred for a dict containing the device information + Raises: + StoreError: if the device is not found + """ + return self._simple_select_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + retcols=("user_id", "device_id", "display_name"), + desc="get_device", + ) + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """Retrieve all of a user's registered devices. + + Args: + user_id (str): + Returns: + defer.Deferred: resolves to a dict from device_id to a dict + containing "device_id", "user_id" and "display_name" for each + device. + """ + devices = yield self._simple_select_list( + table="devices", + keyvalues={"user_id": user_id}, + retcols=("user_id", "device_id", "display_name"), + desc="get_devices_by_user" + ) + + defer.returnValue({d["device_id"]: d for d in devices}) + + def get_devices_by_remote(self, destination, from_stream_id): + """Get stream of updates to send to remote servers + + Returns: + (int, list[dict]): current stream id and list of updates + """ + now_stream_id = self._device_list_id_gen.get_current_token() + + has_changed = self._device_list_federation_stream_cache.has_entity_changed( + destination, int(from_stream_id) + ) + if not has_changed: + return (now_stream_id, []) + + return self.runInteraction( + "get_devices_by_remote", self._get_devices_by_remote_txn, + destination, from_stream_id, now_stream_id, + ) + + def _get_devices_by_remote_txn(self, txn, destination, from_stream_id, + now_stream_id): + sql = """ + SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes + WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? + GROUP BY user_id, device_id + LIMIT 20 + """ + txn.execute( + sql, (destination, from_stream_id, now_stream_id, False) + ) + + # maps (user_id, device_id) -> stream_id + query_map = {(r[0], r[1]): r[2] for r in txn} + if not query_map: + return (now_stream_id, []) + + if len(query_map) >= 20: + now_stream_id = max(stream_id for stream_id in itervalues(query_map)) + + devices = self._get_e2e_device_keys_txn( + txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True + ) + + prev_sent_id_sql = """ + SELECT coalesce(max(stream_id), 0) as stream_id + FROM device_lists_outbound_last_success + WHERE destination = ? AND user_id = ? AND stream_id <= ? + """ + + results = [] + for user_id, user_devices in iteritems(devices): + # The prev_id for the first row is always the last row before + # `from_stream_id` + txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) + rows = txn.fetchall() + prev_id = rows[0][0] + for device_id, device in iteritems(user_devices): + stream_id = query_map[(user_id, device_id)] + result = { + "user_id": user_id, + "device_id": device_id, + "prev_id": [prev_id] if prev_id else [], + "stream_id": stream_id, + } + + prev_id = stream_id + + if device is not None: + key_json = device.get("key_json", None) + if key_json: + result["keys"] = db_to_json(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + else: + result["deleted"] = True + + results.append(result) + + return (now_stream_id, results) + + def mark_as_sent_devices_by_remote(self, destination, stream_id): + """Mark that updates have successfully been sent to the destination. + """ + return self.runInteraction( + "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, + destination, stream_id, + ) + + def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): + # We update the device_lists_outbound_last_success with the successfully + # poked users. We do the join to see which users need to be inserted and + # which updated. + sql = """ + SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) + FROM device_lists_outbound_pokes as o + LEFT JOIN device_lists_outbound_last_success as s + USING (destination, user_id) + WHERE destination = ? AND o.stream_id <= ? + GROUP BY user_id + """ + txn.execute(sql, (destination, stream_id,)) + rows = txn.fetchall() + + sql = """ + UPDATE device_lists_outbound_last_success + SET stream_id = ? + WHERE destination = ? AND user_id = ? + """ + txn.executemany( + sql, ((row[1], destination, row[0],) for row in rows if row[2]) + ) + + sql = """ + INSERT INTO device_lists_outbound_last_success + (destination, user_id, stream_id) VALUES (?, ?, ?) + """ + txn.executemany( + sql, ((destination, row[0], row[1],) for row in rows if not row[2]) + ) + + # Delete all sent outbound pokes + sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE destination = ? AND stream_id <= ? + """ + txn.execute(sql, (destination, stream_id,)) + + def get_device_stream_token(self): + return self._device_list_id_gen.get_current_token() + + @defer.inlineCallbacks + def get_user_devices_from_cache(self, query_list): + """Get the devices (and keys if any) for remote users from the cache. + + Args: + query_list(list): List of (user_id, device_ids), if device_ids is + falsey then return all device ids for that user. + + Returns: + (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is + a set of user_ids and results_map is a mapping of + user_id -> device_id -> device_info + """ + user_ids = set(user_id for user_id, _ in query_list) + user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) + user_ids_in_cache = set( + user_id for user_id, stream_id in user_map.items() if stream_id + ) + user_ids_not_in_cache = user_ids - user_ids_in_cache + + results = {} + for user_id, device_id in query_list: + if user_id not in user_ids_in_cache: + continue + + if device_id: + device = yield self._get_cached_user_device(user_id, device_id) + results.setdefault(user_id, {})[device_id] = device + else: + results[user_id] = yield self._get_cached_devices_for_user(user_id) + + defer.returnValue((user_ids_not_in_cache, results)) + + @cachedInlineCallbacks(num_args=2, tree=True) + def _get_cached_user_device(self, user_id, device_id): + content = yield self._simple_select_one_onecol( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="content", + desc="_get_cached_user_device", + ) + defer.returnValue(db_to_json(content)) + + @cachedInlineCallbacks() + def _get_cached_devices_for_user(self, user_id): + devices = yield self._simple_select_list( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + retcols=("device_id", "content"), + desc="_get_cached_devices_for_user", + ) + defer.returnValue({ + device["device_id"]: db_to_json(device["content"]) + for device in devices + }) + + def get_devices_with_keys_by_user(self, user_id): + """Get all devices (with any device keys) for a user + + Returns: + (stream_id, devices) + """ + return self.runInteraction( + "get_devices_with_keys_by_user", + self._get_devices_with_keys_by_user_txn, user_id, + ) + + def _get_devices_with_keys_by_user_txn(self, txn, user_id): + now_stream_id = self._device_list_id_gen.get_current_token() + + devices = self._get_e2e_device_keys_txn( + txn, [(user_id, None)], include_all_devices=True + ) + + if devices: + user_devices = devices[user_id] + results = [] + for device_id, device in iteritems(user_devices): + result = { + "device_id": device_id, + } + + key_json = device.get("key_json", None) + if key_json: + result["keys"] = db_to_json(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + + results.append(result) + + return now_stream_id, results + + return now_stream_id, [] + + @defer.inlineCallbacks + def get_user_whose_devices_changed(self, from_key): + """Get set of users whose devices have changed since `from_key`. + """ + from_key = int(from_key) + changed = self._device_list_stream_cache.get_all_entities_changed(from_key) + if changed is not None: + defer.returnValue(set(changed)) + + sql = """ + SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? + """ + rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) + defer.returnValue(set(row[0] for row in rows)) + + def get_all_device_list_changes_for_remotes(self, from_key, to_key): + """Return a list of `(stream_id, user_id, destination)` which is the + combined list of changes to devices, and which destinations need to be + poked. `destination` may be None if no destinations need to be poked. + """ + # We do a group by here as there can be a large number of duplicate + # entries, since we throw away device IDs. + sql = """ + SELECT MAX(stream_id) AS stream_id, user_id, destination + FROM device_lists_stream + LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) + WHERE ? < stream_id AND stream_id <= ? + GROUP BY user_id, destination + """ + return self._execute( + "get_all_device_list_changes_for_remotes", None, + sql, from_key, to_key + ) + + @cached(max_entries=10000) + def get_device_list_last_stream_id_for_remote(self, user_id): + """Get the last stream_id we got for a user. May be None if we haven't + got any information for them. + """ + return self._simple_select_one_onecol( + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + retcol="stream_id", + desc="get_device_list_last_stream_id_for_remote", + allow_none=True, + ) + + @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote", + list_name="user_ids", inlineCallbacks=True) + def get_device_list_last_stream_id_for_remotes(self, user_ids): + rows = yield self._simple_select_many_batch( + table="device_lists_remote_extremeties", + column="user_id", + iterable=user_ids, + retcols=("user_id", "stream_id",), + desc="get_device_list_last_stream_id_for_remotes", + ) + + results = {user_id: None for user_id in user_ids} + results.update({ + row["user_id"]: row["stream_id"] for row in rows + }) + + defer.returnValue(results) + + +class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): def __init__(self, db_conn, hs): super(DeviceStore, self).__init__(db_conn, hs) @@ -121,24 +456,6 @@ class DeviceStore(BackgroundUpdateStore): initial_device_display_name, e) raise StoreError(500, "Problem storing device.") - def get_device(self, user_id, device_id): - """Retrieve a device. - - Args: - user_id (str): The ID of the user which owns the device - device_id (str): The ID of the device to retrieve - Returns: - defer.Deferred for a dict containing the device information - Raises: - StoreError: if the device is not found - """ - return self._simple_select_one( - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, - retcols=("user_id", "device_id", "display_name"), - desc="get_device", - ) - @defer.inlineCallbacks def delete_device(self, user_id, device_id): """Delete a device. @@ -203,57 +520,6 @@ class DeviceStore(BackgroundUpdateStore): ) @defer.inlineCallbacks - def get_devices_by_user(self, user_id): - """Retrieve all of a user's registered devices. - - Args: - user_id (str): - Returns: - defer.Deferred: resolves to a dict from device_id to a dict - containing "device_id", "user_id" and "display_name" for each - device. - """ - devices = yield self._simple_select_list( - table="devices", - keyvalues={"user_id": user_id}, - retcols=("user_id", "device_id", "display_name"), - desc="get_devices_by_user" - ) - - defer.returnValue({d["device_id"]: d for d in devices}) - - @cached(max_entries=10000) - def get_device_list_last_stream_id_for_remote(self, user_id): - """Get the last stream_id we got for a user. May be None if we haven't - got any information for them. - """ - return self._simple_select_one_onecol( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - retcol="stream_id", - desc="get_device_list_remote_extremity", - allow_none=True, - ) - - @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote", - list_name="user_ids", inlineCallbacks=True) - def get_device_list_last_stream_id_for_remotes(self, user_ids): - rows = yield self._simple_select_many_batch( - table="device_lists_remote_extremeties", - column="user_id", - iterable=user_ids, - retcols=("user_id", "stream_id",), - desc="get_user_devices_from_cache", - ) - - results = {user_id: None for user_id in user_ids} - results.update({ - row["user_id"]: row["stream_id"] for row in rows - }) - - defer.returnValue(results) - - @defer.inlineCallbacks def mark_remote_user_device_list_as_unsubscribed(self, user_id): """Mark that we no longer track device lists for remote user. """ @@ -405,268 +671,6 @@ class DeviceStore(BackgroundUpdateStore): lock=False, ) - def get_devices_by_remote(self, destination, from_stream_id): - """Get stream of updates to send to remote servers - - Returns: - (int, list[dict]): current stream id and list of updates - """ - now_stream_id = self._device_list_id_gen.get_current_token() - - has_changed = self._device_list_federation_stream_cache.has_entity_changed( - destination, int(from_stream_id) - ) - if not has_changed: - return (now_stream_id, []) - - return self.runInteraction( - "get_devices_by_remote", self._get_devices_by_remote_txn, - destination, from_stream_id, now_stream_id, - ) - - def _get_devices_by_remote_txn(self, txn, destination, from_stream_id, - now_stream_id): - sql = """ - SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes - WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? - GROUP BY user_id, device_id - LIMIT 20 - """ - txn.execute( - sql, (destination, from_stream_id, now_stream_id, False) - ) - - # maps (user_id, device_id) -> stream_id - query_map = {(r[0], r[1]): r[2] for r in txn} - if not query_map: - return (now_stream_id, []) - - if len(query_map) >= 20: - now_stream_id = max(stream_id for stream_id in itervalues(query_map)) - - devices = self._get_e2e_device_keys_txn( - txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True - ) - - prev_sent_id_sql = """ - SELECT coalesce(max(stream_id), 0) as stream_id - FROM device_lists_outbound_last_success - WHERE destination = ? AND user_id = ? AND stream_id <= ? - """ - - results = [] - for user_id, user_devices in iteritems(devices): - # The prev_id for the first row is always the last row before - # `from_stream_id` - txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) - rows = txn.fetchall() - prev_id = rows[0][0] - for device_id, device in iteritems(user_devices): - stream_id = query_map[(user_id, device_id)] - result = { - "user_id": user_id, - "device_id": device_id, - "prev_id": [prev_id] if prev_id else [], - "stream_id": stream_id, - } - - prev_id = stream_id - - if device is not None: - key_json = device.get("key_json", None) - if key_json: - result["keys"] = db_to_json(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name - else: - result["deleted"] = True - - results.append(result) - - return (now_stream_id, results) - - @defer.inlineCallbacks - def get_user_devices_from_cache(self, query_list): - """Get the devices (and keys if any) for remote users from the cache. - - Args: - query_list(list): List of (user_id, device_ids), if device_ids is - falsey then return all device ids for that user. - - Returns: - (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is - a set of user_ids and results_map is a mapping of - user_id -> device_id -> device_info - """ - user_ids = set(user_id for user_id, _ in query_list) - user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) - user_ids_in_cache = set( - user_id for user_id, stream_id in user_map.items() if stream_id - ) - user_ids_not_in_cache = user_ids - user_ids_in_cache - - results = {} - for user_id, device_id in query_list: - if user_id not in user_ids_in_cache: - continue - - if device_id: - device = yield self._get_cached_user_device(user_id, device_id) - results.setdefault(user_id, {})[device_id] = device - else: - results[user_id] = yield self._get_cached_devices_for_user(user_id) - - defer.returnValue((user_ids_not_in_cache, results)) - - @cachedInlineCallbacks(num_args=2, tree=True) - def _get_cached_user_device(self, user_id, device_id): - content = yield self._simple_select_one_onecol( - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - retcol="content", - desc="_get_cached_user_device", - ) - defer.returnValue(db_to_json(content)) - - @cachedInlineCallbacks() - def _get_cached_devices_for_user(self, user_id): - devices = yield self._simple_select_list( - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - }, - retcols=("device_id", "content"), - desc="_get_cached_devices_for_user", - ) - defer.returnValue({ - device["device_id"]: db_to_json(device["content"]) - for device in devices - }) - - def get_devices_with_keys_by_user(self, user_id): - """Get all devices (with any device keys) for a user - - Returns: - (stream_id, devices) - """ - return self.runInteraction( - "get_devices_with_keys_by_user", - self._get_devices_with_keys_by_user_txn, user_id, - ) - - def _get_devices_with_keys_by_user_txn(self, txn, user_id): - now_stream_id = self._device_list_id_gen.get_current_token() - - devices = self._get_e2e_device_keys_txn( - txn, [(user_id, None)], include_all_devices=True - ) - - if devices: - user_devices = devices[user_id] - results = [] - for device_id, device in iteritems(user_devices): - result = { - "device_id": device_id, - } - - key_json = device.get("key_json", None) - if key_json: - result["keys"] = db_to_json(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name - - results.append(result) - - return now_stream_id, results - - return now_stream_id, [] - - def mark_as_sent_devices_by_remote(self, destination, stream_id): - """Mark that updates have successfully been sent to the destination. - """ - return self.runInteraction( - "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, - destination, stream_id, - ) - - def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): - # We update the device_lists_outbound_last_success with the successfully - # poked users. We do the join to see which users need to be inserted and - # which updated. - sql = """ - SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) - FROM device_lists_outbound_pokes as o - LEFT JOIN device_lists_outbound_last_success as s - USING (destination, user_id) - WHERE destination = ? AND o.stream_id <= ? - GROUP BY user_id - """ - txn.execute(sql, (destination, stream_id,)) - rows = txn.fetchall() - - sql = """ - UPDATE device_lists_outbound_last_success - SET stream_id = ? - WHERE destination = ? AND user_id = ? - """ - txn.executemany( - sql, ((row[1], destination, row[0],) for row in rows if row[2]) - ) - - sql = """ - INSERT INTO device_lists_outbound_last_success - (destination, user_id, stream_id) VALUES (?, ?, ?) - """ - txn.executemany( - sql, ((destination, row[0], row[1],) for row in rows if not row[2]) - ) - - # Delete all sent outbound pokes - sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? - """ - txn.execute(sql, (destination, stream_id,)) - - @defer.inlineCallbacks - def get_user_whose_devices_changed(self, from_key): - """Get set of users whose devices have changed since `from_key`. - """ - from_key = int(from_key) - changed = self._device_list_stream_cache.get_all_entities_changed(from_key) - if changed is not None: - defer.returnValue(set(changed)) - - sql = """ - SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? - """ - rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) - defer.returnValue(set(row[0] for row in rows)) - - def get_all_device_list_changes_for_remotes(self, from_key, to_key): - """Return a list of `(stream_id, user_id, destination)` which is the - combined list of changes to devices, and which destinations need to be - poked. `destination` may be None if no destinations need to be poked. - """ - # We do a group by here as there can be a large number of duplicate - # entries, since we throw away device IDs. - sql = """ - SELECT MAX(stream_id) AS stream_id, user_id, destination - FROM device_lists_stream - LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) - WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id, destination - """ - return self._execute( - "get_all_device_list_changes_for_remotes", None, - sql, from_key, to_key - ) - @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_ids, hosts): """Persist that a user's devices have been updated, and which hosts @@ -732,9 +736,6 @@ class DeviceStore(BackgroundUpdateStore): ] ) - def get_device_stream_token(self): - return self._device_list_id_gen.get_current_token() - def _prune_old_outbound_device_pokes(self): """Delete old entries out of the device_lists_outbound_pokes to ensure that we don't fill up due to dead servers. We keep one entry per diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 2a0f6cfca9..e381e472a2 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -23,49 +23,7 @@ from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json -class EndToEndKeyStore(SQLBaseStore): - def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): - """Stores device keys for a device. Returns whether there was a change - or the keys were already in the database. - """ - def _set_e2e_device_keys_txn(txn): - old_key_json = self._simple_select_one_onecol_txn( - txn, - table="e2e_device_keys_json", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - retcol="key_json", - allow_none=True, - ) - - # In py3 we need old_key_json to match new_key_json type. The DB - # returns unicode while encode_canonical_json returns bytes. - new_key_json = encode_canonical_json(device_keys).decode("utf-8") - - if old_key_json == new_key_json: - return False - - self._simple_upsert_txn( - txn, - table="e2e_device_keys_json", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - values={ - "ts_added_ms": time_now, - "key_json": new_key_json, - } - ) - - return True - - return self.runInteraction( - "set_e2e_device_keys", _set_e2e_device_keys_txn - ) - +class EndToEndKeyWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, @@ -238,6 +196,50 @@ class EndToEndKeyStore(SQLBaseStore): "count_e2e_one_time_keys", _count_e2e_one_time_keys ) + +class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): + def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): + """Stores device keys for a device. Returns whether there was a change + or the keys were already in the database. + """ + def _set_e2e_device_keys_txn(txn): + old_key_json = self._simple_select_one_onecol_txn( + txn, + table="e2e_device_keys_json", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="key_json", + allow_none=True, + ) + + # In py3 we need old_key_json to match new_key_json type. The DB + # returns unicode while encode_canonical_json returns bytes. + new_key_json = encode_canonical_json(device_keys).decode("utf-8") + + if old_key_json == new_key_json: + return False + + self._simple_upsert_txn( + txn, + table="e2e_device_keys_json", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "ts_added_ms": time_now, + "key_json": new_key_json, + } + ) + + return True + + return self.runInteraction( + "set_e2e_device_keys", _set_e2e_device_keys_txn + ) + def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" def _claim_e2e_one_time_keys(txn): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 38809ed0fc..a8d90456e3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -442,6 +442,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, event_results.reverse() return event_results + @defer.inlineCallbacks + def get_successor_events(self, event_ids): + """Fetch all events that have the given events as a prev event + + Args: + event_ids (iterable[str]) + + Returns: + Deferred[list[str]] + """ + rows = yield self._simple_select_many_batch( + table="event_edges", + column="prev_event_id", + iterable=event_ids, + retcols=("event_id",), + desc="get_successor_events" + ) + + defer.returnValue([ + row["event_id"] for row in rows + ]) + class EventFederationStore(EventFederationWorkerStore): """ Responsible for storing and serving up the various graphs associated diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0ac665e967..0fd1ccc40a 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -346,15 +346,23 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + """Inserts a read-receipt into the database if it's newer than the current RR + + Returns: int|None + None if the RR is older than the current RR + otherwise, the rx timestamp of the event that the RR corresponds to + (or 0 if the event is unknown) + """ res = self._simple_select_one_txn( txn, table="events", - retcols=["topological_ordering", "stream_ordering"], + retcols=["stream_ordering", "received_ts"], keyvalues={"event_id": event_id}, allow_none=True ) stream_ordering = int(res["stream_ordering"]) if res else None + rx_ts = res["received_ts"] if res else 0 # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -373,7 +381,7 @@ class ReceiptsStore(ReceiptsWorkerStore): "one for later event %s", event_id, eid, ) - return False + return None txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) @@ -429,7 +437,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_ordering=stream_ordering, ) - return True + return rx_ts @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): @@ -466,7 +474,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id_manager = self._receipts_id_gen.get_next() with stream_id_manager as stream_id: - have_persisted = yield self.runInteraction( + event_ts = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, @@ -474,8 +482,14 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id=stream_id, ) - if not have_persisted: - defer.returnValue(None) + if event_ts is None: + defer.returnValue(None) + + now = self._clock.time_msec() + logger.debug( + "RR for event %s in %s (%i ms old)", + linearized_event_id, room_id, now - event_ts, + ) yield self.insert_graph_receipt( room_id, receipt_type, user_id, event_ids, data diff --git a/synapse/visibility.py b/synapse/visibility.py index 0281a7c919..efec21673b 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -216,28 +216,36 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, @defer.inlineCallbacks -def filter_events_for_server(store, server_name, events): - # Whatever else we do, we need to check for senders which have requested - # erasure of their data. - erased_senders = yield store.are_users_erased( - (e.sender for e in events), - ) +def filter_events_for_server(store, server_name, events, redact=True, + check_history_visibility_only=False): + """Filter a list of events based on whether given server is allowed to + see them. - def redact_disallowed(event, state): - # if the sender has been gdpr17ed, always return a redacted - # copy of the event. - if erased_senders[event.sender]: + Args: + store (DataStore) + server_name (str) + events (iterable[FrozenEvent]) + redact (bool): Whether to return a redacted version of the event, or + to filter them out entirely. + check_history_visibility_only (bool): Whether to only check the + history visibility, rather than things like if the sender has been + erased. This is used e.g. during pagination to decide whether to + backfill or not. + + Returns + Deferred[list[FrozenEvent]] + """ + + def is_sender_erased(event, erased_senders): + if erased_senders and erased_senders[event.sender]: logger.info( "Sender of %s has been erased, redacting", event.event_id, ) - return prune_event(event) - - # state will be None if we decided we didn't need to filter by - # room membership. - if not state: - return event + return True + return False + def check_event_is_visible(event, state): history = state.get((EventTypes.RoomHistoryVisibility, ''), None) if history: visibility = history.content.get("history_visibility", "shared") @@ -259,17 +267,17 @@ def filter_events_for_server(store, server_name, events): memtype = ev.membership if memtype == Membership.JOIN: - return event + return True elif memtype == Membership.INVITE: if visibility == "invited": - return event + return True else: # server has no users in the room: redact - return prune_event(event) + return False - return event + return True - # Next lets check to see if all the events have a history visibility + # Lets check to see if all the events have a history visibility # of "shared" or "world_readable". If thats the case then we don't # need to check membership (as we know the server is in the room). event_to_state_ids = yield store.get_state_ids_for_events( @@ -296,16 +304,31 @@ def filter_events_for_server(store, server_name, events): for e in itervalues(event_map) ) + if not check_history_visibility_only: + erased_senders = yield store.are_users_erased( + (e.sender for e in events), + ) + else: + # We don't want to check whether users are erased, which is equivalent + # to no users having been erased. + erased_senders = {} + if all_open: # all the history_visibility state affecting these events is open, so # we don't need to filter by membership state. We *do* need to check # for user erasure, though. if erased_senders: - events = [ - redact_disallowed(e, None) - for e in events - ] + to_return = [] + for e in events: + if not is_sender_erased(e, erased_senders): + to_return.append(e) + elif redact: + to_return.append(prune_event(e)) + + defer.returnValue(to_return) + # If there are no erased users then we can just return the given list + # of events without having to copy it. defer.returnValue(events) # Ok, so we're dealing with events that have non-trivial visibility @@ -361,7 +384,13 @@ def filter_events_for_server(store, server_name, events): for e_id, key_to_eid in iteritems(event_to_state_ids) } - defer.returnValue([ - redact_disallowed(e, event_to_state[e.event_id]) - for e in events - ]) + to_return = [] + for e in events: + erased = is_sender_erased(e, erased_senders) + visible = check_event_is_visible(e, event_to_state[e.event_id]) + if visible and not erased: + to_return.append(e) + elif redact: + to_return.append(prune_event(e)) + + defer.returnValue(to_return) diff --git a/tests/api/test_ratelimiting.py b/tests/api/test_ratelimiting.py index 8933fe3b72..30a255d441 100644 --- a/tests/api/test_ratelimiting.py +++ b/tests/api/test_ratelimiting.py @@ -6,34 +6,34 @@ from tests import unittest class TestRatelimiter(unittest.TestCase): def test_allowed(self): limiter = Ratelimiter() - allowed, time_allowed = limiter.send_message( - user_id="test_id", time_now_s=0, msg_rate_hz=0.1, burst_count=1 + allowed, time_allowed = limiter.can_do_action( + key="test_id", time_now_s=0, rate_hz=0.1, burst_count=1 ) self.assertTrue(allowed) self.assertEquals(10., time_allowed) - allowed, time_allowed = limiter.send_message( - user_id="test_id", time_now_s=5, msg_rate_hz=0.1, burst_count=1 + allowed, time_allowed = limiter.can_do_action( + key="test_id", time_now_s=5, rate_hz=0.1, burst_count=1 ) self.assertFalse(allowed) self.assertEquals(10., time_allowed) - allowed, time_allowed = limiter.send_message( - user_id="test_id", time_now_s=10, msg_rate_hz=0.1, burst_count=1 + allowed, time_allowed = limiter.can_do_action( + key="test_id", time_now_s=10, rate_hz=0.1, burst_count=1 ) self.assertTrue(allowed) self.assertEquals(20., time_allowed) def test_pruning(self): limiter = Ratelimiter() - allowed, time_allowed = limiter.send_message( - user_id="test_id_1", time_now_s=0, msg_rate_hz=0.1, burst_count=1 + allowed, time_allowed = limiter.can_do_action( + key="test_id_1", time_now_s=0, rate_hz=0.1, burst_count=1 ) self.assertIn("test_id_1", limiter.message_counts) - allowed, time_allowed = limiter.send_message( - user_id="test_id_2", time_now_s=10, msg_rate_hz=0.1, burst_count=1 + allowed, time_allowed = limiter.can_do_action( + key="test_id_2", time_now_s=10, rate_hz=0.1, burst_count=1 ) self.assertNotIn("test_id_1", limiter.message_counts) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 80da1c8954..d60c124eec 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -55,11 +55,11 @@ class ProfileTestCase(unittest.TestCase): federation_client=self.mock_federation, federation_server=Mock(), federation_registry=self.mock_registry, - ratelimiter=NonCallableMock(spec_set=["send_message"]), + ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) self.ratelimiter = hs.get_ratelimiter() - self.ratelimiter.send_message.return_value = (True, 0) + self.ratelimiter.can_do_action.return_value = (True, 0) self.store = hs.get_datastore() diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 36e136cded..13486930fb 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -24,13 +24,17 @@ from synapse.api.errors import AuthError from synapse.types import UserID from tests import unittest +from tests.utils import register_federation_servlets -from ..utils import ( - DeferredMockCallable, - MockClock, - MockHttpResource, - setup_test_homeserver, -) +# Some local users to test with +U_APPLE = UserID.from_string("@apple:test") +U_BANANA = UserID.from_string("@banana:test") + +# Remote user +U_ONION = UserID.from_string("@onion:farm") + +# Test room id +ROOM_ID = "a-room" def _expect_edu_transaction(edu_type, content, origin="test"): @@ -46,30 +50,21 @@ def _make_edu_transaction_json(edu_type, content): return json.dumps(_expect_edu_transaction(edu_type, content)).encode('utf8') -class TypingNotificationsTestCase(unittest.TestCase): - """Tests typing notifications to rooms.""" - - @defer.inlineCallbacks - def setUp(self): - self.clock = MockClock() +class TypingNotificationsTestCase(unittest.HomeserverTestCase): + servlets = [register_federation_servlets] - self.mock_http_client = Mock(spec=[]) - self.mock_http_client.put_json = DeferredMockCallable() + def make_homeserver(self, reactor, clock): + # we mock out the keyring so as to skip the authentication check on the + # federation API call. + mock_keyring = Mock(spec=["verify_json_for_server"]) + mock_keyring.verify_json_for_server.return_value = defer.succeed(True) - self.mock_federation_resource = MockHttpResource() - - mock_notifier = Mock() - self.on_new_event = mock_notifier.on_new_event + # we mock out the federation client too + mock_federation_client = Mock(spec=["put_json"]) + mock_federation_client.put_json.return_value = defer.succeed((200, "OK")) - self.auth = Mock(spec=[]) - self.state_handler = Mock() - - hs = yield setup_test_homeserver( - self.addCleanup, - "test", - auth=self.auth, - clock=self.clock, - datastore=Mock( + hs = self.setup_test_homeserver( + datastore=(Mock( spec=[ # Bits that Federation needs "prep_send_transaction", @@ -82,16 +77,21 @@ class TypingNotificationsTestCase(unittest.TestCase): "get_user_directory_stream_pos", "get_current_state_deltas", ] - ), - state_handler=self.state_handler, - handlers=Mock(), - notifier=mock_notifier, - resource_for_client=Mock(), - resource_for_federation=self.mock_federation_resource, - http_client=self.mock_http_client, - keyring=Mock(), + )), + notifier=Mock(), + http_client=mock_federation_client, + keyring=mock_keyring, ) + return hs + + def prepare(self, reactor, clock, hs): + # the tests assume that we are starting at unix time 1000 + reactor.pump((1000, )) + + mock_notifier = hs.get_notifier() + self.on_new_event = mock_notifier.on_new_event + self.handler = hs.get_typing_handler() self.event_source = hs.get_event_sources().sources["typing"] @@ -109,13 +109,12 @@ class TypingNotificationsTestCase(unittest.TestCase): self.datastore.get_received_txn_response = get_received_txn_response - self.room_id = "a-room" - self.room_members = [] def check_joined_room(room_id, user_id): if user_id not in [u.to_string() for u in self.room_members]: raise AuthError(401, "User is not in the room") + hs.get_auth().check_joined_room = check_joined_room def get_joined_hosts_for_room(room_id): return set(member.domain for member in self.room_members) @@ -124,8 +123,7 @@ class TypingNotificationsTestCase(unittest.TestCase): def get_current_user_in_room(room_id): return set(str(u) for u in self.room_members) - - self.state_handler.get_current_user_in_room = get_current_user_in_room + hs.get_state_handler().get_current_user_in_room = get_current_user_in_room self.datastore.get_user_directory_stream_pos.return_value = ( # we deliberately return a non-None stream pos to avoid doing an initial_spam @@ -134,230 +132,208 @@ class TypingNotificationsTestCase(unittest.TestCase): self.datastore.get_current_state_deltas.return_value = None - self.auth.check_joined_room = check_joined_room - self.datastore.get_to_device_stream_token = lambda: 0 self.datastore.get_new_device_msgs_for_remote = lambda *args, **kargs: ([], 0) self.datastore.delete_device_msgs_for_remote = lambda *args, **kargs: None - # Some local users to test with - self.u_apple = UserID.from_string("@apple:test") - self.u_banana = UserID.from_string("@banana:test") - - # Remote user - self.u_onion = UserID.from_string("@onion:farm") - - @defer.inlineCallbacks def test_started_typing_local(self): - self.room_members = [self.u_apple, self.u_banana] + self.room_members = [U_APPLE, U_BANANA] self.assertEquals(self.event_source.get_current_key(), 0) - yield self.handler.started_typing( - target_user=self.u_apple, - auth_user=self.u_apple, - room_id=self.room_id, + self.successResultOf(self.handler.started_typing( + target_user=U_APPLE, + auth_user=U_APPLE, + room_id=ROOM_ID, timeout=20000, - ) + )) self.on_new_event.assert_has_calls( - [call('typing_key', 1, rooms=[self.room_id])] + [call('typing_key', 1, rooms=[ROOM_ID])] ) self.assertEquals(self.event_source.get_current_key(), 1) - events = yield self.event_source.get_new_events( - room_ids=[self.room_id], from_key=0 + events = self.event_source.get_new_events( + room_ids=[ROOM_ID], from_key=0 ) self.assertEquals( events[0], [ { "type": "m.typing", - "room_id": self.room_id, - "content": {"user_ids": [self.u_apple.to_string()]}, + "room_id": ROOM_ID, + "content": {"user_ids": [U_APPLE.to_string()]}, } ], ) - @defer.inlineCallbacks def test_started_typing_remote_send(self): - self.room_members = [self.u_apple, self.u_onion] - - put_json = self.mock_http_client.put_json - put_json.expect_call_and_return( - call( - "farm", - path="/_matrix/federation/v1/send/1000000/", - data=_expect_edu_transaction( - "m.typing", - content={ - "room_id": self.room_id, - "user_id": self.u_apple.to_string(), - "typing": True, - }, - ), - json_data_callback=ANY, - long_retries=True, - backoff_on_404=True, - ), - defer.succeed((200, "OK")), - ) + self.room_members = [U_APPLE, U_ONION] - yield self.handler.started_typing( - target_user=self.u_apple, - auth_user=self.u_apple, - room_id=self.room_id, + self.successResultOf(self.handler.started_typing( + target_user=U_APPLE, + auth_user=U_APPLE, + room_id=ROOM_ID, timeout=20000, - ) + )) - yield put_json.await_calls() + put_json = self.hs.get_http_client().put_json + put_json.assert_called_once_with( + "farm", + path="/_matrix/federation/v1/send/1000000/", + data=_expect_edu_transaction( + "m.typing", + content={ + "room_id": ROOM_ID, + "user_id": U_APPLE.to_string(), + "typing": True, + }, + ), + json_data_callback=ANY, + long_retries=True, + backoff_on_404=True, + ) - @defer.inlineCallbacks def test_started_typing_remote_recv(self): - self.room_members = [self.u_apple, self.u_onion] + self.room_members = [U_APPLE, U_ONION] self.assertEquals(self.event_source.get_current_key(), 0) - (code, response) = yield self.mock_federation_resource.trigger( + (request, channel) = self.make_request( "PUT", "/_matrix/federation/v1/send/1000000/", _make_edu_transaction_json( "m.typing", content={ - "room_id": self.room_id, - "user_id": self.u_onion.to_string(), + "room_id": ROOM_ID, + "user_id": U_ONION.to_string(), "typing": True, }, ), federation_auth_origin=b'farm', ) + self.render(request) + self.assertEqual(channel.code, 200) self.on_new_event.assert_has_calls( - [call('typing_key', 1, rooms=[self.room_id])] + [call('typing_key', 1, rooms=[ROOM_ID])] ) self.assertEquals(self.event_source.get_current_key(), 1) - events = yield self.event_source.get_new_events( - room_ids=[self.room_id], from_key=0 + events = self.event_source.get_new_events( + room_ids=[ROOM_ID], from_key=0 ) self.assertEquals( events[0], [ { "type": "m.typing", - "room_id": self.room_id, - "content": {"user_ids": [self.u_onion.to_string()]}, + "room_id": ROOM_ID, + "content": {"user_ids": [U_ONION.to_string()]}, } ], ) - @defer.inlineCallbacks def test_stopped_typing(self): - self.room_members = [self.u_apple, self.u_banana, self.u_onion] - - put_json = self.mock_http_client.put_json - put_json.expect_call_and_return( - call( - "farm", - path="/_matrix/federation/v1/send/1000000/", - data=_expect_edu_transaction( - "m.typing", - content={ - "room_id": self.room_id, - "user_id": self.u_apple.to_string(), - "typing": False, - }, - ), - json_data_callback=ANY, - long_retries=True, - backoff_on_404=True, - ), - defer.succeed((200, "OK")), - ) + self.room_members = [U_APPLE, U_BANANA, U_ONION] # Gut-wrenching from synapse.handlers.typing import RoomMember - member = RoomMember(self.room_id, self.u_apple.to_string()) + member = RoomMember(ROOM_ID, U_APPLE.to_string()) self.handler._member_typing_until[member] = 1002000 - self.handler._room_typing[self.room_id] = set([self.u_apple.to_string()]) + self.handler._room_typing[ROOM_ID] = set([U_APPLE.to_string()]) self.assertEquals(self.event_source.get_current_key(), 0) - yield self.handler.stopped_typing( - target_user=self.u_apple, auth_user=self.u_apple, room_id=self.room_id - ) + self.successResultOf(self.handler.stopped_typing( + target_user=U_APPLE, auth_user=U_APPLE, room_id=ROOM_ID + )) self.on_new_event.assert_has_calls( - [call('typing_key', 1, rooms=[self.room_id])] + [call('typing_key', 1, rooms=[ROOM_ID])] ) - yield put_json.await_calls() + put_json = self.hs.get_http_client().put_json + put_json.assert_called_once_with( + "farm", + path="/_matrix/federation/v1/send/1000000/", + data=_expect_edu_transaction( + "m.typing", + content={ + "room_id": ROOM_ID, + "user_id": U_APPLE.to_string(), + "typing": False, + }, + ), + json_data_callback=ANY, + long_retries=True, + backoff_on_404=True, + ) self.assertEquals(self.event_source.get_current_key(), 1) - events = yield self.event_source.get_new_events( - room_ids=[self.room_id], from_key=0 + events = self.event_source.get_new_events( + room_ids=[ROOM_ID], from_key=0 ) self.assertEquals( events[0], [ { "type": "m.typing", - "room_id": self.room_id, + "room_id": ROOM_ID, "content": {"user_ids": []}, } ], ) - @defer.inlineCallbacks def test_typing_timeout(self): - self.room_members = [self.u_apple, self.u_banana] + self.room_members = [U_APPLE, U_BANANA] self.assertEquals(self.event_source.get_current_key(), 0) - yield self.handler.started_typing( - target_user=self.u_apple, - auth_user=self.u_apple, - room_id=self.room_id, + self.successResultOf(self.handler.started_typing( + target_user=U_APPLE, + auth_user=U_APPLE, + room_id=ROOM_ID, timeout=10000, - ) + )) self.on_new_event.assert_has_calls( - [call('typing_key', 1, rooms=[self.room_id])] + [call('typing_key', 1, rooms=[ROOM_ID])] ) self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 1) - events = yield self.event_source.get_new_events( - room_ids=[self.room_id], from_key=0 + events = self.event_source.get_new_events( + room_ids=[ROOM_ID], from_key=0 ) self.assertEquals( events[0], [ { "type": "m.typing", - "room_id": self.room_id, - "content": {"user_ids": [self.u_apple.to_string()]}, + "room_id": ROOM_ID, + "content": {"user_ids": [U_APPLE.to_string()]}, } ], ) - self.clock.advance_time(16) + self.reactor.pump([16, ]) self.on_new_event.assert_has_calls( - [call('typing_key', 2, rooms=[self.room_id])] + [call('typing_key', 2, rooms=[ROOM_ID])] ) self.assertEquals(self.event_source.get_current_key(), 2) - events = yield self.event_source.get_new_events( - room_ids=[self.room_id], from_key=1 + events = self.event_source.get_new_events( + room_ids=[ROOM_ID], from_key=1 ) self.assertEquals( events[0], [ { "type": "m.typing", - "room_id": self.room_id, + "room_id": ROOM_ID, "content": {"user_ids": []}, } ], @@ -365,29 +341,29 @@ class TypingNotificationsTestCase(unittest.TestCase): # SYN-230 - see if we can still set after timeout - yield self.handler.started_typing( - target_user=self.u_apple, - auth_user=self.u_apple, - room_id=self.room_id, + self.successResultOf(self.handler.started_typing( + target_user=U_APPLE, + auth_user=U_APPLE, + room_id=ROOM_ID, timeout=10000, - ) + )) self.on_new_event.assert_has_calls( - [call('typing_key', 3, rooms=[self.room_id])] + [call('typing_key', 3, rooms=[ROOM_ID])] ) self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 3) - events = yield self.event_source.get_new_events( - room_ids=[self.room_id], from_key=0 + events = self.event_source.get_new_events( + room_ids=[ROOM_ID], from_key=0 ) self.assertEquals( events[0], [ { "type": "m.typing", - "room_id": self.room_id, - "content": {"user_ids": [self.u_apple.to_string()]}, + "room_id": ROOM_ID, + "content": {"user_ids": [U_APPLE.to_string()]}, } ], ) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 9e9fbbfe93..524af4f8d1 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -31,10 +31,10 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): hs = self.setup_test_homeserver( "blue", federation_client=Mock(), - ratelimiter=NonCallableMock(spec_set=["send_message"]), + ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) - hs.get_ratelimiter().send_message.return_value = (True, 0) + hs.get_ratelimiter().can_do_action.return_value = (True, 0) return hs diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index 483bebc832..36d8547275 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -40,10 +40,10 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase): config.auto_join_rooms = [] hs = self.setup_test_homeserver( - config=config, ratelimiter=NonCallableMock(spec_set=["send_message"]) + config=config, ratelimiter=NonCallableMock(spec_set=["can_do_action"]) ) self.ratelimiter = hs.get_ratelimiter() - self.ratelimiter.send_message.return_value = (True, 0) + self.ratelimiter.can_do_action.return_value = (True, 0) hs.get_handlers().federation_handler = Mock() diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index a824be9a62..015c144248 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -41,10 +41,10 @@ class RoomBase(unittest.HomeserverTestCase): "red", http_client=None, federation_client=Mock(), - ratelimiter=NonCallableMock(spec_set=["send_message"]), + ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) self.ratelimiter = self.hs.get_ratelimiter() - self.ratelimiter.send_message.return_value = (True, 0) + self.ratelimiter.can_do_action.return_value = (True, 0) self.hs.get_federation_handler = Mock(return_value=Mock()) @@ -96,7 +96,7 @@ class RoomPermissionsTestCase(RoomBase): # auth as user_id now self.helper.auth_user_id = self.user_id - def test_send_message(self): + def test_can_do_action(self): msg_content = b'{"msgtype":"m.text","body":"hello"}' seq = iter(range(100)) diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 0ad814c5e5..30fb77bac8 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -42,13 +42,13 @@ class RoomTypingTestCase(unittest.HomeserverTestCase): "red", http_client=None, federation_client=Mock(), - ratelimiter=NonCallableMock(spec_set=["send_message"]), + ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) self.event_source = hs.get_event_sources().sources["typing"] self.ratelimiter = hs.get_ratelimiter() - self.ratelimiter.send_message.return_value = (True, 0) + self.ratelimiter.can_do_action.return_value = (True, 0) hs.get_handlers().federation_handler = Mock() diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 906b348d3e..3600434858 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -130,3 +130,51 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase): self.assertEquals(channel.result["code"], b"403", channel.result) self.assertEquals(channel.json_body["error"], "Guest access is disabled") + + def test_POST_ratelimiting_guest(self): + self.hs.config.rc_registration_request_burst_count = 5 + + for i in range(0, 6): + url = self.url + b"?kind=guest" + request, channel = self.make_request(b"POST", url, b"{}") + self.render(request) + + if i == 5: + self.assertEquals(channel.result["code"], b"429", channel.result) + retry_after_ms = int(channel.json_body["retry_after_ms"]) + else: + self.assertEquals(channel.result["code"], b"200", channel.result) + + self.reactor.advance(retry_after_ms / 1000.) + + request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}") + self.render(request) + + self.assertEquals(channel.result["code"], b"200", channel.result) + + def test_POST_ratelimiting(self): + self.hs.config.rc_registration_request_burst_count = 5 + + for i in range(0, 6): + params = { + "username": "kermit" + str(i), + "password": "monkey", + "device_id": "frogfone", + "auth": {"type": LoginType.DUMMY}, + } + request_data = json.dumps(params) + request, channel = self.make_request(b"POST", self.url, request_data) + self.render(request) + + if i == 5: + self.assertEquals(channel.result["code"], b"429", channel.result) + retry_after_ms = int(channel.json_body["retry_after_ms"]) + else: + self.assertEquals(channel.result["code"], b"200", channel.result) + + self.reactor.advance(retry_after_ms / 1000.) + + request, channel = self.make_request(b"POST", self.url + b"?kind=guest", b"{}") + self.render(request) + + self.assertEquals(channel.result["code"], b"200", channel.result) diff --git a/tests/server.py b/tests/server.py index fc1e76d146..37069afdda 100644 --- a/tests/server.py +++ b/tests/server.py @@ -137,6 +137,7 @@ def make_request( access_token=None, request=SynapseRequest, shorthand=True, + federation_auth_origin=None, ): """ Make a web request using the given method and path, feed it the @@ -150,9 +151,11 @@ def make_request( a dict. shorthand: Whether to try and be helpful and prefix the given URL with the usual REST API path, if it doesn't contain it. + federation_auth_origin (bytes|None): if set to not-None, we will add a fake + Authorization header pretenting to be the given server name. Returns: - A synapse.http.site.SynapseRequest. + Tuple[synapse.http.site.SynapseRequest, channel] """ if not isinstance(method, bytes): method = method.encode('ascii') @@ -184,6 +187,11 @@ def make_request( b"Authorization", b"Bearer " + access_token.encode('ascii') ) + if federation_auth_origin is not None: + req.requestHeaders.addRawHeader( + b"Authorization", b"X-Matrix origin=%s,key=,sig=" % (federation_auth_origin,) + ) + if content: req.requestHeaders.addRawHeader(b"Content-Type", b"application/json") @@ -288,9 +296,6 @@ def setup_test_homeserver(cleanup_func, *args, **kwargs): **kwargs ) - pool.runWithConnection = runWithConnection - pool.runInteraction = runInteraction - class ThreadPool: """ Threadless thread pool. @@ -316,8 +321,12 @@ def setup_test_homeserver(cleanup_func, *args, **kwargs): return d clock.threadpool = ThreadPool() - pool.threadpool = ThreadPool() - pool.running = True + + if pool: + pool.runWithConnection = runWithConnection + pool.runInteraction = runInteraction + pool.threadpool = ThreadPool() + pool.running = True return d diff --git a/tests/test_mau.py b/tests/test_mau.py index 04f95c942f..00be1a8c21 100644 --- a/tests/test_mau.py +++ b/tests/test_mau.py @@ -17,7 +17,7 @@ import json -from mock import Mock, NonCallableMock +from mock import Mock from synapse.api.constants import LoginType from synapse.api.errors import Codes, HttpResponseException, SynapseError @@ -36,7 +36,6 @@ class TestMauLimit(unittest.HomeserverTestCase): "red", http_client=None, federation_client=Mock(), - ratelimiter=NonCallableMock(spec_set=["send_message"]), ) self.store = self.hs.get_datastore() diff --git a/tests/unittest.py b/tests/unittest.py index fac254ff10..ef31321bc8 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -262,6 +262,7 @@ class HomeserverTestCase(TestCase): access_token=None, request=SynapseRequest, shorthand=True, + federation_auth_origin=None, ): """ Create a SynapseRequest at the path using the method and containing the @@ -275,15 +276,18 @@ class HomeserverTestCase(TestCase): a dict. shorthand: Whether to try and be helpful and prefix the given URL with the usual REST API path, if it doesn't contain it. + federation_auth_origin (bytes|None): if set to not-None, we will add a fake + Authorization header pretenting to be the given server name. Returns: - A synapse.http.site.SynapseRequest. + Tuple[synapse.http.site.SynapseRequest, channel] """ if isinstance(content, dict): content = json.dumps(content).encode('utf8') return make_request( - self.reactor, method, path, content, access_token, request, shorthand + self.reactor, method, path, content, access_token, request, shorthand, + federation_auth_origin, ) def render(self, request): diff --git a/tests/utils.py b/tests/utils.py index cf49833a43..e4c42f9fa8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -29,7 +29,7 @@ from twisted.internet import defer, reactor from synapse.api.constants import EventTypes, RoomVersions from synapse.api.errors import CodeMessageException, cs_error from synapse.config.server import ServerConfig -from synapse.federation.transport import server +from synapse.federation.transport import server as federation_server from synapse.http.server import HttpServer from synapse.server import HomeServer from synapse.storage import DataStore @@ -150,6 +150,8 @@ def default_config(name): config.admin_contact = None config.rc_messages_per_second = 10000 config.rc_message_burst_count = 10000 + config.rc_registration_request_burst_count = 3.0 + config.rc_registration_requests_per_second = 0.17 config.saml2_enabled = False config.public_baseurl = None config.default_identity_server = None @@ -200,6 +202,9 @@ def setup_test_homeserver( Args: cleanup_func : The function used to register a cleanup routine for after the test. + + Calling this method directly is deprecated: you should instead derive from + HomeserverTestCase. """ if reactor is None: from twisted.internet import reactor @@ -351,23 +356,27 @@ def setup_test_homeserver( fed = kargs.get("resource_for_federation", None) if fed: - server.register_servlets( - hs, - resource=fed, - authenticator=server.Authenticator(hs), - ratelimiter=FederationRateLimiter( - hs.get_clock(), - window_size=hs.config.federation_rc_window_size, - sleep_limit=hs.config.federation_rc_sleep_limit, - sleep_msec=hs.config.federation_rc_sleep_delay, - reject_limit=hs.config.federation_rc_reject_limit, - concurrent_requests=hs.config.federation_rc_concurrent, - ), - ) + register_federation_servlets(hs, fed) defer.returnValue(hs) +def register_federation_servlets(hs, resource): + federation_server.register_servlets( + hs, + resource=resource, + authenticator=federation_server.Authenticator(hs), + ratelimiter=FederationRateLimiter( + hs.get_clock(), + window_size=hs.config.federation_rc_window_size, + sleep_limit=hs.config.federation_rc_sleep_limit, + sleep_msec=hs.config.federation_rc_sleep_delay, + reject_limit=hs.config.federation_rc_reject_limit, + concurrent_requests=hs.config.federation_rc_concurrent, + ), + ) + + def get_mock_call_args(pattern_func, mock_func): """ Return the arguments the mock function was called with interpreted by the pattern functions argument list. diff --git a/tox.ini b/tox.ini index 14437e7334..19080a648f 100644 --- a/tox.ini +++ b/tox.ini @@ -118,6 +118,9 @@ commands = python -m towncrier.check --compare-with=origin/develop basepython = python3.6 +[testenv:check-sampleconfig] +commands = {toxinidir}/scripts-dev/generate_sample_config --check + [testenv:codecov] skip_install = True deps = |