diff --git a/Cargo.lock b/Cargo.lock
index 46c930ebd7..5c8f627fd7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -505,9 +505,9 @@ dependencies = [
[[package]]
name = "serde_json"
-version = "1.0.132"
+version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03"
+checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377"
dependencies = [
"itoa",
"memchr",
diff --git a/changelog.d/17872.doc b/changelog.d/17872.doc
new file mode 100644
index 0000000000..7f8b2d3495
--- /dev/null
+++ b/changelog.d/17872.doc
@@ -0,0 +1 @@
+Add OIDC example configuration for Forgejo (fork of Gitea).
diff --git a/changelog.d/17933.bugfix b/changelog.d/17933.bugfix
new file mode 100644
index 0000000000..8d30ac587e
--- /dev/null
+++ b/changelog.d/17933.bugfix
@@ -0,0 +1 @@
+Fix long-standing bug where read receipts could get overly delayed being sent over federation.
diff --git a/changelog.d/17936.misc b/changelog.d/17936.misc
new file mode 100644
index 0000000000..91d976fbd9
--- /dev/null
+++ b/changelog.d/17936.misc
@@ -0,0 +1 @@
+Fix incorrect comment in new schema delta.
diff --git a/changelog.d/17944.misc b/changelog.d/17944.misc
new file mode 100644
index 0000000000..a8a645103f
--- /dev/null
+++ b/changelog.d/17944.misc
@@ -0,0 +1 @@
+Raise setuptools_rust version cap to 1.10.2.
\ No newline at end of file
diff --git a/changelog.d/17945.misc b/changelog.d/17945.misc
new file mode 100644
index 0000000000..eeebb92169
--- /dev/null
+++ b/changelog.d/17945.misc
@@ -0,0 +1 @@
+Enable encrypted appservice related experimental features in the complement docker image.
diff --git a/changelog.d/17952.misc b/changelog.d/17952.misc
new file mode 100644
index 0000000000..84fc8bfc29
--- /dev/null
+++ b/changelog.d/17952.misc
@@ -0,0 +1 @@
+Return whether the user is suspended when querying the user account in the Admin API.
\ No newline at end of file
diff --git a/changelog.d/17953.doc b/changelog.d/17953.doc
new file mode 100644
index 0000000000..10f5a27ba9
--- /dev/null
+++ b/changelog.d/17953.doc
@@ -0,0 +1 @@
+Link to element-docker-demo from contrib/docker*.
diff --git a/contrib/docker/README.md b/contrib/docker/README.md
index 89c1518bd0..fdfa96795a 100644
--- a/contrib/docker/README.md
+++ b/contrib/docker/README.md
@@ -30,3 +30,6 @@ docker-compose up -d
### More information
For more information on required environment variables and mounts, see the main docker documentation at [/docker/README.md](../../docker/README.md)
+
+**For a more comprehensive Docker Compose example showcasing a full Matrix 2.0 stack, please see
+https://github.com/element-hq/element-docker-demo**
\ No newline at end of file
diff --git a/contrib/docker_compose_workers/README.md b/contrib/docker_compose_workers/README.md
index 81518f6ba1..16c8c26795 100644
--- a/contrib/docker_compose_workers/README.md
+++ b/contrib/docker_compose_workers/README.md
@@ -8,6 +8,9 @@ All examples and snippets assume that your Synapse service is called `synapse` i
An example Docker Compose file can be found [here](docker-compose.yaml).
+**For a more comprehensive Docker Compose example, showcasing a full Matrix 2.0 stack (originally based on this
+docker-compose.yaml), please see https://github.com/element-hq/element-docker-demo**
+
## Worker Service Examples in Docker Compose
In order to start the Synapse container as a worker, you must specify an `entrypoint` that loads both the `homeserver.yaml` and the configuration for the worker (`synapse-generic-worker-1.yaml` in the example below). You must also include the worker type in the environment variable `SYNAPSE_WORKER` or alternatively pass `-m synapse.app.generic_worker` as part of the `entrypoint` after `"/start.py", "run"`).
diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2
index b9334cc53b..9a74c617bc 100644
--- a/docker/complement/conf/workers-shared-extra.yaml.j2
+++ b/docker/complement/conf/workers-shared-extra.yaml.j2
@@ -104,6 +104,16 @@ experimental_features:
msc3967_enabled: true
# Expose a room summary for public rooms
msc3266_enabled: true
+ # Send to-device messages to application services
+ msc2409_to_device_messages_enabled: true
+ # Allow application services to masquerade devices
+ msc3202_device_masquerading: true
+ # Sending device list changes, one-time key counts and fallback key usage to application services
+ msc3202_transaction_extensions: true
+ # Proxy OTK claim requests to exclusive ASes
+ msc3983_appservice_otk_claims: true
+ # Proxy key queries to exclusive ASes
+ msc3984_appservice_key_query: true
server_notices:
system_mxid_localpart: _server
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index 96a2994b7b..a6e2e0a153 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -55,7 +55,8 @@ It returns a JSON body like the following:
}
],
"user_type": null,
- "locked": false
+ "locked": false,
+ "suspended": false
}
```
diff --git a/docs/openid.md b/docs/openid.md
index 7a10b1615b..5a3d7e9fba 100644
--- a/docs/openid.md
+++ b/docs/openid.md
@@ -336,6 +336,36 @@ but it has a `response_types_supported` which excludes "code" (which we rely on,
is even mentioned in their [documentation](https://developers.facebook.com/docs/facebook-login/manually-build-a-login-flow#login)),
so we have to disable discovery and configure the URIs manually.
+### Forgejo
+
+Forgejo is a fork of Gitea that can act as an OAuth2 provider.
+
+The implementation of OAuth2 is improved compared to Gitea, as it provides a correctly defined `subject_claim` and `scopes`.
+
+Synapse config:
+
+```yaml
+oidc_providers:
+ - idp_id: forgejo
+ idp_name: Forgejo
+ discover: false
+ issuer: "https://your-forgejo.com/"
+ client_id: "your-client-id" # TO BE FILLED
+ client_secret: "your-client-secret" # TO BE FILLED
+ client_auth_method: client_secret_post
+ scopes: ["openid", "profile", "email", "groups"]
+ authorization_endpoint: "https://your-forgejo.com/login/oauth/authorize"
+ token_endpoint: "https://your-forgejo.com/login/oauth/access_token"
+ userinfo_endpoint: "https://your-forgejo.com/api/v1/user"
+ user_mapping_provider:
+ config:
+ subject_claim: "sub"
+ picture_claim: "picture"
+ localpart_template: "{{ user.preferred_username }}"
+ display_name_template: "{{ user.name }}"
+ email_template: "{{ user.email }}"
+```
+
### GitHub
[GitHub][github-idp] is a bit special as it is not an OpenID Connect compliant provider, but
diff --git a/poetry.lock b/poetry.lock
index eece221095..14bb9ad172 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2515,33 +2515,33 @@ twisted = ["twisted"]
[[package]]
name = "tomli"
-version = "2.0.2"
+version = "2.1.0"
description = "A lil' TOML parser"
optional = false
python-versions = ">=3.8"
files = [
- {file = "tomli-2.0.2-py3-none-any.whl", hash = "sha256:2ebe24485c53d303f690b0ec092806a085f07af5a5aa1464f3931eec36caaa38"},
- {file = "tomli-2.0.2.tar.gz", hash = "sha256:d46d457a85337051c36524bc5349dd91b1877838e2979ac5ced3e710ed8a60ed"},
+ {file = "tomli-2.1.0-py3-none-any.whl", hash = "sha256:a5c57c3d1c56f5ccdf89f6523458f60ef716e210fc47c4cfb188c5ba473e0391"},
+ {file = "tomli-2.1.0.tar.gz", hash = "sha256:3f646cae2aec94e17d04973e4249548320197cfabdf130015d023de4b74d8ab8"},
]
[[package]]
name = "tornado"
-version = "6.4.1"
+version = "6.4.2"
description = "Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed."
-optional = true
+optional = false
python-versions = ">=3.8"
files = [
- {file = "tornado-6.4.1-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:163b0aafc8e23d8cdc3c9dfb24c5368af84a81e3364745ccb4427669bf84aec8"},
- {file = "tornado-6.4.1-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:6d5ce3437e18a2b66fbadb183c1d3364fb03f2be71299e7d10dbeeb69f4b2a14"},
- {file = "tornado-6.4.1-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e2e20b9113cd7293f164dc46fffb13535266e713cdb87bd2d15ddb336e96cfc4"},
- {file = "tornado-6.4.1-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8ae50a504a740365267b2a8d1a90c9fbc86b780a39170feca9bcc1787ff80842"},
- {file = "tornado-6.4.1-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:613bf4ddf5c7a95509218b149b555621497a6cc0d46ac341b30bd9ec19eac7f3"},
- {file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:25486eb223babe3eed4b8aecbac33b37e3dd6d776bc730ca14e1bf93888b979f"},
- {file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_i686.whl", hash = "sha256:454db8a7ecfcf2ff6042dde58404164d969b6f5d58b926da15e6b23817950fc4"},
- {file = "tornado-6.4.1-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a02a08cc7a9314b006f653ce40483b9b3c12cda222d6a46d4ac63bb6c9057698"},
- {file = "tornado-6.4.1-cp38-abi3-win32.whl", hash = "sha256:d9a566c40b89757c9aa8e6f032bcdb8ca8795d7c1a9762910c722b1635c9de4d"},
- {file = "tornado-6.4.1-cp38-abi3-win_amd64.whl", hash = "sha256:b24b8982ed444378d7f21d563f4180a2de31ced9d8d84443907a0a64da2072e7"},
- {file = "tornado-6.4.1.tar.gz", hash = "sha256:92d3ab53183d8c50f8204a51e6f91d18a15d5ef261e84d452800d4ff6fc504e9"},
+ {file = "tornado-6.4.2-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:e828cce1123e9e44ae2a50a9de3055497ab1d0aeb440c5ac23064d9e44880da1"},
+ {file = "tornado-6.4.2-cp38-abi3-macosx_10_9_x86_64.whl", hash = "sha256:072ce12ada169c5b00b7d92a99ba089447ccc993ea2143c9ede887e0937aa803"},
+ {file = "tornado-6.4.2-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1a017d239bd1bb0919f72af256a970624241f070496635784d9bf0db640d3fec"},
+ {file = "tornado-6.4.2-cp38-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c36e62ce8f63409301537222faffcef7dfc5284f27eec227389f2ad11b09d946"},
+ {file = "tornado-6.4.2-cp38-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca9eb02196e789c9cb5c3c7c0f04fb447dc2adffd95265b2c7223a8a615ccbf"},
+ {file = "tornado-6.4.2-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:304463bd0772442ff4d0f5149c6f1c2135a1fae045adf070821c6cdc76980634"},
+ {file = "tornado-6.4.2-cp38-abi3-musllinux_1_2_i686.whl", hash = "sha256:c82c46813ba483a385ab2a99caeaedf92585a1f90defb5693351fa7e4ea0bf73"},
+ {file = "tornado-6.4.2-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:932d195ca9015956fa502c6b56af9eb06106140d844a335590c1ec7f5277d10c"},
+ {file = "tornado-6.4.2-cp38-abi3-win32.whl", hash = "sha256:2876cef82e6c5978fde1e0d5b1f919d756968d5b4282418f3146b79b58556482"},
+ {file = "tornado-6.4.2-cp38-abi3-win_amd64.whl", hash = "sha256:908b71bf3ff37d81073356a5fadcc660eb10c1476ee6e2725588626ce7e5ca38"},
+ {file = "tornado-6.4.2.tar.gz", hash = "sha256:92bad5b4746e9879fd7bf1eb21dce4e3fc5128d71601f80005afa39237ad620b"},
]
[[package]]
diff --git a/pyproject.toml b/pyproject.toml
index 5fd1d7c198..eccce10455 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -370,7 +370,7 @@ tomli = ">=1.2.3"
# runtime errors caused by build system changes.
# We are happy to raise these upper bounds upon request,
# provided we check that it's safe to do so (i.e. that CI passes).
-requires = ["poetry-core>=1.1.0,<=1.9.1", "setuptools_rust>=1.3,<=1.8.1"]
+requires = ["poetry-core>=1.1.0,<=1.9.1", "setuptools_rust>=1.3,<=1.10.2"]
build-backend = "poetry.core.masonry.api"
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1888480881..17cddf18a3 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -140,7 +140,6 @@ from typing import (
Iterable,
List,
Optional,
- Set,
Tuple,
)
@@ -170,7 +169,13 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
+from synapse.types import (
+ JsonDict,
+ ReadReceipt,
+ RoomStreamToken,
+ StrCollection,
+ get_domain_from_id,
+)
from synapse.util import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
@@ -297,12 +302,10 @@ class _DestinationWakeupQueue:
# being woken up.
_MAX_TIME_IN_QUEUE = 30.0
- # The maximum duration in seconds between waking up consecutive destination
- # queues.
- _MAX_DELAY = 0.1
-
sender: "FederationSender" = attr.ib()
clock: Clock = attr.ib()
+ max_delay_s: int = attr.ib()
+
queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
processing: bool = attr.ib(default=False)
@@ -332,7 +335,7 @@ class _DestinationWakeupQueue:
# We also add an upper bound to the delay, to gracefully handle the
# case where the queue only has a few entries in it.
current_sleep_seconds = min(
- self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue)
)
while self.queue:
@@ -416,19 +419,14 @@ class FederationSender(AbstractFederationSender):
self._is_processing = False
self._last_poked_id = -1
- # map from room_id to a set of PerDestinationQueues which we believe are
- # awaiting a call to flush_read_receipts_for_room. The presence of an entry
- # here for a given room means that we are rate-limiting RR flushes to that room,
- # and that there is a pending call to _flush_rrs_for_room in the system.
- self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
+ self._external_cache = hs.get_external_cache()
- self._rr_txn_interval_per_room_ms = (
- 1000.0
- / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
+ rr_txn_interval_per_room_s = (
+ 1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
+ )
+ self._destination_wakeup_queue = _DestinationWakeupQueue(
+ self, self.clock, max_delay_s=rr_txn_interval_per_room_s
)
-
- self._external_cache = hs.get_external_cache()
- self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now(
@@ -745,37 +743,48 @@ class FederationSender(AbstractFederationSender):
# Some background on the rate-limiting going on here.
#
- # It turns out that if we attempt to send out RRs as soon as we get them from
- # a client, then we end up trying to do several hundred Hz of federation
- # transactions. (The number of transactions scales as O(N^2) on the size of a
- # room, since in a large room we have both more RRs coming in, and more servers
- # to send them to.)
+ # It turns out that if we attempt to send out RRs as soon as we get them
+ # from a client, then we end up trying to do several hundred Hz of
+ # federation transactions. (The number of transactions scales as O(N^2)
+ # on the size of a room, since in a large room we have both more RRs
+ # coming in, and more servers to send them to.)
#
- # This leads to a lot of CPU load, and we end up getting behind. The solution
- # currently adopted is as follows:
+ # This leads to a lot of CPU load, and we end up getting behind. The
+ # solution currently adopted is to differentiate between receipts and
+ # destinations we should immediately send to, and those we can trickle
+ # the receipts to.
#
- # The first receipt in a given room is sent out immediately, at time T0. Any
- # further receipts are, in theory, batched up for N seconds, where N is calculated
- # based on the number of servers in the room to achieve a transaction frequency
- # of around 50Hz. So, for example, if there were 100 servers in the room, then
- # N would be 100 / 50Hz = 2 seconds.
+ # The current logic is to send receipts out immediately if:
+ # - the room is "small", i.e. there's only N servers to send receipts
+ # to, and so sending out the receipts immediately doesn't cause too
+ # much load; or
+ # - the receipt is for an event that happened recently, as users
+ # notice if receipts are delayed when they know other users are
+ # currently reading the room; or
+ # - the receipt is being sent to the server that sent the event, so
+ # that users see receipts for their own receipts quickly.
#
- # Then, after T+N, we flush out any receipts that have accumulated, and restart
- # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
- # we stop the cycle and go back to the start.
+ # For destinations that we should delay sending the receipt to, we queue
+ # the receipts up to be sent in the next transaction, but don't trigger
+ # a new transaction to be sent. We then add the destination to the
+ # `DestinationWakeupQueue`, which will slowly iterate over each
+ # destination and trigger a new transaction to be sent.
#
- # However, in practice, it is often possible to flush out receipts earlier: in
- # particular, if we are sending a transaction to a given server anyway (for
- # example, because we have a PDU or a RR in another room to send), then we may
- # as well send out all of the pending RRs for that server. So it may be that
- # by the time we get to T+N, we don't actually have any RRs left to send out.
- # Nevertheless we continue to buffer up RRs for the room in question until we
- # reach the point that no RRs arrive between timer ticks.
+ # However, in practice, it is often possible to send out delayed
+ # receipts earlier: in particular, if we are sending a transaction to a
+ # given server anyway (for example, because we have a PDU or a RR in
+ # another room to send), then we may as well send out all of the pending
+ # RRs for that server. So it may be that by the time we get to waking up
+ # the destination, we don't actually have any RRs left to send out.
#
- # For even more background, see https://github.com/matrix-org/synapse/issues/4730.
+ # For even more background, see
+ # https://github.com/matrix-org/synapse/issues/4730.
room_id = receipt.room_id
+ # Local read receipts always have 1 event ID.
+ event_id = receipt.event_ids[0]
+
# Work out which remote servers should be poked and poke them.
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
@@ -797,49 +806,51 @@ class FederationSender(AbstractFederationSender):
if not domains:
return
- queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
+ # We now split which domains we want to wake up immediately vs which we
+ # want to delay waking up.
+ immediate_domains: StrCollection
+ delay_domains: StrCollection
- # if there is no flush yet scheduled, we will send out these receipts with
- # immediate flushes, and schedule the next flush for this room.
- if queues_pending_flush is not None:
- logger.debug("Queuing receipt for: %r", domains)
+ if len(domains) < 10:
+ # For "small" rooms send to all domains immediately
+ immediate_domains = domains
+ delay_domains = ()
else:
- logger.debug("Sending receipt to: %r", domains)
- self._schedule_rr_flush_for_room(room_id, len(domains))
+ metadata = await self.store.get_metadata_for_event(
+ receipt.room_id, event_id
+ )
+ assert metadata is not None
- for domain in domains:
- queue = self._get_per_destination_queue(domain)
- queue.queue_read_receipt(receipt)
+ sender_domain = get_domain_from_id(metadata.sender)
- # if there is already a RR flush pending for this room, then make sure this
- # destination is registered for the flush
- if queues_pending_flush is not None:
- queues_pending_flush.add(queue)
+ if self.clock.time_msec() - metadata.received_ts < 60_000:
+ # We always send receipts for recent messages immediately
+ immediate_domains = domains
+ delay_domains = ()
else:
- queue.flush_read_receipts_for_room(room_id)
-
- def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
- # that is going to cause approximately len(domains) transactions, so now back
- # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
- backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
-
- logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
- self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
- self._queues_awaiting_rr_flush_by_room[room_id] = set()
-
- def _flush_rrs_for_room(self, room_id: str) -> None:
- queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
- logger.debug("Flushing RRs in %s to %s", room_id, queues)
-
- if not queues:
- # no more RRs arrived for this room; we are done.
- return
+ # Otherwise, we delay waking up all destinations except for the
+ # sender's domain.
+ immediate_domains = []
+ delay_domains = []
+ for domain in domains:
+ if domain == sender_domain:
+ immediate_domains.append(domain)
+ else:
+ delay_domains.append(domain)
+
+ for domain in immediate_domains:
+ # Add to destination queue and wake the destination up
+ queue = self._get_per_destination_queue(domain)
+ queue.queue_read_receipt(receipt)
+ queue.attempt_new_transaction()
- # schedule the next flush
- self._schedule_rr_flush_for_room(room_id, len(queues))
+ for domain in delay_domains:
+ # Add to destination queue...
+ queue = self._get_per_destination_queue(domain)
+ queue.queue_read_receipt(receipt)
- for queue in queues:
- queue.flush_read_receipts_for_room(room_id)
+ # ... and schedule the destination to be woken up.
+ self._destination_wakeup_queue.add_to_queue(domain)
async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d097e65ea7..b3f65e8237 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -156,7 +156,6 @@ class PerDestinationQueue:
# Each receipt can only have a single receipt per
# (room ID, receipt type, user ID, thread ID) tuple.
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
- self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
@@ -258,15 +257,7 @@ class PerDestinationQueue:
}
)
- def flush_read_receipts_for_room(self, room_id: str) -> None:
- # If there are any pending receipts for this room then force-flush them
- # in a new transaction.
- for edu in self._pending_receipt_edus:
- if room_id in edu:
- self._rrs_pending_flush = True
- self.attempt_new_transaction()
- # No use in checking remaining EDUs if the room was found.
- break
+ self.mark_new_data()
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -603,12 +594,9 @@ class PerDestinationQueue:
self._destination, last_successful_stream_ordering
)
- def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
+ def _get_receipt_edus(self, limit: int) -> Iterable[Edu]:
if not self._pending_receipt_edus:
return
- if not force_flush and not self._rrs_pending_flush:
- # not yet time for this lot
- return
# Send at most limit EDUs for receipts.
for content in self._pending_receipt_edus[:limit]:
@@ -747,7 +735,7 @@ class _TransactionQueueManager:
)
# Add read receipt EDUs.
- pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
+ pending_edus.extend(self.queue._get_receipt_edus(limit=5))
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
# Next, prioritize to-device messages so that existing encryption channels
@@ -795,13 +783,6 @@ class _TransactionQueueManager:
if not self._pdus and not pending_edus:
return [], []
- # if we've decided to send a transaction anyway, and we have room, we
- # may as well send any pending RRs
- if edu_limit:
- pending_edus.extend(
- self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
- )
-
if self._pdus:
self._last_stream_ordering = self._pdus[
-1
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d1989e9d2c..d1194545ae 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -124,6 +124,7 @@ class AdminHandler:
"consent_ts": user_info.consent_ts,
"user_type": user_info.user_type,
"is_guest": user_info.is_guest,
+ "suspended": user_info.suspended,
}
if self._msc3866_enabled:
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 32c3472e58..707d18de78 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -322,6 +322,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
+ self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id, event_id))
self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))
@@ -446,6 +447,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
self._attempt_to_invalidate_cache("get_event_ordering", None)
+ self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id,))
self._attempt_to_invalidate_cache("is_partial_state_event", None)
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 403407068c..825fd00993 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -193,6 +193,14 @@ class _EventRow:
outlier: bool
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class EventMetadata:
+ """Event metadata returned by `get_metadata_for_event(..)`"""
+
+ sender: str
+ received_ts: int
+
+
class EventRedactBehaviour(Enum):
"""
What to do when retrieving a redacted event from the database.
@@ -2580,3 +2588,22 @@ class EventsWorkerStore(SQLBaseStore):
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
)
)
+
+ @cached(tree=True)
+ async def get_metadata_for_event(
+ self, room_id: str, event_id: str
+ ) -> Optional[EventMetadata]:
+ row = await self.db_pool.simple_select_one(
+ table="events",
+ keyvalues={"room_id": room_id, "event_id": event_id},
+ retcols=("sender", "received_ts"),
+ allow_none=True,
+ desc="get_metadata_for_event",
+ )
+ if row is None:
+ return None
+
+ return EventMetadata(
+ sender=row[0],
+ received_ts=row[1],
+ )
diff --git a/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql b/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql
index ad54302a8f..0ee78df1a0 100644
--- a/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql
+++ b/synapse/storage/schema/main/delta/88/04_current_state_delta_index.sql
@@ -12,7 +12,7 @@
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
--- Add an index on (user_id, device_id, algorithm, ts_added_ms) on e2e_one_time_keys_json, so that OTKs can
--- efficiently be issued in the same order they were uploaded.
+-- Add an index on `current_state_delta_stream(room_id, stream_id)` to allow
+-- efficient per-room lookups.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8804, 'current_state_delta_stream_room_index', '{}');
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index 6a8887fe74..cd906bbbc7 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -34,6 +34,7 @@ from synapse.handlers.device import DeviceHandler
from synapse.rest import admin
from synapse.rest.client import login
from synapse.server import HomeServer
+from synapse.storage.databases.main.events_worker import EventMetadata
from synapse.types import JsonDict, ReadReceipt
from synapse.util import Clock
@@ -55,12 +56,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
federation_transport_client=self.federation_transport_client,
)
- hs.get_storage_controllers().state.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign]
+ self.main_store = hs.get_datastores().main
+ self.state_controller = hs.get_storage_controllers().state
+
+ self.state_controller.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign]
return_value={"test", "host2"}
)
- hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign]
- hs.get_storage_controllers().state.get_current_hosts_in_room
+ self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign]
+ self.state_controller.get_current_hosts_in_room
)
return hs
@@ -185,12 +189,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
],
)
- def test_send_receipts_with_backoff(self) -> None:
- """Send two receipts in quick succession; the second should be flushed, but
- only after 20ms"""
+ def test_send_receipts_with_backoff_small_room(self) -> None:
+ """Read receipt in small rooms should not be delayed"""
mock_send_transaction = self.federation_transport_client.send_transaction
mock_send_transaction.return_value = {}
+ self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
+ return_value={"test", "host2"}
+ )
+
sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id",
@@ -206,47 +213,104 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
# expect a call to send_transaction
mock_send_transaction.assert_called_once()
- json_cb = mock_send_transaction.call_args[0][1]
- data = json_cb()
- self.assertEqual(
- data["edus"],
- [
- {
- "edu_type": EduTypes.RECEIPT,
- "content": {
- "room_id": {
- "m.read": {
- "user_id": {
- "event_ids": ["event_id"],
- "data": {"ts": 1234},
- }
- }
- }
- },
- }
- ],
+ self._assert_edu_in_call(mock_send_transaction.call_args[0][1])
+
+ def test_send_receipts_with_backoff_recent_event(self) -> None:
+ """Read receipt for a recent message should not be delayed"""
+ mock_send_transaction = self.federation_transport_client.send_transaction
+ mock_send_transaction.return_value = {}
+
+ # Pretend this is a big room
+ self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
+ return_value={"test"} | {f"host{i}" for i in range(20)}
)
+
+ self.main_store.get_metadata_for_event = AsyncMock(
+ return_value=EventMetadata(
+ received_ts=self.clock.time_msec(),
+ sender="@test:test",
+ )
+ )
+
+ sender = self.hs.get_federation_sender()
+ receipt = ReadReceipt(
+ "room_id",
+ "m.read",
+ "user_id",
+ ["event_id"],
+ thread_id=None,
+ data={"ts": 1234},
+ )
+ self.get_success(sender.send_read_receipt(receipt))
+
+ self.pump()
+
+ # expect a call to send_transaction for each host
+ self.assertEqual(mock_send_transaction.call_count, 20)
+ self._assert_edu_in_call(mock_send_transaction.call_args.args[1])
+
mock_send_transaction.reset_mock()
- # send the second RR
+ def test_send_receipts_with_backoff_sender(self) -> None:
+ """Read receipt for a message should not be delayed to the sender, but
+ is delayed to everyone else"""
+ mock_send_transaction = self.federation_transport_client.send_transaction
+ mock_send_transaction.return_value = {}
+
+ # Pretend this is a big room
+ self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
+ return_value={"test"} | {f"host{i}" for i in range(20)}
+ )
+
+ self.main_store.get_metadata_for_event = AsyncMock(
+ return_value=EventMetadata(
+ received_ts=self.clock.time_msec() - 5 * 60_000,
+ sender="@test:host1",
+ )
+ )
+
+ sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id",
"m.read",
"user_id",
- ["other_id"],
+ ["event_id"],
thread_id=None,
data={"ts": 1234},
)
- self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
+ self.get_success(sender.send_read_receipt(receipt))
+
self.pump()
- mock_send_transaction.assert_not_called()
- self.reactor.advance(19)
- mock_send_transaction.assert_not_called()
+ # First, expect a call to send_transaction for the sending host
+ mock_send_transaction.assert_called()
- self.reactor.advance(10)
- mock_send_transaction.assert_called_once()
- json_cb = mock_send_transaction.call_args[0][1]
+ transaction = mock_send_transaction.call_args_list[0].args[0]
+ self.assertEqual(transaction.destination, "host1")
+ self._assert_edu_in_call(mock_send_transaction.call_args_list[0].args[1])
+
+ # We also expect a call to one of the other hosts, as the first
+ # destination to wake up.
+ self.assertEqual(mock_send_transaction.call_count, 2)
+ self._assert_edu_in_call(mock_send_transaction.call_args.args[1])
+
+ mock_send_transaction.reset_mock()
+
+ # We now expect to see 18 more transactions to the remaining hosts
+ # periodically.
+ for _ in range(18):
+ self.reactor.advance(
+ 1.0
+ / self.hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
+ )
+
+ mock_send_transaction.assert_called_once()
+ self._assert_edu_in_call(mock_send_transaction.call_args.args[1])
+ mock_send_transaction.reset_mock()
+
+ def _assert_edu_in_call(self, json_cb: Callable[[], JsonDict]) -> None:
+ """Assert that the given `json_cb` from a `send_transaction` has a
+ receipt in it."""
data = json_cb()
self.assertEqual(
data["edus"],
@@ -257,7 +321,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
"room_id": {
"m.read": {
"user_id": {
- "event_ids": ["other_id"],
+ "event_ids": ["event_id"],
"data": {"ts": 1234},
}
}
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 6d050e7784..fdb8fafa0e 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -3222,6 +3222,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertIn("consent_ts", content)
self.assertIn("external_ids", content)
self.assertIn("last_seen_ts", content)
+ self.assertIn("suspended", content)
# This key was removed intentionally. Ensure it is not accidentally re-included.
self.assertNotIn("password_hash", content)
|