summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md99
-rw-r--r--INSTALL.md2
-rw-r--r--changelog.d/2090.bugfix1
-rw-r--r--changelog.d/4474.misc1
-rw-r--r--changelog.d/4537.feature1
-rw-r--r--changelog.d/4662.misc1
-rw-r--r--changelog.d/4699.bugfix1
-rw-r--r--changelog.d/4735.feature1
-rw-r--r--changelog.d/4740.bugfix1
-rw-r--r--changelog.d/4749.bugfix1
-rw-r--r--changelog.d/4752.misc1
-rw-r--r--changelog.d/4757.feature1
-rw-r--r--changelog.d/4757.misc1
-rw-r--r--changelog.d/4759.feature1
-rw-r--r--changelog.d/4763.bugfix1
-rw-r--r--changelog.d/4765.misc1
-rw-r--r--changelog.d/4770.misc1
-rw-r--r--changelog.d/4771.misc1
-rw-r--r--changelog.d/4772.feature1
-rw-r--r--changelog.d/4776.bugfix1
-rw-r--r--changelog.d/4779.misc1
-rw-r--r--changelog.d/4790.bugfix1
-rw-r--r--changelog.d/4791.feature1
-rw-r--r--changelog.d/4792.bugfix1
-rw-r--r--changelog.d/4793.feature1
-rw-r--r--changelog.d/4794.misc1
-rw-r--r--changelog.d/4795.misc1
-rw-r--r--changelog.d/4796.feature1
-rw-r--r--changelog.d/4797.misc1
-rw-r--r--changelog.d/4798.misc1
-rw-r--r--changelog.d/4799.misc1
-rw-r--r--changelog.d/4801.feature1
-rw-r--r--changelog.d/4804.feature1
-rw-r--r--changelog.d/4814.feature1
-rw-r--r--changelog.d/4815.misc1
-rw-r--r--changelog.d/4816.misc1
-rw-r--r--changelog.d/4817.misc1
-rw-r--r--changelog.d/4818.bugfix1
-rw-r--r--changelog.d/4820.misc1
-rw-r--r--changelog.d/4821.feature1
-rw-r--r--changelog.d/4824.misc1
-rw-r--r--changelog.d/4825.misc1
-rw-r--r--changelog.d/4828.misc1
-rw-r--r--changelog.d/4829.bugfix1
-rw-r--r--changelog.d/4832.misc1
-rw-r--r--changelog.d/4837.bugfix1
-rw-r--r--changelog.d/4838.bugfix1
-rw-r--r--changelog.d/4839.misc1
-rw-r--r--changelog.d/4840.feature1
-rw-r--r--changelog.d/4843.misc1
-rw-r--r--changelog.d/4844.misc1
-rw-r--r--changelog.d/4846.feature1
-rw-r--r--changelog.d/4847.misc1
-rw-r--r--changelog.d/4849.misc1
-rw-r--r--changelog.d/4852.misc1
-rw-r--r--changelog.d/4853.feature1
-rw-r--r--changelog.d/4855.misc1
-rw-r--r--changelog.d/4863.misc1
-rw-r--r--changelog.d/4864.feature1
-rw-r--r--changelog.d/4865.feature1
-rw-r--r--changelog.d/4869.misc1
-rw-r--r--changelog.d/4870.misc1
-rw-r--r--changelog.d/4879.misc1
-rw-r--r--changelog.d/4881.misc1
-rw-r--r--changelog.d/4886.bugfix1
-rw-r--r--changelog.d/4886.misc1
-rw-r--r--changelog.d/4887.feature1
-rw-r--r--changelog.d/4888.bugfix2
-rw-r--r--changelog.d/4889.misc1
-rw-r--r--changelog.d/4890.feature1
-rw-r--r--changelog.d/4895.feature1
-rw-r--r--changelog.d/4895.misc1
-rw-r--r--changelog.d/4896.feature1
-rw-r--r--changelog.d/4900.feature1
-rw-r--r--changelog.d/4902.misc1
-rw-r--r--changelog.d/4904.bugfix1
-rw-r--r--changelog.d/4905.misc1
-rw-r--r--changelog.d/4908.bugfix1
-rw-r--r--changelog.d/4912.misc1
-rw-r--r--changelog.d/4913.misc1
-rw-r--r--changelog.d/4917.misc1
-rw-r--r--changelog.d/4923.misc1
-rw-r--r--changelog.d/4927.feature1
-rw-r--r--changelog.d/4928.misc1
-rw-r--r--changelog.d/4929.misc1
-rw-r--r--changelog.d/4931.feature1
-rw-r--r--changelog.d/4942.bugfix1
-rw-r--r--changelog.d/4944.feature1
-rw-r--r--changelog.d/4947.feature1
-rw-r--r--changelog.d/4949.misc1
-rw-r--r--changelog.d/4953.misc2
-rw-r--r--changelog.d/4954.misc1
-rw-r--r--changelog.d/4955.bugfix1
-rw-r--r--changelog.d/4956.bugfix1
-rw-r--r--changelog.d/4959.misc1
-rw-r--r--changelog.d/4965.misc1
-rw-r--r--changelog.d/4968.misc1
-rw-r--r--changelog.d/4969.misc2
-rw-r--r--changelog.d/4981.bugfix1
-rw-r--r--debian/changelog8
-rwxr-xr-xdemo/start.sh20
-rw-r--r--docker/README.md5
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/constants.py40
-rw-r--r--synapse/api/room_versions.py91
-rw-r--r--synapse/app/federation_sender.py2
-rw-r--r--synapse/app/synchrotron.py5
-rw-r--r--synapse/app/user_dir.py17
-rw-r--r--synapse/event_auth.py20
-rw-r--r--synapse/events/__init__.py15
-rw-r--r--synapse/events/builder.py36
-rw-r--r--synapse/events/validator.py3
-rw-r--r--synapse/federation/federation_base.py16
-rw-r--r--synapse/federation/federation_client.py29
-rw-r--r--synapse/federation/federation_server.py3
-rw-r--r--synapse/federation/send_queue.py73
-rw-r--r--synapse/federation/sender/__init__.py19
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/handlers/federation.py13
-rw-r--r--synapse/handlers/message.py7
-rw-r--r--synapse/handlers/presence.py176
-rw-r--r--synapse/handlers/register.py17
-rw-r--r--synapse/handlers/room.py11
-rw-r--r--synapse/module_api/__init__.py9
-rw-r--r--synapse/replication/slave/storage/events.py27
-rw-r--r--synapse/replication/tcp/client.py15
-rw-r--r--synapse/replication/tcp/protocol.py6
-rw-r--r--synapse/replication/tcp/resource.py3
-rw-r--r--synapse/replication/tcp/streams/__init__.py49
-rw-r--r--synapse/replication/tcp/streams/_base.py (renamed from synapse/replication/tcp/streams.py)127
-rw-r--r--synapse/replication/tcp/streams/events.py146
-rw-r--r--synapse/replication/tcp/streams/federation.py39
-rw-r--r--synapse/rest/client/v1/admin.py2
-rw-r--r--synapse/rest/client/v2_alpha/capabilities.py10
-rw-r--r--synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py2
-rw-r--r--synapse/state/__init__.py16
-rw-r--r--synapse/state/v1.py11
-rw-r--r--synapse/storage/_base.py5
-rw-r--r--synapse/storage/events.py563
-rw-r--r--synapse/storage/events_worker.py3
-rw-r--r--synapse/storage/roommember.py184
-rw-r--r--tests/config/test_server.py32
-rw-r--r--tests/handlers/test_presence.py176
-rw-r--r--tests/replication/slave/storage/_base.py28
-rw-r--r--tests/replication/slave/storage/test_events.py161
-rw-r--r--tests/replication/tcp/streams/test_receipts.py2
-rw-r--r--tests/rest/client/v2_alpha/test_capabilities.py4
-rw-r--r--tests/server.py56
-rw-r--r--tests/state/test_v2.py7
-rw-r--r--tests/storage/test_redaction.py9
-rw-r--r--tests/storage/test_roommember.py5
-rw-r--r--tests/storage/test_state.py5
-rw-r--r--tests/test_event_auth.py10
-rw-r--r--tests/test_state.py5
-rw-r--r--tests/test_visibility.py8
-rw-r--r--tests/utils.py5
156 files changed, 1688 insertions, 875 deletions
diff --git a/CHANGES.md b/CHANGES.md

index b25775d18e..490c2021e0 100644 --- a/CHANGES.md +++ b/CHANGES.md
@@ -1,3 +1,102 @@ +Synapse 0.99.3 (2019-04-01) +=========================== + +No significant changes. + + +Synapse 0.99.3rc1 (2019-03-27) +============================== + +Features +-------- + +- The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. ([\#4537](https://github.com/matrix-org/synapse/issues/4537), [\#4846](https://github.com/matrix-org/synapse/issues/4846), [\#4864](https://github.com/matrix-org/synapse/issues/4864), [\#4887](https://github.com/matrix-org/synapse/issues/4887), [\#4900](https://github.com/matrix-org/synapse/issues/4900), [\#4944](https://github.com/matrix-org/synapse/issues/4944)) +- Add configurable rate limiting to the /register endpoint. ([\#4735](https://github.com/matrix-org/synapse/issues/4735), [\#4804](https://github.com/matrix-org/synapse/issues/4804)) +- Move server key queries to federation reader. ([\#4757](https://github.com/matrix-org/synapse/issues/4757)) +- Add support for /account/3pid REST endpoint to client_reader worker. ([\#4759](https://github.com/matrix-org/synapse/issues/4759)) +- Add an endpoint to the admin API for querying the server version. Contributed by Joseph Weston. ([\#4772](https://github.com/matrix-org/synapse/issues/4772)) +- Include a default configuration file in the 'docs' directory. ([\#4791](https://github.com/matrix-org/synapse/issues/4791), [\#4801](https://github.com/matrix-org/synapse/issues/4801)) +- Synapse is now permissive about trailing slashes on some of its federation endpoints, allowing zero or more to be present. ([\#4793](https://github.com/matrix-org/synapse/issues/4793)) +- Add support for /keys/query and /keys/changes REST endpoints to client_reader worker. ([\#4796](https://github.com/matrix-org/synapse/issues/4796)) +- Add checks to incoming events over federation for events evading auth (aka "soft fail"). ([\#4814](https://github.com/matrix-org/synapse/issues/4814)) +- Add configurable rate limiting to the /login endpoint. ([\#4821](https://github.com/matrix-org/synapse/issues/4821), [\#4865](https://github.com/matrix-org/synapse/issues/4865)) +- Remove trailing slashes from certain outbound federation requests. Retry if receiving a 404. Context: #3622. ([\#4840](https://github.com/matrix-org/synapse/issues/4840)) +- Allow passing --daemonize flags to workers in the same way as with master. ([\#4853](https://github.com/matrix-org/synapse/issues/4853)) +- Batch up outgoing read-receipts to reduce federation traffic. ([\#4890](https://github.com/matrix-org/synapse/issues/4890), [\#4927](https://github.com/matrix-org/synapse/issues/4927)) +- Add option to disable searching the user directory. ([\#4895](https://github.com/matrix-org/synapse/issues/4895)) +- Add option to disable searching of local and remote public room lists. ([\#4896](https://github.com/matrix-org/synapse/issues/4896)) +- Add ability for password providers to login/register a user via 3PID (email, phone). ([\#4931](https://github.com/matrix-org/synapse/issues/4931)) + + +Bugfixes +-------- + +- Fix a bug where media with spaces in the name would get a corrupted name. ([\#2090](https://github.com/matrix-org/synapse/issues/2090)) +- Fix attempting to paginate in rooms where server cannot see any events, to avoid unnecessarily pulling in lots of redacted events. ([\#4699](https://github.com/matrix-org/synapse/issues/4699)) +- 'event_id' is now a required parameter in federated state requests, as per the matrix spec. ([\#4740](https://github.com/matrix-org/synapse/issues/4740)) +- Fix tightloop over connecting to replication server. ([\#4749](https://github.com/matrix-org/synapse/issues/4749)) +- Fix parsing of Content-Disposition headers on remote media requests and URL previews. ([\#4763](https://github.com/matrix-org/synapse/issues/4763)) +- Fix incorrect log about not persisting duplicate state event. ([\#4776](https://github.com/matrix-org/synapse/issues/4776)) +- Fix v4v6 option in HAProxy example config. Contributed by Flakebi. ([\#4790](https://github.com/matrix-org/synapse/issues/4790)) +- Handle batch updates in worker replication protocol. ([\#4792](https://github.com/matrix-org/synapse/issues/4792)) +- Fix bug where we didn't correctly throttle sending of USER_IP commands over replication. ([\#4818](https://github.com/matrix-org/synapse/issues/4818)) +- Fix potential race in handling missing updates in device list updates. ([\#4829](https://github.com/matrix-org/synapse/issues/4829)) +- Fix bug where synapse expected an un-specced `prev_state` field on state events. ([\#4837](https://github.com/matrix-org/synapse/issues/4837)) +- Transfer a user's notification settings (push rules) on room upgrade. ([\#4838](https://github.com/matrix-org/synapse/issues/4838)) +- fix test_auto_create_auto_join_where_no_consent. ([\#4886](https://github.com/matrix-org/synapse/issues/4886)) +- Fix a bug where hs_disabled_message was sometimes not correctly enforced. ([\#4888](https://github.com/matrix-org/synapse/issues/4888)) +- Fix bug in shutdown room admin API where it would fail if a user in the room hadn't consented to the privacy policy. ([\#4904](https://github.com/matrix-org/synapse/issues/4904)) +- Fix bug where blocked world-readable rooms were still peekable. ([\#4908](https://github.com/matrix-org/synapse/issues/4908)) + + +Internal Changes +---------------- + +- Add a systemd setup that supports synapse workers. Contributed by Luca Corbatto. ([\#4662](https://github.com/matrix-org/synapse/issues/4662)) +- Change from TravisCI to Buildkite for CI. ([\#4752](https://github.com/matrix-org/synapse/issues/4752)) +- When presence is disabled don't send over replication. ([\#4757](https://github.com/matrix-org/synapse/issues/4757)) +- Minor docstring fixes for MatrixFederationAgent. ([\#4765](https://github.com/matrix-org/synapse/issues/4765)) +- Optimise EDU transmission for the federation_sender worker. ([\#4770](https://github.com/matrix-org/synapse/issues/4770)) +- Update test_typing to use HomeserverTestCase. ([\#4771](https://github.com/matrix-org/synapse/issues/4771)) +- Update URLs for riot.im icons and logos in the default notification templates. ([\#4779](https://github.com/matrix-org/synapse/issues/4779)) +- Removed unnecessary $ from some federation endpoint path regexes. ([\#4794](https://github.com/matrix-org/synapse/issues/4794)) +- Remove link to deleted title in README. ([\#4795](https://github.com/matrix-org/synapse/issues/4795)) +- Clean up read-receipt handling. ([\#4797](https://github.com/matrix-org/synapse/issues/4797)) +- Add some debug about processing read receipts. ([\#4798](https://github.com/matrix-org/synapse/issues/4798)) +- Clean up some replication code. ([\#4799](https://github.com/matrix-org/synapse/issues/4799)) +- Add some docstrings. ([\#4815](https://github.com/matrix-org/synapse/issues/4815)) +- Add debug logger to try and track down #4422. ([\#4816](https://github.com/matrix-org/synapse/issues/4816)) +- Make shutdown API send explanation message to room after users have been forced joined. ([\#4817](https://github.com/matrix-org/synapse/issues/4817)) +- Update example_log_config.yaml. ([\#4820](https://github.com/matrix-org/synapse/issues/4820)) +- Document the `generate` option for the docker image. ([\#4824](https://github.com/matrix-org/synapse/issues/4824)) +- Fix check-newsfragment for debian-only changes. ([\#4825](https://github.com/matrix-org/synapse/issues/4825)) +- Add some debug logging for device list updates to help with #4828. ([\#4828](https://github.com/matrix-org/synapse/issues/4828)) +- Improve federation documentation, specifically .well-known support. Many thanks to @vaab. ([\#4832](https://github.com/matrix-org/synapse/issues/4832)) +- Disable captcha registration by default in unit tests. ([\#4839](https://github.com/matrix-org/synapse/issues/4839)) +- Add stuff back to the .gitignore. ([\#4843](https://github.com/matrix-org/synapse/issues/4843)) +- Clarify what registration_shared_secret allows for. ([\#4844](https://github.com/matrix-org/synapse/issues/4844)) +- Correctly log expected errors when fetching server keys. ([\#4847](https://github.com/matrix-org/synapse/issues/4847)) +- Update install docs to explicitly state a full-chain (not just the top-level) TLS certificate must be provided to Synapse. This caused some people's Synapse ports to appear correct in a browser but still (rightfully so) upset the federation tester. ([\#4849](https://github.com/matrix-org/synapse/issues/4849)) +- Move client read-receipt processing to federation sender worker. ([\#4852](https://github.com/matrix-org/synapse/issues/4852)) +- Refactor federation TransactionQueue. ([\#4855](https://github.com/matrix-org/synapse/issues/4855)) +- Comment out most options in the generated config. ([\#4863](https://github.com/matrix-org/synapse/issues/4863)) +- Fix yaml library warnings by using safe_load. ([\#4869](https://github.com/matrix-org/synapse/issues/4869)) +- Update Apache setup to remove location syntax. Thanks to @cwmke! ([\#4870](https://github.com/matrix-org/synapse/issues/4870)) +- Reinstate test case that runs unit tests against oldest supported dependencies. ([\#4879](https://github.com/matrix-org/synapse/issues/4879)) +- Update link to federation docs. ([\#4881](https://github.com/matrix-org/synapse/issues/4881)) +- fix test_auto_create_auto_join_where_no_consent. ([\#4886](https://github.com/matrix-org/synapse/issues/4886)) +- Use a regular HomeServerConfig object for unit tests rater than a Mock. ([\#4889](https://github.com/matrix-org/synapse/issues/4889)) +- Add some notes about tuning postgres for larger deployments. ([\#4895](https://github.com/matrix-org/synapse/issues/4895)) +- Add a config option for torture-testing worker replication. ([\#4902](https://github.com/matrix-org/synapse/issues/4902)) +- Log requests which are simulated by the unit tests. ([\#4905](https://github.com/matrix-org/synapse/issues/4905)) +- Allow newsfragments to end with exclamation marks. Exciting! ([\#4912](https://github.com/matrix-org/synapse/issues/4912)) +- Refactor some more tests to use HomeserverTestCase. ([\#4913](https://github.com/matrix-org/synapse/issues/4913)) +- Refactor out the state deltas portion of the user directory store and handler. ([\#4917](https://github.com/matrix-org/synapse/issues/4917)) +- Fix nginx example in ACME doc. ([\#4923](https://github.com/matrix-org/synapse/issues/4923)) +- Use an explicit dbname for postgres connections in the tests. ([\#4928](https://github.com/matrix-org/synapse/issues/4928)) +- Fix `ClientReplicationStreamProtocol.__str__()`. ([\#4929](https://github.com/matrix-org/synapse/issues/4929)) + + Synapse 0.99.2 (2019-03-01) =========================== diff --git a/INSTALL.md b/INSTALL.md
index de6893530d..a5c3c6efaa 100644 --- a/INSTALL.md +++ b/INSTALL.md
@@ -384,7 +384,7 @@ To configure Synapse to expose an HTTPS port, you will need to edit `cert.pem`). For those of you upgrading your TLS certificate in readiness for Synapse 1.0, -please take a look at `our guide <docs/MSC1711_certificates_FAQ.md#configuring-certificates-for-compatibility-with-synapse-100>`_. +please take a look at [our guide](docs/MSC1711_certificates_FAQ.md#configuring-certificates-for-compatibility-with-synapse-100). ## Registering a user diff --git a/changelog.d/2090.bugfix b/changelog.d/2090.bugfix deleted file mode 100644
index de2d22fcb8..0000000000 --- a/changelog.d/2090.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix a bug where media with spaces in the name would get a corrupted name. diff --git a/changelog.d/4474.misc b/changelog.d/4474.misc new file mode 100644
index 0000000000..4b882d60be --- /dev/null +++ b/changelog.d/4474.misc
@@ -0,0 +1 @@ +Add test to verify threepid auth check added in #4435. diff --git a/changelog.d/4537.feature b/changelog.d/4537.feature deleted file mode 100644
index 8f792b8890..0000000000 --- a/changelog.d/4537.feature +++ /dev/null
@@ -1 +0,0 @@ -The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. diff --git a/changelog.d/4662.misc b/changelog.d/4662.misc deleted file mode 100644
index f4ec0d6a68..0000000000 --- a/changelog.d/4662.misc +++ /dev/null
@@ -1 +0,0 @@ -Add a systemd setup that supports synapse workers. Contributed by Luca Corbatto. diff --git a/changelog.d/4699.bugfix b/changelog.d/4699.bugfix deleted file mode 100644
index 1d7f3174e7..0000000000 --- a/changelog.d/4699.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix attempting to paginate in rooms where server cannot see any events, to avoid unnecessarily pulling in lots of redacted events. diff --git a/changelog.d/4735.feature b/changelog.d/4735.feature deleted file mode 100644
index a4c0b196f6..0000000000 --- a/changelog.d/4735.feature +++ /dev/null
@@ -1 +0,0 @@ -Add configurable rate limiting to the /register endpoint. diff --git a/changelog.d/4740.bugfix b/changelog.d/4740.bugfix deleted file mode 100644
index f82bb4227a..0000000000 --- a/changelog.d/4740.bugfix +++ /dev/null
@@ -1 +0,0 @@ -'event_id' is now a required parameter in federated state requests, as per the matrix spec. diff --git a/changelog.d/4749.bugfix b/changelog.d/4749.bugfix deleted file mode 100644
index 174e6b4e5e..0000000000 --- a/changelog.d/4749.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix tightloop over connecting to replication server. diff --git a/changelog.d/4752.misc b/changelog.d/4752.misc deleted file mode 100644
index fb1e76edce..0000000000 --- a/changelog.d/4752.misc +++ /dev/null
@@ -1 +0,0 @@ -Change from TravisCI to Buildkite for CI. diff --git a/changelog.d/4757.feature b/changelog.d/4757.feature deleted file mode 100644
index b89029f2b4..0000000000 --- a/changelog.d/4757.feature +++ /dev/null
@@ -1 +0,0 @@ -Move server key queries to federation reader. diff --git a/changelog.d/4757.misc b/changelog.d/4757.misc deleted file mode 100644
index 42bb66f7aa..0000000000 --- a/changelog.d/4757.misc +++ /dev/null
@@ -1 +0,0 @@ -When presence is disabled don't send over replication. diff --git a/changelog.d/4759.feature b/changelog.d/4759.feature deleted file mode 100644
index 643ee404dc..0000000000 --- a/changelog.d/4759.feature +++ /dev/null
@@ -1 +0,0 @@ -Add support for /account/3pid REST endpoint to client_reader worker. diff --git a/changelog.d/4763.bugfix b/changelog.d/4763.bugfix deleted file mode 100644
index 213ea44b70..0000000000 --- a/changelog.d/4763.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix parsing of Content-Disposition headers on remote media requests and URL previews. diff --git a/changelog.d/4765.misc b/changelog.d/4765.misc deleted file mode 100644
index c273fd0cc4..0000000000 --- a/changelog.d/4765.misc +++ /dev/null
@@ -1 +0,0 @@ -Minor docstring fixes for MatrixFederationAgent. \ No newline at end of file diff --git a/changelog.d/4770.misc b/changelog.d/4770.misc deleted file mode 100644
index 144d819958..0000000000 --- a/changelog.d/4770.misc +++ /dev/null
@@ -1 +0,0 @@ -Optimise EDU transmission for the federation_sender worker. diff --git a/changelog.d/4771.misc b/changelog.d/4771.misc deleted file mode 100644
index 8fa3401ca4..0000000000 --- a/changelog.d/4771.misc +++ /dev/null
@@ -1 +0,0 @@ -Update test_typing to use HomeserverTestCase. diff --git a/changelog.d/4772.feature b/changelog.d/4772.feature deleted file mode 100644
index 19bb91f1e8..0000000000 --- a/changelog.d/4772.feature +++ /dev/null
@@ -1 +0,0 @@ -Add an endpoint to the admin API for querying the server version. Contributed by Joseph Weston. diff --git a/changelog.d/4776.bugfix b/changelog.d/4776.bugfix deleted file mode 100644
index ce3e6ce33c..0000000000 --- a/changelog.d/4776.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix incorrect log about not persisting duplicate state event. diff --git a/changelog.d/4779.misc b/changelog.d/4779.misc deleted file mode 100644
index 2442bf31bd..0000000000 --- a/changelog.d/4779.misc +++ /dev/null
@@ -1 +0,0 @@ -Update URLs for riot.im icons and logos in the default notification templates. diff --git a/changelog.d/4790.bugfix b/changelog.d/4790.bugfix deleted file mode 100644
index aa8eb93246..0000000000 --- a/changelog.d/4790.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix v4v6 option in HAProxy example config. Contributed by Flakebi. diff --git a/changelog.d/4791.feature b/changelog.d/4791.feature deleted file mode 100644
index 1e5fd32463..0000000000 --- a/changelog.d/4791.feature +++ /dev/null
@@ -1 +0,0 @@ -Include a default configuration file in the 'docs' directory. diff --git a/changelog.d/4792.bugfix b/changelog.d/4792.bugfix deleted file mode 100644
index b127b6254f..0000000000 --- a/changelog.d/4792.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Handle batch updates in worker replication protocol. \ No newline at end of file diff --git a/changelog.d/4793.feature b/changelog.d/4793.feature deleted file mode 100644
index 90dba7d122..0000000000 --- a/changelog.d/4793.feature +++ /dev/null
@@ -1 +0,0 @@ -Synapse is now permissive about trailing slashes on some of its federation endpoints, allowing zero or more to be present. \ No newline at end of file diff --git a/changelog.d/4794.misc b/changelog.d/4794.misc deleted file mode 100644
index 99b543ecba..0000000000 --- a/changelog.d/4794.misc +++ /dev/null
@@ -1 +0,0 @@ -Removed unnecessary $ from some federation endpoint path regexes. \ No newline at end of file diff --git a/changelog.d/4795.misc b/changelog.d/4795.misc deleted file mode 100644
index 03995f42fe..0000000000 --- a/changelog.d/4795.misc +++ /dev/null
@@ -1 +0,0 @@ -Remove link to deleted title in README. \ No newline at end of file diff --git a/changelog.d/4796.feature b/changelog.d/4796.feature deleted file mode 100644
index 9e05560a3f..0000000000 --- a/changelog.d/4796.feature +++ /dev/null
@@ -1 +0,0 @@ -Add support for /keys/query and /keys/changes REST endpoints to client_reader worker. diff --git a/changelog.d/4797.misc b/changelog.d/4797.misc deleted file mode 100644
index 822e98e6a7..0000000000 --- a/changelog.d/4797.misc +++ /dev/null
@@ -1 +0,0 @@ -Clean up read-receipt handling. diff --git a/changelog.d/4798.misc b/changelog.d/4798.misc deleted file mode 100644
index d60f208dc3..0000000000 --- a/changelog.d/4798.misc +++ /dev/null
@@ -1 +0,0 @@ -Add some debug about processing read receipts. diff --git a/changelog.d/4799.misc b/changelog.d/4799.misc deleted file mode 100644
index 5ab11a5c0b..0000000000 --- a/changelog.d/4799.misc +++ /dev/null
@@ -1 +0,0 @@ -Clean up some replication code. diff --git a/changelog.d/4801.feature b/changelog.d/4801.feature deleted file mode 100644
index 1e5fd32463..0000000000 --- a/changelog.d/4801.feature +++ /dev/null
@@ -1 +0,0 @@ -Include a default configuration file in the 'docs' directory. diff --git a/changelog.d/4804.feature b/changelog.d/4804.feature deleted file mode 100644
index a4c0b196f6..0000000000 --- a/changelog.d/4804.feature +++ /dev/null
@@ -1 +0,0 @@ -Add configurable rate limiting to the /register endpoint. diff --git a/changelog.d/4814.feature b/changelog.d/4814.feature deleted file mode 100644
index 9433acd959..0000000000 --- a/changelog.d/4814.feature +++ /dev/null
@@ -1 +0,0 @@ -Add checks to incoming events over federation for events evading auth (aka "soft fail"). diff --git a/changelog.d/4815.misc b/changelog.d/4815.misc deleted file mode 100644
index b123b36f7f..0000000000 --- a/changelog.d/4815.misc +++ /dev/null
@@ -1 +0,0 @@ -Add some docstrings. diff --git a/changelog.d/4816.misc b/changelog.d/4816.misc deleted file mode 100644
index 43d94251f7..0000000000 --- a/changelog.d/4816.misc +++ /dev/null
@@ -1 +0,0 @@ -Add debug logger to try and track down #4422. diff --git a/changelog.d/4817.misc b/changelog.d/4817.misc deleted file mode 100644
index 438a51dc63..0000000000 --- a/changelog.d/4817.misc +++ /dev/null
@@ -1 +0,0 @@ -Make shutdown API send explanation message to room after users have been forced joined. diff --git a/changelog.d/4818.bugfix b/changelog.d/4818.bugfix deleted file mode 100644
index ebbc27a433..0000000000 --- a/changelog.d/4818.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix bug where we didn't correctly throttle sending of USER_IP commands over replication. diff --git a/changelog.d/4820.misc b/changelog.d/4820.misc deleted file mode 100644
index 1e35b5b63c..0000000000 --- a/changelog.d/4820.misc +++ /dev/null
@@ -1 +0,0 @@ -Update example_log_config.yaml. diff --git a/changelog.d/4821.feature b/changelog.d/4821.feature deleted file mode 100644
index 61d4eb8d60..0000000000 --- a/changelog.d/4821.feature +++ /dev/null
@@ -1 +0,0 @@ -Add configurable rate limiting to the /login endpoint. diff --git a/changelog.d/4824.misc b/changelog.d/4824.misc deleted file mode 100644
index a4c5a1df37..0000000000 --- a/changelog.d/4824.misc +++ /dev/null
@@ -1 +0,0 @@ -Document the `generate` option for the docker image. diff --git a/changelog.d/4825.misc b/changelog.d/4825.misc deleted file mode 100644
index 166661ab6a..0000000000 --- a/changelog.d/4825.misc +++ /dev/null
@@ -1 +0,0 @@ -Fix check-newsfragment for debian-only changes. diff --git a/changelog.d/4828.misc b/changelog.d/4828.misc deleted file mode 100644
index 2fe554884a..0000000000 --- a/changelog.d/4828.misc +++ /dev/null
@@ -1 +0,0 @@ -Add some debug logging for device list updates to help with #4828. diff --git a/changelog.d/4829.bugfix b/changelog.d/4829.bugfix deleted file mode 100644
index b05235e215..0000000000 --- a/changelog.d/4829.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix potential race in handling missing updates in device list updates. diff --git a/changelog.d/4832.misc b/changelog.d/4832.misc deleted file mode 100644
index 92022266c6..0000000000 --- a/changelog.d/4832.misc +++ /dev/null
@@ -1 +0,0 @@ -Improve federation documentation, specifically .well-known support. Many thanks to @vaab. diff --git a/changelog.d/4837.bugfix b/changelog.d/4837.bugfix deleted file mode 100644
index 989aeb82bb..0000000000 --- a/changelog.d/4837.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix bug where synapse expected an un-specced `prev_state` field on state events. diff --git a/changelog.d/4838.bugfix b/changelog.d/4838.bugfix deleted file mode 100644
index 7f4fceabff..0000000000 --- a/changelog.d/4838.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Transfer a user's notification settings (push rules) on room upgrade. \ No newline at end of file diff --git a/changelog.d/4839.misc b/changelog.d/4839.misc deleted file mode 100644
index 7c6868051b..0000000000 --- a/changelog.d/4839.misc +++ /dev/null
@@ -1 +0,0 @@ -Disable captcha registration by default in unit tests. \ No newline at end of file diff --git a/changelog.d/4840.feature b/changelog.d/4840.feature deleted file mode 100644
index 9d1fd59053..0000000000 --- a/changelog.d/4840.feature +++ /dev/null
@@ -1 +0,0 @@ -Remove trailing slashes from certain outbound federation requests. Retry if receiving a 404. Context: #3622. \ No newline at end of file diff --git a/changelog.d/4843.misc b/changelog.d/4843.misc deleted file mode 100644
index 03d0a3e2e7..0000000000 --- a/changelog.d/4843.misc +++ /dev/null
@@ -1 +0,0 @@ -Add stuff back to the .gitignore. diff --git a/changelog.d/4844.misc b/changelog.d/4844.misc deleted file mode 100644
index eff6f1c43c..0000000000 --- a/changelog.d/4844.misc +++ /dev/null
@@ -1 +0,0 @@ -Clarify what registration_shared_secret allows for. diff --git a/changelog.d/4846.feature b/changelog.d/4846.feature deleted file mode 100644
index 8f792b8890..0000000000 --- a/changelog.d/4846.feature +++ /dev/null
@@ -1 +0,0 @@ -The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. diff --git a/changelog.d/4847.misc b/changelog.d/4847.misc deleted file mode 100644
index a001238e08..0000000000 --- a/changelog.d/4847.misc +++ /dev/null
@@ -1 +0,0 @@ -Correctly log expected errors when fetching server keys. diff --git a/changelog.d/4849.misc b/changelog.d/4849.misc deleted file mode 100644
index f2cab20b44..0000000000 --- a/changelog.d/4849.misc +++ /dev/null
@@ -1 +0,0 @@ -Update install docs to explicitly state a full-chain (not just the top-level) TLS certificate must be provided to Synapse. This caused some people's Synapse ports to appear correct in a browser but still (rightfully so) upset the federation tester. \ No newline at end of file diff --git a/changelog.d/4852.misc b/changelog.d/4852.misc deleted file mode 100644
index 76ab1e34e7..0000000000 --- a/changelog.d/4852.misc +++ /dev/null
@@ -1 +0,0 @@ - Move client read-receipt processing to federation sender worker. \ No newline at end of file diff --git a/changelog.d/4853.feature b/changelog.d/4853.feature deleted file mode 100644
index 360f92e1de..0000000000 --- a/changelog.d/4853.feature +++ /dev/null
@@ -1 +0,0 @@ -Allow passing --daemonize flags to workers in the same way as with master. diff --git a/changelog.d/4855.misc b/changelog.d/4855.misc deleted file mode 100644
index c4906d2f56..0000000000 --- a/changelog.d/4855.misc +++ /dev/null
@@ -1 +0,0 @@ -Refactor federation TransactionQueue. \ No newline at end of file diff --git a/changelog.d/4863.misc b/changelog.d/4863.misc deleted file mode 100644
index bfe03cbedc..0000000000 --- a/changelog.d/4863.misc +++ /dev/null
@@ -1 +0,0 @@ -Comment out most options in the generated config. diff --git a/changelog.d/4864.feature b/changelog.d/4864.feature deleted file mode 100644
index 57927f2620..0000000000 --- a/changelog.d/4864.feature +++ /dev/null
@@ -1 +0,0 @@ -The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. \ No newline at end of file diff --git a/changelog.d/4865.feature b/changelog.d/4865.feature deleted file mode 100644
index 61d4eb8d60..0000000000 --- a/changelog.d/4865.feature +++ /dev/null
@@ -1 +0,0 @@ -Add configurable rate limiting to the /login endpoint. diff --git a/changelog.d/4869.misc b/changelog.d/4869.misc deleted file mode 100644
index d8186cc520..0000000000 --- a/changelog.d/4869.misc +++ /dev/null
@@ -1 +0,0 @@ -Fix yaml library warnings by using safe_load. diff --git a/changelog.d/4870.misc b/changelog.d/4870.misc deleted file mode 100644
index f287b7d3b0..0000000000 --- a/changelog.d/4870.misc +++ /dev/null
@@ -1 +0,0 @@ -Update Apache setup to remove location syntax. Thanks to @cwmke! diff --git a/changelog.d/4879.misc b/changelog.d/4879.misc deleted file mode 100644
index 574017230c..0000000000 --- a/changelog.d/4879.misc +++ /dev/null
@@ -1 +0,0 @@ -Reinstate test case that runs unit tests against oldest supported dependencies. diff --git a/changelog.d/4881.misc b/changelog.d/4881.misc deleted file mode 100644
index 308c21c839..0000000000 --- a/changelog.d/4881.misc +++ /dev/null
@@ -1 +0,0 @@ -Update link to federation docs. diff --git a/changelog.d/4886.bugfix b/changelog.d/4886.bugfix deleted file mode 100644
index b17aa92485..0000000000 --- a/changelog.d/4886.bugfix +++ /dev/null
@@ -1 +0,0 @@ -fix test_auto_create_auto_join_where_no_consent. diff --git a/changelog.d/4886.misc b/changelog.d/4886.misc deleted file mode 100644
index b17aa92485..0000000000 --- a/changelog.d/4886.misc +++ /dev/null
@@ -1 +0,0 @@ -fix test_auto_create_auto_join_where_no_consent. diff --git a/changelog.d/4887.feature b/changelog.d/4887.feature deleted file mode 100644
index e7ff0b9297..0000000000 --- a/changelog.d/4887.feature +++ /dev/null
@@ -1 +0,0 @@ -The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. diff --git a/changelog.d/4888.bugfix b/changelog.d/4888.bugfix deleted file mode 100644
index 0e193709e5..0000000000 --- a/changelog.d/4888.bugfix +++ /dev/null
@@ -1,2 +0,0 @@ -Fix a bug where hs_disabled_message was sometimes not correctly enforced. - diff --git a/changelog.d/4889.misc b/changelog.d/4889.misc deleted file mode 100644
index f1948db65e..0000000000 --- a/changelog.d/4889.misc +++ /dev/null
@@ -1 +0,0 @@ -Use a regular HomeServerConfig object for unit tests rater than a Mock. diff --git a/changelog.d/4890.feature b/changelog.d/4890.feature deleted file mode 100644
index 8d74262250..0000000000 --- a/changelog.d/4890.feature +++ /dev/null
@@ -1 +0,0 @@ -Batch up outgoing read-receipts to reduce federation traffic. diff --git a/changelog.d/4895.feature b/changelog.d/4895.feature deleted file mode 100644
index 5dd7c68194..0000000000 --- a/changelog.d/4895.feature +++ /dev/null
@@ -1 +0,0 @@ -Add option to disable searching the user directory. diff --git a/changelog.d/4895.misc b/changelog.d/4895.misc deleted file mode 100644
index 81a3261538..0000000000 --- a/changelog.d/4895.misc +++ /dev/null
@@ -1 +0,0 @@ -Add some notes about tuning postgres for larger deployments. diff --git a/changelog.d/4896.feature b/changelog.d/4896.feature deleted file mode 100644
index 46ac49a4b4..0000000000 --- a/changelog.d/4896.feature +++ /dev/null
@@ -1 +0,0 @@ -Add option to disable searching of local and remote public room lists. diff --git a/changelog.d/4900.feature b/changelog.d/4900.feature deleted file mode 100644
index 8f792b8890..0000000000 --- a/changelog.d/4900.feature +++ /dev/null
@@ -1 +0,0 @@ -The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. diff --git a/changelog.d/4902.misc b/changelog.d/4902.misc deleted file mode 100644
index fecc06a6e8..0000000000 --- a/changelog.d/4902.misc +++ /dev/null
@@ -1 +0,0 @@ -Add a config option for torture-testing worker replication. diff --git a/changelog.d/4904.bugfix b/changelog.d/4904.bugfix deleted file mode 100644
index 5c2d7f3cf1..0000000000 --- a/changelog.d/4904.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix bug in shutdown room admin API where it would fail if a user in the room hadn't consented to the privacy policy. diff --git a/changelog.d/4905.misc b/changelog.d/4905.misc deleted file mode 100644
index 0f00d5a3d5..0000000000 --- a/changelog.d/4905.misc +++ /dev/null
@@ -1 +0,0 @@ -Log requests which are simulated by the unit tests. diff --git a/changelog.d/4908.bugfix b/changelog.d/4908.bugfix deleted file mode 100644
index d8c5babf0d..0000000000 --- a/changelog.d/4908.bugfix +++ /dev/null
@@ -1 +0,0 @@ -Fix bug where blocked world-readable rooms were still peekable. diff --git a/changelog.d/4912.misc b/changelog.d/4912.misc deleted file mode 100644
index f05a239187..0000000000 --- a/changelog.d/4912.misc +++ /dev/null
@@ -1 +0,0 @@ -Allow newsfragments to end with exclamation marks. Exciting! diff --git a/changelog.d/4913.misc b/changelog.d/4913.misc deleted file mode 100644
index 9e835badc0..0000000000 --- a/changelog.d/4913.misc +++ /dev/null
@@ -1 +0,0 @@ -Refactor some more tests to use HomeserverTestCase. diff --git a/changelog.d/4917.misc b/changelog.d/4917.misc deleted file mode 100644
index 338d8a9a0c..0000000000 --- a/changelog.d/4917.misc +++ /dev/null
@@ -1 +0,0 @@ -Refactor out the state deltas portion of the user directory store and handler. diff --git a/changelog.d/4923.misc b/changelog.d/4923.misc deleted file mode 100644
index 8b5e1e3c81..0000000000 --- a/changelog.d/4923.misc +++ /dev/null
@@ -1 +0,0 @@ -Fix nginx example in ACME doc. diff --git a/changelog.d/4927.feature b/changelog.d/4927.feature deleted file mode 100644
index 8d74262250..0000000000 --- a/changelog.d/4927.feature +++ /dev/null
@@ -1 +0,0 @@ -Batch up outgoing read-receipts to reduce federation traffic. diff --git a/changelog.d/4928.misc b/changelog.d/4928.misc deleted file mode 100644
index 3a21752551..0000000000 --- a/changelog.d/4928.misc +++ /dev/null
@@ -1 +0,0 @@ -Use an explicit dbname for postgres connections in the tests. diff --git a/changelog.d/4929.misc b/changelog.d/4929.misc deleted file mode 100644
index aaf02078b9..0000000000 --- a/changelog.d/4929.misc +++ /dev/null
@@ -1 +0,0 @@ -Fix `ClientReplicationStreamProtocol.__str__()`. diff --git a/changelog.d/4931.feature b/changelog.d/4931.feature deleted file mode 100644
index 5d34d16800..0000000000 --- a/changelog.d/4931.feature +++ /dev/null
@@ -1 +0,0 @@ -Add ability for password providers to login/register a user via 3PID (email, phone). \ No newline at end of file diff --git a/changelog.d/4942.bugfix b/changelog.d/4942.bugfix new file mode 100644
index 0000000000..590d80d58f --- /dev/null +++ b/changelog.d/4942.bugfix
@@ -0,0 +1 @@ +Fix bug where presence updates were sent to all servers in a room when a new server joined, rather than to just the new server. diff --git a/changelog.d/4944.feature b/changelog.d/4944.feature deleted file mode 100644
index 8f792b8890..0000000000 --- a/changelog.d/4944.feature +++ /dev/null
@@ -1 +0,0 @@ -The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. diff --git a/changelog.d/4947.feature b/changelog.d/4947.feature new file mode 100644
index 0000000000..b9d27b90f1 --- /dev/null +++ b/changelog.d/4947.feature
@@ -0,0 +1 @@ +Add ability for password provider modules to bind email addresses to users upon registration. \ No newline at end of file diff --git a/changelog.d/4949.misc b/changelog.d/4949.misc new file mode 100644
index 0000000000..25c4e05a64 --- /dev/null +++ b/changelog.d/4949.misc
@@ -0,0 +1 @@ +Fix/improve some docstrings in the replication code. diff --git a/changelog.d/4953.misc b/changelog.d/4953.misc new file mode 100644
index 0000000000..06a084e6ef --- /dev/null +++ b/changelog.d/4953.misc
@@ -0,0 +1,2 @@ +Split synapse.replication.tcp.streams into smaller files. + diff --git a/changelog.d/4954.misc b/changelog.d/4954.misc new file mode 100644
index 0000000000..91f145950d --- /dev/null +++ b/changelog.d/4954.misc
@@ -0,0 +1 @@ +Refactor replication row generation/parsing. diff --git a/changelog.d/4955.bugfix b/changelog.d/4955.bugfix new file mode 100644
index 0000000000..e50e67383d --- /dev/null +++ b/changelog.d/4955.bugfix
@@ -0,0 +1 @@ +Fix sync bug which made accepting invites unreliable in worker-mode synapses. diff --git a/changelog.d/4956.bugfix b/changelog.d/4956.bugfix new file mode 100644
index 0000000000..e50e67383d --- /dev/null +++ b/changelog.d/4956.bugfix
@@ -0,0 +1 @@ +Fix sync bug which made accepting invites unreliable in worker-mode synapses. diff --git a/changelog.d/4959.misc b/changelog.d/4959.misc new file mode 100644
index 0000000000..dd4275501f --- /dev/null +++ b/changelog.d/4959.misc
@@ -0,0 +1 @@ +Run `black` to clean up formatting on `synapse/storage/roommember.py` and `synapse/storage/events.py`. \ No newline at end of file diff --git a/changelog.d/4965.misc b/changelog.d/4965.misc new file mode 100644
index 0000000000..284c58b75e --- /dev/null +++ b/changelog.d/4965.misc
@@ -0,0 +1 @@ +Remove log line for password via the admin API. diff --git a/changelog.d/4968.misc b/changelog.d/4968.misc new file mode 100644
index 0000000000..7a7b69771c --- /dev/null +++ b/changelog.d/4968.misc
@@ -0,0 +1 @@ +Fix typo in TLS filenames in docker/README.md. Also add the '-p' commandline option to the 'docker run' example. Contributed by Jurrie Overgoor. diff --git a/changelog.d/4969.misc b/changelog.d/4969.misc new file mode 100644
index 0000000000..e3a3214e6b --- /dev/null +++ b/changelog.d/4969.misc
@@ -0,0 +1,2 @@ +Refactor room version definitions. + diff --git a/changelog.d/4981.bugfix b/changelog.d/4981.bugfix new file mode 100644
index 0000000000..e51b45eec0 --- /dev/null +++ b/changelog.d/4981.bugfix
@@ -0,0 +1 @@ +start.sh: Fix the --no-rate-limit option for messages and make it bypass rate limit on registration and login too. \ No newline at end of file diff --git a/debian/changelog b/debian/changelog
index d84931ec03..03df2e1c00 100644 --- a/debian/changelog +++ b/debian/changelog
@@ -1,8 +1,12 @@ -matrix-synapse-py3 (0.99.3) UNRELEASED; urgency=medium +matrix-synapse-py3 (0.99.3) stable; urgency=medium + [ Richard van der Hoff ] * Fix warning during preconfiguration. (Fixes: #4819) - -- Richard van der Hoff <richard@matrix.org> Thu, 07 Mar 2019 07:17:00 +0000 + [ Synapse Packaging team ] + * New synapse release 0.99.3. + + -- Synapse Packaging team <packages@matrix.org> Mon, 01 Apr 2019 12:48:21 +0000 matrix-synapse-py3 (0.99.2) stable; urgency=medium diff --git a/demo/start.sh b/demo/start.sh
index dcc4d6f4fa..c4a1328a6f 100755 --- a/demo/start.sh +++ b/demo/start.sh
@@ -27,17 +27,27 @@ for port in 8080 8081 8082; do --config-path "$DIR/etc/$port.config" \ --report-stats no + printf '\n\n# Customisation made by demo/start.sh\n' >> $DIR/etc/$port.config + echo 'enable_registration: true' >> $DIR/etc/$port.config + # Check script parameters if [ $# -eq 1 ]; then if [ $1 = "--no-rate-limit" ]; then - # Set high limits in config file to disable rate limiting - perl -p -i -e 's/rc_messages_per_second.*/rc_messages_per_second: 1000/g' $DIR/etc/$port.config - perl -p -i -e 's/rc_message_burst_count.*/rc_message_burst_count: 1000/g' $DIR/etc/$port.config + # messages rate limit + echo 'rc_messages_per_second: 1000' >> $DIR/etc/$port.config + echo 'rc_message_burst_count: 1000' >> $DIR/etc/$port.config + + # registration rate limit + printf 'rc_registration:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config + + # login rate limit + echo 'rc_login:' >> $DIR/etc/$port.config + printf ' address:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config + printf ' account:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config + printf ' failed_attempts:\n per_second: 1000\n burst_count: 1000\n' >> $DIR/etc/$port.config fi fi - perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config - if ! grep -F "full_twisted_stacktraces" -q $DIR/etc/$port.config; then echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config fi diff --git a/docker/README.md b/docker/README.md
index 4b98b7fd75..44ade63f27 100644 --- a/docker/README.md +++ b/docker/README.md
@@ -31,6 +31,7 @@ docker run \ --mount type=volume,src=synapse-data,dst=/data \ -e SYNAPSE_SERVER_NAME=my.matrix.host \ -e SYNAPSE_REPORT_STATS=yes \ + -p 8448:8448 \ matrixdotorg/synapse:latest ``` @@ -57,8 +58,8 @@ configuration file there. Multiple application services are supported. Synapse requires a valid TLS certificate. You can do one of the following: * Provide your own certificate and key (as - `${DATA_PATH}/${SYNAPSE_SERVER_NAME}.crt` and - `${DATA_PATH}/${SYNAPSE_SERVER_NAME}.key`, or elsewhere by providing an + `${DATA_PATH}/${SYNAPSE_SERVER_NAME}.tls.crt` and + `${DATA_PATH}/${SYNAPSE_SERVER_NAME}.tls.key`, or elsewhere by providing an entire config as `${SYNAPSE_CONFIG_PATH}`). * Use a reverse proxy to terminate incoming TLS, and forward the plain http diff --git a/synapse/__init__.py b/synapse/__init__.py
index 25c10244d3..6bb5a8b24d 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try: except ImportError: pass -__version__ = "0.99.2" +__version__ = "0.99.3" diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index f47c33a074..dc913feeee 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py
@@ -102,46 +102,6 @@ class ThirdPartyEntityKind(object): LOCATION = "location" -class RoomVersions(object): - V1 = "1" - V2 = "2" - V3 = "3" - STATE_V2_TEST = "state-v2-test" - - -class RoomDisposition(object): - STABLE = "stable" - UNSTABLE = "unstable" - - -# the version we will give rooms which are created on this server -DEFAULT_ROOM_VERSION = RoomVersions.V1 - -# vdh-test-version is a placeholder to get room versioning support working and tested -# until we have a working v2. -KNOWN_ROOM_VERSIONS = { - RoomVersions.V1, - RoomVersions.V2, - RoomVersions.V3, - RoomVersions.STATE_V2_TEST, - RoomVersions.V3, -} - - -class EventFormatVersions(object): - """This is an internal enum for tracking the version of the event format, - independently from the room version. - """ - V1 = 1 - V2 = 2 - - -KNOWN_EVENT_FORMAT_VERSIONS = { - EventFormatVersions.V1, - EventFormatVersions.V2, -} - - ServerNoticeMsgType = "m.server_notice" ServerNoticeLimitReached = "m.server_notice.usage_limit_reached" diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py new file mode 100644
index 0000000000..e77abe1040 --- /dev/null +++ b/synapse/api/room_versions.py
@@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import attr + + +class EventFormatVersions(object): + """This is an internal enum for tracking the version of the event format, + independently from the room version. + """ + V1 = 1 # $id:server format + V2 = 2 # MSC1659-style $hash format: introduced for room v3 + + +KNOWN_EVENT_FORMAT_VERSIONS = { + EventFormatVersions.V1, + EventFormatVersions.V2, +} + + +class StateResolutionVersions(object): + """Enum to identify the state resolution algorithms""" + V1 = 1 # room v1 state res + V2 = 2 # MSC1442 state res: room v2 and later + + +class RoomDisposition(object): + STABLE = "stable" + UNSTABLE = "unstable" + + +@attr.s(slots=True, frozen=True) +class RoomVersion(object): + """An object which describes the unique attributes of a room version.""" + + identifier = attr.ib() # str; the identifier for this version + disposition = attr.ib() # str; one of the RoomDispositions + event_format = attr.ib() # int; one of the EventFormatVersions + state_res = attr.ib() # int; one of the StateResolutionVersions + + +class RoomVersions(object): + V1 = RoomVersion( + "1", + RoomDisposition.STABLE, + EventFormatVersions.V1, + StateResolutionVersions.V1, + ) + STATE_V2_TEST = RoomVersion( + "state-v2-test", + RoomDisposition.UNSTABLE, + EventFormatVersions.V1, + StateResolutionVersions.V2, + ) + V2 = RoomVersion( + "2", + RoomDisposition.STABLE, + EventFormatVersions.V1, + StateResolutionVersions.V2, + ) + V3 = RoomVersion( + "3", + RoomDisposition.STABLE, + EventFormatVersions.V2, + StateResolutionVersions.V2, + ) + + +# the version we will give rooms which are created on this server +DEFAULT_ROOM_VERSION = RoomVersions.V1 + + +KNOWN_ROOM_VERSIONS = { + v.identifier: v for v in ( + RoomVersions.V1, + RoomVersions.V2, + RoomVersions.V3, + RoomVersions.STATE_V2_TEST, + ) +} # type: dict[str, RoomVersion] diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 9711a7147c..1d43f2b075 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams import ReceiptsStream +from synapse.replication.tcp.streams._base import ReceiptsStream from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.types import ReadReceipt diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9163b56d86..5388def28a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py
@@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import EventsStreamEventRow from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet @@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler): # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: - event = yield self.store.get_event(row.event_id) + if row.type != EventsStreamEventRow.TypeId: + continue + event = yield self.store.get_event(row.data.event_id) extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index d1ab9512cd..355f5aa71d 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py
@@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamCurrentStateRow, +) from synapse.rest.client.v2_alpha import user_directory from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -73,19 +77,18 @@ class UserDirectorySlaveStore( prefilled_cache=curr_state_delta_prefill, ) - self._current_state_delta_pos = events_max - def stream_positions(self): result = super(UserDirectorySlaveStore, self).stream_positions() - result["current_state_deltas"] = self._current_state_delta_pos return result def process_replication_rows(self, stream_name, token, rows): - if stream_name == "current_state_deltas": - self._current_state_delta_pos = token + if stream_name == EventsStream.NAME: + self._stream_id_gen.advance(token) for row in rows: + if row.type != EventsStreamCurrentStateRow.TypeId: + continue self._curr_state_delta_stream_cache.entity_has_changed( - row.room_id, token + row.data.room_id, token ) return super(UserDirectorySlaveStore, self).process_replication_rows( stream_name, token, rows @@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) - if stream_name == "current_state_deltas": + if stream_name == EventsStream.NAME: run_in_background(self._notify_directory) @defer.inlineCallbacks diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 8f9e330da5..203490fc36 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py
@@ -20,15 +20,9 @@ from signedjson.key import decode_verify_key_bytes from signedjson.sign import SignatureVerifyException, verify_signed_json from unpaddedbase64 import decode_base64 -from synapse.api.constants import ( - KNOWN_ROOM_VERSIONS, - EventFormatVersions, - EventTypes, - JoinRules, - Membership, - RoomVersions, -) +from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, EventSizeError, SynapseError +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions from synapse.types import UserID, get_domain_from_id logger = logging.getLogger(__name__) @@ -452,16 +446,18 @@ def check_redaction(room_version, event, auth_events): if user_level >= redact_level: return False - if room_version in (RoomVersions.V1, RoomVersions.V2,): + v = KNOWN_ROOM_VERSIONS.get(room_version) + if not v: + raise RuntimeError("Unrecognized room version %r" % (room_version,)) + + if v.event_format == EventFormatVersions.V1: redacter_domain = get_domain_from_id(event.event_id) redactee_domain = get_domain_from_id(event.redacts) if redacter_domain == redactee_domain: return True - elif room_version == RoomVersions.V3: + else: event.internal_metadata.recheck_redaction = True return True - else: - raise RuntimeError("Unrecognized room version %r" % (room_version,)) raise AuthError( 403, diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index fafa135182..12056d5be2 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py
@@ -21,7 +21,7 @@ import six from unpaddedbase64 import encode_base64 -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventFormatVersions, RoomVersions +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -351,18 +351,13 @@ def room_version_to_event_format(room_version): Returns: int """ - if room_version not in KNOWN_ROOM_VERSIONS: + v = KNOWN_ROOM_VERSIONS.get(room_version) + + if not v: # We should have already checked version, so this should not happen raise RuntimeError("Unrecognized room version %s" % (room_version,)) - if room_version in ( - RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST, - ): - return EventFormatVersions.V1 - elif room_version in (RoomVersions.V3,): - return EventFormatVersions.V2 - else: - raise RuntimeError("Unrecognized room version %s" % (room_version,)) + return v.event_format def event_type_from_format_version(format_version): diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 06e01be918..fba27177c7 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py
@@ -17,21 +17,17 @@ import attr from twisted.internet import defer -from synapse.api.constants import ( +from synapse.api.constants import MAX_DEPTH +from synapse.api.room_versions import ( KNOWN_EVENT_FORMAT_VERSIONS, KNOWN_ROOM_VERSIONS, - MAX_DEPTH, EventFormatVersions, ) from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.types import EventID from synapse.util.stringutils import random_string -from . import ( - _EventInternalMetadata, - event_type_from_format_version, - room_version_to_event_format, -) +from . import _EventInternalMetadata, event_type_from_format_version @attr.s(slots=True, cmp=False, frozen=True) @@ -170,21 +166,34 @@ class EventBuilderFactory(object): def new(self, room_version, key_values): """Generate an event builder appropriate for the given room version + Deprecated: use for_room_version with a RoomVersion object instead + Args: - room_version (str): Version of the room that we're creating an - event builder for + room_version (str): Version of the room that we're creating an event builder + for key_values (dict): Fields used as the basis of the new event Returns: EventBuilder """ - - # There's currently only the one event version defined - if room_version not in KNOWN_ROOM_VERSIONS: + v = KNOWN_ROOM_VERSIONS.get(room_version) + if not v: raise Exception( "No event format defined for version %r" % (room_version,) ) + return self.for_room_version(v, key_values) + def for_room_version(self, room_version, key_values): + """Generate an event builder appropriate for the given room version + + Args: + room_version (synapse.api.room_versions.RoomVersion): + Version of the room that we're creating an event builder for + key_values (dict): Fields used as the basis of the new event + + Returns: + EventBuilder + """ return EventBuilder( store=self.store, state=self.state, @@ -192,7 +201,7 @@ class EventBuilderFactory(object): clock=self.clock, hostname=self.hostname, signing_key=self.signing_key, - format_version=room_version_to_event_format(room_version), + format_version=room_version.event_format, type=key_values["type"], state_key=key_values.get("state_key"), room_id=key_values["room_id"], @@ -222,7 +231,6 @@ def create_local_event_from_event_dict(clock, hostname, signing_key, FrozenEvent """ - # There's currently only the one event version defined if format_version not in KNOWN_EVENT_FORMAT_VERSIONS: raise Exception( "No event format defined for version %r" % (format_version,) diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index a072674b02..514273c792 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py
@@ -15,8 +15,9 @@ from six import string_types -from synapse.api.constants import EventFormatVersions, EventTypes, Membership +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError +from synapse.api.room_versions import EventFormatVersions from synapse.types import EventID, RoomID, UserID diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a7a2ec4523..dfe6b4aa5c 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -20,8 +20,9 @@ import six from twisted.internet import defer from twisted.internet.defer import DeferredList -from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions +from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions from synapse.crypto.event_signing import check_event_content_hash from synapse.events import event_type_from_format_version from synapse.events.utils import prune_event @@ -274,9 +275,12 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): # now let's look for events where the sender's domain is different to the # event id's domain (normally only the case for joins/leaves), and add additional # checks. Only do this if the room version has a concept of event ID domain - if room_version in ( - RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST, - ): + # (ie, the room version uses old-style non-hash event IDs). + v = KNOWN_ROOM_VERSIONS.get(room_version) + if not v: + raise RuntimeError("Unrecognized room version %s" % (room_version,)) + + if v.event_format == EventFormatVersions.V1: pdus_to_check_event_id = [ p for p in pdus_to_check if p.sender_domain != get_domain_from_id(p.pdu.event_id) @@ -289,10 +293,6 @@ def _check_sigs_on_pdus(keyring, room_version, pdus): for p, d in zip(pdus_to_check_event_id, more_deferreds): p.deferreds.append(d) - elif room_version in (RoomVersions.V3,): - pass # No further checks needed, as event IDs are hashes here - else: - raise RuntimeError("Unrecognized room version %s" % (room_version,)) # replace lists of deferreds with single Deferreds return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check] diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 58e04d81ab..f3fc897a0a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -25,12 +25,7 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import ( - KNOWN_ROOM_VERSIONS, - EventTypes, - Membership, - RoomVersions, -) +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( CodeMessageException, Codes, @@ -38,6 +33,11 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + EventFormatVersions, + RoomVersions, +) from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.util import logcontext, unwrapFirstError @@ -570,7 +570,7 @@ class FederationClient(FederationBase): Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of `(origin, event, event_format)` where origin is the remote homeserver which generated the event, and event_format is one of - `synapse.api.constants.EventFormatVersions`. + `synapse.api.room_versions.EventFormatVersions`. Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. @@ -592,7 +592,7 @@ class FederationClient(FederationBase): # Note: If not supplied, the room version may be either v1 or v2, # however either way the event format version will be v1. - room_version = ret.get("room_version", RoomVersions.V1) + room_version = ret.get("room_version", RoomVersions.V1.identifier) event_format = room_version_to_event_format(room_version) pdu_dict = ret.get("event", None) @@ -695,7 +695,9 @@ class FederationClient(FederationBase): room_version = None for e in state: if (e.type, e.state_key) == (EventTypes.Create, ""): - room_version = e.content.get("room_version", RoomVersions.V1) + room_version = e.content.get( + "room_version", RoomVersions.V1.identifier + ) break if room_version is None: @@ -802,11 +804,10 @@ class FederationClient(FederationBase): raise err # Otherwise, we assume that the remote server doesn't understand - # the v2 invite API. - - if room_version in (RoomVersions.V1, RoomVersions.V2): - pass # We'll fall through - else: + # the v2 invite API. That's ok provided the room uses old-style event + # IDs. + v = KNOWN_ROOM_VERSIONS.get(room_version) + if v.event_format != EventFormatVersions.V1: raise SynapseError( 400, "User's homeserver does not support this room version", diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 81f3b4b1ff..df60828dba 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -25,7 +25,7 @@ from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, @@ -34,6 +34,7 @@ from synapse.api.errors import ( NotFoundError, SynapseError, ) +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.crypto.event_signing import compute_event_signature from synapse.events import room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 04d04a4457..0240b339b0 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -55,7 +55,12 @@ class FederationRemoteSendQueue(object): self.is_mine_id = hs.is_mine_id self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = SortedDict() # Stream position -> user_id + self.presence_changed = SortedDict() # Stream position -> list[user_id] + + # Stores the destinations we need to explicitly send presence to about a + # given user. + # Stream position -> (user_id, destinations) + self.presence_destinations = SortedDict() self.keyed_edu = {} # (destination, key) -> EDU self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) @@ -77,7 +82,7 @@ class FederationRemoteSendQueue(object): for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", - "edus", "device_messages", "pos_time", + "edus", "device_messages", "pos_time", "presence_destinations", ]: register(queue_name, getattr(self, queue_name)) @@ -121,6 +126,15 @@ class FederationRemoteSendQueue(object): for user_id in uids ) + keys = self.presence_destinations.keys() + i = self.presence_destinations.bisect_left(position_to_delete) + for key in keys[:i]: + del self.presence_destinations[key] + + user_ids.update( + user_id for user_id, _ in self.presence_destinations.values() + ) + to_del = [ user_id for user_id in self.presence_map if user_id not in user_ids ] @@ -209,6 +223,20 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() + def send_presence_to_destinations(self, states, destinations): + """As per FederationSender + + Args: + states (list[UserPresenceState]) + destinations (list[str]) + """ + for state in states: + pos = self._next_pos() + self.presence_map.update({state.user_id: state for state in states}) + self.presence_destinations[pos] = (state.user_id, destinations) + + self.notifier.on_new_replication_data() + def send_device_messages(self, destination): """As per FederationSender""" pos = self._next_pos() @@ -261,6 +289,16 @@ class FederationRemoteSendQueue(object): state=self.presence_map[user_id], ))) + # Fetch presence to send to destinations + i = self.presence_destinations.bisect_right(from_token) + j = self.presence_destinations.bisect_right(to_token) + 1 + + for pos, (user_id, dests) in self.presence_destinations.items()[i:j]: + rows.append((pos, PresenceDestinationsRow( + state=self.presence_map[user_id], + destinations=list(dests), + ))) + # Fetch changes keyed edus i = self.keyed_edu_changed.bisect_right(from_token) j = self.keyed_edu_changed.bisect_right(to_token) + 1 @@ -357,6 +395,29 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( buff.presence.append(self.state) +class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", ( + "state", # UserPresenceState + "destinations", # list[str] +))): + TypeId = "pd" + + @staticmethod + def from_data(data): + return PresenceDestinationsRow( + state=UserPresenceState.from_dict(data["state"]), + destinations=data["dests"], + ) + + def to_data(self): + return { + "state": self.state.as_dict(), + "dests": self.destinations, + } + + def add_to_buffer(self, buff): + buff.presence_destinations.append((self.state, self.destinations)) + + class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( "key", # tuple(str) - the edu key passed to send_edu "edu", # Edu @@ -428,6 +489,7 @@ TypeToRow = { Row.TypeId: Row for Row in ( PresenceRow, + PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow, @@ -437,6 +499,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( "presence", # list(UserPresenceState) + "presence_destinations", # list of tuples of UserPresenceState and destinations "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "device_destinations", # set of destinations @@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows): buff = ParsedFederationStreamData( presence=[], + presence_destinations=[], keyed_edus={}, edus={}, device_destinations=set(), @@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows): if buff.presence: transaction_queue.send_presence(buff.presence) + for state, destinations in buff.presence_destinations: + transaction_queue.send_presence_to_destinations( + states=[state], destinations=destinations, + ) + for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): transaction_queue.send_edu(edu, key) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1dc041752b..4f0f939102 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -371,7 +371,7 @@ class FederationSender(object): return # First we queue up the new presence by user ID, so multiple presence - # updates in quick successtion are correctly handled + # updates in quick succession are correctly handled. # We only want to send presence for our own users, so lets always just # filter here just in case. self.pending_presence.update({ @@ -402,6 +402,23 @@ class FederationSender(object): finally: self._processing_pending_presence = False + def send_presence_to_destinations(self, states, destinations): + """Send the given presence states to the given destinations. + + Args: + states (list[UserPresenceState]) + destinations (list[str]) + """ + + if not states or not self.hs.config.use_presence: + # No-op if presence is disabled. + return + + for destination in destinations: + if destination == self.server_name: + continue + self._get_per_destination_queue(destination).send_presence(states) + @measure_func("txnqueue._process_presence") @defer.inlineCallbacks def _process_presence_inner(self, states): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index efb6bdca48..452599e1a1 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -21,8 +21,8 @@ import re from twisted.internet import defer import synapse -from synapse.api.constants import RoomVersions from synapse.api.errors import Codes, FederationDeniedError, SynapseError +from synapse.api.room_versions import RoomVersions from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX from synapse.http.endpoint import parse_and_validate_server_name from synapse.http.server import JsonResource @@ -513,7 +513,7 @@ class FederationV1InviteServlet(BaseFederationServlet): # state resolution algorithm, and we don't use that for processing # invites content = yield self.handler.on_invite_request( - origin, content, room_version=RoomVersions.V1, + origin, content, room_version=RoomVersions.V1.identifier, ) # V1 federation API is defined to return a content of `[200, {...}]` diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9eaf2d3e18..0684778882 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -29,13 +29,7 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer -from synapse.api.constants import ( - KNOWN_ROOM_VERSIONS, - EventTypes, - Membership, - RejectedReason, - RoomVersions, -) +from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.errors import ( AuthError, CodeMessageException, @@ -44,6 +38,7 @@ from synapse.api.errors import ( StoreError, SynapseError, ) +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.crypto.event_signing import compute_event_signature from synapse.event_auth import auth_types_for_event from synapse.events.validator import EventValidator @@ -1733,7 +1728,9 @@ class FederationHandler(BaseHandler): # invalid, and it would fail auth checks anyway. raise SynapseError(400, "No create event in state") - room_version = create_event.content.get("room_version", RoomVersions.V1) + room_version = create_event.content.get( + "room_version", RoomVersions.V1.identifier, + ) missing_auth_events = set() for e in itertools.chain(auth_events, state, [event]): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2d9e4c651b..054398dbcb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed -from synapse.api.constants import EventTypes, Membership, RoomVersions +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, @@ -30,6 +30,7 @@ from synapse.api.errors import ( NotFoundError, SynapseError, ) +from synapse.api.room_versions import RoomVersions from synapse.api.urls import ConsentURIBuilder from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -603,7 +604,9 @@ class EventCreationHandler(object): """ if event.is_state() and (event.type, event.state_key) == (EventTypes.Create, ""): - room_version = event.content.get("room_version", RoomVersions.V1) + room_version = event.content.get( + "room_version", RoomVersions.V1.identifier + ) else: room_version = yield self.store.get_room_version(event.room_id) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..e85c49742d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import PresenceState +import synapse.metrics +from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState from synapse.types import UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer @@ -98,6 +100,7 @@ class PresenceHandler(object): self.hs = hs self.is_mine = hs.is_mine self.is_mine_id = hs.is_mine_id + self.server_name = hs.hostname self.clock = hs.get_clock() self.store = hs.get_datastore() self.wheel_timer = WheelTimer() @@ -132,9 +135,6 @@ class PresenceHandler(object): ) ) - distributor = hs.get_distributor() - distributor.observe("user_joined_room", self.user_joined_room) - active_presence = self.store.take_presence_startup_info() # A dictionary of the current state of users. This is prefilled with @@ -220,6 +220,15 @@ class PresenceHandler(object): LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], lambda: len(self.wheel_timer)) + # Used to handle sending of presence to newly joined users/servers + if hs.config.use_presence: + self.notifier.add_replication_callback(self.notify_new_event) + + # Presence is best effort and quickly heals itself, so lets just always + # stream from the current state when we restart. + self._event_pos = self.store.get_current_events_token() + self._event_processing = False + @defer.inlineCallbacks def _on_shutdown(self): """Gets called when shutting down. This lets us persist any updates that @@ -751,31 +760,6 @@ class PresenceHandler(object): yield self._update_states([prev_state.copy_and_replace(**new_fields)]) @defer.inlineCallbacks - def user_joined_room(self, user, room_id): - """Called (via the distributor) when a user joins a room. This funciton - sends presence updates to servers, either: - 1. the joining user is a local user and we send their presence to - all servers in the room. - 2. the joining user is a remote user and so we send presence for all - local users in the room. - """ - # We only need to send presence to servers that don't have it yet. We - # don't need to send to local clients here, as that is done as part - # of the event stream/sync. - # TODO: Only send to servers not already in the room. - if self.is_mine(user): - state = yield self.current_state_for_user(user.to_string()) - - self._push_to_remotes([state]) - else: - user_ids = yield self.store.get_users_in_room(room_id) - user_ids = list(filter(self.is_mine_id, user_ids)) - - states = yield self.current_state_for_users(user_ids) - - self._push_to_remotes(list(states.values())) - - @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): """Returns the presence for all users in their presence list. """ @@ -945,6 +929,140 @@ class PresenceHandler(object): rows = yield self.store.get_all_presence_updates(last_id, current_id) defer.returnValue(rows) + def notify_new_event(self): + """Called when new events have happened. Handles users and servers + joining rooms and require being sent presence. + """ + + if self._event_processing: + return + + @defer.inlineCallbacks + def _process_presence(): + assert not self._event_processing + + self._event_processing = True + try: + yield self._unsafe_process() + finally: + self._event_processing = False + + run_as_background_process("presence.notify_new_event", _process_presence) + + @defer.inlineCallbacks + def _unsafe_process(self): + # Loop round handling deltas until we're up to date + while True: + with Measure(self.clock, "presence_delta"): + deltas = yield self.store.get_current_state_deltas(self._event_pos) + if not deltas: + return + + yield self._handle_state_delta(deltas) + + self._event_pos = deltas[-1]["stream_id"] + + # Expose current event processing position to prometheus + synapse.metrics.event_processing_positions.labels("presence").set( + self._event_pos + ) + + @defer.inlineCallbacks + def _handle_state_delta(self, deltas): + """Process current state deltas to find new joins that need to be + handled. + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + if typ != EventTypes.Member: + continue + + event = yield self.store.get_event(event_id) + if event.content.get("membership") != Membership.JOIN: + # We only care about joins + continue + + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id) + if prev_event.content.get("membership") == Membership.JOIN: + # Ignore changes to join events. + continue + + yield self._on_user_joined_room(room_id, state_key) + + @defer.inlineCallbacks + def _on_user_joined_room(self, room_id, user_id): + """Called when we detect a user joining the room via the current state + delta stream. + + Args: + room_id (str) + user_id (str) + + Returns: + Deferred + """ + + if self.is_mine_id(user_id): + # If this is a local user then we need to send their presence + # out to hosts in the room (who don't already have it) + + # TODO: We should be able to filter the hosts down to those that + # haven't previously seen the user + + state = yield self.current_state_for_user(user_id) + hosts = yield self.state.get_current_hosts_in_room(room_id) + + # Filter out ourselves. + hosts = set(host for host in hosts if host != self.server_name) + + self.federation.send_presence_to_destinations( + states=[state], + destinations=hosts, + ) + else: + # A remote user has joined the room, so we need to: + # 1. Check if this is a new server in the room + # 2. If so send any presence they don't already have for + # local users in the room. + + # TODO: We should be able to filter the users down to those that + # the server hasn't previously seen + + # TODO: Check that this is actually a new server joining the + # room. + + user_ids = yield self.state.get_current_user_in_room(room_id) + user_ids = list(filter(self.is_mine_id, user_ids)) + + states = yield self.current_state_for_users(user_ids) + + # Filter out old presence, i.e. offline presence states where + # the user hasn't been active for a week. We can change this + # depending on what we want the UX to be, but at the least we + # should filter out offline presence where the state is just the + # default state. + now = self.clock.time_msec() + states = [ + state for state in states.values() + if state.state != PresenceState.OFFLINE + or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 + or state.status_msg is not None + ] + + if states: + self.federation.send_presence_to_destinations( + states=states, + destinations=[get_domain_from_id(user_id)], + ) + def should_notify(old_state, new_state): """Decides if a presence state change should be sent to interested parties. diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 58940e0320..a51d11a257 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py
@@ -153,6 +153,7 @@ class RegistrationHandler(BaseHandler): user_type=None, default_display_name=None, address=None, + bind_emails=[], ): """Registers a new client on the server. @@ -172,6 +173,7 @@ class RegistrationHandler(BaseHandler): default_display_name (unicode|None): if set, the new user's displayname will be set to this. Defaults to 'localpart'. address (str|None): the IP address used to perform the registration. + bind_emails (List[str]): list of emails to bind to this account. Returns: A tuple of (user_id, access_token). Raises: @@ -261,6 +263,21 @@ class RegistrationHandler(BaseHandler): if not self.hs.config.user_consent_at_registration: yield self._auto_join_rooms(user_id) + # Bind any specified emails to this account + current_time = self.hs.get_clock().time_msec() + for email in bind_emails: + # generate threepid dict + threepid_dict = { + "medium": "email", + "address": email, + "validated_at": current_time, + } + + # Bind email to new account + yield self._register_email_threepid( + user_id, threepid_dict, None, False, + ) + defer.returnValue((user_id, token)) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 67b15697fd..c3dcfec247 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -25,14 +25,9 @@ from six import iteritems, string_types from twisted.internet import defer -from synapse.api.constants import ( - DEFAULT_ROOM_VERSION, - KNOWN_ROOM_VERSIONS, - EventTypes, - JoinRules, - RoomCreationPreset, -) +from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError +from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS from synapse.storage.state import StateFilter from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils @@ -479,7 +474,7 @@ class RoomCreationHandler(BaseHandler): if ratelimit: yield self.ratelimit(requester) - room_version = config.get("room_version", DEFAULT_ROOM_VERSION) + room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier) if not isinstance(room_version, string_types): raise SynapseError( 400, diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 235ce8334e..b3abd1b3c6 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py
@@ -74,14 +74,14 @@ class ModuleApi(object): return self._auth_handler.check_user_exists(user_id) @defer.inlineCallbacks - def register(self, localpart, displayname=None): + def register(self, localpart, displayname=None, emails=[]): """Registers a new user with given localpart and optional - displayname. + displayname, emails. Args: localpart (str): The localpart of the new user. - displayname (str|None): The displayname of the new user. If None, - the user's displayname will default to `localpart`. + displayname (str|None): The displayname of the new user. + emails (List[str]): Emails to bind to the new user. Returns: Deferred: a 2-tuple of (user_id, access_token) @@ -90,6 +90,7 @@ class ModuleApi(object): reg = self.hs.get_registration_handler() user_id, access_token = yield reg.register( localpart=localpart, default_display_name=displayname, + bind_emails=emails, ) defer.returnValue((user_id, access_token)) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 4830c68f35..b457c5563f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py
@@ -16,6 +16,10 @@ import logging from synapse.api.constants import EventTypes +from synapse.replication.tcp.streams.events import ( + EventsStreamCurrentStateRow, + EventsStreamEventRow, +) from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore @@ -79,11 +83,7 @@ class SlavedEventStore(EventFederationWorkerStore, if stream_name == "events": self._stream_id_gen.advance(token) for row in rows: - self.invalidate_caches_for_event( - token, row.event_id, row.room_id, row.type, row.state_key, - row.redacts, - backfilled=False, - ) + self._process_event_stream_row(token, row) elif stream_name == "backfill": self._backfill_id_gen.advance(-token) for row in rows: @@ -96,6 +96,23 @@ class SlavedEventStore(EventFederationWorkerStore, stream_name, token, rows ) + def _process_event_stream_row(self, token, row): + data = row.data + + if row.type == EventsStreamEventRow.TypeId: + self.invalidate_caches_for_event( + token, data.event_id, data.room_id, data.type, data.state_key, + data.redacts, + backfilled=False, + ) + elif row.type == EventsStreamCurrentStateRow.TypeId: + if data.type == EventTypes.Member: + self.get_rooms_for_user_with_stream_ordering.invalidate( + (data.state_key, ), + ) + else: + raise Exception("Unknown events stream row type %s" % (row.type, )) + def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, etype, state_key, redacts, backfilled): self._invalidate_get_event_cache(event_id) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e558f90e1a..206dc3b397 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -103,10 +103,19 @@ class ReplicationClientHandler(object): hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): - """Called when we get new replication data. By default this just pokes - the slave store. + """Called to handle a batch of replication data with a given stream token. - Can be overriden in subclasses to handle more. + By default this just pokes the slave store. Can be overridden in subclasses to + handle more. + + Args: + stream_name (str): name of the replication stream for this batch of rows + token (int): stream token for this batch of rows + rows (list): a list of Stream.ROW_TYPE objects as returned by + Stream.parse_row. + + Returns: + Deferred|None """ logger.debug("Received rdata %s -> %s", stream_name, token) return self.store.process_replication_rows(stream_name, token, rows) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 02e5bf6cc8..b51590cf8f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire:: > POSITION backfill 1 > POSITION caches 1 > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] - > RDATA events 14 ["$149019767112vOHxz:localhost:8823", - "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] + > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823", + "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]] < PING 1490197675618 > ERROR server stopping * connection closed by server * @@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): inbound_rdata_count.labels(stream_name).inc() try: - row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) + row = STREAMS_MAP[stream_name].parse_row(cmd.row) except Exception: logger.exception( "[%s] Failed to parse RDATA: %r %r", diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 7fc346c7b6..f6a38f5140 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.metrics import Measure, measure_func from .protocol import ServerReplicationStreamProtocol -from .streams import STREAMS_MAP, FederationStream +from .streams import STREAMS_MAP +from .streams.federation import FederationStream stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py new file mode 100644
index 0000000000..634f636dc9 --- /dev/null +++ b/synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines all the valid streams that clients can subscribe to, and the format +of the rows returned by each stream. + +Each stream is defined by the following information: + + stream name: The name of the stream + row type: The type that is used to serialise/deserialse the row + current_token: The function that returns the current token for the stream + update_function: The function that returns a list of updates between two tokens +""" + +from . import _base, events, federation + +STREAMS_MAP = { + stream.NAME: stream + for stream in ( + events.EventsStream, + _base.BackfillStream, + _base.PresenceStream, + _base.TypingStream, + _base.ReceiptsStream, + _base.PushRulesStream, + _base.PushersStream, + _base.CachesStream, + _base.PublicRoomsStream, + _base.DeviceListsStream, + _base.ToDeviceStream, + federation.FederationStream, + _base.TagAccountDataStream, + _base.AccountDataStream, + _base.GroupServerStream, + ) +} diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams/_base.py
index a6e9d6709e..52564136fc 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,16 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Defines all the valid streams that clients can subscribe to, and the format -of the rows returned by each stream. -Each stream is defined by the following information: - - stream name: The name of the stream - row type: The type that is used to serialise/deserialse the row - current_token: The function that returns the current token for the stream - update_function: The function that returns a list of updates between two tokens -""" import itertools import logging from collections import namedtuple @@ -34,14 +26,6 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 500000 - -EventStreamRow = namedtuple("EventStreamRow", ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional -)) BackfillStreamRow = namedtuple("BackfillStreamRow", ( "event_id", # str "room_id", # str @@ -96,10 +80,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( "entity", # str )) -FederationStreamRow = namedtuple("FederationStreamRow", ( - "type", # str, the type of data as defined in the BaseFederationRows - "data", # dict, serialization of a federation.send_queue.BaseFederationRow -)) TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( "user_id", # str "room_id", # str @@ -111,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) -CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( - "room_id", # str - "type", # str - "state_key", # str - "event_id", # str, optional -)) GroupsStreamRow = namedtuple("GroupsStreamRow", ( "group_id", # str "user_id", # str @@ -132,9 +106,24 @@ class Stream(object): time it was called up until the point `advance_current_token` was called. """ NAME = None # The name of the stream - ROW_TYPE = None # The type of the row + ROW_TYPE = None # The type of the row. Used by the default impl of parse_row. _LIMITED = True # Whether the update function takes a limit + @classmethod + def parse_row(cls, row): + """Parse a row received over replication + + By default, assumes that the row data is an array object and passes its contents + to the constructor of the ROW_TYPE for this stream. + + Args: + row: row data from the incoming RDATA command, after json decoding + + Returns: + ROW_TYPE object for this stream + """ + return cls.ROW_TYPE(*row) + def __init__(self, hs): # The token from which we last asked for updates self.last_token = self.current_token() @@ -162,8 +151,10 @@ class Stream(object): until the `upto_token` Returns: - (list(ROW_TYPE), int): list of updates plus the token used as an - upper bound of the updates (i.e. the "current token") + Deferred[Tuple[List[Tuple[int, Any]], int]: + Resolves to a pair ``(updates, current_token)``, where ``updates`` is a + list of ``(token, row)`` entries. ``row`` will be json-serialised and + sent over the replication steam. """ updates, current_token = yield self.get_updates_since(self.last_token) self.last_token = current_token @@ -176,8 +167,10 @@ class Stream(object): stream updates Returns: - (list(ROW_TYPE), int): list of updates plus the token used as an - upper bound of the updates (i.e. the "current token") + Deferred[Tuple[List[Tuple[int, Any]], int]: + Resolves to a pair ``(updates, current_token)``, where ``updates`` is a + list of ``(token, row)`` entries. ``row`` will be json-serialised and + sent over the replication steam. """ if from_token in ("NOW", "now"): defer.returnValue(([], self.upto_token)) @@ -202,7 +195,7 @@ class Stream(object): from_token, current_token, ) - updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] + updates = [(row[0], row[1:]) for row in rows] # check we didn't get more rows than the limit. # doing it like this allows the update_function to be a generator. @@ -232,20 +225,6 @@ class Stream(object): raise NotImplementedError() -class EventsStream(Stream): - """We received a new event, or an event went from being an outlier to not - """ - NAME = "events" - ROW_TYPE = EventStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - self.current_token = store.get_current_events_token - self.update_function = store.get_all_new_forward_event_rows - - super(EventsStream, self).__init__(hs) - - class BackfillStream(Stream): """We fetched some old events and either we had never seen that event before or it went from being an outlier to not. @@ -400,22 +379,6 @@ class ToDeviceStream(Stream): super(ToDeviceStream, self).__init__(hs) -class FederationStream(Stream): - """Data to be sent over federation. Only available when master has federation - sending disabled. - """ - NAME = "federation" - ROW_TYPE = FederationStreamRow - - def __init__(self, hs): - federation_sender = hs.get_federation_sender() - - self.current_token = federation_sender.get_current_token - self.update_function = federation_sender.get_replication_rows - - super(FederationStream, self).__init__(hs) - - class TagAccountDataStream(Stream): """Someone added/removed a tag for a room """ @@ -459,21 +422,6 @@ class AccountDataStream(Stream): defer.returnValue(results) -class CurrentStateDeltaStream(Stream): - """Current state for a room was changed - """ - NAME = "current_state_deltas" - ROW_TYPE = CurrentStateDeltaStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_current_state_delta_stream_id - self.update_function = store.get_all_updated_current_state_deltas - - super(CurrentStateDeltaStream, self).__init__(hs) - - class GroupServerStream(Stream): NAME = "groups" ROW_TYPE = GroupsStreamRow @@ -485,26 +433,3 @@ class GroupServerStream(Stream): self.update_function = store.get_all_groups_changes super(GroupServerStream, self).__init__(hs) - - -STREAMS_MAP = { - stream.NAME: stream - for stream in ( - EventsStream, - BackfillStream, - PresenceStream, - TypingStream, - ReceiptsStream, - PushRulesStream, - PushersStream, - CachesStream, - PublicRoomsStream, - DeviceListsStream, - ToDeviceStream, - FederationStream, - TagAccountDataStream, - AccountDataStream, - CurrentStateDeltaStream, - GroupServerStream, - ) -} diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py new file mode 100644
index 0000000000..e0f6e29248 --- /dev/null +++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import heapq + +import attr + +from twisted.internet import defer + +from ._base import Stream + + +"""Handling of the 'events' replication stream + +This stream contains rows of various types. Each row therefore contains a 'type' +identifier before the real data. For example:: + + RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]] + RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] + +An "ev" row is sent for each new event. The fields in the data part are: + + * The new event id + * The room id for the event + * The type of the new event + * The state key of the event, for state events + * The event id of an event which is redacted by this event. + +A "state" row is sent whenever the "current state" in a room changes. The fields in the +data part are: + + * The room id for the state change + * The event type of the state which has changed + * The state_key of the state which has changed + * The event id of the new state + +""" + + +@attr.s(slots=True, frozen=True) +class EventsStreamRow(object): + """A parsed row from the events replication stream""" + type = attr.ib() # str: the TypeId of one of the *EventsStreamRows + data = attr.ib() # BaseEventsStreamRow + + +class BaseEventsStreamRow(object): + """Base class for rows to be sent in the events stream. + + Specifies how to identify, serialize and deserialize the different types. + """ + + TypeId = None # Unique string that ids the type. Must be overriden in sub classes. + + @classmethod + def from_data(cls, data): + """Parse the data from the replication stream into a row. + + By default we just call the constructor with the data list as arguments + + Args: + data: The value of the data object from the replication stream + """ + return cls(*data) + + +@attr.s(slots=True, frozen=True) +class EventsStreamEventRow(BaseEventsStreamRow): + TypeId = "ev" + + event_id = attr.ib() # str + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str, optional + redacts = attr.ib() # str, optional + + +@attr.s(slots=True, frozen=True) +class EventsStreamCurrentStateRow(BaseEventsStreamRow): + TypeId = "state" + + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str + event_id = attr.ib() # str, optional + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + EventsStreamEventRow, + EventsStreamCurrentStateRow, + ) +} + + +class EventsStream(Stream): + """We received a new event, or an event went from being an outlier to not + """ + NAME = "events" + + def __init__(self, hs): + self._store = hs.get_datastore() + self.current_token = self._store.get_current_events_token + + super(EventsStream, self).__init__(hs) + + @defer.inlineCallbacks + def update_function(self, from_token, current_token, limit=None): + event_rows = yield self._store.get_all_new_forward_event_rows( + from_token, current_token, limit, + ) + event_updates = ( + (row[0], EventsStreamEventRow.TypeId, row[1:]) + for row in event_rows + ) + + state_rows = yield self._store.get_all_updated_current_state_deltas( + from_token, current_token, limit + ) + state_updates = ( + (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) + for row in state_rows + ) + + all_updates = heapq.merge(event_updates, state_updates) + + defer.returnValue(all_updates) + + @classmethod + def parse_row(cls, row): + (typ, data) = row + data = TypeToRow[typ].from_data(data) + return EventsStreamRow(typ, data) diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py new file mode 100644
index 0000000000..9aa43aa8d2 --- /dev/null +++ b/synapse/replication/tcp/streams/federation.py
@@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from collections import namedtuple + +from ._base import Stream + +FederationStreamRow = namedtuple("FederationStreamRow", ( + "type", # str, the type of data as defined in the BaseFederationRows + "data", # dict, serialization of a federation.send_queue.BaseFederationRow +)) + + +class FederationStream(Stream): + """Data to be sent over federation. Only available when master has federation + sending disabled. + """ + NAME = "federation" + ROW_TYPE = FederationStreamRow + + def __init__(self, hs): + federation_sender = hs.get_federation_sender() + + self.current_token = federation_sender.get_current_token + self.update_function = federation_sender.get_replication_rows + + super(FederationStream, self).__init__(hs) diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index e788769639..1a26f5a1a6 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py
@@ -647,8 +647,6 @@ class ResetPasswordRestServlet(ClientV1RestServlet): assert_params_in_dict(params, ["new_password"]) new_password = params['new_password'] - logger.info("new_password: %r", new_password) - yield self._set_password_handler.set_password( target_user_id, new_password, requester ) diff --git a/synapse/rest/client/v2_alpha/capabilities.py b/synapse/rest/client/v2_alpha/capabilities.py
index 373f95126e..a868d06098 100644 --- a/synapse/rest/client/v2_alpha/capabilities.py +++ b/synapse/rest/client/v2_alpha/capabilities.py
@@ -16,7 +16,7 @@ import logging from twisted.internet import defer -from synapse.api.constants import DEFAULT_ROOM_VERSION, RoomDisposition, RoomVersions +from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS from synapse.http.servlet import RestServlet from ._base import client_v2_patterns @@ -48,12 +48,10 @@ class CapabilitiesRestServlet(RestServlet): response = { "capabilities": { "m.room_versions": { - "default": DEFAULT_ROOM_VERSION, + "default": DEFAULT_ROOM_VERSION.identifier, "available": { - RoomVersions.V1: RoomDisposition.STABLE, - RoomVersions.V2: RoomDisposition.STABLE, - RoomVersions.STATE_V2_TEST: RoomDisposition.UNSTABLE, - RoomVersions.V3: RoomDisposition.STABLE, + v.identifier: v.disposition + for v in KNOWN_ROOM_VERSIONS.values() }, }, "m.change_password": {"enabled": change_password}, diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
index e6356101fd..3db7ff8d1b 100644 --- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
@@ -17,8 +17,8 @@ import logging from twisted.internet import defer -from synapse.api.constants import KNOWN_ROOM_VERSIONS from synapse.api.errors import Codes, SynapseError +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.http.servlet import ( RestServlet, assert_params_in_dict, diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 68058f613c..52347fee34 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py
@@ -24,7 +24,8 @@ from frozendict import frozendict from twisted.internet import defer -from synapse.api.constants import EventTypes, RoomVersions +from synapse.api.constants import EventTypes +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.events.snapshot import EventContext from synapse.state import v1, v2 from synapse.util.async_helpers import Linearizer @@ -603,22 +604,15 @@ def resolve_events_with_store(room_version, state_sets, event_map, state_res_sto Deferred[dict[(str, str), str]]: a map from (type, state_key) to event_id. """ - if room_version == RoomVersions.V1: + v = KNOWN_ROOM_VERSIONS[room_version] + if v.state_res == StateResolutionVersions.V1: return v1.resolve_events_with_store( state_sets, event_map, state_res_store.get_events, ) - elif room_version in ( - RoomVersions.STATE_V2_TEST, RoomVersions.V2, RoomVersions.V3, - ): + else: return v2.resolve_events_with_store( room_version, state_sets, event_map, state_res_store, ) - else: - # This should only happen if we added a version but forgot to add it to - # the list above. - raise Exception( - "No state resolution algorithm defined for version %r" % (room_version,) - ) @attr.s diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 6d3afcae7c..29b4e86cfd 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py
@@ -21,8 +21,9 @@ from six import iteritems, iterkeys, itervalues from twisted.internet import defer from synapse import event_auth -from synapse.api.constants import EventTypes, RoomVersions +from synapse.api.constants import EventTypes from synapse.api.errors import AuthError +from synapse.api.room_versions import RoomVersions logger = logging.getLogger(__name__) @@ -275,7 +276,9 @@ def _resolve_auth_events(events, auth_events): try: # The signatures have already been checked at this point event_auth.check( - RoomVersions.V1, event, auth_events, + RoomVersions.V1.identifier, + event, + auth_events, do_sig_check=False, do_size_check=False, ) @@ -291,7 +294,9 @@ def _resolve_normal_events(events, auth_events): try: # The signatures have already been checked at this point event_auth.check( - RoomVersions.V1, event, auth_events, + RoomVersions.V1.identifier, + event, + auth_events, do_sig_check=False, do_size_check=False, ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7e3903859b..653b32fbf5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py
@@ -1355,11 +1355,6 @@ class SQLBaseStore(object): members_changed (iterable[str]): The user_ids of members that have changed """ - for member in members_changed: - self._attempt_to_invalidate_cache( - "get_rooms_for_user_with_stream_ordering", (member,), - ) - for host in set(get_domain_from_id(u) for u in members_changed): self._attempt_to_invalidate_cache( "is_host_joined", (room_id, host,), diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 428300ea0a..dfda39bbe0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -30,7 +30,6 @@ from twisted.internet import defer import synapse.metrics from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError -# these are only included to make the type annotations work from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 from synapse.metrics.background_process_metrics import run_as_background_process @@ -51,8 +50,11 @@ from synapse.util.metrics import Measure logger = logging.getLogger(__name__) persist_event_counter = Counter("synapse_storage_events_persisted_events", "") -event_counter = Counter("synapse_storage_events_persisted_events_sep", "", - ["type", "origin_type", "origin_entity"]) +event_counter = Counter( + "synapse_storage_events_persisted_events_sep", + "", + ["type", "origin_type", "origin_entity"], +) # The number of times we are recalculating the current state state_delta_counter = Counter("synapse_storage_events_state_delta", "") @@ -60,13 +62,15 @@ state_delta_counter = Counter("synapse_storage_events_state_delta", "") # The number of times we are recalculating state when there is only a # single forward extremity state_delta_single_event_counter = Counter( - "synapse_storage_events_state_delta_single_event", "") + "synapse_storage_events_state_delta_single_event", "" +) # The number of times we are reculating state when we could have resonably # calculated the delta when we calculated the state for an event we were # persisting. state_delta_reuse_delta_counter = Counter( - "synapse_storage_events_state_delta_reuse_delta", "") + "synapse_storage_events_state_delta_reuse_delta", "" +) def encode_json(json_object): @@ -75,7 +79,7 @@ def encode_json(json_object): """ out = frozendict_json_encoder.encode(json_object) if isinstance(out, bytes): - out = out.decode('utf8') + out = out.decode("utf8") return out @@ -84,9 +88,9 @@ class _EventPeristenceQueue(object): concurrent transaction per room. """ - _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( - "events_and_contexts", "backfilled", "deferred", - )) + _EventPersistQueueItem = namedtuple( + "_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred") + ) def __init__(self): self._event_persist_queues = {} @@ -119,11 +123,13 @@ class _EventPeristenceQueue(object): deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) - queue.append(self._EventPersistQueueItem( - events_and_contexts=events_and_contexts, - backfilled=backfilled, - deferred=deferred, - )) + queue.append( + self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + deferred=deferred, + ) + ) return deferred.observe() @@ -191,6 +197,7 @@ def _retry_on_integrity_error(func): Args: func: function that returns a Deferred and accepts a `delete_existing` arg """ + @wraps(func) @defer.inlineCallbacks def f(self, *args, **kwargs): @@ -206,8 +213,12 @@ def _retry_on_integrity_error(func): # inherits from EventFederationStore so that we can call _update_backward_extremities # and _handle_mult_prev_events (though arguably those could both be moved in here) -class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore, - BackgroundUpdateStore): +class EventsStore( + StateGroupWorkerStore, + EventFederationStore, + EventsWorkerStore, + BackgroundUpdateStore, +): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -265,8 +276,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore deferreds = [] for room_id, evs_ctxs in iteritems(partitioned): d = self._event_persist_queue.add_to_queue( - room_id, evs_ctxs, - backfilled=backfilled, + room_id, evs_ctxs, backfilled=backfilled ) deferreds.append(d) @@ -296,8 +306,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore and the stream ordering of the latest persisted event """ deferred = self._event_persist_queue.add_to_queue( - event.room_id, [(event, context)], - backfilled=backfilled, + event.room_id, [(event, context)], backfilled=backfilled ) self._maybe_start_persisting(event.room_id) @@ -312,16 +321,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore def persisting_queue(item): with Measure(self._clock, "persist_events"): yield self._persist_events( - item.events_and_contexts, - backfilled=item.backfilled, + item.events_and_contexts, backfilled=item.backfilled ) self._event_persist_queue.handle_queue(room_id, persisting_queue) @_retry_on_integrity_error @defer.inlineCallbacks - def _persist_events(self, events_and_contexts, backfilled=False, - delete_existing=False): + def _persist_events( + self, events_and_contexts, backfilled=False, delete_existing=False + ): """Persist events to db Args: @@ -345,13 +354,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore ) with stream_ordering_manager as stream_orderings: - for (event, context), stream, in zip( - events_and_contexts, stream_orderings - ): + for (event, context), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream chunks = [ - events_and_contexts[x:x + 100] + events_and_contexts[x : x + 100] for x in range(0, len(events_and_contexts), 100) ] @@ -445,12 +452,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore state_delta_reuse_delta_counter.inc() break - logger.info( - "Calculating state delta for room %s", room_id, - ) + logger.info("Calculating state delta for room %s", room_id) with Measure( - self._clock, - "persist_events.get_new_state_after_events", + self._clock, "persist_events.get_new_state_after_events" ): res = yield self._get_new_state_after_events( room_id, @@ -470,11 +474,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore state_delta_for_room[room_id] = ([], delta_ids) elif current_state is not None: with Measure( - self._clock, - "persist_events.calculate_state_delta", + self._clock, "persist_events.calculate_state_delta" ): delta = yield self._calculate_state_delta( - room_id, current_state, + room_id, current_state ) state_delta_for_room[room_id] = delta @@ -498,7 +501,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( - chunk[-1][0].internal_metadata.stream_ordering, + chunk[-1][0].internal_metadata.stream_ordering ) for event, context in chunk: @@ -515,9 +518,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore event_counter.labels(event.type, origin_type, origin_entity).inc() for room_id, new_state in iteritems(current_state_for_room): - self.get_current_state_ids.prefill( - (room_id, ), new_state - ) + self.get_current_state_ids.prefill((room_id,), new_state) for room_id, latest_event_ids in iteritems(new_forward_extremeties): self.get_latest_event_ids_in_room.prefill( @@ -535,8 +536,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # we're only interested in new events which aren't outliers and which aren't # being rejected. new_events = [ - event for event, ctx in event_contexts - if not event.internal_metadata.is_outlier() and not ctx.rejected + event + for event, ctx in event_contexts + if not event.internal_metadata.is_outlier() + and not ctx.rejected and not event.internal_metadata.is_soft_failed() ] @@ -544,15 +547,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore result = set(latest_event_ids) # add all the new events to the list - result.update( - event.event_id for event in new_events - ) + result.update(event.event_id for event in new_events) # Now remove all events which are prev_events of any of the new events result.difference_update( - e_id - for event in new_events - for e_id in event.prev_event_ids() + e_id for event in new_events for e_id in event.prev_event_ids() ) # Finally, remove any events which are prev_events of any existing events. @@ -592,17 +591,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore results.extend(r[0] for r in txn) for chunk in batch_iter(event_ids, 100): - yield self.runInteraction( - "_get_events_which_are_prevs", - _get_events, - chunk, - ) + yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk) defer.returnValue(results) @defer.inlineCallbacks - def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids, - new_latest_event_ids): + def _get_new_state_after_events( + self, room_id, events_context, old_latest_event_ids, new_latest_event_ids + ): """Calculate the current state dict after adding some new events to a room @@ -642,7 +638,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if not ev.internal_metadata.is_outlier(): raise Exception( "Context for new event %s has no state " - "group" % (ev.event_id, ), + "group" % (ev.event_id,) ) continue @@ -682,9 +678,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if missing_event_ids: # Now pull out the state groups for any missing events from DB - event_to_groups = yield self._get_state_group_for_events( - missing_event_ids, - ) + event_to_groups = yield self._get_state_group_for_events(missing_event_ids) event_id_to_state_group.update(event_to_groups) # State groups of old_latest_event_ids @@ -710,9 +704,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_state_group = next(iter(new_state_groups)) old_state_group = next(iter(old_state_groups)) - delta_ids = state_group_deltas.get( - (old_state_group, new_state_group,), None - ) + delta_ids = state_group_deltas.get((old_state_group, new_state_group), None) if delta_ids is not None: # We have a delta from the existing to new current state, # so lets just return that. If we happen to already have @@ -735,9 +727,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Ok, we need to defer to the state handler to resolve our state sets. - state_groups = { - sg: state_groups_map[sg] for sg in new_state_groups - } + state_groups = {sg: state_groups_map[sg] for sg in new_state_groups} events_map = {ev.event_id: ev for ev, _ in events_context} @@ -755,8 +745,11 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.debug("calling resolve_state_groups from preserve_events") res = yield self._state_resolution_handler.resolve_state_groups( - room_id, room_version, state_groups, events_map, - state_res_store=StateResolutionStore(self) + room_id, + room_version, + state_groups, + events_map, + state_res_store=StateResolutionStore(self), ) defer.returnValue((res.state, None)) @@ -774,22 +767,26 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore """ existing_state = yield self.get_current_state_ids(room_id) - to_delete = [ - key for key in existing_state - if key not in current_state - ] + to_delete = [key for key in existing_state if key not in current_state] to_insert = { - key: ev_id for key, ev_id in iteritems(current_state) + key: ev_id + for key, ev_id in iteritems(current_state) if ev_id != existing_state.get(key) } defer.returnValue((to_delete, to_insert)) @log_function - def _persist_events_txn(self, txn, events_and_contexts, backfilled, - delete_existing=False, state_delta_for_room={}, - new_forward_extremeties={}): + def _persist_events_txn( + self, + txn, + events_and_contexts, + backfilled, + delete_existing=False, + state_delta_for_room={}, + new_forward_extremeties={}, + ): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, @@ -816,9 +813,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore """ all_events_and_contexts = events_and_contexts + min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering - self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) + self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) self._update_forward_extremities_txn( txn, @@ -828,20 +826,17 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Ensure that we don't have the same event twice. events_and_contexts = self._filter_events_and_contexts_for_duplicates( - events_and_contexts, + events_and_contexts ) self._update_room_depths_txn( - txn, - events_and_contexts=events_and_contexts, - backfilled=backfilled, + txn, events_and_contexts=events_and_contexts, backfilled=backfilled ) # _update_outliers_txn filters out any events which have already been # persisted, and returns the filtered list. events_and_contexts = self._update_outliers_txn( - txn, - events_and_contexts=events_and_contexts, + txn, events_and_contexts=events_and_contexts ) # From this point onwards the events are only events that we haven't @@ -852,15 +847,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # for these events so we can reinsert them. # This gets around any problems with some tables already having # entries. - self._delete_existing_rows_txn( - txn, - events_and_contexts=events_and_contexts, - ) + self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts) - self._store_event_txn( - txn, - events_and_contexts=events_and_contexts, - ) + self._store_event_txn(txn, events_and_contexts=events_and_contexts) # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) @@ -889,8 +878,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. events_and_contexts = self._store_rejected_events_txn( - txn, - events_and_contexts=events_and_contexts, + txn, events_and_contexts=events_and_contexts ) # From this point onwards the events are only ones that weren't @@ -903,7 +891,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore backfilled=backfilled, ) - def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): + def _update_current_state_txn(self, txn, state_delta_by_room, stream_id): for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple @@ -912,6 +900,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # that we can use it to calculate the `prev_event_id`. (This # allows us to not have to pull out the existing state # unnecessarily). + # + # The stream_id for the update is chosen to be the minimum of the stream_ids + # for the batch of the events that we are persisting; that means we do not + # end up in a situation where workers see events before the + # current_state_delta updates. + # sql = """ INSERT INTO current_state_delta_stream (stream_id, room_id, type, state_key, event_id, prev_event_id) @@ -920,22 +914,40 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore WHERE room_id = ? AND type = ? AND state_key = ? ) """ - txn.executemany(sql, ( + txn.executemany( + sql, ( - max_stream_order, room_id, etype, state_key, None, - room_id, etype, state_key, - ) - for etype, state_key in to_delete - # We sanity check that we're deleting rather than updating - if (etype, state_key) not in to_insert - )) - txn.executemany(sql, ( + ( + stream_id, + room_id, + etype, + state_key, + None, + room_id, + etype, + state_key, + ) + for etype, state_key in to_delete + # We sanity check that we're deleting rather than updating + if (etype, state_key) not in to_insert + ), + ) + txn.executemany( + sql, ( - max_stream_order, room_id, etype, state_key, ev_id, - room_id, etype, state_key, - ) - for (etype, state_key), ev_id in iteritems(to_insert) - )) + ( + stream_id, + room_id, + etype, + state_key, + ev_id, + room_id, + etype, + state_key, + ) + for (etype, state_key), ev_id in iteritems(to_insert) + ), + ) # Now we actually update the current_state_events table @@ -964,7 +976,8 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore txn.call_after( self._curr_state_delta_stream_cache.entity_has_changed, - room_id, max_stream_order, + room_id, + stream_id, ) # Invalidate the various caches @@ -980,28 +993,27 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if ev_type == EventTypes.Member ) + for member in members_changed: + txn.call_after( + self.get_rooms_for_user_with_stream_ordering.invalidate, (member,) + ) + self._invalidate_state_caches_and_stream(txn, room_id, members_changed) - def _update_forward_extremities_txn(self, txn, new_forward_extremities, - max_stream_order): + def _update_forward_extremities_txn( + self, txn, new_forward_extremities, max_stream_order + ): for room_id, new_extrem in iteritems(new_forward_extremities): self._simple_delete_txn( - txn, - table="event_forward_extremities", - keyvalues={"room_id": room_id}, - ) - txn.call_after( - self.get_latest_event_ids_in_room.invalidate, (room_id,) + txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) + txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) self._simple_insert_many_txn( txn, table="event_forward_extremities", values=[ - { - "event_id": ev_id, - "room_id": room_id, - } + {"event_id": ev_id, "room_id": room_id} for room_id, new_extrem in iteritems(new_forward_extremities) for ev_id in new_extrem ], @@ -1021,7 +1033,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore } for room_id, new_extrem in iteritems(new_forward_extremities) for event_id in new_extrem - ] + ], ) @classmethod @@ -1065,7 +1077,8 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore if not backfilled: txn.call_after( self._events_stream_cache.entity_has_changed, - event.room_id, event.internal_metadata.stream_ordering, + event.room_id, + event.internal_metadata.stream_ordering, ) if not event.internal_metadata.is_outlier() and not context.rejected: @@ -1092,16 +1105,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore are already in the events table. """ txn.execute( - "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % ( - ",".join(["?"] * len(events_and_contexts)), - ), - [event.event_id for event, _ in events_and_contexts] + "SELECT event_id, outlier FROM events WHERE event_id in (%s)" + % (",".join(["?"] * len(events_and_contexts)),), + [event.event_id for event, _ in events_and_contexts], ) - have_persisted = { - event_id: outlier - for event_id, outlier in txn - } + have_persisted = {event_id: outlier for event_id, outlier in txn} to_remove = set() for event, context in events_and_contexts: @@ -1128,18 +1137,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.exception("") raise - metadata_json = encode_json( - event.internal_metadata.get_dict() - ) + metadata_json = encode_json(event.internal_metadata.get_dict()) sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json, event.event_id,) + "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?" ) + txn.execute(sql, (metadata_json, event.event_id)) # Add an entry to the ex_outlier_stream table to replicate the # change in outlier status to our workers. @@ -1152,25 +1155,17 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "event_stream_ordering": stream_order, "event_id": event.event_id, "state_group": state_group_id, - } + }, ) - sql = ( - "UPDATE events SET outlier = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (False, event.event_id,) - ) + sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?" + txn.execute(sql, (False, event.event_id)) # Update the event_backward_extremities table now that this # event isn't an outlier any more. self._update_backward_extremeties(txn, [event]) - return [ - ec for ec in events_and_contexts if ec[0] not in to_remove - ] + return [ec for ec in events_and_contexts if ec[0] not in to_remove] @classmethod def _delete_existing_rows_txn(cls, txn, events_and_contexts): @@ -1181,39 +1176,37 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.info("Deleting existing") for table in ( - "events", - "event_auth", - "event_json", - "event_content_hashes", - "event_destinations", - "event_edge_hashes", - "event_edges", - "event_forward_extremities", - "event_reference_hashes", - "event_search", - "event_signatures", - "event_to_state_groups", - "guest_access", - "history_visibility", - "local_invites", - "room_names", - "state_events", - "rejections", - "redactions", - "room_memberships", - "topics" + "events", + "event_auth", + "event_json", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_reference_hashes", + "event_search", + "event_signatures", + "event_to_state_groups", + "guest_access", + "history_visibility", + "local_invites", + "room_names", + "state_events", + "rejections", + "redactions", + "room_memberships", + "topics", ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), - [(ev.event_id,) for ev, _ in events_and_contexts] + [(ev.event_id,) for ev, _ in events_and_contexts], ) - for table in ( - "event_push_actions", - ): + for table in ("event_push_actions",): txn.executemany( "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,), - [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts] + [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts], ) def _store_event_txn(self, txn, events_and_contexts): @@ -1296,17 +1289,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore for event, context in events_and_contexts: if context.rejected: # Insert the event_id into the rejections table - self._store_rejections_txn( - txn, event.event_id, context.rejected - ) + self._store_rejections_txn(txn, event.event_id, context.rejected) to_remove.add(event) - return [ - ec for ec in events_and_contexts if ec[0] not in to_remove - ] + return [ec for ec in events_and_contexts if ec[0] not in to_remove] - def _update_metadata_tables_txn(self, txn, events_and_contexts, - all_events_and_contexts, backfilled): + def _update_metadata_tables_txn( + self, txn, events_and_contexts, all_events_and_contexts, backfilled + ): """Update all the miscellaneous tables for new events Args: @@ -1342,8 +1332,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( - txn, - events=[event for event, _ in events_and_contexts], + txn, events=[event for event, _ in events_and_contexts] ) for event, _ in events_and_contexts: @@ -1401,11 +1390,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore state_values.append(vals) - self._simple_insert_many_txn( - txn, - table="state_events", - values=state_values, - ) + self._simple_insert_many_txn(txn, table="state_events", values=state_values) # Prefill the event cache self._add_to_cache(txn, events_and_contexts) @@ -1416,10 +1401,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore rows = [] N = 200 for i in range(0, len(events_and_contexts), N): - ev_map = { - e[0].event_id: e[0] - for e in events_and_contexts[i:i + N] - } + ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]} if not ev_map: break @@ -1439,14 +1421,14 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore for row in rows: event = ev_map[row["event_id"]] if not row["rejects"] and not row["redacts"]: - to_prefill.append(_EventCacheEntry( - event=event, - redacted_event=None, - )) + to_prefill.append( + _EventCacheEntry(event=event, redacted_event=None) + ) def prefill(): for cache_entry in to_prefill: self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) def _store_redaction(self, txn, event): @@ -1454,7 +1436,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore txn.call_after(self._invalidate_get_event_cache, event.redacts) txn.execute( "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) + (event.event_id, event.redacts), ) @defer.inlineCallbacks @@ -1465,6 +1447,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore If it has been significantly less or more than one day since the last call to this function, it will return None. """ + def _count_messages(txn): sql = """ SELECT COALESCE(COUNT(*), 0) FROM events @@ -1492,7 +1475,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore AND stream_ordering > ? """ - txn.execute(sql, (like_clause, self.stream_ordering_day_ago,)) + txn.execute(sql, (like_clause, self.stream_ordering_day_ago)) count, = txn.fetchone() return count @@ -1557,18 +1540,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore update_rows.append((sender, contains_url, event_id)) - sql = ( - "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?" - ) + sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?" for index in range(0, len(update_rows), INSERT_CLUMP_SIZE): - clump = update_rows[index:index + INSERT_CLUMP_SIZE] + clump = update_rows[index : index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows) + "rows_inserted": rows_inserted + len(rows), } self._background_update_progress_txn( @@ -1613,10 +1594,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore rows_to_update = [] - chunks = [ - event_ids[i:i + 100] - for i in range(0, len(event_ids), 100) - ] + chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)] for chunk in chunks: ev_rows = self._simple_select_many_txn( txn, @@ -1639,18 +1617,16 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore rows_to_update.append((origin_server_ts, event_id)) - sql = ( - "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" - ) + sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): - clump = rows_to_update[index:index + INSERT_CLUMP_SIZE] + clump = rows_to_update[index : index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows_to_update) + "rows_inserted": rows_inserted + len(rows_to_update), } self._background_update_progress_txn( @@ -1714,6 +1690,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_event_updates.extend(txn) return new_event_updates + return self.runInteraction( "get_all_new_forward_event_rows", get_all_new_forward_event_rows ) @@ -1756,13 +1733,20 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_event_updates.extend(txn.fetchall()) return new_event_updates + return self.runInteraction( "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows ) @cached(num_args=5, max_entries=10) - def get_all_new_events(self, last_backfill_id, last_forward_id, - current_backfill_id, current_forward_id, limit): + def get_all_new_events( + self, + last_backfill_id, + last_forward_id, + current_backfill_id, + current_forward_id, + limit, + ): """Get all the new events that have arrived at the server either as new events or as backfilled events""" have_backfill_events = last_backfill_id != current_backfill_id @@ -1837,14 +1821,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore backward_ex_outliers = [] return AllNewEventsResult( - new_forward_events, new_backfill_events, - forward_ex_outliers, backward_ex_outliers, + new_forward_events, + new_backfill_events, + forward_ex_outliers, + backward_ex_outliers, ) + return self.runInteraction("get_all_new_events", get_all_new_events_txn) - def purge_history( - self, room_id, token, delete_local_events, - ): + def purge_history(self, room_id, token, delete_local_events): """Deletes room history before a certain point Args: @@ -1860,13 +1845,13 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore return self.runInteraction( "purge_history", - self._purge_history_txn, room_id, token, + self._purge_history_txn, + room_id, + token, delete_local_events, ) - def _purge_history_txn( - self, txn, room_id, token_str, delete_local_events, - ): + def _purge_history_txn(self, txn, room_id, token_str, delete_local_events): token = RoomStreamToken.parse(token_str) # Tables that should be pruned: @@ -1913,7 +1898,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "ON e.event_id = f.event_id " "AND e.room_id = f.room_id " "WHERE f.room_id = ?", - (room_id,) + (room_id,), ) rows = txn.fetchall() max_depth = max(row[1] for row in rows) @@ -1934,10 +1919,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore should_delete_expr += " AND event_id NOT LIKE ?" # We include the parameter twice since we use the expression twice - should_delete_params += ( - "%:" + self.hs.hostname, - "%:" + self.hs.hostname, - ) + should_delete_params += ("%:" + self.hs.hostname, "%:" + self.hs.hostname) should_delete_params += (room_id, token.topological) @@ -1948,10 +1930,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore " SELECT event_id, %s" " FROM events AS e LEFT JOIN state_events USING (event_id)" " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?" - % ( - should_delete_expr, - should_delete_expr, - ), + % (should_delete_expr, should_delete_expr), should_delete_params, ) @@ -1961,23 +1940,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # the should_delete / shouldn't_delete subsets txn.execute( "CREATE INDEX events_to_purge_should_delete" - " ON events_to_purge(should_delete)", + " ON events_to_purge(should_delete)" ) # We do joins against events_to_purge for e.g. calculating state # groups to purge, etc., so lets make an index. - txn.execute( - "CREATE INDEX events_to_purge_id" - " ON events_to_purge(event_id)", - ) + txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)") - txn.execute( - "SELECT event_id, should_delete FROM events_to_purge" - ) + txn.execute("SELECT event_id, should_delete FROM events_to_purge") event_rows = txn.fetchall() logger.info( "[purge] found %i events before cutoff, of which %i can be deleted", - len(event_rows), sum(1 for e in event_rows if e[1]), + len(event_rows), + sum(1 for e in event_rows if e[1]), ) logger.info("[purge] Finding new backward extremities") @@ -1989,24 +1964,21 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "SELECT DISTINCT e.event_id FROM events_to_purge AS e" " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id" - " WHERE ep2.event_id IS NULL", + " WHERE ep2.event_id IS NULL" ) new_backwards_extrems = txn.fetchall() logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems) txn.execute( - "DELETE FROM event_backward_extremities WHERE room_id = ?", - (room_id,) + "DELETE FROM event_backward_extremities WHERE room_id = ?", (room_id,) ) # Update backward extremeties txn.executemany( "INSERT INTO event_backward_extremities (room_id, event_id)" " VALUES (?, ?)", - [ - (room_id, event_id) for event_id, in new_backwards_extrems - ] + [(room_id, event_id) for event_id, in new_backwards_extrems], ) logger.info("[purge] finding redundant state groups") @@ -2014,28 +1986,25 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # Get all state groups that are referenced by events that are to be # deleted. We then go and check if they are referenced by other events # or state groups, and if not we delete them. - txn.execute(""" + txn.execute( + """ SELECT DISTINCT state_group FROM events_to_purge INNER JOIN event_to_state_groups USING (event_id) - """) + """ + ) referenced_state_groups = set(sg for sg, in txn) logger.info( - "[purge] found %i referenced state groups", - len(referenced_state_groups), + "[purge] found %i referenced state groups", len(referenced_state_groups) ) logger.info("[purge] finding state groups that can be deleted") - state_groups_to_delete, remaining_state_groups = ( - self._find_unreferenced_groups_during_purge( - txn, referenced_state_groups, - ) - ) + _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups) + state_groups_to_delete, remaining_state_groups = _ logger.info( - "[purge] found %i state groups to delete", - len(state_groups_to_delete), + "[purge] found %i state groups to delete", len(state_groups_to_delete) ) logger.info( @@ -2047,25 +2016,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # groups to non delta versions. for sg in remaining_state_groups: logger.info("[purge] de-delta-ing remaining state group %s", sg) - curr_state = self._get_state_groups_from_groups_txn( - txn, [sg], - ) + curr_state = self._get_state_groups_from_groups_txn(txn, [sg]) curr_state = curr_state[sg] self._simple_delete_txn( - txn, - table="state_groups_state", - keyvalues={ - "state_group": sg, - } + txn, table="state_groups_state", keyvalues={"state_group": sg} ) self._simple_delete_txn( - txn, - table="state_group_edges", - keyvalues={ - "state_group": sg, - } + txn, table="state_group_edges", keyvalues={"state_group": sg} ) self._simple_insert_many_txn( @@ -2099,9 +2058,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore "WHERE event_id IN (SELECT event_id from events_to_purge)" ) for event_id, _ in event_rows: - txn.call_after(self._get_state_group_for_event.invalidate, ( - event_id, - )) + txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) # Delete all remote non-state events for table in ( @@ -2123,21 +2080,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore txn.execute( "DELETE FROM %s WHERE event_id IN (" " SELECT event_id FROM events_to_purge WHERE should_delete" - ")" % (table,), + ")" % (table,) ) # event_push_actions lacks an index on event_id, and has one on # (room_id, event_id) instead. - for table in ( - "event_push_actions", - ): + for table in ("event_push_actions",): logger.info("[purge] removing events from %s", table) txn.execute( "DELETE FROM %s WHERE room_id = ? AND event_id IN (" " SELECT event_id FROM events_to_purge WHERE should_delete" ")" % (table,), - (room_id, ) + (room_id,), ) # Mark all state and own events as outliers @@ -2162,27 +2117,28 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore # extremities. However, the events in event_backward_extremities # are ones we don't have yet so we need to look at the events that # point to it via event_edges table. - txn.execute(""" + txn.execute( + """ SELECT COALESCE(MIN(depth), 0) FROM event_backward_extremities AS eb INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id INNER JOIN events AS e ON e.event_id = eg.event_id WHERE eb.room_id = ? - """, (room_id,)) + """, + (room_id,), + ) min_depth, = txn.fetchone() logger.info("[purge] updating room_depth to %d", min_depth) txn.execute( "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", - (min_depth, room_id,) + (min_depth, room_id), ) # finally, drop the temp table. this will commit the txn in sqlite, # so make sure to keep this actually last. - txn.execute( - "DROP TABLE events_to_purge" - ) + txn.execute("DROP TABLE events_to_purge") logger.info("[purge] done") @@ -2226,7 +2182,9 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore SELECT DISTINCT state_group FROM event_to_state_groups LEFT JOIN events_to_purge AS ep USING (event_id) WHERE state_group IN (%s) AND ep.event_id IS NULL - """ % (",".join("?" for _ in current_search),) + """ % ( + ",".join("?" for _ in current_search), + ) txn.execute(sql, list(current_search)) referenced = set(sg for sg, in txn) @@ -2242,7 +2200,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore column="prev_state_group", iterable=current_search, keyvalues={}, - retcols=("prev_state_group", "state_group",), + retcols=("prev_state_group", "state_group"), ) prevs = set(row["state_group"] for row in rows) @@ -2279,16 +2237,15 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore table="events", retcols=["topological_ordering", "stream_ordering"], keyvalues={"event_id": event_id}, - allow_none=True + allow_none=True, ) if not res: raise SynapseError(404, "Could not find event %s" % (event_id,)) - defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) - - def get_max_current_state_delta_stream_id(self): - return self._stream_id_gen.get_current_token() + defer.returnValue( + (int(res["topological_ordering"]), int(res["stream_ordering"])) + ) def get_all_updated_current_state_deltas(self, from_token, to_token, limit): def get_all_updated_current_state_deltas_txn(txn): @@ -2300,13 +2257,19 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore """ txn.execute(sql, (from_token, to_token, limit)) return txn.fetchall() + return self.runInteraction( "get_all_updated_current_state_deltas", get_all_updated_current_state_deltas_txn, ) -AllNewEventsResult = namedtuple("AllNewEventsResult", [ - "new_forward_events", "new_backfill_events", - "forward_ex_outliers", "backward_ex_outliers", -]) +AllNewEventsResult = namedtuple( + "AllNewEventsResult", + [ + "new_forward_events", + "new_backfill_events", + "forward_ex_outliers", + "backward_ex_outliers", + ], +) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 1716be529a..53c8dc3903 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py
@@ -21,8 +21,9 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.constants import EventFormatVersions, EventTypes +from synapse.api.constants import EventTypes from synapse.api.errors import NotFoundError +from synapse.api.room_versions import EventFormatVersions from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 # these are only included to make the type annotations work from synapse.events.snapshot import EventContext # noqa: F401 diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 2874dabbd1..57df17bcc2 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py
@@ -35,28 +35,22 @@ logger = logging.getLogger(__name__) RoomsForUser = namedtuple( - "RoomsForUser", - ("room_id", "sender", "membership", "event_id", "stream_ordering") + "RoomsForUser", ("room_id", "sender", "membership", "event_id", "stream_ordering") ) GetRoomsForUserWithStreamOrdering = namedtuple( - "_GetRoomsForUserWithStreamOrdering", - ("room_id", "stream_ordering",) + "_GetRoomsForUserWithStreamOrdering", ("room_id", "stream_ordering") ) # We store this using a namedtuple so that we save about 3x space over using a # dict. -ProfileInfo = namedtuple( - "ProfileInfo", ("avatar_url", "display_name") -) +ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name")) # "members" points to a truncated list of (user_id, event_id) tuples for users of # a given membership type, suitable for use in calculating heroes for a room. # "count" points to the total numberr of users of a given membership type. -MemberSummary = namedtuple( - "MemberSummary", ("members", "count") -) +MemberSummary = namedtuple("MemberSummary", ("members", "count")) _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" @@ -67,12 +61,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): """Returns the set of all hosts currently in the room """ user_ids = yield self.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate, + room_id, on_invalidate=cache_context.invalidate ) hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids) defer.returnValue(hosts) - @cachedInlineCallbacks(max_entries=100000, iterable=True) + @cached(max_entries=100000, iterable=True) def get_users_in_room(self, room_id): def f(txn): sql = ( @@ -84,16 +78,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?" ) - txn.execute(sql, (room_id, Membership.JOIN,)) + txn.execute(sql, (room_id, Membership.JOIN)) return [to_ascii(r[0]) for r in txn] - start_time = self._clock.time_msec() - result = yield self.runInteraction("get_users_in_room", f) - end_time = self._clock.time_msec() - logger.info( - "Fetched room membership for %s (%i users) in %i ms", - room_id, len(result), end_time - start_time, - ) - defer.returnValue(result) + + return self.runInteraction("get_users_in_room", f) @cached(max_entries=100000) def get_room_summary(self, room_id): @@ -163,9 +151,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): A deferred list of RoomsForUser. """ - return self.get_rooms_for_user_where_membership_is( - user_id, [Membership.INVITE] - ) + return self.get_rooms_for_user_where_membership_is(user_id, [Membership.INVITE]) @defer.inlineCallbacks def get_invite_for_user_in_room(self, user_id, room_id): @@ -203,11 +189,13 @@ class RoomMemberWorkerStore(EventsWorkerStore): return self.runInteraction( "get_rooms_for_user_where_membership_is", self._get_rooms_for_user_where_membership_is_txn, - user_id, membership_list + user_id, + membership_list, ) - def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, - membership_list): + def _get_rooms_for_user_where_membership_is_txn( + self, txn, user_id, membership_list + ): do_invite = Membership.INVITE in membership_list membership_list = [m for m in membership_list if m != Membership.INVITE] @@ -234,9 +222,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) % (where_clause,) txn.execute(sql, args) - results = [ - RoomsForUser(**r) for r in self.cursor_to_dict(txn) - ] + results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)] if do_invite: sql = ( @@ -248,13 +234,16 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) txn.execute(sql, (user_id,)) - results.extend(RoomsForUser( - room_id=r["room_id"], - sender=r["inviter"], - event_id=r["event_id"], - stream_ordering=r["stream_ordering"], - membership=Membership.INVITE, - ) for r in self.cursor_to_dict(txn)) + results.extend( + RoomsForUser( + room_id=r["room_id"], + sender=r["inviter"], + event_id=r["event_id"], + stream_ordering=r["stream_ordering"], + membership=Membership.INVITE, + ) + for r in self.cursor_to_dict(txn) + ) return results @@ -271,19 +260,21 @@ class RoomMemberWorkerStore(EventsWorkerStore): of the most recent join for that user and room. """ rooms = yield self.get_rooms_for_user_where_membership_is( - user_id, membership_list=[Membership.JOIN], + user_id, membership_list=[Membership.JOIN] + ) + defer.returnValue( + frozenset( + GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering) + for r in rooms + ) ) - defer.returnValue(frozenset( - GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering) - for r in rooms - )) @defer.inlineCallbacks def get_rooms_for_user(self, user_id, on_invalidate=None): """Returns a set of room_ids the user is currently joined to """ rooms = yield self.get_rooms_for_user_with_stream_ordering( - user_id, on_invalidate=on_invalidate, + user_id, on_invalidate=on_invalidate ) defer.returnValue(frozenset(r.room_id for r in rooms)) @@ -292,13 +283,13 @@ class RoomMemberWorkerStore(EventsWorkerStore): """Returns the set of users who share a room with `user_id` """ room_ids = yield self.get_rooms_for_user( - user_id, on_invalidate=cache_context.invalidate, + user_id, on_invalidate=cache_context.invalidate ) user_who_share_room = set() for room_id in room_ids: user_ids = yield self.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate, + room_id, on_invalidate=cache_context.invalidate ) user_who_share_room.update(user_ids) @@ -316,9 +307,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): current_state_ids = yield context.get_current_state_ids(self) result = yield self._get_joined_users_from_context( - event.room_id, state_group, current_state_ids, - event=event, - context=context, + event.room_id, state_group, current_state_ids, event=event, context=context ) defer.returnValue(result) @@ -332,13 +321,21 @@ class RoomMemberWorkerStore(EventsWorkerStore): state_group = object() return self._get_joined_users_from_context( - room_id, state_group, state_entry.state, context=state_entry, + room_id, state_group, state_entry.state, context=state_entry ) - @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, - max_entries=100000) - def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, - cache_context, event=None, context=None): + @cachedInlineCallbacks( + num_args=2, cache_context=True, iterable=True, max_entries=100000 + ) + def _get_joined_users_from_context( + self, + room_id, + state_group, + current_state_ids, + cache_context, + event=None, + context=None, + ): # We don't use `state_group`, it's there so that we can cache based # on it. However, it's important that it's never None, since two current_states # with a state_group of None are likely to be different. @@ -378,9 +375,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # the hit ratio counts. After all, we don't populate the cache if we # miss it here event_map = self._get_events_from_cache( - member_event_ids, - allow_rejected=False, - update_metrics=False, + member_event_ids, allow_rejected=False, update_metrics=False ) missing_member_event_ids = [] @@ -404,21 +399,21 @@ class RoomMemberWorkerStore(EventsWorkerStore): table="room_memberships", column="event_id", iterable=missing_member_event_ids, - retcols=('user_id', 'display_name', 'avatar_url',), - keyvalues={ - "membership": Membership.JOIN, - }, + retcols=('user_id', 'display_name', 'avatar_url'), + keyvalues={"membership": Membership.JOIN}, batch_size=500, desc="_get_joined_users_from_context", ) - users_in_room.update({ - to_ascii(row["user_id"]): ProfileInfo( - avatar_url=to_ascii(row["avatar_url"]), - display_name=to_ascii(row["display_name"]), - ) - for row in rows - }) + users_in_room.update( + { + to_ascii(row["user_id"]): ProfileInfo( + avatar_url=to_ascii(row["avatar_url"]), + display_name=to_ascii(row["display_name"]), + ) + for row in rows + } + ) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -512,7 +507,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): state_group = object() return self._get_joined_hosts( - room_id, state_group, state_entry.state, state_entry=state_entry, + room_id, state_group, state_entry.state, state_entry=state_entry ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) @@ -538,6 +533,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): """Returns whether user_id has elected to discard history for room_id. Returns False if they have since re-joined.""" + def f(txn): sql = ( "SELECT" @@ -554,6 +550,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (user_id, room_id)) rows = txn.fetchall() return rows[0][0] + count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) @@ -582,13 +579,14 @@ class RoomMemberStore(RoomMemberWorkerStore): "avatar_url": event.content.get("avatar_url", None), } for event in events - ] + ], ) for event in events: txn.call_after( self._membership_stream_cache.entity_has_changed, - event.state_key, event.internal_metadata.stream_ordering + event.state_key, + event.internal_metadata.stream_ordering, ) txn.call_after( self.get_invited_rooms_for_user.invalidate, (event.state_key,) @@ -614,7 +612,7 @@ class RoomMemberStore(RoomMemberWorkerStore): "inviter": event.sender, "room_id": event.room_id, "stream_id": event.internal_metadata.stream_ordering, - } + }, ) else: sql = ( @@ -623,12 +621,15 @@ class RoomMemberStore(RoomMemberWorkerStore): " AND replaced_by is NULL" ) - txn.execute(sql, ( - event.internal_metadata.stream_ordering, - event.event_id, - event.room_id, - event.state_key, - )) + txn.execute( + sql, + ( + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.state_key, + ), + ) @defer.inlineCallbacks def locally_reject_invite(self, user_id, room_id): @@ -639,18 +640,14 @@ class RoomMemberStore(RoomMemberWorkerStore): ) def f(txn, stream_ordering): - txn.execute(sql, ( - stream_ordering, - True, - room_id, - user_id, - )) + txn.execute(sql, (stream_ordering, True, room_id, user_id)) with self._stream_id_gen.get_next() as stream_ordering: yield self.runInteraction("locally_reject_invite", f, stream_ordering) def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" + def f(txn): sql = ( "UPDATE" @@ -664,9 +661,8 @@ class RoomMemberStore(RoomMemberWorkerStore): ) txn.execute(sql, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.did_forget, (user_id, room_id,), - ) + self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) + return self.runInteraction("forget_membership", f) @defer.inlineCallbacks @@ -681,7 +677,7 @@ class RoomMemberStore(RoomMemberWorkerStore): INSERT_CLUMP_SIZE = 1000 def add_membership_profile_txn(txn): - sql = (""" + sql = """ SELECT stream_ordering, event_id, events.room_id, event_json.json FROM events INNER JOIN event_json USING (event_id) @@ -690,7 +686,7 @@ class RoomMemberStore(RoomMemberWorkerStore): AND type = 'm.room.member' ORDER BY stream_ordering DESC LIMIT ? - """) + """ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) @@ -714,16 +710,14 @@ class RoomMemberStore(RoomMemberWorkerStore): avatar_url = content.get("avatar_url", None) if display_name or avatar_url: - to_update.append(( - display_name, avatar_url, event_id, room_id - )) + to_update.append((display_name, avatar_url, event_id, room_id)) - to_update_sql = (""" + to_update_sql = """ UPDATE room_memberships SET display_name = ?, avatar_url = ? WHERE event_id = ? AND room_id = ? - """) + """ for index in range(0, len(to_update), INSERT_CLUMP_SIZE): - clump = to_update[index:index + INSERT_CLUMP_SIZE] + clump = to_update[index : index + INSERT_CLUMP_SIZE] txn.executemany(to_update_sql, clump) progress = { @@ -796,7 +790,7 @@ class _JoinedHostsCache(object): self.hosts_to_joined_users.pop(host, None) else: joined_users = yield self.store.get_joined_users_from_state( - self.room_id, state_entry, + self.room_id, state_entry ) self.hosts_to_joined_users = {} diff --git a/tests/config/test_server.py b/tests/config/test_server.py new file mode 100644
index 0000000000..f5836d73ac --- /dev/null +++ b/tests/config/test_server.py
@@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.config.server import is_threepid_reserved + +from tests import unittest + + +class ServerConfigTestCase(unittest.TestCase): + + def test_is_threepid_reserved(self): + user1 = {'medium': 'email', 'address': 'user1@example.com'} + user2 = {'medium': 'email', 'address': 'user2@example.com'} + user3 = {'medium': 'email', 'address': 'user3@example.com'} + user1_msisdn = {'medium': 'msisdn', 'address': '447700000000'} + config = [user1, user2] + + self.assertTrue(is_threepid_reserved(config, user1)) + self.assertFalse(is_threepid_reserved(config, user3)) + self.assertFalse(is_threepid_reserved(config, user1_msisdn)) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index fc2b646ba2..94c6080e34 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py
@@ -16,7 +16,11 @@ from mock import Mock, call -from synapse.api.constants import PresenceState +from signedjson.key import generate_signing_key + +from synapse.api.constants import EventTypes, Membership, PresenceState +from synapse.events import room_version_to_event_format +from synapse.events.builder import EventBuilder from synapse.handlers.presence import ( FEDERATION_PING_INTERVAL, FEDERATION_TIMEOUT, @@ -26,7 +30,9 @@ from synapse.handlers.presence import ( handle_timeout, handle_update, ) +from synapse.rest.client.v1 import room from synapse.storage.presence import UserPresenceState +from synapse.types import UserID, get_domain_from_id from tests import unittest @@ -405,3 +411,171 @@ class PresenceTimeoutTestCase(unittest.TestCase): self.assertIsNotNone(new_state) self.assertEquals(state, new_state) + + +class PresenceJoinTestCase(unittest.HomeserverTestCase): + """Tests remote servers get told about presence of users in the room when + they join and when new local users join. + """ + + user_id = "@test:server" + + servlets = [room.register_servlets] + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver( + "server", http_client=None, + federation_sender=Mock(), + ) + return hs + + def prepare(self, reactor, clock, hs): + self.federation_sender = hs.get_federation_sender() + self.event_builder_factory = hs.get_event_builder_factory() + self.federation_handler = hs.get_handlers().federation_handler + self.presence_handler = hs.get_presence_handler() + + # self.event_builder_for_2 = EventBuilderFactory(hs) + # self.event_builder_for_2.hostname = "test2" + + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.auth = hs.get_auth() + + # We don't actually check signatures in tests, so lets just create a + # random key to use. + self.random_signing_key = generate_signing_key("ver") + + def test_remote_joins(self): + # We advance time to something that isn't 0, as we use 0 as a special + # value. + self.reactor.advance(1000000000000) + + # Create a room with two local users + room_id = self.helper.create_room_as(self.user_id) + self.helper.join(room_id, "@test2:server") + + # Mark test2 as online, test will be offline with a last_active of 0 + self.presence_handler.set_state( + UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}, + ) + self.reactor.pump([0]) # Wait for presence updates to be handled + + # + # Test that a new server gets told about existing presence + # + + self.federation_sender.reset_mock() + + # Add a new remote server to the room + self._add_new_user(room_id, "@alice:server2") + + # We shouldn't have sent out any local presence *updates* + self.federation_sender.send_presence.assert_not_called() + + # When new server is joined we send it the local users presence states. + # We expect to only see user @test2:server, as @test:server is offline + # and has a zero last_active_ts + expected_state = self.get_success( + self.presence_handler.current_state_for_user("@test2:server") + ) + self.assertEqual(expected_state.state, PresenceState.ONLINE) + self.federation_sender.send_presence_to_destinations.assert_called_once_with( + destinations=["server2"], states=[expected_state] + ) + + # + # Test that only the new server gets sent presence and not existing servers + # + + self.federation_sender.reset_mock() + self._add_new_user(room_id, "@bob:server3") + + self.federation_sender.send_presence.assert_not_called() + self.federation_sender.send_presence_to_destinations.assert_called_once_with( + destinations=["server3"], states=[expected_state] + ) + + def test_remote_gets_presence_when_local_user_joins(self): + # We advance time to something that isn't 0, as we use 0 as a special + # value. + self.reactor.advance(1000000000000) + + # Create a room with one local users + room_id = self.helper.create_room_as(self.user_id) + + # Mark test as online + self.presence_handler.set_state( + UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE}, + ) + + # Mark test2 as online, test will be offline with a last_active of 0. + # Note we don't join them to the room yet + self.presence_handler.set_state( + UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}, + ) + + # Add servers to the room + self._add_new_user(room_id, "@alice:server2") + self._add_new_user(room_id, "@bob:server3") + + self.reactor.pump([0]) # Wait for presence updates to be handled + + # + # Test that when a local join happens remote servers get told about it + # + + self.federation_sender.reset_mock() + + # Join local user to room + self.helper.join(room_id, "@test2:server") + + self.reactor.pump([0]) # Wait for presence updates to be handled + + # We shouldn't have sent out any local presence *updates* + self.federation_sender.send_presence.assert_not_called() + + # We expect to only send test2 presence to server2 and server3 + expected_state = self.get_success( + self.presence_handler.current_state_for_user("@test2:server") + ) + self.assertEqual(expected_state.state, PresenceState.ONLINE) + self.federation_sender.send_presence_to_destinations.assert_called_once_with( + destinations=set(("server2", "server3")), + states=[expected_state] + ) + + def _add_new_user(self, room_id, user_id): + """Add new user to the room by creating an event and poking the federation API. + """ + + hostname = get_domain_from_id(user_id) + + room_version = self.get_success(self.store.get_room_version(room_id)) + + builder = EventBuilder( + state=self.state, + auth=self.auth, + store=self.store, + clock=self.clock, + hostname=hostname, + signing_key=self.random_signing_key, + format_version=room_version_to_event_format(room_version), + room_id=room_id, + type=EventTypes.Member, + sender=user_id, + state_key=user_id, + content={"membership": Membership.JOIN} + ) + + prev_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(room_id) + ) + + event = self.get_success(builder.build(prev_event_ids)) + + self.get_success(self.federation_handler.on_receive_pdu(hostname, event)) + + # Check that it was successfully persisted. + self.get_success(self.store.get_event(event.event_id)) + self.get_success(self.store.get_event(event.event_id)) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 524af4f8d1..1f72a2a04f 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py
@@ -56,7 +56,9 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): client = client_factory.buildProtocol(None) client.makeConnection(FakeTransport(server, reactor)) - server.makeConnection(FakeTransport(client, reactor)) + + self.server_to_client_transport = FakeTransport(client, reactor) + server.makeConnection(self.server_to_client_transport) def replicate(self): """Tell the master side of replication that something has happened, and then @@ -69,6 +71,24 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): master_result = self.get_success(getattr(self.master_store, method)(*args)) slaved_result = self.get_success(getattr(self.slaved_store, method)(*args)) if expected_result is not None: - self.assertEqual(master_result, expected_result) - self.assertEqual(slaved_result, expected_result) - self.assertEqual(master_result, slaved_result) + self.assertEqual( + master_result, + expected_result, + "Expected master result to be %r but was %r" % ( + expected_result, master_result + ), + ) + self.assertEqual( + slaved_result, + expected_result, + "Expected slave result to be %r but was %r" % ( + expected_result, slaved_result + ), + ) + self.assertEqual( + master_result, + slaved_result, + "Slave result %r does not match master result %r" % ( + slaved_result, master_result + ), + ) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 1688a741d1..65ecff3bd6 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py
@@ -11,11 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from canonicaljson import encode_canonical_json from synapse.events import FrozenEvent, _EventInternalMetadata from synapse.events.snapshot import EventContext +from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser @@ -26,6 +28,8 @@ USER_ID_2 = "@bright:blue" OUTLIER = {"outlier": True} ROOM_ID = "!room:blue" +logger = logging.getLogger(__name__) + def dict_equals(self, other): me = encode_canonical_json(self.get_pdu_json()) @@ -172,18 +176,142 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): {"highlight_count": 1, "notify_count": 2}, ) + def test_get_rooms_for_user_with_stream_ordering(self): + """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated + by rows in the events stream + """ + self.persist(type="m.room.create", key="", creator=USER_ID) + self.persist(type="m.room.member", key=USER_ID, membership="join") + self.replicate() + self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) + + j2 = self.persist( + type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" + ) + self.replicate() + self.check( + "get_rooms_for_user_with_stream_ordering", + (USER_ID_2,), + {(ROOM_ID, j2.internal_metadata.stream_ordering)}, + ) + + def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self): + """Check that current_state invalidation happens correctly with multiple events + in the persistence batch. + + This test attempts to reproduce a race condition between the event persistence + loop and a worker-based Sync handler. + + The problem occurred when the master persisted several events in one batch. It + only updates the current_state at the end of each batch, so the obvious thing + to do is then to issue a current_state_delta stream update corresponding to the + last stream_id in the batch. + + However, that raises the possibility that a worker will see the replication + notification for a join event before the current_state caches are invalidated. + + The test involves: + * creating a join and a message event for a user, and persisting them in the + same batch + + * controlling the replication stream so that updates are sent gradually + + * between each bunch of replication updates, check that we see a consistent + snapshot of the state. + """ + self.persist(type="m.room.create", key="", creator=USER_ID) + self.persist(type="m.room.member", key=USER_ID, membership="join") + self.replicate() + self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) + + # limit the replication rate + repl_transport = self.server_to_client_transport + repl_transport.autoflush = False + + # build the join and message events and persist them in the same batch. + logger.info("----- build test events ------") + j2, j2ctx = self.build_event( + type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" + ) + msg, msgctx = self.build_event() + self.get_success(self.master_store.persist_events([ + (j2, j2ctx), + (msg, msgctx), + ])) + self.replicate() + + event_source = RoomEventSource(self.hs) + event_source.store = self.slaved_store + current_token = self.get_success(event_source.get_current_key()) + + # gradually stream out the replication + while repl_transport.buffer: + logger.info("------ flush ------") + repl_transport.flush(30) + self.pump(0) + + prev_token = current_token + current_token = self.get_success(event_source.get_current_key()) + + # attempt to replicate the behaviour of the sync handler. + # + # First, we get a list of the rooms we are joined to + joined_rooms = self.get_success( + self.slaved_store.get_rooms_for_user_with_stream_ordering( + USER_ID_2, + ), + ) + + # Then, we get a list of the events since the last sync + membership_changes = self.get_success( + self.slaved_store.get_membership_changes_for_user( + USER_ID_2, prev_token, current_token, + ) + ) + + logger.info( + "%s->%s: joined_rooms=%r membership_changes=%r", + prev_token, + current_token, + joined_rooms, + membership_changes, + ) + + # the membership change is only any use to us if the room is in the + # joined_rooms list. + if membership_changes: + self.assertEqual( + joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)} + ) + event_id = 0 - def persist( + def persist(self, backfill=False, **kwargs): + """ + Returns: + synapse.events.FrozenEvent: The event that was persisted. + """ + event, context = self.build_event(**kwargs) + + if backfill: + self.get_success( + self.master_store.persist_events([(event, context)], backfilled=True) + ) + else: + self.get_success( + self.master_store.persist_event(event, context) + ) + + return event + + def build_event( self, sender=USER_ID, room_id=ROOM_ID, - type={}, + type="m.room.message", key=None, internal={}, state=None, - reset_state=False, - backfill=False, depth=None, prev_events=[], auth_events=[], @@ -192,10 +320,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): push_actions=[], **content ): - """ - Returns: - synapse.events.FrozenEvent: The event that was persisted. - """ + if depth is None: depth = self.event_id @@ -234,23 +359,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): ) else: state_handler = self.hs.get_state_handler() - context = self.get_success(state_handler.compute_event_context(event)) + context = self.get_success(state_handler.compute_event_context( + event + )) self.master_store.add_push_actions_to_staging( event.event_id, {user_id: actions for user_id, actions in push_actions} ) - - ordering = None - if backfill: - self.get_success( - self.master_store.persist_events([(event, context)], backfilled=True) - ) - else: - ordering, _ = self.get_success( - self.master_store.persist_event(event, context) - ) - - if ordering: - event.internal_metadata.stream_ordering = ordering - - return event + return event, context diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index 9aa9dfe82e..d5a99f6caa 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.replication.tcp.streams import ReceiptsStreamRow +from synapse.replication.tcp.streams._base import ReceiptsStreamRow from tests.replication.tcp.streams._base import BaseStreamTestCase diff --git a/tests/rest/client/v2_alpha/test_capabilities.py b/tests/rest/client/v2_alpha/test_capabilities.py
index d3d43970fb..bbfc77e829 100644 --- a/tests/rest/client/v2_alpha/test_capabilities.py +++ b/tests/rest/client/v2_alpha/test_capabilities.py
@@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.constants import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS +from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS from synapse.rest.client.v1 import admin, login from synapse.rest.client.v2_alpha import capabilities @@ -52,7 +52,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase): for room_version in capabilities['m.room_versions']['available'].keys(): self.assertTrue(room_version in KNOWN_ROOM_VERSIONS, "" + room_version) self.assertEqual( - DEFAULT_ROOM_VERSION, capabilities['m.room_versions']['default'] + DEFAULT_ROOM_VERSION.identifier, capabilities['m.room_versions']['default'] ) def test_get_change_password_capabilities(self): diff --git a/tests/server.py b/tests/server.py
index ea26dea623..8f89f4a83d 100644 --- a/tests/server.py +++ b/tests/server.py
@@ -365,6 +365,7 @@ class FakeTransport(object): disconnected = False buffer = attr.ib(default=b'') producer = attr.ib(default=None) + autoflush = attr.ib(default=True) def getPeer(self): return None @@ -415,31 +416,44 @@ class FakeTransport(object): def write(self, byt): self.buffer = self.buffer + byt - def _write(): - if not self.buffer: - # nothing to do. Don't write empty buffers: it upsets the - # TLSMemoryBIOProtocol - return - - if self.disconnected: - return - logger.info("%s->%s: %s", self._protocol, self.other, self.buffer) - - if getattr(self.other, "transport") is not None: - try: - self.other.dataReceived(self.buffer) - self.buffer = b"" - except Exception as e: - logger.warning("Exception writing to protocol: %s", e) - return - - self._reactor.callLater(0.0, _write) - # always actually do the write asynchronously. Some protocols (notably the # TLSMemoryBIOProtocol) get very confused if a read comes back while they are # still doing a write. Doing a callLater here breaks the cycle. - self._reactor.callLater(0.0, _write) + if self.autoflush: + self._reactor.callLater(0.0, self.flush) def writeSequence(self, seq): for x in seq: self.write(x) + + def flush(self, maxbytes=None): + if not self.buffer: + # nothing to do. Don't write empty buffers: it upsets the + # TLSMemoryBIOProtocol + return + + if self.disconnected: + return + + if getattr(self.other, "transport") is None: + # the other has no transport yet; reschedule + if self.autoflush: + self._reactor.callLater(0.0, self.flush) + return + + if maxbytes is not None: + to_write = self.buffer[:maxbytes] + else: + to_write = self.buffer + + logger.info("%s->%s: %s", self._protocol, self.other, to_write) + + try: + self.other.dataReceived(to_write) + except Exception as e: + logger.warning("Exception writing to protocol: %s", e) + return + + self.buffer = self.buffer[len(to_write):] + if self.buffer and self.autoflush: + self._reactor.callLater(0.0, self.flush) diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py
index 9a5c816927..f448b01326 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py
@@ -19,7 +19,8 @@ from six.moves import zip import attr -from synapse.api.constants import EventTypes, JoinRules, Membership, RoomVersions +from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.api.room_versions import RoomVersions from synapse.event_auth import auth_types_for_event from synapse.events import FrozenEvent from synapse.state.v2 import lexicographical_topological_sort, resolve_events_with_store @@ -539,7 +540,7 @@ class StateTestCase(unittest.TestCase): state_before = dict(state_at_event[prev_events[0]]) else: state_d = resolve_events_with_store( - RoomVersions.V2, + RoomVersions.V2.identifier, [state_at_event[n] for n in prev_events], event_map=event_map, state_res_store=TestStateResolutionStore(event_map), @@ -686,7 +687,7 @@ class SimpleParamStateTestCase(unittest.TestCase): # Test that we correctly handle passing `None` as the event_map state_d = resolve_events_with_store( - RoomVersions.V2, + RoomVersions.V2.identifier, [self.state_at_bob, self.state_at_charlie], event_map=None, state_res_store=TestStateResolutionStore(self.event_map), diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py
index 3957561b1e..0fc5019e9f 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py
@@ -18,7 +18,8 @@ from mock import Mock from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership, RoomVersions +from synapse.api.constants import EventTypes, Membership +from synapse.api.room_versions import RoomVersions from synapse.types import RoomID, UserID from tests import unittest @@ -51,7 +52,7 @@ class RedactionTestCase(unittest.TestCase): ): content = {"membership": membership} content.update(extra_content) - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": EventTypes.Member, @@ -74,7 +75,7 @@ class RedactionTestCase(unittest.TestCase): def inject_message(self, room, user, body): self.depth += 1 - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": EventTypes.Message, @@ -95,7 +96,7 @@ class RedactionTestCase(unittest.TestCase): @defer.inlineCallbacks def inject_redaction(self, room, event_id, user, reason): - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": EventTypes.Redaction, diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index 7fa2f4fd70..063387863e 100644 --- a/tests/storage/test_roommember.py +++ b/tests/storage/test_roommember.py
@@ -18,7 +18,8 @@ from mock import Mock from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership, RoomVersions +from synapse.api.constants import EventTypes, Membership +from synapse.api.room_versions import RoomVersions from synapse.types import RoomID, UserID from tests import unittest @@ -49,7 +50,7 @@ class RoomMemberStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def inject_room_member(self, room, user, membership, replaces_state=None): - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": EventTypes.Member, diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index 99cd3e09eb..78e260a7fa 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py
@@ -17,7 +17,8 @@ import logging from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership, RoomVersions +from synapse.api.constants import EventTypes, Membership +from synapse.api.room_versions import RoomVersions from synapse.storage.state import StateFilter from synapse.types import RoomID, UserID @@ -48,7 +49,7 @@ class StateStoreTestCase(tests.unittest.TestCase): @defer.inlineCallbacks def inject_state_event(self, room, sender, typ, state_key, content): - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": typ, diff --git a/tests/test_event_auth.py b/tests/test_event_auth.py
index 7ee318e4e8..4c8f87e958 100644 --- a/tests/test_event_auth.py +++ b/tests/test_event_auth.py
@@ -16,8 +16,8 @@ import unittest from synapse import event_auth -from synapse.api.constants import RoomVersions from synapse.api.errors import AuthError +from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent @@ -37,7 +37,7 @@ class EventAuthTestCase(unittest.TestCase): # creator should be able to send state event_auth.check( - RoomVersions.V1, _random_state_event(creator), auth_events, + RoomVersions.V1.identifier, _random_state_event(creator), auth_events, do_sig_check=False, ) @@ -45,7 +45,7 @@ class EventAuthTestCase(unittest.TestCase): self.assertRaises( AuthError, event_auth.check, - RoomVersions.V1, + RoomVersions.V1.identifier, _random_state_event(joiner), auth_events, do_sig_check=False, @@ -74,7 +74,7 @@ class EventAuthTestCase(unittest.TestCase): self.assertRaises( AuthError, event_auth.check, - RoomVersions.V1, + RoomVersions.V1.identifier, _random_state_event(pleb), auth_events, do_sig_check=False, @@ -82,7 +82,7 @@ class EventAuthTestCase(unittest.TestCase): # king should be able to send state event_auth.check( - RoomVersions.V1, _random_state_event(king), auth_events, + RoomVersions.V1.identifier, _random_state_event(king), auth_events, do_sig_check=False, ) diff --git a/tests/test_state.py b/tests/test_state.py
index e20c33322a..03e4810c2e 100644 --- a/tests/test_state.py +++ b/tests/test_state.py
@@ -18,7 +18,8 @@ from mock import Mock from twisted.internet import defer from synapse.api.auth import Auth -from synapse.api.constants import EventTypes, Membership, RoomVersions +from synapse.api.constants import EventTypes, Membership +from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent from synapse.state import StateHandler, StateResolutionHandler @@ -118,7 +119,7 @@ class StateGroupStore(object): self._event_to_state_group[event_id] = state_group def get_room_version(self, room_id): - return RoomVersions.V1 + return RoomVersions.V1.identifier class DictObj(dict): diff --git a/tests/test_visibility.py b/tests/test_visibility.py
index 455db9f276..3bdb500514 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py
@@ -17,7 +17,7 @@ import logging from twisted.internet import defer from twisted.internet.defer import succeed -from synapse.api.constants import RoomVersions +from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent from synapse.visibility import filter_events_for_server @@ -124,7 +124,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase): @defer.inlineCallbacks def inject_visibility(self, user_id, visibility): content = {"history_visibility": visibility} - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": "m.room.history_visibility", @@ -145,7 +145,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase): def inject_room_member(self, user_id, membership="join", extra_content={}): content = {"membership": membership} content.update(extra_content) - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": "m.room.member", @@ -167,7 +167,7 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase): def inject_message(self, user_id, content=None): if content is None: content = {"body": "testytest", "msgtype": "m.text"} - builder = self.event_builder_factory.new( + builder = self.event_builder_factory.for_room_version( RoomVersions.V1, { "type": "m.room.message", diff --git a/tests/utils.py b/tests/utils.py
index 615b9f8cca..cb75514851 100644 --- a/tests/utils.py +++ b/tests/utils.py
@@ -27,8 +27,9 @@ from six.moves.urllib import parse as urlparse from twisted.internet import defer, reactor -from synapse.api.constants import EventTypes, RoomVersions +from synapse.api.constants import EventTypes from synapse.api.errors import CodeMessageException, cs_error +from synapse.api.room_versions import RoomVersions from synapse.config.homeserver import HomeServerConfig from synapse.federation.transport import server as federation_server from synapse.http.server import HttpServer @@ -671,7 +672,7 @@ def create_room(hs, room_id, creator_id): event_builder_factory = hs.get_event_builder_factory() event_creation_handler = hs.get_event_creation_handler() - builder = event_builder_factory.new( + builder = event_builder_factory.for_room_version( RoomVersions.V1, { "type": EventTypes.Create,