diff --git a/.gitignore b/.gitignore
index a20f3e615d..a84c41b0c9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,11 +12,15 @@ _trial_temp/
_trial_temp*/
# stuff that is likely to exist when you run a server locally
+/*.db
+/*.log
+/*.log.config
+/*.pid
/*.signing.key
-/*.tls.crt
-/*.tls.key
-/uploads
+/env/
+/homeserver*.yaml
/media_store/
+/uploads
# IDEs
/.idea/
diff --git a/changelog.d/4662.misc b/changelog.d/4662.misc
new file mode 100644
index 0000000000..f4ec0d6a68
--- /dev/null
+++ b/changelog.d/4662.misc
@@ -0,0 +1 @@
+Add a systemd setup that supports synapse workers. Contributed by Luca Corbatto.
diff --git a/changelog.d/4821.feature b/changelog.d/4821.feature
new file mode 100644
index 0000000000..61d4eb8d60
--- /dev/null
+++ b/changelog.d/4821.feature
@@ -0,0 +1 @@
+Add configurable rate limiting to the /login endpoint.
diff --git a/changelog.d/4843.misc b/changelog.d/4843.misc
new file mode 100644
index 0000000000..03d0a3e2e7
--- /dev/null
+++ b/changelog.d/4843.misc
@@ -0,0 +1 @@
+Add stuff back to the .gitignore.
diff --git a/changelog.d/4852.misc b/changelog.d/4852.misc
new file mode 100644
index 0000000000..76ab1e34e7
--- /dev/null
+++ b/changelog.d/4852.misc
@@ -0,0 +1 @@
+ Move client read-receipt processing to federation sender worker.
\ No newline at end of file
diff --git a/changelog.d/4853.feature b/changelog.d/4853.feature
new file mode 100644
index 0000000000..360f92e1de
--- /dev/null
+++ b/changelog.d/4853.feature
@@ -0,0 +1 @@
+Allow passing --daemonize flags to workers in the same way as with master.
diff --git a/changelog.d/4855.misc b/changelog.d/4855.misc
new file mode 100644
index 0000000000..c4906d2f56
--- /dev/null
+++ b/changelog.d/4855.misc
@@ -0,0 +1 @@
+Refactor federation TransactionQueue.
\ No newline at end of file
diff --git a/changelog.d/4863.misc b/changelog.d/4863.misc
new file mode 100644
index 0000000000..bfe03cbedc
--- /dev/null
+++ b/changelog.d/4863.misc
@@ -0,0 +1 @@
+Comment out most options in the generated config.
diff --git a/changelog.d/4864.feature b/changelog.d/4864.feature
new file mode 100644
index 0000000000..57927f2620
--- /dev/null
+++ b/changelog.d/4864.feature
@@ -0,0 +1 @@
+The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.
\ No newline at end of file
diff --git a/changelog.d/4865.feature b/changelog.d/4865.feature
new file mode 100644
index 0000000000..61d4eb8d60
--- /dev/null
+++ b/changelog.d/4865.feature
@@ -0,0 +1 @@
+Add configurable rate limiting to the /login endpoint.
diff --git a/changelog.d/4881.misc b/changelog.d/4881.misc
new file mode 100644
index 0000000000..308c21c839
--- /dev/null
+++ b/changelog.d/4881.misc
@@ -0,0 +1 @@
+Update link to federation docs.
diff --git a/changelog.d/4886.bugfix b/changelog.d/4886.bugfix
new file mode 100644
index 0000000000..b17aa92485
--- /dev/null
+++ b/changelog.d/4886.bugfix
@@ -0,0 +1 @@
+fix test_auto_create_auto_join_where_no_consent.
diff --git a/changelog.d/4886.misc b/changelog.d/4886.misc
new file mode 100644
index 0000000000..b17aa92485
--- /dev/null
+++ b/changelog.d/4886.misc
@@ -0,0 +1 @@
+fix test_auto_create_auto_join_where_no_consent.
diff --git a/changelog.d/4887.feature b/changelog.d/4887.feature
new file mode 100644
index 0000000000..e7ff0b9297
--- /dev/null
+++ b/changelog.d/4887.feature
@@ -0,0 +1 @@
+The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.
diff --git a/contrib/systemd-with-workers/README.md b/contrib/systemd-with-workers/README.md
new file mode 100644
index 0000000000..74b261e9fb
--- /dev/null
+++ b/contrib/systemd-with-workers/README.md
@@ -0,0 +1,150 @@
+# Setup Synapse with Workers and Systemd
+
+This is a setup for managing synapse with systemd including support for
+managing workers. It provides a `matrix-synapse`, as well as a
+`matrix-synapse-worker@` service for any workers you require. Additionally to
+group the required services it sets up a `matrix.target`. You can use this to
+automatically start any bot- or bridge-services. More on this in
+[Bots and Bridges](#bots-and-bridges).
+
+See the folder [system](system) for any service and target files.
+
+The folder [workers](workers) contains an example configuration for the
+`federation_reader` worker. Pay special attention to the name of the
+configuration file. In order to work with the `matrix-synapse-worker@.service`
+service, it needs to have the exact same name as the worker app.
+
+This setup expects neither the homeserver nor any workers to fork. Forking is
+handled by systemd.
+
+## Setup
+
+1. Adjust your matrix configs. Make sure that the worker config files have the
+exact same name as the worker app. Compare `matrix-synapse-worker@.service` for
+why. You can find an example worker config in the [workers](workers) folder. See
+below for relevant settings in the `homeserver.yaml`.
+2. Copy the `*.service` and `*.target` files in [system](system) to
+`/etc/systemd/system`.
+3. `systemctl enable matrix-synapse.service` this adds the homeserver
+app to the `matrix.target`
+4. *Optional.* `systemctl enable
+matrix-synapse-worker@federation_reader.service` this adds the federation_reader
+app to the `matrix-synapse.service`
+5. *Optional.* Repeat step 4 for any additional workers you require.
+6. *Optional.* Add any bots or bridges by enabling them.
+7. Start all matrix related services via `systemctl start matrix.target`
+8. *Optional.* Enable autostart of all matrix related services on system boot
+via `systemctl enable matrix.target`
+
+## Usage
+
+After you have setup you can use the following commands to manage your synapse
+installation:
+
+```
+# Start matrix-synapse, all workers and any enabled bots or bridges.
+systemctl start matrix.target
+
+# Restart matrix-synapse and all workers (not necessarily restarting bots
+# or bridges, see "Bots and Bridges")
+systemctl restart matrix-synapse.service
+
+# Stop matrix-synapse and all workers (not necessarily restarting bots
+# or bridges, see "Bots and Bridges")
+systemctl stop matrix-synapse.service
+
+# Restart a specific worker (i. e. federation_reader), the homeserver is
+# unaffected by this.
+systemctl restart matrix-synapse-worker@federation_reader.service
+
+# Add a new worker (assuming all configs are setup already)
+systemctl enable matrix-synapse-worker@federation_writer.service
+systemctl restart matrix-synapse.service
+```
+
+## The Configs
+
+Make sure the `worker_app` is set in the `homeserver.yaml` and it does not fork.
+
+```
+worker_app: synapse.app.homeserver
+daemonize: false
+```
+
+None of the workers should fork, as forking is handled by systemd. Hence make
+sure this is present in all worker config files.
+
+```
+worker_daemonize: false
+```
+
+The config files of all workers are expected to be located in
+`/etc/matrix-synapse/workers`. If you want to use a different location you have
+to edit the provided `*.service` files accordingly.
+
+## Bots and Bridges
+
+Most bots and bridges do not care if the homeserver goes down or is restarted.
+Depending on the implementation this may crash them though. So look up the docs
+or ask the community of the specific bridge or bot you want to run to make sure
+you choose the correct setup.
+
+Whichever configuration you choose, after the setup the following will enable
+automatically starting (and potentially restarting) your bot/bridge with the
+`matrix.target`.
+
+```
+systemctl enable <yourBotOrBridgeName>.service
+```
+
+**Note** that from an inactive synapse the bots/bridges will only be started with
+synapse if you start the `matrix.target`, not if you start the
+`matrix-synapse.service`. This is on purpose. Think of `matrix-synapse.service`
+as *just* synapse, but `matrix.target` being anything matrix related, including
+synapse and any and all enabled bots and bridges.
+
+### Start with synapse but ignore synapse going down
+
+If the bridge can handle shutdowns of the homeserver you'll want to install the
+service in the `matrix.target` and optionally add a
+`After=matrix-synapse.service` dependency to have the bot/bridge start after
+synapse on starting everything.
+
+In this case the service file should look like this.
+
+```
+[Unit]
+# ...
+# Optional, this will only ensure that if you start everything, synapse will
+# be started before the bot/bridge will be started.
+After=matrix-synapse.service
+
+[Service]
+# ...
+
+[Install]
+WantedBy=matrix.target
+```
+
+### Stop/restart when synapse stops/restarts
+
+If the bridge can't handle shutdowns of the homeserver you'll still want to
+install the service in the `matrix.target` but also have to specify the
+`After=matrix-synapse.service` *and* `BindsTo=matrix-synapse.service`
+dependencies to have the bot/bridge stop/restart with synapse.
+
+In this case the service file should look like this.
+
+```
+[Unit]
+# ...
+# Mandatory
+After=matrix-synapse.service
+BindsTo=matrix-synapse.service
+
+[Service]
+# ...
+
+[Install]
+WantedBy=matrix.target
+```
diff --git a/contrib/systemd-with-workers/system/matrix-synapse-worker@.service b/contrib/systemd-with-workers/system/matrix-synapse-worker@.service
new file mode 100644
index 0000000000..912984b9d2
--- /dev/null
+++ b/contrib/systemd-with-workers/system/matrix-synapse-worker@.service
@@ -0,0 +1,17 @@
+[Unit]
+Description=Synapse Matrix Worker
+After=matrix-synapse.service
+BindsTo=matrix-synapse.service
+
+[Service]
+Type=simple
+User=matrix-synapse
+WorkingDirectory=/var/lib/matrix-synapse
+EnvironmentFile=/etc/default/matrix-synapse
+ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.%i --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml
+ExecReload=/bin/kill -HUP $MAINPID
+Restart=always
+RestartSec=3
+
+[Install]
+WantedBy=matrix-synapse.service
diff --git a/contrib/systemd-with-workers/system/matrix-synapse.service b/contrib/systemd-with-workers/system/matrix-synapse.service
new file mode 100644
index 0000000000..8bb4e400dc
--- /dev/null
+++ b/contrib/systemd-with-workers/system/matrix-synapse.service
@@ -0,0 +1,16 @@
+[Unit]
+Description=Synapse Matrix Homeserver
+
+[Service]
+Type=simple
+User=matrix-synapse
+WorkingDirectory=/var/lib/matrix-synapse
+EnvironmentFile=/etc/default/matrix-synapse
+ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
+ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
+ExecReload=/bin/kill -HUP $MAINPID
+Restart=always
+RestartSec=3
+
+[Install]
+WantedBy=matrix.target
diff --git a/contrib/systemd-with-workers/system/matrix.target b/contrib/systemd-with-workers/system/matrix.target
new file mode 100644
index 0000000000..aff97d03ef
--- /dev/null
+++ b/contrib/systemd-with-workers/system/matrix.target
@@ -0,0 +1,7 @@
+[Unit]
+Description=Contains matrix services like synapse, bridges and bots
+After=network.target
+AllowIsolate=no
+
+[Install]
+WantedBy=multi-user.target
diff --git a/contrib/systemd-with-workers/workers/federation_reader.yaml b/contrib/systemd-with-workers/workers/federation_reader.yaml
new file mode 100644
index 0000000000..47c54ec0d4
--- /dev/null
+++ b/contrib/systemd-with-workers/workers/federation_reader.yaml
@@ -0,0 +1,14 @@
+worker_app: synapse.app.federation_reader
+
+worker_replication_host: 127.0.0.1
+worker_replication_port: 9092
+worker_replication_http_port: 9093
+
+worker_listeners:
+ - type: http
+ port: 8011
+ resources:
+ - names: [federation]
+
+worker_daemonize: false
+worker_log_config: /etc/matrix-synapse/federation-reader-log.yaml
diff --git a/docs/federate.md b/docs/federate.md
index 186245a94b..b7fc09661c 100644
--- a/docs/federate.md
+++ b/docs/federate.md
@@ -15,8 +15,8 @@ machine's public DNS hostname, and provide Synapse with a TLS certificate
which is valid for your ``server_name``.
Once you have completed the steps necessary to federate, you should be able to
-join a room via federation. (A good place to start is ``#synapse:matrix.org``
-- a room for Synapse admins.)
+join a room via federation. (A good place to start is ``#synapse:matrix.org`` - a
+room for Synapse admins.)
## Delegation
@@ -89,7 +89,6 @@ In our example, we would need to add this SRV record in the
_matrix._tcp.example.com. 3600 IN SRV 10 5 443 synapse.example.com.
-
Once done and set up, you can check the DNS record with ``dig -t srv
_matrix._tcp.<server_name>``. In our example, we would expect this:
@@ -117,7 +116,6 @@ you invite them to. This can be caused by an incorrectly-configured reverse
proxy: see [reverse_proxy.rst](<reverse_proxy.rst>) for instructions on how to correctly
configure a reverse proxy.
-
## Running a Demo Federation of Synapses
If you want to get up and running quickly with a trio of homeservers in a
diff --git a/docs/reverse_proxy.rst b/docs/reverse_proxy.rst
index 6cd129abf4..8e26c50f1b 100644
--- a/docs/reverse_proxy.rst
+++ b/docs/reverse_proxy.rst
@@ -18,7 +18,7 @@ servers do not necessarily need to connect to your server via the same server
name or port. Indeed, clients will use port 443 by default, whereas servers
default to port 8448. Where these are different, we refer to the 'client port'
and the 'federation port'. See `Setting up federation
-<../README.rst#setting-up-federation>`_ for more details of the algorithm used for
+<federate.md>`_ for more details of the algorithm used for
federation connections.
Let's assume that we expect clients to connect to our server at
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 5f2534e465..f9886a900d 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -63,11 +63,11 @@ pid_file: DATADIR/homeserver.pid
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
#
-soft_file_limit: 0
+#soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
#
-use_presence: true
+#use_presence: false
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
#
@@ -359,7 +359,8 @@ database:
database: "DATADIR/homeserver.db"
# Number of events to cache in memory.
-event_cache_size: "10K"
+#
+#event_cache_size: 10K
## Logging ##
@@ -373,46 +374,69 @@ log_config: "CONFDIR/SERVERNAME.log.config"
# Number of messages a client can send per second
#
-rc_messages_per_second: 0.2
+#rc_messages_per_second: 0.2
# Number of message a client can send before being throttled
#
-rc_message_burst_count: 10.0
+#rc_message_burst_count: 10.0
+
+# Ratelimiting settings for registration and login.
+#
+# Each ratelimiting configuration is made of two parameters:
+# - per_second: number of requests a client can send per second.
+# - burst_count: number of requests a client can send before being throttled.
+#
+# Synapse currently uses the following configurations:
+# - one for registration that ratelimits registration requests based on the
+# client's IP address.
+# - one for login that ratelimits login requests based on the client's IP
+# address.
+# - one for login that ratelimits login requests based on the account the
+# client is attempting to log into.
+# - one for login that ratelimits login requests based on the account the
+# client is attempting to log into, based on the amount of failed login
+# attempts for this account.
+#
+# The defaults are as shown below.
+#
+#rc_registration:
+# per_second: 0.17
+# burst_count: 3
+#
+#rc_login:
+# address:
+# per_second: 0.17
+# burst_count: 3
+# account:
+# per_second: 0.17
+# burst_count: 3
+# failed_attempts:
+# per_second: 0.17
+# burst_count: 3
# The federation window size in milliseconds
#
-federation_rc_window_size: 1000
+#federation_rc_window_size: 1000
# The number of federation requests from a single server in a window
# before the server will delay processing the request.
#
-federation_rc_sleep_limit: 10
+#federation_rc_sleep_limit: 10
# The duration in milliseconds to delay processing events from
# remote servers by if they go over the sleep limit.
#
-federation_rc_sleep_delay: 500
+#federation_rc_sleep_delay: 500
# The maximum number of concurrent federation requests allowed
# from a single server
#
-federation_rc_reject_limit: 50
+#federation_rc_reject_limit: 50
# The number of federation requests to concurrently process from a
# single server
#
-federation_rc_concurrent: 3
-
-# Number of registration requests a client can send per second.
-# Defaults to 1/minute (0.17).
-#
-#rc_registration_requests_per_second: 0.17
-
-# Number of registration requests a client can send before being
-# throttled.
-# Defaults to 3.
-#
-#rc_registration_request_burst_count: 3.0
+#federation_rc_concurrent: 3
@@ -441,11 +465,11 @@ uploads_path: "DATADIR/uploads"
# The largest allowed upload size in bytes
#
-max_upload_size: "10M"
+#max_upload_size: 10M
# Maximum number of pixels that will be thumbnailed
#
-max_image_pixels: "32M"
+#max_image_pixels: 32M
# Whether to generate new thumbnails on the fly to precisely match
# the resolution requested by the client. If true then whenever
@@ -453,32 +477,32 @@ max_image_pixels: "32M"
# generate a new thumbnail. If false the server will pick a thumbnail
# from a precalculated list.
#
-dynamic_thumbnails: false
+#dynamic_thumbnails: false
# List of thumbnails to precalculate when an image is uploaded.
#
-thumbnail_sizes:
-- width: 32
- height: 32
- method: crop
-- width: 96
- height: 96
- method: crop
-- width: 320
- height: 240
- method: scale
-- width: 640
- height: 480
- method: scale
-- width: 800
- height: 600
- method: scale
+#thumbnail_sizes:
+# - width: 32
+# height: 32
+# method: crop
+# - width: 96
+# height: 96
+# method: crop
+# - width: 320
+# height: 240
+# method: scale
+# - width: 640
+# height: 480
+# method: scale
+# - width: 800
+# height: 600
+# method: scale
# Is the preview URL API enabled? If enabled, you *must* specify
# an explicit url_preview_ip_range_blacklist of IPs that the spider is
# denied from accessing.
#
-url_preview_enabled: False
+#url_preview_enabled: false
# List of IP address CIDR ranges that the URL preview spider is denied
# from accessing. There are no defaults: you must explicitly
@@ -543,8 +567,8 @@ url_preview_enabled: False
# - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$'
# The largest allowed URL preview spidering size in bytes
-max_spider_size: "10M"
-
+#
+#max_spider_size: 10M
## Captcha ##
@@ -552,23 +576,25 @@ max_spider_size: "10M"
# This Home Server's ReCAPTCHA public key.
#
-recaptcha_public_key: "YOUR_PUBLIC_KEY"
+#recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key.
#
-recaptcha_private_key: "YOUR_PRIVATE_KEY"
+#recaptcha_private_key: "YOUR_PRIVATE_KEY"
# Enables ReCaptcha checks when registering, preventing signup
# unless a captcha is answered. Requires a valid ReCaptcha
# public/private key.
#
-enable_registration_captcha: False
+#enable_registration_captcha: false
# A secret key used to bypass the captcha test entirely.
+#
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
-recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
+#
+#recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
## TURN ##
@@ -589,7 +615,7 @@ recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
# How long generated TURN credentials last
#
-turn_user_lifetime: "1h"
+#turn_user_lifetime: 1h
# Whether guests should be allowed to use the TURN server.
# This defaults to True, otherwise VoIP will be unreliable for guests.
@@ -597,15 +623,17 @@ turn_user_lifetime: "1h"
# connect to arbitrary endpoints without having first signed up for a
# valid account (e.g. by passing a CAPTCHA).
#
-turn_allow_guests: True
+#turn_allow_guests: True
## Registration ##
+#
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users.
-enable_registration: False
+#
+#enable_registration: false
# The user must provide all of the below types of 3PID when registering.
#
@@ -616,7 +644,7 @@ enable_registration: False
# Explicitly disable asking for MSISDNs from the registration
# flow (overrides registrations_require_3pid if MSISDNs are set as required)
#
-#disable_msisdn_registration: True
+#disable_msisdn_registration: true
# Mandate that users are only allowed to associate certain formats of
# 3PIDs with accounts on this server.
@@ -640,13 +668,13 @@ enable_registration: False
# N.B. that increasing this will exponentially increase the time required
# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
#
-bcrypt_rounds: 12
+#bcrypt_rounds: 12
# Allows users to register as guests without a password/email/etc, and
# participate in rooms hosted on this server which have been made
# accessible to anonymous users.
#
-allow_guest_access: False
+#allow_guest_access: false
# The identity server which we suggest that clients should use when users log
# in on this server.
@@ -662,9 +690,9 @@ allow_guest_access: False
# Also defines the ID server which will be called when an account is
# deactivated (one will be picked arbitrarily).
#
-trusted_third_party_id_servers:
- - matrix.org
- - vector.im
+#trusted_third_party_id_servers:
+# - matrix.org
+# - vector.im
# Users who register on this homeserver will automatically be joined
# to these rooms
@@ -678,14 +706,14 @@ trusted_third_party_id_servers:
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
-autocreate_auto_join_rooms: true
+#autocreate_auto_join_rooms: true
## Metrics ###
# Enable collection and rendering of performance metrics
#
-enable_metrics: False
+#enable_metrics: False
# Enable sentry integration
# NOTE: While attempts are made to ensure that the logs don't contain
@@ -705,22 +733,24 @@ enable_metrics: False
# A list of event types that will be included in the room_invite_state
#
-room_invite_state_types:
- - "m.room.join_rules"
- - "m.room.canonical_alias"
- - "m.room.avatar"
- - "m.room.encryption"
- - "m.room.name"
+#room_invite_state_types:
+# - "m.room.join_rules"
+# - "m.room.canonical_alias"
+# - "m.room.avatar"
+# - "m.room.encryption"
+# - "m.room.name"
-# A list of application service config file to use
+# A list of application service config files to use
#
-app_service_config_files: []
+#app_service_config_files:
+# - app_service_1.yaml
+# - app_service_2.yaml
-# Whether or not to track application service IP addresses. Implicitly
+# Uncomment to enable tracking of application service IP addresses. Implicitly
# enables MAU tracking for application service users.
#
-track_appservice_user_ips: False
+#track_appservice_user_ips: True
# a secret which is used to sign access tokens. If none is specified,
@@ -731,7 +761,7 @@ track_appservice_user_ips: False
# Used to enable access token expiration.
#
-expire_access_token: False
+#expire_access_token: False
# a secret which is used to calculate HMACs for form values, to stop
# falsification of values. Must be specified for the User Consent
@@ -760,17 +790,16 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
# Determines how quickly servers will query to check which keys
# are still valid.
#
-key_refresh_interval: "1d" # 1 Day.
+#key_refresh_interval: 1d
# The trusted servers to download signing keys from.
#
-perspectives:
- servers:
- "matrix.org":
- verify_keys:
- "ed25519:auto":
- key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
-
+#perspectives:
+# servers:
+# "matrix.org":
+# verify_keys:
+# "ed25519:auto":
+# key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
# Enable SAML2 for registration and login. Uses pysaml2.
@@ -835,14 +864,15 @@ perspectives:
# algorithm: "HS256"
-
-# Enable password for login.
-#
password_config:
- enabled: true
+ # Uncomment to disable password login
+ #
+ #enabled: false
+
# Uncomment and change to a secret random string for extra security.
# DO NOT CHANGE THIS AFTER INITIAL SETUP!
- #pepper: ""
+ #
+ #pepper: "EVEN_MORE_SECRET"
@@ -911,9 +941,9 @@ password_config:
# example_option: 'things'
-# Whether to allow non server admins to create groups on this server
+# Uncomment to allow non-server-admin users to create groups on this server
#
-enable_group_creation: false
+#enable_group_creation: true
# If enabled, non server admins can only create groups with local parts
# starting with this prefix
diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py
index ad68079eeb..296c4a1c17 100644
--- a/synapse/api/ratelimiting.py
+++ b/synapse/api/ratelimiting.py
@@ -14,6 +14,8 @@
import collections
+from synapse.api.errors import LimitExceededError
+
class Ratelimiter(object):
"""
@@ -82,3 +84,13 @@ class Ratelimiter(object):
break
else:
del self.message_counts[key]
+
+ def ratelimit(self, key, time_now_s, rate_hz, burst_count, update=True):
+ allowed, time_allowed = self.can_do_action(
+ key, time_now_s, rate_hz, burst_count, update
+ )
+
+ if not allowed:
+ raise LimitExceededError(
+ retry_after_ms=int(1000 * (time_allowed - time_now_s)),
+ )
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 32e8b8a3f5..d4c6c4c8e2 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -63,12 +63,13 @@ def start_worker_reactor(appname, config):
start_reactor(
appname,
- config.soft_file_limit,
- config.gc_thresholds,
- config.worker_pid_file,
- config.worker_daemonize,
- config.worker_cpu_affinity,
- logger,
+ soft_file_limit=config.soft_file_limit,
+ gc_thresholds=config.gc_thresholds,
+ pid_file=config.worker_pid_file,
+ daemonize=config.worker_daemonize,
+ cpu_affinity=config.worker_cpu_affinity,
+ print_pidfile=config.print_pidfile,
+ logger=logger,
)
@@ -79,6 +80,7 @@ def start_reactor(
pid_file,
daemonize,
cpu_affinity,
+ print_pidfile,
logger,
):
""" Run the reactor in the main process
@@ -93,6 +95,7 @@ def start_reactor(
pid_file (str): name of pid file to write to if daemonize is True
daemonize (bool): true to run the reactor in a background process
cpu_affinity (int|None): cpu affinity mask
+ print_pidfile (bool): whether to print the pid file, if daemonize is True
logger (logging.Logger): logger instance to pass to Daemonize
"""
@@ -124,6 +127,9 @@ def start_reactor(
reactor.run()
if daemonize:
+ if print_pidfile:
+ print(pid_file)
+
daemon = Daemonize(
app=appname,
pid=pid_file,
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index a461442fdc..9711a7147c 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
@@ -37,8 +38,10 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
+from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
@@ -202,6 +205,7 @@ class FederationSenderHandler(object):
"""
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
+ self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client
@@ -234,6 +238,32 @@ class FederationSenderHandler(object):
elif stream_name == "events":
self.federation_sender.notify_new_events(token)
+ # ... and when new receipts happen
+ elif stream_name == ReceiptsStream.NAME:
+ run_as_background_process(
+ "process_receipts_for_federation", self._on_new_receipts, rows,
+ )
+
+ @defer.inlineCallbacks
+ def _on_new_receipts(self, rows):
+ """
+ Args:
+ rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]):
+ new receipts to be processed
+ """
+ for receipt in rows:
+ # we only want to send on receipts for our own users
+ if not self._is_mine_id(receipt.user_id):
+ continue
+ receipt_info = ReadReceipt(
+ receipt.room_id,
+ receipt.receipt_type,
+ receipt.user_id,
+ [receipt.event_id],
+ receipt.data,
+ )
+ yield self.federation_sender.send_read_receipt(receipt_info)
+
@defer.inlineCallbacks
def update_token(self, token):
try:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e0431608e8..869c028d1f 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -637,17 +637,15 @@ def run(hs):
# be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home)
- if hs.config.daemonize and hs.config.print_pidfile:
- print(hs.config.pid_file)
-
_base.start_reactor(
"synapse-homeserver",
- hs.config.soft_file_limit,
- hs.config.gc_thresholds,
- hs.config.pid_file,
- hs.config.daemonize,
- hs.config.cpu_affinity,
- logger,
+ soft_file_limit=hs.config.soft_file_limit,
+ gc_thresholds=hs.config.gc_thresholds,
+ pid_file=hs.config.pid_file,
+ daemonize=hs.config.daemonize,
+ cpu_affinity=hs.config.cpu_affinity,
+ print_pidfile=hs.config.print_pidfile,
+ logger=logger,
)
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index c4d3087fa4..5613f38e4d 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -214,14 +214,20 @@ class Config(object):
" Defaults to the directory containing the last config file",
)
+ obj = cls()
+
+ obj.invoke_all("add_arguments", config_parser)
+
config_args = config_parser.parse_args(argv)
config_files = find_config_files(search_paths=config_args.config_path)
- obj = cls()
obj.read_config_files(
config_files, keys_directory=config_args.keys_directory, generate_keys=False
)
+
+ obj.invoke_all("read_arguments", config_args)
+
return obj
@classmethod
diff --git a/synapse/config/api.py b/synapse/config/api.py
index e8a753f002..5eb4f86fa2 100644
--- a/synapse/config/api.py
+++ b/synapse/config/api.py
@@ -34,10 +34,10 @@ class ApiConfig(Config):
# A list of event types that will be included in the room_invite_state
#
- room_invite_state_types:
- - "{JoinRules}"
- - "{CanonicalAlias}"
- - "{RoomAvatar}"
- - "{RoomEncryption}"
- - "{Name}"
+ #room_invite_state_types:
+ # - "{JoinRules}"
+ # - "{CanonicalAlias}"
+ # - "{RoomAvatar}"
+ # - "{RoomEncryption}"
+ # - "{Name}"
""".format(**vars(EventTypes))
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index c260d59464..9e64c76544 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -37,14 +37,16 @@ class AppServiceConfig(Config):
def default_config(cls, **kwargs):
return """\
- # A list of application service config file to use
+ # A list of application service config files to use
#
- app_service_config_files: []
+ #app_service_config_files:
+ # - app_service_1.yaml
+ # - app_service_2.yaml
- # Whether or not to track application service IP addresses. Implicitly
+ # Uncomment to enable tracking of application service IP addresses. Implicitly
# enables MAU tracking for application service users.
#
- track_appservice_user_ips: False
+ #track_appservice_user_ips: True
"""
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index d25196be08..f7eebf26d2 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -18,11 +18,16 @@ from ._base import Config
class CaptchaConfig(Config):
def read_config(self, config):
- self.recaptcha_private_key = config["recaptcha_private_key"]
- self.recaptcha_public_key = config["recaptcha_public_key"]
- self.enable_registration_captcha = config["enable_registration_captcha"]
+ self.recaptcha_private_key = config.get("recaptcha_private_key")
+ self.recaptcha_public_key = config.get("recaptcha_public_key")
+ self.enable_registration_captcha = config.get(
+ "enable_registration_captcha", False
+ )
self.captcha_bypass_secret = config.get("captcha_bypass_secret")
- self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"]
+ self.recaptcha_siteverify_api = config.get(
+ "recaptcha_siteverify_api",
+ "https://www.recaptcha.net/recaptcha/api/siteverify",
+ )
def default_config(self, **kwargs):
return """\
@@ -31,21 +36,23 @@ class CaptchaConfig(Config):
# This Home Server's ReCAPTCHA public key.
#
- recaptcha_public_key: "YOUR_PUBLIC_KEY"
+ #recaptcha_public_key: "YOUR_PUBLIC_KEY"
# This Home Server's ReCAPTCHA private key.
#
- recaptcha_private_key: "YOUR_PRIVATE_KEY"
+ #recaptcha_private_key: "YOUR_PRIVATE_KEY"
# Enables ReCaptcha checks when registering, preventing signup
# unless a captcha is answered. Requires a valid ReCaptcha
# public/private key.
#
- enable_registration_captcha: False
+ #enable_registration_captcha: false
# A secret key used to bypass the captcha test entirely.
+ #
#captcha_bypass_secret: "YOUR_SECRET_HERE"
# The API endpoint to use for verifying m.login.recaptcha responses.
- recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
+ #
+ #recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
"""
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 63e9cb63f8..3c27ed6b4a 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -60,7 +60,8 @@ class DatabaseConfig(Config):
database: "%(database_path)s"
# Number of events to cache in memory.
- event_cache_size: "10K"
+ #
+ #event_cache_size: 10K
""" % locals()
def read_arguments(self, args):
diff --git a/synapse/config/groups.py b/synapse/config/groups.py
index 46933a904c..e4be172a79 100644
--- a/synapse/config/groups.py
+++ b/synapse/config/groups.py
@@ -23,9 +23,9 @@ class GroupsConfig(Config):
def default_config(self, **kwargs):
return """\
- # Whether to allow non server admins to create groups on this server
+ # Uncomment to allow non-server-admin users to create groups on this server
#
- enable_group_creation: false
+ #enable_group_creation: true
# If enabled, non server admins can only create groups with local parts
# starting with this prefix
diff --git a/synapse/config/key.py b/synapse/config/key.py
index 35f05fa974..2bd5531acb 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -43,10 +43,16 @@ class KeyConfig(Config):
config.get("old_signing_keys", {})
)
self.key_refresh_interval = self.parse_duration(
- config["key_refresh_interval"]
+ config.get("key_refresh_interval", "1d"),
)
self.perspectives = self.read_perspectives(
- config["perspectives"]
+ config.get("perspectives", {}).get("servers", {
+ "matrix.org": {"verify_keys": {
+ "ed25519:auto": {
+ "key": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw",
+ }
+ }}
+ })
)
self.macaroon_secret_key = config.get(
@@ -88,7 +94,7 @@ class KeyConfig(Config):
# Used to enable access token expiration.
#
- expire_access_token: False
+ #expire_access_token: False
# a secret which is used to calculate HMACs for form values, to stop
# falsification of values. Must be specified for the User Consent
@@ -117,21 +123,21 @@ class KeyConfig(Config):
# Determines how quickly servers will query to check which keys
# are still valid.
#
- key_refresh_interval: "1d" # 1 Day.
+ #key_refresh_interval: 1d
# The trusted servers to download signing keys from.
#
- perspectives:
- servers:
- "matrix.org":
- verify_keys:
- "ed25519:auto":
- key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+ #perspectives:
+ # servers:
+ # "matrix.org":
+ # verify_keys:
+ # "ed25519:auto":
+ # key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
""" % locals()
- def read_perspectives(self, perspectives_config):
+ def read_perspectives(self, perspectives_servers):
servers = {}
- for server_name, server_config in perspectives_config["servers"].items():
+ for server_name, server_config in perspectives_servers.items():
for key_id, key_data in server_config["verify_keys"].items():
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
index ed0498c634..2de51979d8 100644
--- a/synapse/config/metrics.py
+++ b/synapse/config/metrics.py
@@ -24,7 +24,7 @@ MISSING_SENTRY = (
class MetricsConfig(Config):
def read_config(self, config):
- self.enable_metrics = config["enable_metrics"]
+ self.enable_metrics = config.get("enable_metrics", False)
self.report_stats = config.get("report_stats", None)
self.metrics_port = config.get("metrics_port")
self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
@@ -48,7 +48,7 @@ class MetricsConfig(Config):
# Enable collection and rendering of performance metrics
#
- enable_metrics: False
+ #enable_metrics: False
# Enable sentry integration
# NOTE: While attempts are made to ensure that the logs don't contain
diff --git a/synapse/config/password.py b/synapse/config/password.py
index 2a52b9db54..eea59e772b 100644
--- a/synapse/config/password.py
+++ b/synapse/config/password.py
@@ -22,16 +22,21 @@ class PasswordConfig(Config):
def read_config(self, config):
password_config = config.get("password_config", {})
+ if password_config is None:
+ password_config = {}
+
self.password_enabled = password_config.get("enabled", True)
self.password_pepper = password_config.get("pepper", "")
def default_config(self, config_dir_path, server_name, **kwargs):
- return """
- # Enable password for login.
- #
+ return """\
password_config:
- enabled: true
+ # Uncomment to disable password login
+ #
+ #enabled: false
+
# Uncomment and change to a secret random string for extra security.
# DO NOT CHANGE THIS AFTER INITIAL SETUP!
- #pepper: ""
+ #
+ #pepper: "EVEN_MORE_SECRET"
"""
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 093042fdb9..898a19dd8c 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -15,69 +15,100 @@
from ._base import Config
+class RateLimitConfig(object):
+ def __init__(self, config):
+ self.per_second = config.get("per_second", 0.17)
+ self.burst_count = config.get("burst_count", 3.0)
+
+
class RatelimitConfig(Config):
def read_config(self, config):
- self.rc_messages_per_second = config["rc_messages_per_second"]
- self.rc_message_burst_count = config["rc_message_burst_count"]
+ self.rc_messages_per_second = config.get("rc_messages_per_second", 0.2)
+ self.rc_message_burst_count = config.get("rc_message_burst_count", 10.0)
- self.federation_rc_window_size = config["federation_rc_window_size"]
- self.federation_rc_sleep_limit = config["federation_rc_sleep_limit"]
- self.federation_rc_sleep_delay = config["federation_rc_sleep_delay"]
- self.federation_rc_reject_limit = config["federation_rc_reject_limit"]
- self.federation_rc_concurrent = config["federation_rc_concurrent"]
+ self.rc_registration = RateLimitConfig(config.get("rc_registration", {}))
- self.rc_registration_requests_per_second = config.get(
- "rc_registration_requests_per_second", 0.17,
- )
- self.rc_registration_request_burst_count = config.get(
- "rc_registration_request_burst_count", 3,
+ rc_login_config = config.get("rc_login", {})
+ self.rc_login_address = RateLimitConfig(rc_login_config.get("address", {}))
+ self.rc_login_account = RateLimitConfig(rc_login_config.get("account", {}))
+ self.rc_login_failed_attempts = RateLimitConfig(
+ rc_login_config.get("failed_attempts", {}),
)
+ self.federation_rc_window_size = config.get("federation_rc_window_size", 1000)
+ self.federation_rc_sleep_limit = config.get("federation_rc_sleep_limit", 10)
+ self.federation_rc_sleep_delay = config.get("federation_rc_sleep_delay", 500)
+ self.federation_rc_reject_limit = config.get("federation_rc_reject_limit", 50)
+ self.federation_rc_concurrent = config.get("federation_rc_concurrent", 3)
+
def default_config(self, **kwargs):
return """\
## Ratelimiting ##
# Number of messages a client can send per second
#
- rc_messages_per_second: 0.2
+ #rc_messages_per_second: 0.2
# Number of message a client can send before being throttled
#
- rc_message_burst_count: 10.0
+ #rc_message_burst_count: 10.0
+
+ # Ratelimiting settings for registration and login.
+ #
+ # Each ratelimiting configuration is made of two parameters:
+ # - per_second: number of requests a client can send per second.
+ # - burst_count: number of requests a client can send before being throttled.
+ #
+ # Synapse currently uses the following configurations:
+ # - one for registration that ratelimits registration requests based on the
+ # client's IP address.
+ # - one for login that ratelimits login requests based on the client's IP
+ # address.
+ # - one for login that ratelimits login requests based on the account the
+ # client is attempting to log into.
+ # - one for login that ratelimits login requests based on the account the
+ # client is attempting to log into, based on the amount of failed login
+ # attempts for this account.
+ #
+ # The defaults are as shown below.
+ #
+ #rc_registration:
+ # per_second: 0.17
+ # burst_count: 3
+ #
+ #rc_login:
+ # address:
+ # per_second: 0.17
+ # burst_count: 3
+ # account:
+ # per_second: 0.17
+ # burst_count: 3
+ # failed_attempts:
+ # per_second: 0.17
+ # burst_count: 3
# The federation window size in milliseconds
#
- federation_rc_window_size: 1000
+ #federation_rc_window_size: 1000
# The number of federation requests from a single server in a window
# before the server will delay processing the request.
#
- federation_rc_sleep_limit: 10
+ #federation_rc_sleep_limit: 10
# The duration in milliseconds to delay processing events from
# remote servers by if they go over the sleep limit.
#
- federation_rc_sleep_delay: 500
+ #federation_rc_sleep_delay: 500
# The maximum number of concurrent federation requests allowed
# from a single server
#
- federation_rc_reject_limit: 50
+ #federation_rc_reject_limit: 50
# The number of federation requests to concurrently process from a
# single server
#
- federation_rc_concurrent: 3
-
- # Number of registration requests a client can send per second.
- # Defaults to 1/minute (0.17).
- #
- #rc_registration_requests_per_second: 0.17
-
- # Number of registration requests a client can send before being
- # throttled.
- # Defaults to 3.
- #
- #rc_registration_request_burst_count: 3.0
+ #federation_rc_concurrent: 3
"""
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index a123f25a68..f6b2b9ceee 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -24,7 +24,7 @@ class RegistrationConfig(Config):
def read_config(self, config):
self.enable_registration = bool(
- strtobool(str(config["enable_registration"]))
+ strtobool(str(config.get("enable_registration", False)))
)
if "disable_registration" in config:
self.enable_registration = not bool(
@@ -36,7 +36,10 @@ class RegistrationConfig(Config):
self.registration_shared_secret = config.get("registration_shared_secret")
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
- self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"]
+ self.trusted_third_party_id_servers = config.get(
+ "trusted_third_party_id_servers",
+ ["matrix.org", "vector.im"],
+ )
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
@@ -64,11 +67,13 @@ class RegistrationConfig(Config):
return """\
## Registration ##
+ #
# Registration can be rate-limited using the parameters in the "Ratelimiting"
# section of this file.
# Enable registration for new users.
- enable_registration: False
+ #
+ #enable_registration: false
# The user must provide all of the below types of 3PID when registering.
#
@@ -79,7 +84,7 @@ class RegistrationConfig(Config):
# Explicitly disable asking for MSISDNs from the registration
# flow (overrides registrations_require_3pid if MSISDNs are set as required)
#
- #disable_msisdn_registration: True
+ #disable_msisdn_registration: true
# Mandate that users are only allowed to associate certain formats of
# 3PIDs with accounts on this server.
@@ -103,13 +108,13 @@ class RegistrationConfig(Config):
# N.B. that increasing this will exponentially increase the time required
# to register or login - e.g. 24 => 2^24 rounds which will take >20 mins.
#
- bcrypt_rounds: 12
+ #bcrypt_rounds: 12
# Allows users to register as guests without a password/email/etc, and
# participate in rooms hosted on this server which have been made
# accessible to anonymous users.
#
- allow_guest_access: False
+ #allow_guest_access: false
# The identity server which we suggest that clients should use when users log
# in on this server.
@@ -125,9 +130,9 @@ class RegistrationConfig(Config):
# Also defines the ID server which will be called when an account is
# deactivated (one will be picked arbitrarily).
#
- trusted_third_party_id_servers:
- - matrix.org
- - vector.im
+ #trusted_third_party_id_servers:
+ # - matrix.org
+ # - vector.im
# Users who register on this homeserver will automatically be joined
# to these rooms
@@ -141,7 +146,7 @@ class RegistrationConfig(Config):
# Setting to false means that if the rooms are not manually created,
# users cannot be auto-joined since they do not exist.
#
- autocreate_auto_join_rooms: true
+ #autocreate_auto_join_rooms: true
""" % locals()
def add_arguments(self, parser):
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 97db2a5b7a..3f34ad9b2a 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -19,6 +19,36 @@ from synapse.util.module_loader import load_module
from ._base import Config, ConfigError
+DEFAULT_THUMBNAIL_SIZES = [
+ {
+ "width": 32,
+ "height": 32,
+ "method": "crop",
+ }, {
+ "width": 96,
+ "height": 96,
+ "method": "crop",
+ }, {
+ "width": 320,
+ "height": 240,
+ "method": "scale",
+ }, {
+ "width": 640,
+ "height": 480,
+ "method": "scale",
+ }, {
+ "width": 800,
+ "height": 600,
+ "method": "scale"
+ },
+]
+
+THUMBNAIL_SIZE_YAML = """\
+ # - width: %(width)i
+ # height: %(height)i
+ # method: %(method)s
+"""
+
MISSING_NETADDR = (
"Missing netaddr library. This is required for URL preview API."
)
@@ -77,9 +107,9 @@ def parse_thumbnail_requirements(thumbnail_sizes):
class ContentRepositoryConfig(Config):
def read_config(self, config):
- self.max_upload_size = self.parse_size(config["max_upload_size"])
- self.max_image_pixels = self.parse_size(config["max_image_pixels"])
- self.max_spider_size = self.parse_size(config["max_spider_size"])
+ self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
+ self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
+ self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
self.media_store_path = self.ensure_directory(config["media_store_path"])
@@ -139,9 +169,9 @@ class ContentRepositoryConfig(Config):
)
self.uploads_path = self.ensure_directory(config["uploads_path"])
- self.dynamic_thumbnails = config["dynamic_thumbnails"]
+ self.dynamic_thumbnails = config.get("dynamic_thumbnails", False)
self.thumbnail_requirements = parse_thumbnail_requirements(
- config["thumbnail_sizes"]
+ config.get("thumbnail_sizes", DEFAULT_THUMBNAIL_SIZES),
)
self.url_preview_enabled = config.get("url_preview_enabled", False)
if self.url_preview_enabled:
@@ -178,6 +208,13 @@ class ContentRepositoryConfig(Config):
def default_config(self, data_dir_path, **kwargs):
media_store = os.path.join(data_dir_path, "media_store")
uploads_path = os.path.join(data_dir_path, "uploads")
+
+ formatted_thumbnail_sizes = "".join(
+ THUMBNAIL_SIZE_YAML % s for s in DEFAULT_THUMBNAIL_SIZES
+ )
+ # strip final NL
+ formatted_thumbnail_sizes = formatted_thumbnail_sizes[:-1]
+
return r"""
# Directory where uploaded images and attachments are stored.
#
@@ -204,11 +241,11 @@ class ContentRepositoryConfig(Config):
# The largest allowed upload size in bytes
#
- max_upload_size: "10M"
+ #max_upload_size: 10M
# Maximum number of pixels that will be thumbnailed
#
- max_image_pixels: "32M"
+ #max_image_pixels: 32M
# Whether to generate new thumbnails on the fly to precisely match
# the resolution requested by the client. If true then whenever
@@ -216,32 +253,18 @@ class ContentRepositoryConfig(Config):
# generate a new thumbnail. If false the server will pick a thumbnail
# from a precalculated list.
#
- dynamic_thumbnails: false
+ #dynamic_thumbnails: false
# List of thumbnails to precalculate when an image is uploaded.
#
- thumbnail_sizes:
- - width: 32
- height: 32
- method: crop
- - width: 96
- height: 96
- method: crop
- - width: 320
- height: 240
- method: scale
- - width: 640
- height: 480
- method: scale
- - width: 800
- height: 600
- method: scale
+ #thumbnail_sizes:
+%(formatted_thumbnail_sizes)s
# Is the preview URL API enabled? If enabled, you *must* specify
# an explicit url_preview_ip_range_blacklist of IPs that the spider is
# denied from accessing.
#
- url_preview_enabled: False
+ #url_preview_enabled: false
# List of IP address CIDR ranges that the URL preview spider is denied
# from accessing. There are no defaults: you must explicitly
@@ -306,6 +329,6 @@ class ContentRepositoryConfig(Config):
# - netloc: '^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$'
# The largest allowed URL preview spidering size in bytes
- max_spider_size: "10M"
-
+ #
+ #max_spider_size: 10M
""" % locals()
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index aff0a1f00c..39b9eb29c2 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -64,7 +64,7 @@ class SAML2Config(Config):
}
def default_config(self, config_dir_path, server_name, **kwargs):
- return """
+ return """\
# Enable SAML2 for registration and login. Uses pysaml2.
#
# `sp_config` is the configuration for the pysaml2 Service Provider.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 35a322fee0..499eb30bea 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -45,7 +45,7 @@ class ServerConfig(Config):
self.pid_file = self.abspath(config.get("pid_file"))
self.web_client_location = config.get("web_client_location", None)
- self.soft_file_limit = config["soft_file_limit"]
+ self.soft_file_limit = config.get("soft_file_limit", 0)
self.daemonize = config.get("daemonize")
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
@@ -307,11 +307,11 @@ class ServerConfig(Config):
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
#
- soft_file_limit: 0
+ #soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
#
- use_presence: true
+ #use_presence: false
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
#
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 257f7c86e7..2a1f005a37 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -22,7 +22,9 @@ class VoipConfig(Config):
self.turn_shared_secret = config.get("turn_shared_secret")
self.turn_username = config.get("turn_username")
self.turn_password = config.get("turn_password")
- self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
+ self.turn_user_lifetime = self.parse_duration(
+ config.get("turn_user_lifetime", "1h"),
+ )
self.turn_allow_guests = config.get("turn_allow_guests", True)
def default_config(self, **kwargs):
@@ -45,7 +47,7 @@ class VoipConfig(Config):
# How long generated TURN credentials last
#
- turn_user_lifetime: "1h"
+ #turn_user_lifetime: 1h
# Whether guests should be allowed to use the TURN server.
# This defaults to True, otherwise VoIP will be unreliable for guests.
@@ -53,5 +55,5 @@ class VoipConfig(Config):
# connect to arbitrary endpoints without having first signed up for a
# valid account (e.g. by passing a CAPTCHA).
#
- turn_allow_guests: True
+ #turn_allow_guests: True
"""
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 80baf0ce0e..bfbd8b6c91 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -28,7 +28,7 @@ class WorkerConfig(Config):
if self.worker_app == "synapse.app.homeserver":
self.worker_app = None
- self.worker_listeners = config.get("worker_listeners")
+ self.worker_listeners = config.get("worker_listeners", [])
self.worker_daemonize = config.get("worker_daemonize")
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file")
@@ -48,6 +48,17 @@ class WorkerConfig(Config):
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
self.worker_cpu_affinity = config.get("worker_cpu_affinity")
+ # This option is really only here to support `--manhole` command line
+ # argument.
+ manhole = config.get("worker_manhole")
+ if manhole:
+ self.worker_listeners.append({
+ "port": manhole,
+ "bind_addresses": ["127.0.0.1"],
+ "type": "manhole",
+ "tls": False,
+ })
+
if self.worker_listeners:
for listener in self.worker_listeners:
bind_address = listener.pop("bind_address", None)
@@ -57,3 +68,18 @@ class WorkerConfig(Config):
bind_addresses.append(bind_address)
elif not bind_addresses:
bind_addresses.append('')
+
+ def read_arguments(self, args):
+ # We support a bunch of command line arguments that override options in
+ # the config. A lot of these options have a worker_* prefix when running
+ # on workers so we also have to override them when command line options
+ # are specified.
+
+ if args.daemonize is not None:
+ self.worker_daemonize = args.daemonize
+ if args.log_config is not None:
+ self.worker_log_config = args.log_config
+ if args.log_file is not None:
+ self.worker_log_file = args.log_file
+ if args.manhole is not None:
+ self.worker_manhole = args.worker_manhole
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b7d0b25781..04d04a4457 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -46,7 +46,7 @@ logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(object):
- """A drop in replacement for TransactionQueue"""
+ """A drop in replacement for FederationSender"""
def __init__(self, hs):
self.server_name = hs.hostname
@@ -154,13 +154,13 @@ class FederationRemoteSendQueue(object):
del self.device_messages[key]
def notify_new_events(self, current_id):
- """As per TransactionQueue"""
+ """As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
pass
def build_and_send_edu(self, destination, edu_type, content, key=None):
- """As per TransactionQueue"""
+ """As per FederationSender"""
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
@@ -183,8 +183,17 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
+ def send_read_receipt(self, receipt):
+ """As per FederationSender
+
+ Args:
+ receipt (synapse.types.ReadReceipt):
+ """
+ # nothing to do here: the replication listener will handle it.
+ pass
+
def send_presence(self, states):
- """As per TransactionQueue
+ """As per FederationSender
Args:
states (list(UserPresenceState))
@@ -201,7 +210,7 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
- """As per TransactionQueue"""
+ """As per FederationSender"""
pos = self._next_pos()
self.device_messages[pos] = destination
self.notifier.on_new_replication_data()
@@ -439,7 +448,7 @@ def process_rows_for_federation(transaction_queue, rows):
transaction queue ready for sending to the relevant homeservers.
Args:
- transaction_queue (TransactionQueue)
+ transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow))
"""
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
new file mode 100644
index 0000000000..1bcc353d18
--- /dev/null
+++ b/synapse/federation/sender/__init__.py
@@ -0,0 +1,388 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import itervalues
+
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
+import synapse.metrics
+from synapse.federation.sender.per_destination_queue import PerDestinationQueue
+from synapse.federation.sender.transaction_manager import TransactionManager
+from synapse.federation.units import Edu
+from synapse.handlers.presence import get_interested_remotes
+from synapse.metrics import (
+ LaterGauge,
+ event_processing_loop_counter,
+ event_processing_loop_room_count,
+ events_processed_counter,
+)
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
+from synapse.util.metrics import measure_func
+
+logger = logging.getLogger(__name__)
+
+sent_pdus_destination_dist_count = Counter(
+ "synapse_federation_client_sent_pdu_destinations:count",
+ "Number of PDUs queued for sending to one or more destinations",
+)
+
+sent_pdus_destination_dist_total = Counter(
+ "synapse_federation_client_sent_pdu_destinations:total", ""
+ "Total number of PDUs queued for sending across all destinations",
+)
+
+
+class FederationSender(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.server_name = hs.hostname
+
+ self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
+
+ self.clock = hs.get_clock()
+ self.is_mine_id = hs.is_mine_id
+
+ self._transaction_manager = TransactionManager(hs)
+
+ # map from destination to PerDestinationQueue
+ self._per_destination_queues = {} # type: dict[str, PerDestinationQueue]
+
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_destinations",
+ "",
+ [],
+ lambda: sum(
+ 1 for d in self._per_destination_queues.values()
+ if d.transmission_loop_running
+ ),
+ )
+
+ # Map of user_id -> UserPresenceState for all the pending presence
+ # to be sent out by user_id. Entries here get processed and put in
+ # pending_presence_by_dest
+ self.pending_presence = {}
+
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_pdus",
+ "",
+ [],
+ lambda: sum(
+ d.pending_pdu_count() for d in self._per_destination_queues.values()
+ ),
+ )
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_edus",
+ "",
+ [],
+ lambda: sum(
+ d.pending_edu_count() for d in self._per_destination_queues.values()
+ ),
+ )
+
+ self._order = 1
+
+ self._is_processing = False
+ self._last_poked_id = -1
+
+ self._processing_pending_presence = False
+
+ def _get_per_destination_queue(self, destination):
+ queue = self._per_destination_queues.get(destination)
+ if not queue:
+ queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
+ self._per_destination_queues[destination] = queue
+ return queue
+
+ def notify_new_events(self, current_id):
+ """This gets called when we have some new events we might want to
+ send out to other servers.
+ """
+ self._last_poked_id = max(current_id, self._last_poked_id)
+
+ if self._is_processing:
+ return
+
+ # fire off a processing loop in the background
+ run_as_background_process(
+ "process_event_queue_for_federation",
+ self._process_event_queue_loop,
+ )
+
+ @defer.inlineCallbacks
+ def _process_event_queue_loop(self):
+ try:
+ self._is_processing = True
+ while True:
+ last_token = yield self.store.get_federation_out_pos("events")
+ next_token, events = yield self.store.get_all_new_events_stream(
+ last_token, self._last_poked_id, limit=100,
+ )
+
+ logger.debug("Handling %s -> %s", last_token, next_token)
+
+ if not events and next_token >= self._last_poked_id:
+ break
+
+ @defer.inlineCallbacks
+ def handle_event(event):
+ # Only send events for this server.
+ send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
+ is_mine = self.is_mine_id(event.sender)
+ if not is_mine and send_on_behalf_of is None:
+ return
+
+ try:
+ # Get the state from before the event.
+ # We need to make sure that this is the state from before
+ # the event and not from after it.
+ # Otherwise if the last member on a server in a room is
+ # banned then it won't receive the event because it won't
+ # be in the room after the ban.
+ destinations = yield self.state.get_current_hosts_in_room(
+ event.room_id, latest_event_ids=event.prev_event_ids(),
+ )
+ except Exception:
+ logger.exception(
+ "Failed to calculate hosts in room for event: %s",
+ event.event_id,
+ )
+ return
+
+ destinations = set(destinations)
+
+ if send_on_behalf_of is not None:
+ # If we are sending the event on behalf of another server
+ # then it already has the event and there is no reason to
+ # send the event to it.
+ destinations.discard(send_on_behalf_of)
+
+ logger.debug("Sending %s to %r", event, destinations)
+
+ self._send_pdu(event, destinations)
+
+ @defer.inlineCallbacks
+ def handle_room_events(events):
+ for event in events:
+ yield handle_event(event)
+
+ events_by_room = {}
+ for event in events:
+ events_by_room.setdefault(event.room_id, []).append(event)
+
+ yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(handle_room_events, evs)
+ for evs in itervalues(events_by_room)
+ ],
+ consumeErrors=True
+ ))
+
+ yield self.store.update_federation_out_pos(
+ "events", next_token
+ )
+
+ if events:
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_lag.labels(
+ "federation_sender").set(now - ts)
+ synapse.metrics.event_processing_last_ts.labels(
+ "federation_sender").set(ts)
+
+ events_processed_counter.inc(len(events))
+
+ event_processing_loop_room_count.labels(
+ "federation_sender"
+ ).inc(len(events_by_room))
+
+ event_processing_loop_counter.labels("federation_sender").inc()
+
+ synapse.metrics.event_processing_positions.labels(
+ "federation_sender").set(next_token)
+
+ finally:
+ self._is_processing = False
+
+ def _send_pdu(self, pdu, destinations):
+ # We loop through all destinations to see whether we already have
+ # a transaction in progress. If we do, stick it in the pending_pdus
+ # table and we'll get back to it later.
+
+ order = self._order
+ self._order += 1
+
+ destinations = set(destinations)
+ destinations.discard(self.server_name)
+ logger.debug("Sending to: %s", str(destinations))
+
+ if not destinations:
+ return
+
+ sent_pdus_destination_dist_total.inc(len(destinations))
+ sent_pdus_destination_dist_count.inc()
+
+ for destination in destinations:
+ self._get_per_destination_queue(destination).send_pdu(pdu, order)
+
+ @defer.inlineCallbacks
+ def send_read_receipt(self, receipt):
+ """Send a RR to any other servers in the room
+
+ Args:
+ receipt (synapse.types.ReadReceipt): receipt to be sent
+ """
+ # Work out which remote servers should be poked and poke them.
+ domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
+ domains = [d for d in domains if d != self.server_name]
+ if not domains:
+ return
+
+ logger.debug("Sending receipt to: %r", domains)
+
+ content = {
+ receipt.room_id: {
+ receipt.receipt_type: {
+ receipt.user_id: {
+ "event_ids": receipt.event_ids,
+ "data": receipt.data,
+ },
+ },
+ },
+ }
+ key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
+
+ for domain in domains:
+ self.build_and_send_edu(
+ destination=domain,
+ edu_type="m.receipt",
+ content=content,
+ key=key,
+ )
+
+ @logcontext.preserve_fn # the caller should not yield on this
+ @defer.inlineCallbacks
+ def send_presence(self, states):
+ """Send the new presence states to the appropriate destinations.
+
+ This actually queues up the presence states ready for sending and
+ triggers a background task to process them and send out the transactions.
+
+ Args:
+ states (list(UserPresenceState))
+ """
+ if not self.hs.config.use_presence:
+ # No-op if presence is disabled.
+ return
+
+ # First we queue up the new presence by user ID, so multiple presence
+ # updates in quick successtion are correctly handled
+ # We only want to send presence for our own users, so lets always just
+ # filter here just in case.
+ self.pending_presence.update({
+ state.user_id: state for state in states
+ if self.is_mine_id(state.user_id)
+ })
+
+ # We then handle the new pending presence in batches, first figuring
+ # out the destinations we need to send each state to and then poking it
+ # to attempt a new transaction. We linearize this so that we don't
+ # accidentally mess up the ordering and send multiple presence updates
+ # in the wrong order
+ if self._processing_pending_presence:
+ return
+
+ self._processing_pending_presence = True
+ try:
+ while True:
+ states_map = self.pending_presence
+ self.pending_presence = {}
+
+ if not states_map:
+ break
+
+ yield self._process_presence_inner(list(states_map.values()))
+ except Exception:
+ logger.exception("Error sending presence states to servers")
+ finally:
+ self._processing_pending_presence = False
+
+ @measure_func("txnqueue._process_presence")
+ @defer.inlineCallbacks
+ def _process_presence_inner(self, states):
+ """Given a list of states populate self.pending_presence_by_dest and
+ poke to send a new transaction to each destination
+
+ Args:
+ states (list(UserPresenceState))
+ """
+ hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
+
+ for destinations, states in hosts_and_states:
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+ self._get_per_destination_queue(destination).send_presence(states)
+
+ def build_and_send_edu(self, destination, edu_type, content, key=None):
+ """Construct an Edu object, and queue it for sending
+
+ Args:
+ destination (str): name of server to send to
+ edu_type (str): type of EDU to send
+ content (dict): content of EDU
+ key (Any|None): clobbering key for this edu
+ """
+ if destination == self.server_name:
+ logger.info("Not sending EDU to ourselves")
+ return
+
+ edu = Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type=edu_type,
+ content=content,
+ )
+
+ self.send_edu(edu, key)
+
+ def send_edu(self, edu, key):
+ """Queue an EDU for sending
+
+ Args:
+ edu (Edu): edu to send
+ key (Any|None): clobbering key for this edu
+ """
+ queue = self._get_per_destination_queue(edu.destination)
+ if key:
+ queue.send_keyed_edu(edu, key)
+ else:
+ queue.send_edu(edu)
+
+ def send_device_messages(self, destination):
+ if destination == self.server_name:
+ logger.info("Not sending device update to ourselves")
+ return
+
+ self._get_per_destination_queue(destination).attempt_new_transaction()
+
+ def get_current_token(self):
+ return 0
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
new file mode 100644
index 0000000000..385039add4
--- /dev/null
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -0,0 +1,318 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import datetime
+import logging
+
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
+from synapse.api.errors import (
+ FederationDeniedError,
+ HttpResponseException,
+ RequestSendFailed,
+)
+from synapse.events import EventBase
+from synapse.federation.units import Edu
+from synapse.handlers.presence import format_user_presence_state
+from synapse.metrics import sent_transactions_counter
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage import UserPresenceState
+from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
+
+logger = logging.getLogger(__name__)
+
+
+sent_edus_counter = Counter(
+ "synapse_federation_client_sent_edus",
+ "Total number of EDUs successfully sent",
+)
+
+sent_edus_by_type = Counter(
+ "synapse_federation_client_sent_edus_by_type",
+ "Number of sent EDUs successfully sent, by event type",
+ ["type"],
+)
+
+
+class PerDestinationQueue(object):
+ """
+ Manages the per-destination transmission queues.
+
+ Args:
+ hs (synapse.HomeServer):
+ transaction_sender (TransactionManager):
+ destination (str): the server_name of the destination that we are managing
+ transmission for.
+ """
+ def __init__(self, hs, transaction_manager, destination):
+ self._server_name = hs.hostname
+ self._clock = hs.get_clock()
+ self._store = hs.get_datastore()
+ self._transaction_manager = transaction_manager
+
+ self._destination = destination
+ self.transmission_loop_running = False
+
+ # a list of tuples of (pending pdu, order)
+ self._pending_pdus = [] # type: list[tuple[EventBase, int]]
+ self._pending_edus = [] # type: list[Edu]
+
+ # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
+ # based on their key (e.g. typing events by room_id)
+ # Map of (edu_type, key) -> Edu
+ self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
+
+ # Map of user_id -> UserPresenceState of pending presence to be sent to this
+ # destination
+ self._pending_presence = {} # type: dict[str, UserPresenceState]
+
+ # stream_id of last successfully sent to-device message.
+ # NB: may be a long or an int.
+ self._last_device_stream_id = 0
+
+ # stream_id of last successfully sent device list update.
+ self._last_device_list_stream_id = 0
+
+ def pending_pdu_count(self):
+ return len(self._pending_pdus)
+
+ def pending_edu_count(self):
+ return (
+ len(self._pending_edus)
+ + len(self._pending_presence)
+ + len(self._pending_edus_keyed)
+ )
+
+ def send_pdu(self, pdu, order):
+ """Add a PDU to the queue, and start the transmission loop if neccessary
+
+ Args:
+ pdu (EventBase): pdu to send
+ order (int):
+ """
+ self._pending_pdus.append((pdu, order))
+ self.attempt_new_transaction()
+
+ def send_presence(self, states):
+ """Add presence updates to the queue. Start the transmission loop if neccessary.
+
+ Args:
+ states (iterable[UserPresenceState]): presence to send
+ """
+ self._pending_presence.update({
+ state.user_id: state for state in states
+ })
+ self.attempt_new_transaction()
+
+ def send_keyed_edu(self, edu, key):
+ self._pending_edus_keyed[(edu.edu_type, key)] = edu
+ self.attempt_new_transaction()
+
+ def send_edu(self, edu):
+ self._pending_edus.append(edu)
+ self.attempt_new_transaction()
+
+ def attempt_new_transaction(self):
+ """Try to start a new transaction to this destination
+
+ If there is already a transaction in progress to this destination,
+ returns immediately. Otherwise kicks off the process of sending a
+ transaction in the background.
+ """
+ # list of (pending_pdu, deferred, order)
+ if self.transmission_loop_running:
+ # XXX: this can get stuck on by a never-ending
+ # request at which point pending_pdus just keeps growing.
+ # we need application-layer timeouts of some flavour of these
+ # requests
+ logger.debug(
+ "TX [%s] Transaction already in progress",
+ self._destination
+ )
+ return
+
+ logger.debug("TX [%s] Starting transaction loop", self._destination)
+
+ run_as_background_process(
+ "federation_transaction_transmission_loop",
+ self._transaction_transmission_loop,
+ )
+
+ @defer.inlineCallbacks
+ def _transaction_transmission_loop(self):
+ pending_pdus = []
+ try:
+ self.transmission_loop_running = True
+
+ # This will throw if we wouldn't retry. We do this here so we fail
+ # quickly, but we will later check this again in the http client,
+ # hence why we throw the result away.
+ yield get_retry_limiter(self._destination, self._clock, self._store)
+
+ pending_pdus = []
+ while True:
+ device_message_edus, device_stream_id, dev_list_id = (
+ yield self._get_new_device_messages()
+ )
+
+ # BEGIN CRITICAL SECTION
+ #
+ # In order to avoid a race condition, we need to make sure that
+ # the following code (from popping the queues up to the point
+ # where we decide if we actually have any pending messages) is
+ # atomic - otherwise new PDUs or EDUs might arrive in the
+ # meantime, but not get sent because we hold the
+ # transmission_loop_running flag.
+
+ pending_pdus = self._pending_pdus
+
+ # We can only include at most 50 PDUs per transactions
+ pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
+
+ pending_edus = self._pending_edus
+
+ # We can only include at most 100 EDUs per transactions
+ pending_edus, self._pending_edus = pending_edus[:100], pending_edus[100:]
+
+ pending_edus.extend(
+ self._pending_edus_keyed.values()
+ )
+
+ self._pending_edus_keyed = {}
+
+ pending_edus.extend(device_message_edus)
+
+ pending_presence = self._pending_presence
+ self._pending_presence = {}
+ if pending_presence:
+ pending_edus.append(
+ Edu(
+ origin=self._server_name,
+ destination=self._destination,
+ edu_type="m.presence",
+ content={
+ "push": [
+ format_user_presence_state(
+ presence, self._clock.time_msec()
+ )
+ for presence in pending_presence.values()
+ ]
+ },
+ )
+ )
+
+ if pending_pdus:
+ logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+ self._destination, len(pending_pdus))
+
+ if not pending_pdus and not pending_edus:
+ logger.debug("TX [%s] Nothing to send", self._destination)
+ self._last_device_stream_id = device_stream_id
+ return
+
+ # END CRITICAL SECTION
+
+ success = yield self._transaction_manager.send_new_transaction(
+ self._destination, pending_pdus, pending_edus
+ )
+ if success:
+ sent_transactions_counter.inc()
+ sent_edus_counter.inc(len(pending_edus))
+ for edu in pending_edus:
+ sent_edus_by_type.labels(edu.edu_type).inc()
+ # Remove the acknowledged device messages from the database
+ # Only bother if we actually sent some device messages
+ if device_message_edus:
+ yield self._store.delete_device_msgs_for_remote(
+ self._destination, device_stream_id
+ )
+ logger.info(
+ "Marking as sent %r %r", self._destination, dev_list_id
+ )
+ yield self._store.mark_as_sent_devices_by_remote(
+ self._destination, dev_list_id
+ )
+
+ self._last_device_stream_id = device_stream_id
+ self._last_device_list_stream_id = dev_list_id
+ else:
+ break
+ except NotRetryingDestination as e:
+ logger.debug(
+ "TX [%s] not ready for retry yet (next retry at %s) - "
+ "dropping transaction for now",
+ self._destination,
+ datetime.datetime.fromtimestamp(
+ (e.retry_last_ts + e.retry_interval) / 1000.0
+ ),
+ )
+ except FederationDeniedError as e:
+ logger.info(e)
+ except HttpResponseException as e:
+ logger.warning(
+ "TX [%s] Received %d response to transaction: %s",
+ self._destination, e.code, e,
+ )
+ except RequestSendFailed as e:
+ logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e)
+
+ for p, _ in pending_pdus:
+ logger.info("Failed to send event %s to %s", p.event_id,
+ self._destination)
+ except Exception:
+ logger.exception(
+ "TX [%s] Failed to send transaction",
+ self._destination,
+ )
+ for p, _ in pending_pdus:
+ logger.info("Failed to send event %s to %s", p.event_id,
+ self._destination)
+ finally:
+ # We want to be *very* sure we clear this after we stop processing
+ self.transmission_loop_running = False
+
+ @defer.inlineCallbacks
+ def _get_new_device_messages(self):
+ last_device_stream_id = self._last_device_stream_id
+ to_device_stream_id = self._store.get_to_device_stream_token()
+ contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
+ self._destination, last_device_stream_id, to_device_stream_id
+ )
+ edus = [
+ Edu(
+ origin=self._server_name,
+ destination=self._destination,
+ edu_type="m.direct_to_device",
+ content=content,
+ )
+ for content in contents
+ ]
+
+ last_device_list = self._last_device_list_stream_id
+ now_stream_id, results = yield self._store.get_devices_by_remote(
+ self._destination, last_device_list
+ )
+ edus.extend(
+ Edu(
+ origin=self._server_name,
+ destination=self._destination,
+ edu_type="m.device_list_update",
+ content=content,
+ )
+ for content in results
+ )
+ defer.returnValue((edus, stream_id, now_stream_id))
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
new file mode 100644
index 0000000000..35e6b8ff5b
--- /dev/null
+++ b/synapse/federation/sender/transaction_manager.py
@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import HttpResponseException
+from synapse.federation.persistence import TransactionActions
+from synapse.federation.units import Transaction
+from synapse.util.metrics import measure_func
+
+logger = logging.getLogger(__name__)
+
+
+class TransactionManager(object):
+ """Helper class which handles building and sending transactions
+
+ shared between PerDestinationQueue objects
+ """
+ def __init__(self, hs):
+ self._server_name = hs.hostname
+ self.clock = hs.get_clock() # nb must be called this for @measure_func
+ self._store = hs.get_datastore()
+ self._transaction_actions = TransactionActions(self._store)
+ self._transport_layer = hs.get_federation_transport_client()
+
+ # HACK to get unique tx id
+ self._next_txn_id = int(self.clock.time_msec())
+
+ @measure_func("_send_new_transaction")
+ @defer.inlineCallbacks
+ def send_new_transaction(self, destination, pending_pdus, pending_edus):
+
+ # Sort based on the order field
+ pending_pdus.sort(key=lambda t: t[1])
+ pdus = [x[0] for x in pending_pdus]
+ edus = pending_edus
+
+ success = True
+
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
+
+ txn_id = str(self._next_txn_id)
+
+ logger.debug(
+ "TX [%s] {%s} Attempting new transaction"
+ " (pdus: %d, edus: %d)",
+ destination, txn_id,
+ len(pdus),
+ len(edus),
+ )
+
+ logger.debug("TX [%s] Persisting transaction...", destination)
+
+ transaction = Transaction.create_new(
+ origin_server_ts=int(self.clock.time_msec()),
+ transaction_id=txn_id,
+ origin=self._server_name,
+ destination=destination,
+ pdus=pdus,
+ edus=edus,
+ )
+
+ self._next_txn_id += 1
+
+ yield self._transaction_actions.prepare_to_send(transaction)
+
+ logger.debug("TX [%s] Persisted transaction", destination)
+ logger.info(
+ "TX [%s] {%s} Sending transaction [%s],"
+ " (PDUs: %d, EDUs: %d)",
+ destination, txn_id,
+ transaction.transaction_id,
+ len(pdus),
+ len(edus),
+ )
+
+ # Actually send the transaction
+
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def json_data_cb():
+ data = transaction.get_dict()
+ now = int(self.clock.time_msec())
+ if "pdus" in data:
+ for p in data["pdus"]:
+ if "age_ts" in p:
+ unsigned = p.setdefault("unsigned", {})
+ unsigned["age"] = now - int(p["age_ts"])
+ del p["age_ts"]
+ return data
+
+ try:
+ response = yield self._transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
+
+ if e.code in (401, 404, 429) or 500 <= e.code:
+ logger.info(
+ "TX [%s] {%s} got %d response",
+ destination, txn_id, code
+ )
+ raise e
+
+ logger.info(
+ "TX [%s] {%s} got %d response",
+ destination, txn_id, code
+ )
+
+ yield self._transaction_actions.delivered(
+ transaction, code, response
+ )
+
+ logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
+
+ if code == 200:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "TX [%s] {%s} Remote returned error for %s: %s",
+ destination, txn_id, e_id, r,
+ )
+ else:
+ for p in pdus:
+ logger.warn(
+ "TX [%s] {%s} Failed to send event %s",
+ destination, txn_id, p.event_id,
+ )
+ success = False
+
+ defer.returnValue(success)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
deleted file mode 100644
index e5e42c647d..0000000000
--- a/synapse/federation/transaction_queue.py
+++ /dev/null
@@ -1,716 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import datetime
-import logging
-
-from six import itervalues
-
-from prometheus_client import Counter
-
-from twisted.internet import defer
-
-import synapse.metrics
-from synapse.api.errors import (
- FederationDeniedError,
- HttpResponseException,
- RequestSendFailed,
-)
-from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
-from synapse.metrics import (
- LaterGauge,
- event_processing_loop_counter,
- event_processing_loop_room_count,
- events_processed_counter,
- sent_transactions_counter,
-)
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util import logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
-
-from .persistence import TransactionActions
-from .units import Edu, Transaction
-
-logger = logging.getLogger(__name__)
-
-sent_pdus_destination_dist_count = Counter(
- "synapse_federation_client_sent_pdu_destinations:count",
- "Number of PDUs queued for sending to one or more destinations",
-)
-
-sent_pdus_destination_dist_total = Counter(
- "synapse_federation_client_sent_pdu_destinations:total", ""
- "Total number of PDUs queued for sending across all destinations",
-)
-
-sent_edus_counter = Counter(
- "synapse_federation_client_sent_edus",
- "Total number of EDUs successfully sent",
-)
-
-sent_edus_by_type = Counter(
- "synapse_federation_client_sent_edus_by_type",
- "Number of sent EDUs successfully sent, by event type",
- ["type"],
-)
-
-
-class TransactionQueue(object):
- """This class makes sure we only have one transaction in flight at
- a time for a given destination.
-
- It batches pending PDUs into single transactions.
- """
-
- def __init__(self, hs):
- self.hs = hs
- self.server_name = hs.hostname
-
- self.store = hs.get_datastore()
- self.state = hs.get_state_handler()
- self.transaction_actions = TransactionActions(self.store)
-
- self.transport_layer = hs.get_federation_transport_client()
-
- self.clock = hs.get_clock()
- self.is_mine_id = hs.is_mine_id
-
- # Is a mapping from destinations -> deferreds. Used to keep track
- # of which destinations have transactions in flight and when they are
- # done
- self.pending_transactions = {}
-
- LaterGauge(
- "synapse_federation_transaction_queue_pending_destinations",
- "",
- [],
- lambda: len(self.pending_transactions),
- )
-
- # Is a mapping from destination -> list of
- # tuple(pending pdus, deferred, order)
- self.pending_pdus_by_dest = pdus = {}
- # destination -> list of tuple(edu, deferred)
- self.pending_edus_by_dest = edus = {}
-
- # Map of user_id -> UserPresenceState for all the pending presence
- # to be sent out by user_id. Entries here get processed and put in
- # pending_presence_by_dest
- self.pending_presence = {}
-
- # Map of destination -> user_id -> UserPresenceState of pending presence
- # to be sent to each destinations
- self.pending_presence_by_dest = presence = {}
-
- # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
- # based on their key (e.g. typing events by room_id)
- # Map of destination -> (edu_type, key) -> Edu
- self.pending_edus_keyed_by_dest = edus_keyed = {}
-
- LaterGauge(
- "synapse_federation_transaction_queue_pending_pdus",
- "",
- [],
- lambda: sum(map(len, pdus.values())),
- )
- LaterGauge(
- "synapse_federation_transaction_queue_pending_edus",
- "",
- [],
- lambda: (
- sum(map(len, edus.values()))
- + sum(map(len, presence.values()))
- + sum(map(len, edus_keyed.values()))
- ),
- )
-
- # destination -> stream_id of last successfully sent to-device message.
- # NB: may be a long or an int.
- self.last_device_stream_id_by_dest = {}
-
- # destination -> stream_id of last successfully sent device list
- # update.
- self.last_device_list_stream_id_by_dest = {}
-
- # HACK to get unique tx id
- self._next_txn_id = int(self.clock.time_msec())
-
- self._order = 1
-
- self._is_processing = False
- self._last_poked_id = -1
-
- self._processing_pending_presence = False
-
- def notify_new_events(self, current_id):
- """This gets called when we have some new events we might want to
- send out to other servers.
- """
- self._last_poked_id = max(current_id, self._last_poked_id)
-
- if self._is_processing:
- return
-
- # fire off a processing loop in the background
- run_as_background_process(
- "process_event_queue_for_federation",
- self._process_event_queue_loop,
- )
-
- @defer.inlineCallbacks
- def _process_event_queue_loop(self):
- try:
- self._is_processing = True
- while True:
- last_token = yield self.store.get_federation_out_pos("events")
- next_token, events = yield self.store.get_all_new_events_stream(
- last_token, self._last_poked_id, limit=100,
- )
-
- logger.debug("Handling %s -> %s", last_token, next_token)
-
- if not events and next_token >= self._last_poked_id:
- break
-
- @defer.inlineCallbacks
- def handle_event(event):
- # Only send events for this server.
- send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
- is_mine = self.is_mine_id(event.sender)
- if not is_mine and send_on_behalf_of is None:
- return
-
- try:
- # Get the state from before the event.
- # We need to make sure that this is the state from before
- # the event and not from after it.
- # Otherwise if the last member on a server in a room is
- # banned then it won't receive the event because it won't
- # be in the room after the ban.
- destinations = yield self.state.get_current_hosts_in_room(
- event.room_id, latest_event_ids=event.prev_event_ids(),
- )
- except Exception:
- logger.exception(
- "Failed to calculate hosts in room for event: %s",
- event.event_id,
- )
- return
-
- destinations = set(destinations)
-
- if send_on_behalf_of is not None:
- # If we are sending the event on behalf of another server
- # then it already has the event and there is no reason to
- # send the event to it.
- destinations.discard(send_on_behalf_of)
-
- logger.debug("Sending %s to %r", event, destinations)
-
- self._send_pdu(event, destinations)
-
- @defer.inlineCallbacks
- def handle_room_events(events):
- for event in events:
- yield handle_event(event)
-
- events_by_room = {}
- for event in events:
- events_by_room.setdefault(event.room_id, []).append(event)
-
- yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.run_in_background(handle_room_events, evs)
- for evs in itervalues(events_by_room)
- ],
- consumeErrors=True
- ))
-
- yield self.store.update_federation_out_pos(
- "events", next_token
- )
-
- if events:
- now = self.clock.time_msec()
- ts = yield self.store.get_received_ts(events[-1].event_id)
-
- synapse.metrics.event_processing_lag.labels(
- "federation_sender").set(now - ts)
- synapse.metrics.event_processing_last_ts.labels(
- "federation_sender").set(ts)
-
- events_processed_counter.inc(len(events))
-
- event_processing_loop_room_count.labels(
- "federation_sender"
- ).inc(len(events_by_room))
-
- event_processing_loop_counter.labels("federation_sender").inc()
-
- synapse.metrics.event_processing_positions.labels(
- "federation_sender").set(next_token)
-
- finally:
- self._is_processing = False
-
- def _send_pdu(self, pdu, destinations):
- # We loop through all destinations to see whether we already have
- # a transaction in progress. If we do, stick it in the pending_pdus
- # table and we'll get back to it later.
-
- order = self._order
- self._order += 1
-
- destinations = set(destinations)
- destinations.discard(self.server_name)
- logger.debug("Sending to: %s", str(destinations))
-
- if not destinations:
- return
-
- sent_pdus_destination_dist_total.inc(len(destinations))
- sent_pdus_destination_dist_count.inc()
-
- for destination in destinations:
- self.pending_pdus_by_dest.setdefault(destination, []).append(
- (pdu, order)
- )
-
- self._attempt_new_transaction(destination)
-
- @logcontext.preserve_fn # the caller should not yield on this
- @defer.inlineCallbacks
- def send_presence(self, states):
- """Send the new presence states to the appropriate destinations.
-
- This actually queues up the presence states ready for sending and
- triggers a background task to process them and send out the transactions.
-
- Args:
- states (list(UserPresenceState))
- """
- if not self.hs.config.use_presence:
- # No-op if presence is disabled.
- return
-
- # First we queue up the new presence by user ID, so multiple presence
- # updates in quick successtion are correctly handled
- # We only want to send presence for our own users, so lets always just
- # filter here just in case.
- self.pending_presence.update({
- state.user_id: state for state in states
- if self.is_mine_id(state.user_id)
- })
-
- # We then handle the new pending presence in batches, first figuring
- # out the destinations we need to send each state to and then poking it
- # to attempt a new transaction. We linearize this so that we don't
- # accidentally mess up the ordering and send multiple presence updates
- # in the wrong order
- if self._processing_pending_presence:
- return
-
- self._processing_pending_presence = True
- try:
- while True:
- states_map = self.pending_presence
- self.pending_presence = {}
-
- if not states_map:
- break
-
- yield self._process_presence_inner(list(states_map.values()))
- except Exception:
- logger.exception("Error sending presence states to servers")
- finally:
- self._processing_pending_presence = False
-
- @measure_func("txnqueue._process_presence")
- @defer.inlineCallbacks
- def _process_presence_inner(self, states):
- """Given a list of states populate self.pending_presence_by_dest and
- poke to send a new transaction to each destination
-
- Args:
- states (list(UserPresenceState))
- """
- hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
-
- for destinations, states in hosts_and_states:
- for destination in destinations:
- if destination == self.server_name:
- continue
-
- self.pending_presence_by_dest.setdefault(
- destination, {}
- ).update({
- state.user_id: state for state in states
- })
-
- self._attempt_new_transaction(destination)
-
- def build_and_send_edu(self, destination, edu_type, content, key=None):
- """Construct an Edu object, and queue it for sending
-
- Args:
- destination (str): name of server to send to
- edu_type (str): type of EDU to send
- content (dict): content of EDU
- key (Any|None): clobbering key for this edu
- """
- if destination == self.server_name:
- logger.info("Not sending EDU to ourselves")
- return
-
- edu = Edu(
- origin=self.server_name,
- destination=destination,
- edu_type=edu_type,
- content=content,
- )
-
- self.send_edu(edu, key)
-
- def send_edu(self, edu, key):
- """Queue an EDU for sending
-
- Args:
- edu (Edu): edu to send
- key (Any|None): clobbering key for this edu
- """
- if key:
- self.pending_edus_keyed_by_dest.setdefault(
- edu.destination, {}
- )[(edu.edu_type, key)] = edu
- else:
- self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu)
-
- self._attempt_new_transaction(edu.destination)
-
- def send_device_messages(self, destination):
- if destination == self.server_name:
- logger.info("Not sending device update to ourselves")
- return
-
- self._attempt_new_transaction(destination)
-
- def get_current_token(self):
- return 0
-
- def _attempt_new_transaction(self, destination):
- """Try to start a new transaction to this destination
-
- If there is already a transaction in progress to this destination,
- returns immediately. Otherwise kicks off the process of sending a
- transaction in the background.
-
- Args:
- destination (str):
-
- Returns:
- None
- """
- # list of (pending_pdu, deferred, order)
- if destination in self.pending_transactions:
- # XXX: pending_transactions can get stuck on by a never-ending
- # request at which point pending_pdus_by_dest just keeps growing.
- # we need application-layer timeouts of some flavour of these
- # requests
- logger.debug(
- "TX [%s] Transaction already in progress",
- destination
- )
- return
-
- logger.debug("TX [%s] Starting transaction loop", destination)
-
- run_as_background_process(
- "federation_transaction_transmission_loop",
- self._transaction_transmission_loop,
- destination,
- )
-
- @defer.inlineCallbacks
- def _transaction_transmission_loop(self, destination):
- pending_pdus = []
- try:
- self.pending_transactions[destination] = 1
-
- # This will throw if we wouldn't retry. We do this here so we fail
- # quickly, but we will later check this again in the http client,
- # hence why we throw the result away.
- yield get_retry_limiter(destination, self.clock, self.store)
-
- pending_pdus = []
- while True:
- device_message_edus, device_stream_id, dev_list_id = (
- yield self._get_new_device_messages(destination)
- )
-
- # BEGIN CRITICAL SECTION
- #
- # In order to avoid a race condition, we need to make sure that
- # the following code (from popping the queues up to the point
- # where we decide if we actually have any pending messages) is
- # atomic - otherwise new PDUs or EDUs might arrive in the
- # meantime, but not get sent because we hold the
- # pending_transactions flag.
-
- pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
-
- # We can only include at most 50 PDUs per transactions
- pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
- if leftover_pdus:
- self.pending_pdus_by_dest[destination] = leftover_pdus
-
- pending_edus = self.pending_edus_by_dest.pop(destination, [])
-
- # We can only include at most 100 EDUs per transactions
- pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
- if leftover_edus:
- self.pending_edus_by_dest[destination] = leftover_edus
-
- pending_presence = self.pending_presence_by_dest.pop(destination, {})
-
- pending_edus.extend(
- self.pending_edus_keyed_by_dest.pop(destination, {}).values()
- )
-
- pending_edus.extend(device_message_edus)
- if pending_presence:
- pending_edus.append(
- Edu(
- origin=self.server_name,
- destination=destination,
- edu_type="m.presence",
- content={
- "push": [
- format_user_presence_state(
- presence, self.clock.time_msec()
- )
- for presence in pending_presence.values()
- ]
- },
- )
- )
-
- if pending_pdus:
- logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
- destination, len(pending_pdus))
-
- if not pending_pdus and not pending_edus:
- logger.debug("TX [%s] Nothing to send", destination)
- self.last_device_stream_id_by_dest[destination] = (
- device_stream_id
- )
- return
-
- # END CRITICAL SECTION
-
- success = yield self._send_new_transaction(
- destination, pending_pdus, pending_edus,
- )
- if success:
- sent_transactions_counter.inc()
- sent_edus_counter.inc(len(pending_edus))
- for edu in pending_edus:
- sent_edus_by_type.labels(edu.edu_type).inc()
- # Remove the acknowledged device messages from the database
- # Only bother if we actually sent some device messages
- if device_message_edus:
- yield self.store.delete_device_msgs_for_remote(
- destination, device_stream_id
- )
- logger.info("Marking as sent %r %r", destination, dev_list_id)
- yield self.store.mark_as_sent_devices_by_remote(
- destination, dev_list_id
- )
-
- self.last_device_stream_id_by_dest[destination] = device_stream_id
- self.last_device_list_stream_id_by_dest[destination] = dev_list_id
- else:
- break
- except NotRetryingDestination as e:
- logger.debug(
- "TX [%s] not ready for retry yet (next retry at %s) - "
- "dropping transaction for now",
- destination,
- datetime.datetime.fromtimestamp(
- (e.retry_last_ts + e.retry_interval) / 1000.0
- ),
- )
- except FederationDeniedError as e:
- logger.info(e)
- except HttpResponseException as e:
- logger.warning(
- "TX [%s] Received %d response to transaction: %s",
- destination, e.code, e,
- )
- except RequestSendFailed as e:
- logger.warning("TX [%s] Failed to send transaction: %s", destination, e)
-
- for p, _ in pending_pdus:
- logger.info("Failed to send event %s to %s", p.event_id,
- destination)
- except Exception:
- logger.exception(
- "TX [%s] Failed to send transaction",
- destination,
- )
- for p, _ in pending_pdus:
- logger.info("Failed to send event %s to %s", p.event_id,
- destination)
- finally:
- # We want to be *very* sure we delete this after we stop processing
- self.pending_transactions.pop(destination, None)
-
- @defer.inlineCallbacks
- def _get_new_device_messages(self, destination):
- last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
- to_device_stream_id = self.store.get_to_device_stream_token()
- contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
- destination, last_device_stream_id, to_device_stream_id
- )
- edus = [
- Edu(
- origin=self.server_name,
- destination=destination,
- edu_type="m.direct_to_device",
- content=content,
- )
- for content in contents
- ]
-
- last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
- now_stream_id, results = yield self.store.get_devices_by_remote(
- destination, last_device_list
- )
- edus.extend(
- Edu(
- origin=self.server_name,
- destination=destination,
- edu_type="m.device_list_update",
- content=content,
- )
- for content in results
- )
- defer.returnValue((edus, stream_id, now_stream_id))
-
- @measure_func("_send_new_transaction")
- @defer.inlineCallbacks
- def _send_new_transaction(self, destination, pending_pdus, pending_edus):
-
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[1])
- pdus = [x[0] for x in pending_pdus]
- edus = pending_edus
-
- success = True
-
- logger.debug("TX [%s] _attempt_new_transaction", destination)
-
- txn_id = str(self._next_txn_id)
-
- logger.debug(
- "TX [%s] {%s} Attempting new transaction"
- " (pdus: %d, edus: %d)",
- destination, txn_id,
- len(pdus),
- len(edus),
- )
-
- logger.debug("TX [%s] Persisting transaction...", destination)
-
- transaction = Transaction.create_new(
- origin_server_ts=int(self.clock.time_msec()),
- transaction_id=txn_id,
- origin=self.server_name,
- destination=destination,
- pdus=pdus,
- edus=edus,
- )
-
- self._next_txn_id += 1
-
- yield self.transaction_actions.prepare_to_send(transaction)
-
- logger.debug("TX [%s] Persisted transaction", destination)
- logger.info(
- "TX [%s] {%s} Sending transaction [%s],"
- " (PDUs: %d, EDUs: %d)",
- destination, txn_id,
- transaction.transaction_id,
- len(pdus),
- len(edus),
- )
-
- # Actually send the transaction
-
- # FIXME (erikj): This is a bit of a hack to make the Pdu age
- # keys work
- def json_data_cb():
- data = transaction.get_dict()
- now = int(self.clock.time_msec())
- if "pdus" in data:
- for p in data["pdus"]:
- if "age_ts" in p:
- unsigned = p.setdefault("unsigned", {})
- unsigned["age"] = now - int(p["age_ts"])
- del p["age_ts"]
- return data
-
- try:
- response = yield self.transport_layer.send_transaction(
- transaction, json_data_cb
- )
- code = 200
- except HttpResponseException as e:
- code = e.code
- response = e.response
-
- if e.code in (401, 404, 429) or 500 <= e.code:
- logger.info(
- "TX [%s] {%s} got %d response",
- destination, txn_id, code
- )
- raise e
-
- logger.info(
- "TX [%s] {%s} got %d response",
- destination, txn_id, code
- )
-
- yield self.transaction_actions.delivered(
- transaction, code, response
- )
-
- logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
-
- if code == 200:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
- logger.warn(
- "TX [%s] {%s} Remote returned error for %s: %s",
- destination, txn_id, e_id, r,
- )
- else:
- for p in pdus:
- logger.warn(
- "TX [%s] {%s} Failed to send event %s",
- destination, txn_id, p.event_id,
- )
- success = False
-
- defer.returnValue(success)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2abd9af94f..caad9ae2dd 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
+from synapse.api.ratelimiting import Ratelimiter
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util import logcontext
@@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
login_types.append(t)
self._supported_login_types = login_types
+ self._account_ratelimiter = Ratelimiter()
+ self._failed_attempts_ratelimiter = Ratelimiter()
+
+ self._clock = self.hs.get_clock()
+
@defer.inlineCallbacks
def validate_user_via_ui_auth(self, requester, request_body, clientip):
"""
@@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
Returns:
defer.Deferred: (unicode) canonical_user_id, or None if zero or
multiple matches
+
+ Raises:
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
+ self.ratelimit_login_per_account(user_id)
res = yield self._find_user_id_and_pwd_hash(user_id)
if res is not None:
defer.returnValue(res[0])
@@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem accessing the database
SynapseError if there was a problem with the request
LoginError if there was an authentication problem.
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
if username.startswith('@'):
@@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
username, self.hs.hostname
).to_string()
+ self.ratelimit_login_per_account(qualified_user_id)
+
login_type = login_submission.get("type")
known_login_type = False
@@ -715,9 +730,16 @@ class AuthHandler(BaseHandler):
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
- # unknown username or invalid password. We raise a 403 here, but note
- # that if we're doing user-interactive login, it turns all LoginErrors
- # into a 401 anyway.
+ # unknown username or invalid password.
+ self._failed_attempts_ratelimiter.ratelimit(
+ qualified_user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=True,
+ )
+
+ # We raise a 403 here, but note that if we're doing user-interactive
+ # login, it turns all LoginErrors into a 401 anyway.
raise LoginError(
403, "Invalid password",
errcode=Codes.FORBIDDEN
@@ -735,6 +757,10 @@ class AuthHandler(BaseHandler):
password (unicode): the provided password
Returns:
(unicode) the canonical_user_id, or None if unknown user / bad password
+
+ Raises:
+ LimitExceededError if the ratelimiter's login requests count for this
+ user is too high too proceed.
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@@ -763,6 +789,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+ self.ratelimit_login_per_account(user_id)
yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@@ -934,6 +961,33 @@ class AuthHandler(BaseHandler):
else:
return defer.succeed(False)
+ def ratelimit_login_per_account(self, user_id):
+ """Checks whether the process must be stopped because of ratelimiting.
+
+ Checks against two ratelimiters: the generic one for login attempts per
+ account and the one specific to failed attempts.
+
+ Args:
+ user_id (unicode): complete @user:id
+
+ Raises:
+ LimitExceededError if one of the ratelimiters' login requests count
+ for this user is too high too proceed.
+ """
+ self._failed_attempts_ratelimiter.ratelimit(
+ user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+ burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+ update=False,
+ )
+
+ self._account_ratelimiter.ratelimit(
+ user_id.lower(), time_now_s=self._clock.time(),
+ rate_hz=self.hs.config.rc_login_account.per_second,
+ burst_count=self.hs.config.rc_login_account.burst_count,
+ update=True,
+ )
+
@attr.s
class MacaroonGenerator(object):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 63793b1fcf..578320607e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,7 +243,14 @@ class EventCreationHandler(object):
self.spam_checker = hs.get_spam_checker()
- if self.config.block_events_without_consent_error is not None:
+ self._block_events_without_consent_error = (
+ self.config.block_events_without_consent_error
+ )
+
+ # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+ # config options, but *only* if we have a configuration for which we are
+ # going to need it.
+ if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)
@defer.inlineCallbacks
@@ -378,7 +385,7 @@ class EventCreationHandler(object):
Raises:
ConsentNotGivenError: if the user has not given consent yet
"""
- if self.config.block_events_without_consent_error is None:
+ if self._block_events_without_consent_error is None:
return
# exempt AS users from needing consent
@@ -405,7 +412,7 @@ class EventCreationHandler(object):
consent_uri = self._consent_uri_builder.build_user_consent_uri(
requester.user.localpart,
)
- msg = self.config.block_events_without_consent_error % {
+ msg = self._block_events_without_consent_error % {
'consent_uri': consent_uri,
}
raise ConsentNotGivenError(
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 1728089667..dd783ae134 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,9 +16,8 @@ import logging
from twisted.internet import defer
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
logger = logging.getLogger(__name__)
@@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
"""Called when we receive an EDU of type m.receipt from a remote HS.
"""
receipts = [
- {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_ids": user_values["event_ids"],
- "data": user_values.get("data", {}),
- }
+ ReadReceipt(
+ room_id=room_id,
+ receipt_type=receipt_type,
+ user_id=user_id,
+ event_ids=user_values["event_ids"],
+ data=user_values.get("data", {}),
+ )
for room_id, room_values in content.items()
for receipt_type, users in room_values.items()
for user_id, user_values in users.items()
@@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
max_batch_id = None
for receipt in receipts:
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
res = yield self.store.insert_receipt(
- room_id, receipt_type, user_id, event_ids, data
+ receipt.room_id,
+ receipt.receipt_type,
+ receipt.user_id,
+ receipt.event_ids,
+ receipt.data,
)
if not res:
@@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
# no new receipts
defer.returnValue(False)
- affected_room_ids = list(set([r["room_id"] for r in receipts]))
+ affected_room_ids = list(set([r.room_id for r in receipts]))
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
@@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
"""Called when a client tells us a local user has read up to the given
event_id in the room.
"""
- receipt = {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_ids": [event_id],
- "data": {
+ receipt = ReadReceipt(
+ room_id=room_id,
+ receipt_type=receipt_type,
+ user_id=user_id,
+ event_ids=[event_id],
+ data={
"ts": int(self.clock.time_msec()),
- }
- }
+ },
+ )
is_new = yield self._handle_new_receipts([receipt])
if not is_new:
return
- # Work out which remote servers should be poked and poke them.
-
- # TODO: optimise this to move some of the work to the workers.
- data = receipt["data"]
-
- # XXX why does this not use state.get_current_hosts_in_room() ?
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.build_and_send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": [event_id],
- "data": data,
- }
- }
- },
- },
- key=(room_id, receipt_type, user_id),
- )
+ self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 03130edc54..68f73d3793 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,6 +23,7 @@ from synapse.api.constants import LoginType
from synapse.api.errors import (
AuthError,
Codes,
+ ConsentNotGivenError,
InvalidCaptchaError,
LimitExceededError,
RegistrationError,
@@ -311,6 +312,10 @@ class RegistrationHandler(BaseHandler):
)
else:
yield self._join_user_to_room(fake_requester, r)
+ except ConsentNotGivenError as e:
+ # Technically not necessary to pull out this error though
+ # moving away from bare excepts is a good thing to do.
+ logger.error("Failed to join new user to %r: %r", r, e)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
@@ -629,8 +634,8 @@ class RegistrationHandler(BaseHandler):
allowed, time_allowed = self.ratelimiter.can_do_action(
address, time_now_s=time_now,
- rate_hz=self.hs.config.rc_registration_requests_per_second,
- burst_count=self.hs.config.rc_registration_request_burst_count,
+ rate_hz=self.hs.config.rc_registration.per_second,
+ burst_count=self.hs.config.rc_registration.burst_count,
)
if not allowed:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d92f8c529c..7dc0e236e7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
world_readable or publically joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
-
- For each user in the directory we also store a room_id which is public and that the
- user is joined to. This allows us to ignore history_visibility and join_rules changes
- for that user in all other public rooms, as we know they'll still be in at least
- one public room.
"""
- INITIAL_ROOM_SLEEP_MS = 50
- INITIAL_ROOM_SLEEP_COUNT = 100
- INITIAL_ROOM_BATCH_SIZE = 100
- INITIAL_USER_SLEEP_MS = 10
-
def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
@@ -59,17 +49,6 @@ class UserDirectoryHandler(object):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users
-
- # If we're a worker, don't sleep when doing the initial room work, as it
- # won't monopolise the master's CPU.
- if hs.config.worker_app:
- self.INITIAL_ROOM_SLEEP_MS = 0
- self.INITIAL_USER_SLEEP_MS = 0
-
- # When start up for the first time we need to populate the user_directory.
- # This is a set of user_id's we've inserted already
- self.initially_handled_users = set()
-
# The current position in the current_state_delta stream
self.pos = None
@@ -132,7 +111,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
- user_id, profile.display_name, profile.avatar_url, None
+ user_id, profile.display_name, profile.avatar_url
)
@defer.inlineCallbacks
@@ -149,10 +128,9 @@ class UserDirectoryHandler(object):
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()
- # If still None then we need to do the initial fill of directory
+ # If still None then the initial background update hasn't happened yet
if self.pos is None:
- yield self._do_initial_spam()
- self.pos = yield self.store.get_user_directory_stream_pos()
+ defer.returnValue(None)
# Loop round handling deltas until we're up to date
while True:
@@ -174,133 +152,6 @@ class UserDirectoryHandler(object):
yield self.store.update_user_directory_stream_pos(self.pos)
@defer.inlineCallbacks
- def _do_initial_spam(self):
- """Populates the user_directory from the current state of the DB, used
- when synapse first starts with user_directory support
- """
- new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
- # Delete any existing entries just in case there are any
- yield self.store.delete_all_from_user_dir()
-
- # We process by going through each existing room at a time.
- room_ids = yield self.store.get_all_rooms()
-
- logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
- num_processed_rooms = 0
-
- for room_id in room_ids:
- logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
- yield self._handle_initial_room(room_id)
- num_processed_rooms += 1
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- logger.info("Processed all rooms.")
-
- if self.search_all_users:
- num_processed_users = 0
- user_ids = yield self.store.get_all_local_users()
- logger.info(
- "Doing initial update of user directory. %d users", len(user_ids)
- )
- for user_id in user_ids:
- # We add profiles for all users even if they don't match the
- # include pattern, just in case we want to change it in future
- logger.info(
- "Handling user %d/%d", num_processed_users + 1, len(user_ids)
- )
- yield self._handle_local_user(user_id)
- num_processed_users += 1
- yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
- logger.info("Processed all users")
-
- self.initially_handled_users = None
-
- yield self.store.update_user_directory_stream_pos(new_pos)
-
- @defer.inlineCallbacks
- def _handle_initial_room(self, room_id):
- """
- Called when we initially fill out user_directory one room at a time
- """
- is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
- if not is_in_room:
- return
-
- is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
- room_id
- )
-
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
- user_ids = set(users_with_profile)
- unhandled_users = user_ids - self.initially_handled_users
-
- yield self.store.add_profiles_to_user_dir(
- {user_id: users_with_profile[user_id] for user_id in unhandled_users}
- )
-
- self.initially_handled_users |= unhandled_users
-
- # We now go and figure out the new users who share rooms with user entries
- # We sleep aggressively here as otherwise it can starve resources.
- # We also batch up inserts/updates, but try to avoid too many at once.
- to_insert = set()
- count = 0
-
- if is_public:
- for user_id in user_ids:
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- if self.store.get_if_app_services_interested_in_user(user_id):
- count += 1
- continue
-
- to_insert.add(user_id)
- if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.add_users_in_public_rooms(room_id, to_insert)
- to_insert.clear()
-
- if to_insert:
- yield self.store.add_users_in_public_rooms(room_id, to_insert)
- to_insert.clear()
- else:
-
- for user_id in user_ids:
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
- if not self.is_mine_id(user_id):
- count += 1
- continue
-
- if self.store.get_if_app_services_interested_in_user(user_id):
- count += 1
- continue
-
- for other_user_id in user_ids:
- if user_id == other_user_id:
- continue
-
- if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
- count += 1
-
- user_set = (user_id, other_user_id)
- to_insert.add(user_set)
-
- if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
- yield self.store.add_users_who_share_private_room(
- room_id, not is_public, to_insert
- )
- to_insert.clear()
-
- if to_insert:
- yield self.store.add_users_who_share_private_room(room_id, to_insert)
- to_insert.clear()
-
- @defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
"""
@@ -449,7 +300,9 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_directory(user_id)
if not row:
- yield self.store.add_profiles_to_user_dir({user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
@@ -461,9 +314,9 @@ class UserDirectoryHandler(object):
"""
logger.debug("Adding new user to dir, %r", user_id)
- row = yield self.store.get_user_in_directory(user_id)
- if not row:
- yield self.store.add_profiles_to_user_dir({user_id: profile})
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
@@ -479,7 +332,9 @@ class UserDirectoryHandler(object):
# First, if they're our user then we need to update for every user
if self.is_mine_id(user_id):
- is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+ is_appservice = self.store.get_if_app_services_interested_in_user(
+ user_id
+ )
# We don't care about appservice users.
if not is_appservice:
@@ -546,9 +401,7 @@ class UserDirectoryHandler(object):
new_avatar = event.content.get("avatar_url")
if prev_name != new_name or prev_avatar != new_avatar:
- yield self.store.update_profile_in_user_dir(
- user_id, new_name, new_avatar, room_id
- )
+ yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 6121c5b6df..8d56effbb8 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
from twisted.web.client import PartialDownloadError
from synapse.api.errors import Codes, LoginError, SynapseError
+from synapse.api.ratelimiting import Ratelimiter
from synapse.http.server import finish_request
from synapse.http.servlet import (
RestServlet,
@@ -97,6 +98,7 @@ class LoginRestServlet(ClientV1RestServlet):
self.registration_handler = hs.get_registration_handler()
self.handlers = hs.get_handlers()
self._well_known_builder = WellKnownBuilder(hs)
+ self._address_ratelimiter = Ratelimiter()
def on_GET(self, request):
flows = []
@@ -129,6 +131,13 @@ class LoginRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
+ self._address_ratelimiter.ratelimit(
+ request.getClientIP(), time_now_s=self.hs.clock.time(),
+ rate_hz=self.hs.config.rc_login_address.per_second,
+ burst_count=self.hs.config.rc_login_address.burst_count,
+ update=True,
+ )
+
login_submission = parse_json_object_from_request(request)
try:
if self.jwt_enabled and (login_submission["type"] ==
@@ -285,6 +294,7 @@ class LoginRestServlet(ClientV1RestServlet):
raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
user_id = UserID(user, self.hs.hostname).to_string()
+
auth_handler = self.auth_handler
registered_user_id = yield auth_handler.check_user_exists(user_id)
if registered_user_id:
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 6f34029431..6d235262c8 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -210,8 +210,8 @@ class RegisterRestServlet(RestServlet):
allowed, time_allowed = self.ratelimiter.can_do_action(
client_addr, time_now_s=time_now,
- rate_hz=self.hs.config.rc_registration_requests_per_second,
- burst_count=self.hs.config.rc_registration_request_burst_count,
+ rate_hz=self.hs.config.rc_registration.per_second,
+ burst_count=self.hs.config.rc_registration.burst_count,
update=False,
)
diff --git a/synapse/server.py b/synapse/server.py
index b9549dd042..dc8f1ccb8c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -42,7 +42,7 @@ from synapse.federation.federation_server import (
ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
-from synapse.federation.transaction_queue import TransactionQueue
+from synapse.federation.sender import FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
from synapse.groups.groups_server import GroupsServerHandler
@@ -434,7 +434,7 @@ class HomeServer(object):
def build_federation_sender(self):
if self.should_send_federation():
- return TransactionQueue(self)
+ return FederationSender(self)
elif not self.config.worker_app:
return FederationRemoteSendQueue(self)
else:
diff --git a/synapse/server.pyi b/synapse/server.pyi
index fb8df56cd5..3ba3a967c2 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,5 +1,6 @@
import synapse.api.auth
import synapse.config.homeserver
+import synapse.federation.sender
import synapse.federation.transaction_queue
import synapse.federation.transport.client
import synapse.handlers
@@ -62,7 +63,7 @@ class HomeServer(object):
def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
pass
- def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue:
+ def get_federation_sender(self) -> synapse.federation.sender.FederationSender:
pass
def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 60cdc884e6..a2f8c23a65 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -52,7 +52,9 @@ class BackgroundUpdatePerformance(object):
Returns:
A duration in ms as a float
"""
- if self.total_item_count == 0:
+ if self.avg_duration_ms == 0:
+ return 0
+ elif self.total_item_count == 0:
return None
else:
# Use the exponential moving average so that we can adapt to
@@ -64,7 +66,9 @@ class BackgroundUpdatePerformance(object):
Returns:
A duration in ms as a float
"""
- if self.total_item_count == 0:
+ if self.total_duration_ms == 0:
+ return 0
+ elif self.total_item_count == 0:
return None
else:
return float(self.total_item_count) / float(self.total_duration_ms)
diff --git a/synapse/storage/schema/delta/53/user_dir_populate.sql b/synapse/storage/schema/delta/53/user_dir_populate.sql
new file mode 100644
index 0000000000..ffcc896b58
--- /dev/null
+++ b/synapse/storage/schema/delta/53/user_dir_populate.sql
@@ -0,0 +1,30 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('populate_user_directory_createtables', '{}');
+
+-- Run through each room and update the user directory according to who is in it
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables');
+
+-- Insert all users, if search_all_users is on
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_user_directory_process_users', '{}', 'populate_user_directory_process_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users');
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 1c00b956e5..4ee653210f 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -16,12 +16,10 @@
import logging
import re
-from six import iteritems
-
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
-from synapse.storage._base import SQLBaseStore
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.state import StateFilter
from synapse.types import get_domain_from_id, get_localpart_from_id
@@ -30,7 +28,276 @@ from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
-class UserDirectoryStore(SQLBaseStore):
+TEMP_TABLE = "_temp_populate_user_directory"
+
+
+class UserDirectoryStore(BackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(UserDirectoryStore, self).__init__(db_conn, hs)
+
+ self.server_name = hs.hostname
+
+ self.register_background_update_handler(
+ "populate_user_directory_createtables",
+ self._populate_user_directory_createtables,
+ )
+ self.register_background_update_handler(
+ "populate_user_directory_process_rooms",
+ self._populate_user_directory_process_rooms,
+ )
+ self.register_background_update_handler(
+ "populate_user_directory_process_users",
+ self._populate_user_directory_process_users,
+ )
+ self.register_background_update_handler(
+ "populate_user_directory_cleanup", self._populate_user_directory_cleanup
+ )
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_createtables(self, progress, batch_size):
+
+ # Get all the rooms that we want to process.
+ def _make_staging_area(txn):
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_position(position TEXT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ # Get rooms we want to process from the database
+ sql = """
+ SELECT room_id, count(*) FROM current_state_events
+ GROUP BY room_id
+ """
+ txn.execute(sql)
+ rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
+ self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+ del rooms
+
+ # If search all users is on, get all the users we want to add.
+ if self.hs.config.user_directory_search_all_users:
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_users(user_id TEXT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ txn.execute("SELECT name FROM users")
+ users = [{"user_id": x[0]} for x in txn.fetchall()]
+
+ self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
+
+ new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+ yield self.runInteraction(
+ "populate_user_directory_temp_build", _make_staging_area
+ )
+ yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+
+ yield self._end_background_update("populate_user_directory_createtables")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_cleanup(self, progress, batch_size):
+ """
+ Update the user directory stream position, then clean up the old tables.
+ """
+ position = yield self._simple_select_one_onecol(
+ TEMP_TABLE + "_position", None, "position"
+ )
+ yield self.update_user_directory_stream_pos(position)
+
+ def _delete_staging_area(txn):
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+ yield self.runInteraction(
+ "populate_user_directory_cleanup", _delete_staging_area
+ )
+
+ yield self._end_background_update("populate_user_directory_cleanup")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_process_rooms(self, progress, batch_size):
+
+ state = self.hs.get_state_handler()
+
+ # If we don't have progress filed, delete everything.
+ if not progress:
+ yield self.delete_all_from_user_dir()
+
+ def _get_next_batch(txn):
+ sql = """
+ SELECT room_id FROM %s
+ ORDER BY events DESC
+ LIMIT %s
+ """ % (
+ TEMP_TABLE + "_rooms",
+ str(batch_size),
+ )
+ txn.execute(sql)
+ rooms_to_work_on = txn.fetchall()
+
+ if not rooms_to_work_on:
+ return None
+
+ rooms_to_work_on = [x[0] for x in rooms_to_work_on]
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+ progress["remaining"] = txn.fetchone()[0]
+
+ return rooms_to_work_on
+
+ rooms_to_work_on = yield self.runInteraction(
+ "populate_user_directory_temp_read", _get_next_batch
+ )
+
+ # No more rooms -- complete the transaction.
+ if not rooms_to_work_on:
+ yield self._end_background_update("populate_user_directory_process_rooms")
+ defer.returnValue(1)
+
+ logger.info(
+ "Processing the next %d rooms of %d remaining"
+ % (len(rooms_to_work_on), progress["remaining"])
+ )
+
+ for room_id in rooms_to_work_on:
+ is_in_room = yield self.is_host_joined(room_id, self.server_name)
+
+ if is_in_room:
+ is_public = yield self.is_room_world_readable_or_publicly_joinable(
+ room_id
+ )
+
+ users_with_profile = yield state.get_current_user_in_room(room_id)
+ user_ids = set(users_with_profile)
+
+ # Update each user in the user directory.
+ for user_id, profile in users_with_profile.items():
+ yield self.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
+
+ to_insert = set()
+
+ if is_public:
+ for user_id in user_ids:
+ if self.get_if_app_services_interested_in_user(user_id):
+ continue
+
+ to_insert.add(user_id)
+
+ if to_insert:
+ yield self.add_users_in_public_rooms(room_id, to_insert)
+ to_insert.clear()
+ else:
+ for user_id in user_ids:
+ if not self.hs.is_mine_id(user_id):
+ continue
+
+ if self.get_if_app_services_interested_in_user(user_id):
+ continue
+
+ for other_user_id in user_ids:
+ if user_id == other_user_id:
+ continue
+
+ user_set = (user_id, other_user_id)
+ to_insert.add(user_set)
+
+ if to_insert:
+ yield self.add_users_who_share_private_room(room_id, to_insert)
+ to_insert.clear()
+
+ # We've finished a room. Delete it from the table.
+ yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+ yield self.runInteraction(
+ "populate_user_directory",
+ self._background_update_progress_txn,
+ "populate_user_directory_process_rooms",
+ progress,
+ )
+
+ defer.returnValue(len(rooms_to_work_on))
+
+ @defer.inlineCallbacks
+ def _populate_user_directory_process_users(self, progress, batch_size):
+ """
+ If search_all_users is enabled, add all of the users to the user directory.
+ """
+ if not self.hs.config.user_directory_search_all_users:
+ yield self._end_background_update("populate_user_directory_process_users")
+ defer.returnValue(1)
+
+ def _get_next_batch(txn):
+ sql = "SELECT user_id FROM %s LIMIT %s" % (
+ TEMP_TABLE + "_users",
+ str(batch_size),
+ )
+ txn.execute(sql)
+ users_to_work_on = txn.fetchall()
+
+ if not users_to_work_on:
+ return None
+
+ users_to_work_on = [x[0] for x in users_to_work_on]
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
+ txn.execute(sql)
+ progress["remaining"] = txn.fetchone()[0]
+
+ return users_to_work_on
+
+ users_to_work_on = yield self.runInteraction(
+ "populate_user_directory_temp_read", _get_next_batch
+ )
+
+ # No more users -- complete the transaction.
+ if not users_to_work_on:
+ yield self._end_background_update("populate_user_directory_process_users")
+ defer.returnValue(1)
+
+ logger.info(
+ "Processing the next %d users of %d remaining"
+ % (len(users_to_work_on), progress["remaining"])
+ )
+
+ for user_id in users_to_work_on:
+ profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
+ yield self.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url
+ )
+
+ # We've finished processing a user. Delete it from the table.
+ yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id})
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+ yield self.runInteraction(
+ "populate_user_directory",
+ self._background_update_progress_txn,
+ "populate_user_directory_process_users",
+ progress,
+ )
+
+ defer.returnValue(len(users_to_work_on))
+
@defer.inlineCallbacks
def is_room_world_readable_or_publicly_joinable(self, room_id):
"""Check if the room is either world_readable or publically joinable
@@ -62,89 +329,16 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(False)
- def add_profiles_to_user_dir(self, users_with_profile):
- """Add profiles to the user directory
-
- Args:
- users_with_profile (dict): Users to add to directory in the form of
- mapping of user_id -> ProfileInfo
+ def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
+ """
+ Update or add a user's profile in the user directory.
"""
- if isinstance(self.database_engine, PostgresEngine):
- # We weight the loclpart most highly, then display name and finally
- # server name
- sql = """
- INSERT INTO user_directory_search(user_id, vector)
- VALUES (?,
- setweight(to_tsvector('english', ?), 'A')
- || setweight(to_tsvector('english', ?), 'D')
- || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- )
- """
- args = (
- (
- user_id,
- get_localpart_from_id(user_id),
- get_domain_from_id(user_id),
- profile.display_name,
- )
- for user_id, profile in iteritems(users_with_profile)
- )
- elif isinstance(self.database_engine, Sqlite3Engine):
- sql = """
- INSERT INTO user_directory_search(user_id, value)
- VALUES (?,?)
- """
- args = tuple(
- (
- user_id,
- "%s %s" % (user_id, p.display_name) if p.display_name else user_id,
- )
- for user_id, p in iteritems(users_with_profile)
- )
- else:
- # This should be unreachable.
- raise Exception("Unrecognized database engine")
-
- def _add_profiles_to_user_dir_txn(txn):
- txn.executemany(sql, args)
- self._simple_insert_many_txn(
- txn,
- table="user_directory",
- values=[
- {
- "user_id": user_id,
- "room_id": None,
- "display_name": profile.display_name,
- "avatar_url": profile.avatar_url,
- }
- for user_id, profile in iteritems(users_with_profile)
- ],
- )
- for user_id in users_with_profile:
- txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
-
- return self.runInteraction(
- "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
- )
-
- @defer.inlineCallbacks
- def update_user_in_user_dir(self, user_id, room_id):
- yield self._simple_update_one(
- table="user_directory",
- keyvalues={"user_id": user_id},
- updatevalues={"room_id": room_id},
- desc="update_user_in_user_dir",
- )
- self.get_user_in_directory.invalidate((user_id,))
-
- def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id):
def _update_profile_in_user_dir_txn(txn):
new_entry = self._simple_upsert_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
- insertion_values={"room_id": room_id},
values={"display_name": display_name, "avatar_url": avatar_url},
lock=False, # We're only inserter
)
@@ -282,18 +476,6 @@ class UserDirectoryStore(SQLBaseStore):
defer.returnValue(user_ids)
@defer.inlineCallbacks
- def get_all_rooms(self):
- """Get all room_ids we've ever known about, in ascending order of "size"
- """
- sql = """
- SELECT room_id FROM current_state_events
- GROUP BY room_id
- ORDER BY count(*) ASC
- """
- rows = yield self._execute("get_all_rooms", None, sql)
- defer.returnValue([room_id for room_id, in rows])
-
- @defer.inlineCallbacks
def get_all_local_users(self):
"""Get all local users
"""
@@ -553,8 +735,8 @@ class UserDirectoryStore(SQLBaseStore):
"""
if self.hs.config.user_directory_search_all_users:
- join_args = ()
- where_clause = "1=1"
+ join_args = (user_id,)
+ where_clause = "user_id != ?"
else:
join_args = (user_id,)
where_clause = """
diff --git a/synapse/types.py b/synapse/types.py
index d8cb64addb..3de94b6335 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -16,6 +16,8 @@ import re
import string
from collections import namedtuple
+import attr
+
from synapse.api.errors import SynapseError
@@ -455,3 +457,13 @@ class ThirdPartyInstanceID(
@classmethod
def create(cls, appservice_id, network_id,):
return cls(appservice_id=appservice_id, network_id=network_id)
+
+
+@attr.s(slots=True)
+class ReadReceipt(object):
+ """Information about a read-receipt"""
+ room_id = attr.ib()
+ receipt_type = attr.ib()
+ user_id = attr.ib()
+ event_ids = attr.ib()
+ data = attr.ib()
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index c9c1506273..010e65829e 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -187,12 +187,32 @@ class RegistrationTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_auto_create_auto_join_where_no_consent(self):
- self.hs.config.user_consent_at_registration = True
- self.hs.config.block_events_without_consent_error = "Error"
+ """Test to ensure that the first user is not auto-joined to a room if
+ they have not given general consent.
+ """
+
+ # Given:-
+ # * a user must give consent,
+ # * they have not given that consent
+ # * The server is configured to auto-join to a room
+ # (and autocreate if necessary)
+
+ event_creation_handler = self.hs.get_event_creation_handler()
+ # (Messing with the internals of event_creation_handler is fragile
+ # but can't see a better way to do this. One option could be to subclass
+ # the test with custom config.)
+ event_creation_handler._block_events_without_consent_error = ("Error")
+ event_creation_handler._consent_uri_builder = Mock()
room_alias_str = "#room:test"
self.hs.config.auto_join_rooms = [room_alias_str]
+
+ # When:-
+ # * the user is registered and post consent actions are called
res = yield self.handler.register(localpart='jeff')
yield self.handler.post_consent_actions(res[0])
+
+ # Then:-
+ # * Ensure that they have not been joined to the room
rooms = yield self.store.get_rooms_for_user(res[0])
self.assertEqual(len(rooms), 0)
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index 114807efc1..aefe11ac28 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -163,9 +163,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
def get_users_in_public_rooms(self):
r = self.get_success(
self.store._simple_select_list(
- "users_in_public_rooms",
- None,
- ("user_id", "room_id"),
+ "users_in_public_rooms", None, ("user_id", "room_id")
)
)
retval = []
@@ -182,6 +180,53 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
)
)
+ def _add_background_updates(self):
+ """
+ Add the background updates we need to run.
+ """
+ # Ugh, have to reset this flag
+ self.store._all_done = False
+
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_user_directory_createtables",
+ "progress_json": "{}",
+ },
+ )
+ )
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_user_directory_process_rooms",
+ "progress_json": "{}",
+ "depends_on": "populate_user_directory_createtables",
+ },
+ )
+ )
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_user_directory_process_users",
+ "progress_json": "{}",
+ "depends_on": "populate_user_directory_process_rooms",
+ },
+ )
+ )
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_user_directory_cleanup",
+ "progress_json": "{}",
+ "depends_on": "populate_user_directory_process_users",
+ },
+ )
+ )
+
def test_initial(self):
"""
The user directory's initial handler correctly updates the search tables.
@@ -211,26 +256,17 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
self.assertEqual(shares_private, [])
self.assertEqual(public_users, [])
- # Reset the handled users caches
- self.handler.initially_handled_users = set()
+ # Do the initial population of the user directory via the background update
+ self._add_background_updates()
- # Do the initial population
- d = self.handler._do_initial_spam()
-
- # This takes a while, so pump it a bunch of times to get through the
- # sleep delays
- for i in range(10):
- self.pump(1)
-
- self.get_success(d)
+ while not self.get_success(self.store.has_completed_background_updates()):
+ self.get_success(self.store.do_next_background_update(100), by=0.1)
shares_private = self.get_users_who_share_private_rooms()
public_users = self.get_users_in_public_rooms()
# User 1 and User 2 are in the same public room
- self.assertEqual(
- set(public_users), set([(u1, room), (u2, room)])
- )
+ self.assertEqual(set(public_users), set([(u1, room), (u2, room)]))
# User 1 and User 3 share private rooms
self.assertEqual(
@@ -238,7 +274,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
set([(u1, u3, private_room), (u3, u1, private_room)]),
)
- def test_search_all_users(self):
+ def test_initial_share_all_users(self):
"""
Search all users = True means that a user does not have to share a
private room with the searching user or be in a public room to be search
@@ -248,33 +284,36 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
self.hs.config.user_directory_search_all_users = True
u1 = self.register_user("user1", "pass")
- u1_token = self.login(u1, "pass")
- u2 = self.register_user("user2", "pass")
- u2_token = self.login(u2, "pass")
+ self.register_user("user2", "pass")
u3 = self.register_user("user3", "pass")
- # User 1 and User 2 join a room. User 3 never does.
- room = self.helper.create_room_as(u1, is_public=True, tok=u1_token)
- self.helper.invite(room, src=u1, targ=u2, tok=u1_token)
- self.helper.join(room, user=u2, tok=u2_token)
-
+ # Wipe the user dir
self.get_success(self.store.update_user_directory_stream_pos(None))
self.get_success(self.store.delete_all_from_user_dir())
- # Reset the handled users caches
- self.handler.initially_handled_users = set()
+ # Do the initial population of the user directory via the background update
+ self._add_background_updates()
- # Do the initial population
- d = self.handler._do_initial_spam()
+ while not self.get_success(self.store.has_completed_background_updates()):
+ self.get_success(self.store.do_next_background_update(100), by=0.1)
- # This takes a while, so pump it a bunch of times to get through the
- # sleep delays
- for i in range(10):
- self.pump(1)
+ shares_private = self.get_users_who_share_private_rooms()
+ public_users = self.get_users_in_public_rooms()
- self.get_success(d)
+ # No users share rooms
+ self.assertEqual(public_users, [])
+ self.assertEqual(self._compress_shared(shares_private), set([]))
# Despite not sharing a room, search_all_users means we get a search
# result.
s = self.get_success(self.handler.search_users(u1, u3, 10))
self.assertEqual(len(s["results"]), 1)
+
+ # We can find the other two users
+ s = self.get_success(self.handler.search_users(u1, "user", 10))
+ self.assertEqual(len(s["results"]), 2)
+
+ # Registering a user and then searching for them works.
+ u4 = self.register_user("user4", "pass")
+ s = self.get_success(self.handler.search_users(u1, u4, 10))
+ self.assertEqual(len(s["results"]), 1)
diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py
new file mode 100644
index 0000000000..86312f1096
--- /dev/null
+++ b/tests/rest/client/v1/test_login.py
@@ -0,0 +1,163 @@
+import json
+
+from synapse.rest.client.v1 import admin, login
+
+from tests import unittest
+
+LOGIN_URL = b"/_matrix/client/r0/login"
+
+
+class LoginRestServletTestCase(unittest.HomeserverTestCase):
+
+ servlets = [
+ admin.register_servlets,
+ login.register_servlets,
+ ]
+
+ def make_homeserver(self, reactor, clock):
+
+ self.hs = self.setup_test_homeserver()
+ self.hs.config.enable_registration = True
+ self.hs.config.registrations_require_3pid = []
+ self.hs.config.auto_join_rooms = []
+ self.hs.config.enable_registration_captcha = False
+
+ return self.hs
+
+ def test_POST_ratelimiting_per_address(self):
+ self.hs.config.rc_login_address.burst_count = 5
+ self.hs.config.rc_login_address.per_second = 0.17
+
+ # Create different users so we're sure not to be bothered by the per-user
+ # ratelimiter.
+ for i in range(0, 6):
+ self.register_user("kermit" + str(i), "monkey")
+
+ for i in range(0, 6):
+ params = {
+ "type": "m.login.password",
+ "identifier": {
+ "type": "m.id.user",
+ "user": "kermit" + str(i),
+ },
+ "password": "monkey",
+ }
+ request_data = json.dumps(params)
+ request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
+ self.render(request)
+
+ if i == 5:
+ self.assertEquals(channel.result["code"], b"429", channel.result)
+ retry_after_ms = int(channel.json_body["retry_after_ms"])
+ else:
+ self.assertEquals(channel.result["code"], b"200", channel.result)
+
+ # Since we're ratelimiting at 1 request/min, retry_after_ms should be lower
+ # than 1min.
+ self.assertTrue(retry_after_ms < 6000)
+
+ self.reactor.advance(retry_after_ms / 1000.)
+
+ params = {
+ "type": "m.login.password",
+ "identifier": {
+ "type": "m.id.user",
+ "user": "kermit" + str(i),
+ },
+ "password": "monkey",
+ }
+ request_data = json.dumps(params)
+ request, channel = self.make_request(b"POST", LOGIN_URL, params)
+ self.render(request)
+
+ self.assertEquals(channel.result["code"], b"200", channel.result)
+
+ def test_POST_ratelimiting_per_account(self):
+ self.hs.config.rc_login_account.burst_count = 5
+ self.hs.config.rc_login_account.per_second = 0.17
+
+ self.register_user("kermit", "monkey")
+
+ for i in range(0, 6):
+ params = {
+ "type": "m.login.password",
+ "identifier": {
+ "type": "m.id.user",
+ "user": "kermit",
+ },
+ "password": "monkey",
+ }
+ request_data = json.dumps(params)
+ request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
+ self.render(request)
+
+ if i == 5:
+ self.assertEquals(channel.result["code"], b"429", channel.result)
+ retry_after_ms = int(channel.json_body["retry_after_ms"])
+ else:
+ self.assertEquals(channel.result["code"], b"200", channel.result)
+
+ # Since we're ratelimiting at 1 request/min, retry_after_ms should be lower
+ # than 1min.
+ self.assertTrue(retry_after_ms < 6000)
+
+ self.reactor.advance(retry_after_ms / 1000.)
+
+ params = {
+ "type": "m.login.password",
+ "identifier": {
+ "type": "m.id.user",
+ "user": "kermit",
+ },
+ "password": "monkey",
+ }
+ request_data = json.dumps(params)
+ request, channel = self.make_request(b"POST", LOGIN_URL, params)
+ self.render(request)
+
+ self.assertEquals(channel.result["code"], b"200", channel.result)
+
+ def test_POST_ratelimiting_per_account_failed_attempts(self):
+ self.hs.config.rc_login_failed_attempts.burst_count = 5
+ self.hs.config.rc_login_failed_attempts.per_second = 0.17
+
+ self.register_user("kermit", "monkey")
+
+ for i in range(0, 6):
+ params = {
+ "type": "m.login.password",
+ "identifier": {
+ "type": "m.id.user",
+ "user": "kermit",
+ },
+ "password": "notamonkey",
+ }
+ request_data = json.dumps(params)
+ request, channel = self.make_request(b"POST", LOGIN_URL, request_data)
+ self.render(request)
+
+ if i == 5:
+ self.assertEquals(channel.result["code"], b"429", channel.result)
+ retry_after_ms = int(channel.json_body["retry_after_ms"])
+ else:
+ self.assertEquals(channel.result["code"], b"403", channel.result)
+
+ # Since we're ratelimiting at 1 request/min, retry_after_ms should be lower
+ # than 1min.
+ self.assertTrue(retry_after_ms < 6000)
+
+ self.reactor.advance(retry_after_ms / 1000.)
+
+ params = {
+ "type": "m.login.password",
+ "identifier": {
+ "type": "m.id.user",
+ "user": "kermit",
+ },
+ "password": "notamonkey",
+ }
+ request_data = json.dumps(params)
+ request, channel = self.make_request(b"POST", LOGIN_URL, params)
+ self.render(request)
+
+ self.assertEquals(channel.result["code"], b"403", channel.result)
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index 3600434858..8fb525d3bf 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -132,7 +132,8 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
self.assertEquals(channel.json_body["error"], "Guest access is disabled")
def test_POST_ratelimiting_guest(self):
- self.hs.config.rc_registration_request_burst_count = 5
+ self.hs.config.rc_registration.burst_count = 5
+ self.hs.config.rc_registration.per_second = 0.17
for i in range(0, 6):
url = self.url + b"?kind=guest"
@@ -153,7 +154,8 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
self.assertEquals(channel.result["code"], b"200", channel.result)
def test_POST_ratelimiting(self):
- self.hs.config.rc_registration_request_burst_count = 5
+ self.hs.config.rc_registration.burst_count = 5
+ self.hs.config.rc_registration.per_second = 0.17
for i in range(0, 6):
params = {
diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py
index 512d76e7a3..fd3361404f 100644
--- a/tests/storage/test_user_directory.py
+++ b/tests/storage/test_user_directory.py
@@ -16,7 +16,6 @@
from twisted.internet import defer
from synapse.storage import UserDirectoryStore
-from synapse.storage.roommember import ProfileInfo
from tests import unittest
from tests.utils import setup_test_homeserver
@@ -34,13 +33,9 @@ class UserDirectoryStoreTestCase(unittest.TestCase):
# alice and bob are both in !room_id. bobby is not but shares
# a homeserver with alice.
- yield self.store.add_profiles_to_user_dir(
- {
- ALICE: ProfileInfo(None, "alice"),
- BOB: ProfileInfo(None, "bob"),
- BOBBY: ProfileInfo(None, "bobby"),
- },
- )
+ yield self.store.update_profile_in_user_dir(ALICE, "alice", None)
+ yield self.store.update_profile_in_user_dir(BOB, "bob", None)
+ yield self.store.update_profile_in_user_dir(BOBBY, "bobby", None)
yield self.store.add_users_in_public_rooms(
"!room:id", (ALICE, BOB)
)
diff --git a/tests/unittest.py b/tests/unittest.py
index ef31321bc8..7772a47078 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -330,10 +330,10 @@ class HomeserverTestCase(TestCase):
"""
self.reactor.pump([by] * 100)
- def get_success(self, d):
+ def get_success(self, d, by=0.0):
if not isinstance(d, Deferred):
return d
- self.pump()
+ self.pump(by=by)
return self.successResultOf(d)
def register_user(self, username, password, admin=False):
diff --git a/tests/utils.py b/tests/utils.py
index 03b5a05b22..b58b674aa4 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -151,8 +151,14 @@ def default_config(name):
config.admin_contact = None
config.rc_messages_per_second = 10000
config.rc_message_burst_count = 10000
- config.rc_registration_request_burst_count = 3.0
- config.rc_registration_requests_per_second = 0.17
+ config.rc_registration.per_second = 10000
+ config.rc_registration.burst_count = 10000
+ config.rc_login_address.per_second = 10000
+ config.rc_login_address.burst_count = 10000
+ config.rc_login_account.per_second = 10000
+ config.rc_login_account.burst_count = 10000
+ config.rc_login_failed_attempts.per_second = 10000
+ config.rc_login_failed_attempts.burst_count = 10000
config.saml2_enabled = False
config.public_baseurl = None
config.default_identity_server = None
|