summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-08-31 14:53:31 +0100
committerBrendan Abolivier <babolivier@matrix.org>2021-08-31 14:53:31 +0100
commit1d4f5c34d86cc1b2afaf72c4b176469d3004724d (patch)
treebffbc001eac036be46fd50c0a0b67c94b409539e /synapse/replication
parentMerge tag 'v1.32.2' into babolivier/dinsic_1.41.0 (diff)
parent 1.33.0 (diff)
downloadsynapse-1d4f5c34d86cc1b2afaf72c4b176469d3004724d.tar.xz
Merge tag 'v1.33.0' into babolivier/dinsic_1.41.0
Synapse 1.33.0 (2021-05-05)
===========================

Features
--------

- Build Debian packages for Ubuntu 21.04 (Hirsute Hippo). ([\#9909](https://github.com/matrix-org/synapse/issues/9909))

Synapse 1.33.0rc2 (2021-04-29)
==============================

Bugfixes
--------

- Fix tight loop when handling presence replication when using workers. Introduced in v1.33.0rc1. ([\#9900](https://github.com/matrix-org/synapse/issues/9900))

Synapse 1.33.0rc1 (2021-04-28)
==============================

Features
--------

- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))

Bugfixes
--------

- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))

Improved Documentation
----------------------

- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))

Internal Changes
----------------

- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/__init__.py1
-rw-r--r--synapse/replication/http/__init__.py1
-rw-r--r--synapse/replication/http/_base.py6
-rw-r--r--synapse/replication/http/account_data.py1
-rw-r--r--synapse/replication/http/devices.py1
-rw-r--r--synapse/replication/http/federation.py1
-rw-r--r--synapse/replication/http/login.py1
-rw-r--r--synapse/replication/http/membership.py1
-rw-r--r--synapse/replication/http/presence.py1
-rw-r--r--synapse/replication/http/push.py1
-rw-r--r--synapse/replication/http/register.py1
-rw-r--r--synapse/replication/http/send_event.py1
-rw-r--r--synapse/replication/http/streams.py1
-rw-r--r--synapse/replication/slave/__init__.py1
-rw-r--r--synapse/replication/slave/storage/__init__.py1
-rw-r--r--synapse/replication/slave/storage/_base.py1
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py1
-rw-r--r--synapse/replication/slave/storage/account_data.py1
-rw-r--r--synapse/replication/slave/storage/appservice.py1
-rw-r--r--synapse/replication/slave/storage/client_ips.py1
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py1
-rw-r--r--synapse/replication/slave/storage/devices.py1
-rw-r--r--synapse/replication/slave/storage/directory.py1
-rw-r--r--synapse/replication/slave/storage/events.py1
-rw-r--r--synapse/replication/slave/storage/filtering.py1
-rw-r--r--synapse/replication/slave/storage/groups.py1
-rw-r--r--synapse/replication/slave/storage/keys.py1
-rw-r--r--synapse/replication/slave/storage/presence.py51
-rw-r--r--synapse/replication/slave/storage/profile.py1
-rw-r--r--synapse/replication/slave/storage/push_rule.py1
-rw-r--r--synapse/replication/slave/storage/pushers.py1
-rw-r--r--synapse/replication/slave/storage/receipts.py1
-rw-r--r--synapse/replication/slave/storage/registration.py1
-rw-r--r--synapse/replication/slave/storage/room.py1
-rw-r--r--synapse/replication/slave/storage/transactions.py1
-rw-r--r--synapse/replication/tcp/__init__.py1
-rw-r--r--synapse/replication/tcp/client.py233
-rw-r--r--synapse/replication/tcp/commands.py1
-rw-r--r--synapse/replication/tcp/external_cache.py1
-rw-r--r--synapse/replication/tcp/handler.py19
-rw-r--r--synapse/replication/tcp/protocol.py4
-rw-r--r--synapse/replication/tcp/redis.py1
-rw-r--r--synapse/replication/tcp/resource.py1
-rw-r--r--synapse/replication/tcp/streams/__init__.py4
-rw-r--r--synapse/replication/tcp/streams/_base.py42
-rw-r--r--synapse/replication/tcp/streams/events.py1
-rw-r--r--synapse/replication/tcp/streams/federation.py1
47 files changed, 285 insertions, 114 deletions
diff --git a/synapse/replication/__init__.py b/synapse/replication/__init__.py

index b7df13c9ee..f43a360a80 100644 --- a/synapse/replication/__init__.py +++ b/synapse/replication/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index cb4a52dbe9..ba8114ac9e 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index b7aa0c280f..5685cf2121 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -159,7 +158,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): def make_client(cls, hs): """Create a client that makes requests. - Returns a callable that accepts the same parameters as `_serialize_payload`. + Returns a callable that accepts the same parameters as + `_serialize_payload`, and also accepts an optional `instance_name` + parameter to specify which instance to hit (the instance must be in + the `instance_map` config). """ clock = hs.get_clock() client = hs.get_simple_http_client() diff --git a/synapse/replication/http/account_data.py b/synapse/replication/http/account_data.py
index 60899b6ad6..70e951af63 100644 --- a/synapse/replication/http/account_data.py +++ b/synapse/replication/http/account_data.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 807b85d2e1..5a5818ef61 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 82ea3b895f..79cadb7b57 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index 4ec1bfa6ea..c2e8c00293 100644 --- a/synapse/replication/http/login.py +++ b/synapse/replication/http/login.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 2812ac12fc..bd03030b4b 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
index bc9aa82cb4..f25307620d 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
index 054ed64d34..139427cb1f 100644 --- a/synapse/replication/http/push.py +++ b/synapse/replication/http/push.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 73d7477854..d6dd7242eb 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index a4c5b44292..fae5ffa451 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 309159e304..9afa147d00 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/__init__.py b/synapse/replication/slave/__init__.py
index b7df13c9ee..f43a360a80 100644 --- a/synapse/replication/slave/__init__.py +++ b/synapse/replication/slave/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py
index b7df13c9ee..f43a360a80 100644 --- a/synapse/replication/slave/storage/__init__.py +++ b/synapse/replication/slave/storage/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 693c9ab901..faa99387a7 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
index 0d39a93ed2..2cb7489047 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
index 21afe5f155..ee74ee7d85 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 0f8d7037bd..29f50c0add 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 0f5b7adef7..8730966380 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 1260f6d141..e940751084 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index e0d86240dd..70207420a6 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 1945bcf9a8..71fde0c96c 100644 --- a/synapse/replication/slave/storage/directory.py +++ b/synapse/replication/slave/storage/directory.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index fbffe6d85c..d4d3f8c448 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 6a23252861..37875bc973 100644 --- a/synapse/replication/slave/storage/filtering.py +++ b/synapse/replication/slave/storage/filtering.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 30955bcbfe..e9bdc38470 100644 --- a/synapse/replication/slave/storage/groups.py +++ b/synapse/replication/slave/storage/groups.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index 961579751c..a00b38c512 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py deleted file mode 100644
index 55620c03d8..0000000000 --- a/synapse/replication/slave/storage/presence.py +++ /dev/null
@@ -1,51 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.replication.tcp.streams import PresenceStream -from synapse.storage import DataStore -from synapse.storage.database import DatabasePool -from synapse.storage.databases.main.presence import PresenceStore -from synapse.util.caches.stream_change_cache import StreamChangeCache - -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker - - -class SlavedPresenceStore(BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id") - - self._presence_on_startup = self._get_active_presence(db_conn) # type: ignore - - self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() - ) - - _get_active_presence = DataStore._get_active_presence - take_presence_startup_info = DataStore.take_presence_startup_info - _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] - get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] - - def get_current_presence_token(self): - return self._presence_id_gen.get_current_token() - - def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == PresenceStream.NAME: - self._presence_id_gen.advance(instance_name, token) - for row in rows: - self.presence_stream_cache.entity_has_changed(row.user_id, token) - self._get_presence_for_user.invalidate((row.user_id,)) - return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
index f85b20a071..99f4a22642 100644 --- a/synapse/replication/slave/storage/profile.py +++ b/synapse/replication/slave/storage/profile.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index de904c943c..4d5f862862 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 93161c3dfb..2672a2c94b 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index 3dfdd9961d..3826b87dec 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index a40f064e2b..5dae35a960 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 109ac6bea1..8cc6de3f46 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 2091ac0df6..a59e543924 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py
index 1b8718b11d..1fa60af8e6 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3455839d67..4f3c6a18b6 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,22 +14,35 @@ """A replication client for use by synapse workers. """ import logging -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple from twisted.internet.defer import Deferred from twisted.internet.protocol import ReconnectingClientFactory from synapse.api.constants import EventTypes +from synapse.federation import send_queue +from synapse.federation.sender import FederationSender from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol -from synapse.replication.tcp.streams import TypingStream +from synapse.replication.tcp.streams import ( + AccountDataStream, + DeviceListsStream, + GroupServerStream, + PushersStream, + PushRulesStream, + ReceiptsStream, + TagAccountDataStream, + ToDeviceStream, + TypingStream, +) from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, EventsStreamRow, ) -from synapse.types import PersistedEventPosition, UserID -from synapse.util.async_helpers import timeout_deferred +from synapse.types import PersistedEventPosition, ReadReceipt, UserID +from synapse.util.async_helpers import Linearizer, timeout_deferred from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -106,6 +118,14 @@ class ReplicationDataHandler: self._instance_name = hs.get_instance_name() self._typing_handler = hs.get_typing_handler() + self._notify_pushers = hs.config.start_pushers + self._pusher_pool = hs.get_pusherpool() + self._presence_handler = hs.get_presence_handler() + + self.send_handler = None # type: Optional[FederationSenderHandler] + if hs.should_send_federation(): + self.send_handler = FederationSenderHandler(hs) + # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. self._streams_to_waiters = {} # type: Dict[str, List[Tuple[int, Deferred]]] @@ -126,13 +146,51 @@ class ReplicationDataHandler: """ self.store.process_replication_rows(stream_name, instance_name, token, rows) + if self.send_handler: + await self.send_handler.process_replication_rows(stream_name, token, rows) + if stream_name == TypingStream.NAME: self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows] ) - - if stream_name == EventsStream.NAME: + elif stream_name == PushRulesStream.NAME: + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows] + ) + elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME): + self.notifier.on_new_event( + "account_data_key", token, users=[row.user_id for row in rows] + ) + elif stream_name == ReceiptsStream.NAME: + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows] + ) + await self._pusher_pool.on_new_receipts( + token, token, {row.room_id for row in rows} + ) + elif stream_name == ToDeviceStream.NAME: + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event("to_device_key", token, users=entities) + elif stream_name == DeviceListsStream.NAME: + all_room_ids = set() # type: Set[str] + for row in rows: + if row.entity.startswith("@"): + room_ids = await self.store.get_rooms_for_user(row.entity) + all_room_ids.update(room_ids) + self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) + elif stream_name == GroupServerStream.NAME: + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows] + ) + elif stream_name == PushersStream.NAME: + for row in rows: + if row.deleted: + self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + await self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: @@ -160,6 +218,10 @@ class ReplicationDataHandler: membership=row.data.membership, ) + await self._presence_handler.process_replication_rows( + stream_name, instance_name, token, rows + ) + # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is # greater than the received row position. @@ -191,7 +253,7 @@ class ReplicationDataHandler: waiting_list[:] = waiting_list[index_of_first_deferred_not_called:] async def on_position(self, stream_name: str, instance_name: str, token: int): - self.store.process_replication_rows(stream_name, instance_name, token, []) + await self.on_rdata(stream_name, instance_name, token, []) # We poke the generic "replication" notifier to wake anything up that # may be streaming. @@ -200,6 +262,11 @@ class ReplicationDataHandler: def on_remote_server_up(self, server: str): """Called when get a new REMOTE_SERVER_UP command.""" + # Let's wake up the transaction queue for the server in case we have + # pending stuff to send to it. + if self.send_handler: + self.send_handler.wake_destination(server) + async def wait_for_stream_position( self, instance_name: str, stream_name: str, position: int ): @@ -236,3 +303,153 @@ class ReplicationDataHandler: logger.info( "Finished waiting for repl stream %r to reach %s", stream_name, position ) + + def stop_pusher(self, user_id, app_id, pushkey): + if not self._notify_pushers: + return + + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = self._pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + async def start_pusher(self, user_id, app_id, pushkey): + if not self._notify_pushers: + return + + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) + + +class FederationSenderHandler: + """Processes the fedration replication stream + + This class is only instantiate on the worker responsible for sending outbound + federation transactions. It receives rows from the replication stream and forwards + the appropriate entries to the FederationSender class. + """ + + def __init__(self, hs: "HomeServer"): + assert hs.should_send_federation() + + self.store = hs.get_datastore() + self._is_mine_id = hs.is_mine_id + self._hs = hs + + # We need to make a temporary value to ensure that mypy picks up the + # right type. We know we should have a federation sender instance since + # `should_send_federation` is True. + sender = hs.get_federation_sender() + assert isinstance(sender, FederationSender) + self.federation_sender = sender + + # Stores the latest position in the federation stream we've gotten up + # to. This is always set before we use it. + self.federation_position = None # type: Optional[int] + + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + + def wake_destination(self, server: str): + self.federation_sender.wake_destination(server) + + async def process_replication_rows(self, stream_name, token, rows): + # The federation stream contains things that we want to send out, e.g. + # presence, typing, etc. + if stream_name == "federation": + send_queue.process_rows_for_federation(self.federation_sender, rows) + await self.update_token(token) + + # ... and when new receipts happen + elif stream_name == ReceiptsStream.NAME: + await self._on_new_receipts(rows) + + # ... as well as device updates and messages + elif stream_name == DeviceListsStream.NAME: + # The entities are either user IDs (starting with '@') whose devices + # have changed, or remote servers that we need to tell about + # changes. + hosts = {row.entity for row in rows if not row.entity.startswith("@")} + for host in hosts: + self.federation_sender.send_device_messages(host) + + elif stream_name == ToDeviceStream.NAME: + # The to_device stream includes stuff to be pushed to both local + # clients and remote servers, so we ignore entities that start with + # '@' (since they'll be local users rather than destinations). + hosts = {row.entity for row in rows if not row.entity.startswith("@")} + for host in hosts: + self.federation_sender.send_device_messages(host) + + async def _on_new_receipts(self, rows): + """ + Args: + rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]): + new receipts to be processed + """ + for receipt in rows: + # we only want to send on receipts for our own users + if not self._is_mine_id(receipt.user_id): + continue + receipt_info = ReadReceipt( + receipt.room_id, + receipt.receipt_type, + receipt.user_id, + [receipt.event_id], + receipt.data, + ) + await self.federation_sender.send_read_receipt(receipt_info) + + async def update_token(self, token): + """Update the record of where we have processed to in the federation stream. + + Called after we have processed a an update received over replication. Sends + a FEDERATION_ACK back to the master, and stores the token that we have processed + in `federation_stream_position` so that we can restart where we left off. + """ + self.federation_position = token + + # We save and send the ACK to master asynchronously, so we don't block + # processing on persistence. We don't need to do this operation for + # every single RDATA we receive, we just need to do it periodically. + + if self._fed_position_linearizer.is_queued(None): + # There is already a task queued up to save and send the token, so + # no need to queue up another task. + return + + run_as_background_process("_save_and_send_ack", self._save_and_send_ack) + + async def _save_and_send_ack(self): + """Save the current federation position in the database and send an ACK + to master with where we're up to. + """ + # We should only be calling this once we've got a token. + assert self.federation_position is not None + + try: + # We linearize here to ensure we don't have races updating the token + # + # XXX this appears to be redundant, since the ReplicationCommandHandler + # has a linearizer which ensures that we only process one line of + # replication data at a time. Should we remove it, or is it doing useful + # service for robustness? Or could we replace it with an assertion that + # we're not being re-entered? + + with (await self._fed_position_linearizer.queue(None)): + # We persist and ack the same position, so we take a copy of it + # here as otherwise it can get modified from underneath us. + current_position = self.federation_position + + await self.store.update_federation_out_pos( + "federation", current_position + ) + + # We ACK this token over replication so that the master can drop + # its in memory queues + self._hs.get_tcp_replication().send_federation_ack(current_position) + except Exception: + logger.exception("Error updating federation stream position") diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 8abed1f52d..505d450e19 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index d89a36f25a..1a3b051e3c 100644 --- a/synapse/replication/tcp/external_cache.py +++ b/synapse/replication/tcp/external_cache.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index a8894beadf..7ced4c543c 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2020 The Matrix.org Foundation C.I.C. # @@ -56,6 +55,8 @@ from synapse.replication.tcp.streams import ( CachesStream, EventsStream, FederationStream, + PresenceFederationStream, + PresenceStream, ReceiptsStream, Stream, TagAccountDataStream, @@ -100,6 +101,10 @@ class ReplicationCommandHandler: self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + self._is_presence_writer = ( + hs.get_instance_name() in hs.config.worker.writers.presence + ) + self._streams = { stream.NAME: stream(hs) for stream in STREAMS_MAP.values() } # type: Dict[str, Stream] @@ -154,6 +159,14 @@ class ReplicationCommandHandler: continue + if isinstance(stream, (PresenceStream, PresenceFederationStream)): + # Only add PresenceStream as a source on the instance in charge + # of presence. + if self._is_presence_writer: + self._streams_to_replicate.append(stream) + + continue + # Only add any other streams if we're on master. if hs.config.worker_app is not None: continue @@ -351,7 +364,7 @@ class ReplicationCommandHandler: ) -> Optional[Awaitable[None]]: user_sync_counter.inc() - if self._is_master: + if self._is_presence_writer: return self._presence_handler.update_external_syncs_row( cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms ) @@ -361,7 +374,7 @@ class ReplicationCommandHandler: def on_CLEAR_USER_SYNC( self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand ) -> Optional[Awaitable[None]]: - if self._is_master: + if self._is_presence_writer: return self._presence_handler.update_external_syncs_clear(cmd.instance_id) else: return None diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index d10d574246..6e3705364f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -50,7 +49,7 @@ import fcntl import logging import struct from inspect import isawaitable -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, Collection, List, Optional from prometheus_client import Counter from zope.interface import Interface, implementer @@ -77,7 +76,6 @@ from synapse.replication.tcp.commands import ( ServerCommand, parse_command_from_line, ) -from synapse.types import Collection from synapse.util import Clock from synapse.util.stringutils import random_string diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 98bdeb0ec6..6a2c2655e4 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 2018f9f29e..bd47d84258 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index d1a61c3314..4c0023c68a 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2019 New Vector Ltd # @@ -31,6 +30,7 @@ from synapse.replication.tcp.streams._base import ( CachesStream, DeviceListsStream, GroupServerStream, + PresenceFederationStream, PresenceStream, PublicRoomsStream, PushersStream, @@ -51,6 +51,7 @@ STREAMS_MAP = { EventsStream, BackfillStream, PresenceStream, + PresenceFederationStream, TypingStream, ReceiptsStream, PushRulesStream, @@ -72,6 +73,7 @@ __all__ = [ "Stream", "BackfillStream", "PresenceStream", + "PresenceFederationStream", "TypingStream", "ReceiptsStream", "PushRulesStream", diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 3dfee76743..b03824925a 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2019 New Vector Ltd # @@ -273,15 +272,22 @@ class PresenceStream(Stream): NAME = "presence" ROW_TYPE = PresenceStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() - if hs.config.worker_app is None: - # on the master, query the presence handler + if hs.get_instance_name() in hs.config.worker.writers.presence: + # on the presence writer, query the presence handler presence_handler = hs.get_presence_handler() - update_function = presence_handler.get_all_presence_updates + + from synapse.handlers.presence import PresenceHandler + + assert isinstance(presence_handler, PresenceHandler) + + update_function = ( + presence_handler.get_all_presence_updates + ) # type: UpdateFunction else: - # Query master process + # Query presence writer process update_function = make_http_update_function(hs, self.NAME) super().__init__( @@ -291,6 +297,30 @@ class PresenceStream(Stream): ) +class PresenceFederationStream(Stream): + """A stream used to send ad hoc presence updates over federation. + + Streams the remote destination and the user ID of the presence state to + send. + """ + + @attr.s(slots=True, auto_attribs=True) + class PresenceFederationStreamRow: + destination: str + user_id: str + + NAME = "presence_federation" + ROW_TYPE = PresenceFederationStreamRow + + def __init__(self, hs: "HomeServer"): + federation_queue = hs.get_presence_handler().get_federation_queue() + super().__init__( + hs.get_instance_name(), + federation_queue.get_current_token, + federation_queue.get_replication_rows, + ) + + class TypingStream(Stream): TypingStreamRow = namedtuple( "TypingStreamRow", ("room_id", "user_ids") # str # list(str) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index fa5e37ba7b..e7e87bac92 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2019 New Vector Ltd # diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 9bb8e9e177..096a85d363 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py
@@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2019 New Vector Ltd #