diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c4c0bc7315..137cfb3346 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -771,17 +771,28 @@ class FederationClient(FederationBase):
"""
if synapse_error is None:
synapse_error = e.to_synapse_error()
- # There is no good way to detect an "unknown" endpoint.
+ # MSC3743 specifies that servers should return a 404 or 405 with an errcode
+ # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
+ # to an unknown method, respectively.
#
- # Dendrite returns a 404 (with a body of "404 page not found");
- # Conduit returns a 404 (with no body); and Synapse returns a 400
- # with M_UNRECOGNIZED.
- #
- # This needs to be rather specific as some endpoints truly do return 404
- # errors.
+ # Older versions of servers don't properly handle this. This needs to be
+ # rather specific as some endpoints truly do return 404 errors.
return (
- e.code == 404 and (not e.response or e.response == b"404 page not found")
- ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
+ # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
+ (e.code == 404 or e.code == 405)
+ and (
+ # Older Dendrites returned a text or empty body.
+ # Older Conduit returned an empty body.
+ not e.response
+ or e.response == b"404 page not found"
+ # The proper response JSON with M_UNRECOGNIZED errcode.
+ or synapse_error.errcode == Codes.UNRECOGNIZED
+ )
+ ) or (
+ # Older Synapses returned a 400 error.
+ e.code == 400
+ and synapse_error.errcode == Codes.UNRECOGNIZED
+ )
async def _try_destination_list(
self,
@@ -1691,9 +1702,19 @@ class FederationClient(FederationBase):
# to return events on *both* sides of the timestamp to
# help reconcile the gap faster.
_timestamp_to_event_from_destination,
+ # Since this endpoint is new, we should try other servers before giving up.
+ # We can safely remove this in a year (remove after 2023-11-16).
+ failover_on_unknown_endpoint=True,
)
return timestamp_to_event_response
- except SynapseError:
+ except SynapseError as e:
+ logger.warn(
+ "timestamp_to_event(room_id=%s, timestamp=%s, direction=%s): encountered error when trying to fetch from destinations: %s",
+ room_id,
+ timestamp,
+ direction,
+ e,
+ )
return None
async def _timestamp_to_event_from_destination(
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fc1d8c88a7..30ebd62883 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -647,7 +647,7 @@ class FederationSender(AbstractFederationSender):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
- domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
+ domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
)
domains = [
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3ae5e8634c..ffc9d95ee7 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import JsonDict, ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.visibility import filter_events_for_server
@@ -136,8 +136,11 @@ class PerDestinationQueue:
# destination
self._pending_presence: Dict[str, UserPresenceState] = {}
- # room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
+ # List of room_id -> receipt_type -> user_id -> receipt_dict,
+ #
+ # Each receipt can only have a single receipt per
+ # (room ID, receipt type, user ID, thread ID) tuple.
+ self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ class PerDestinationQueue:
Args:
receipt: receipt to be queued
"""
- self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
- receipt.receipt_type, {}
- )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
+ serialized_receipt: JsonDict = {
+ "event_ids": receipt.event_ids,
+ "data": receipt.data,
+ }
+ if receipt.thread_id is not None:
+ serialized_receipt["data"]["thread_id"] = receipt.thread_id
+
+ # Find which EDU to add this receipt to. There's three situations depending
+ # on the (room ID, receipt type, user, thread ID) tuple:
+ #
+ # 1. If it fully matches, clobber the information.
+ # 2. If it is missing, add the information.
+ # 3. If the subset tuple of (room ID, receipt type, user) matches, check
+ # the next EDU (or add a new EDU).
+ for edu in self._pending_receipt_edus:
+ receipt_content = edu.setdefault(receipt.room_id, {}).setdefault(
+ receipt.receipt_type, {}
+ )
+ # If this room ID, receipt type, user ID is not in this EDU, OR if
+ # the full tuple matches, use the current EDU.
+ if (
+ receipt.user_id not in receipt_content
+ or receipt_content[receipt.user_id].get("thread_id")
+ == receipt.thread_id
+ ):
+ receipt_content[receipt.user_id] = serialized_receipt
+ break
+
+ # If no matching EDU was found, create a new one.
+ else:
+ self._pending_receipt_edus.append(
+ {
+ receipt.room_id: {
+ receipt.receipt_type: {receipt.user_id: serialized_receipt}
+ }
+ }
+ )
def flush_read_receipts_for_room(self, room_id: str) -> None:
- # if we don't have any read-receipts for this room, it may be that we've already
- # sent them out, so we don't need to flush.
- if room_id not in self._pending_rrs:
- return
- self._rrs_pending_flush = True
- self.attempt_new_transaction()
+ # If there are any pending receipts for this room then force-flush them
+ # in a new transaction.
+ for edu in self._pending_receipt_edus:
+ if room_id in edu:
+ self._rrs_pending_flush = True
+ self.attempt_new_transaction()
+ # No use in checking remaining EDUs if the room was found.
+ break
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -351,7 +390,7 @@ class PerDestinationQueue:
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
- self._pending_rrs = {}
+ self._pending_receipt_edus = []
self._start_catching_up()
except FederationDeniedError as e:
@@ -543,22 +582,27 @@ class PerDestinationQueue:
self._destination, last_successful_stream_ordering
)
- def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
- if not self._pending_rrs:
+ def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
+ if not self._pending_receipt_edus:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return
- edu = Edu(
- origin=self._server_name,
- destination=self._destination,
- edu_type=EduTypes.RECEIPT,
- content=self._pending_rrs,
- )
- self._pending_rrs = {}
- self._rrs_pending_flush = False
- yield edu
+ # Send at most limit EDUs for receipts.
+ for content in self._pending_receipt_edus[:limit]:
+ yield Edu(
+ origin=self._server_name,
+ destination=self._destination,
+ edu_type=EduTypes.RECEIPT,
+ content=content,
+ )
+ self._pending_receipt_edus = self._pending_receipt_edus[limit:]
+
+ # If there are still pending read-receipts, don't reset the pending flush
+ # flag.
+ if not self._pending_receipt_edus:
+ self._rrs_pending_flush = False
def _pop_pending_edus(self, limit: int) -> List[Edu]:
pending_edus = self._pending_edus
@@ -597,7 +641,7 @@ class PerDestinationQueue:
if not message_id:
continue
- set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
edus = [
Edu(
@@ -645,27 +689,61 @@ class _TransactionQueueManager:
async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# First we calculate the EDUs we want to send, if any.
- # We start by fetching device related EDUs, i.e device updates and to
- # device messages. We have to keep 2 free slots for presence and rr_edus.
- device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
+ # There's a maximum number of EDUs that can be sent with a transaction,
+ # generally device updates and to-device messages get priority, but we
+ # want to ensure that there's room for some other EDUs as well.
+ #
+ # This is done by:
+ #
+ # * Add a presence EDU, if one exists.
+ # * Add up-to a small limit of read receipt EDUs.
+ # * Add to-device EDUs, but leave some space for device list updates.
+ # * Add device list updates EDUs.
+ # * If there's any remaining room, add other EDUs.
+ pending_edus = []
+
+ # Add presence EDU.
+ if self.queue._pending_presence:
+ pending_edus.append(
+ Edu(
+ origin=self.queue._server_name,
+ destination=self.queue._destination,
+ edu_type=EduTypes.PRESENCE,
+ content={
+ "push": [
+ format_user_presence_state(
+ presence, self.queue._clock.time_msec()
+ )
+ for presence in self.queue._pending_presence.values()
+ ]
+ },
+ )
+ )
+ self.queue._pending_presence = {}
- # We prioritize to-device messages so that existing encryption channels
+ # Add read receipt EDUs.
+ pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
+ edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
+
+ # Next, prioritize to-device messages so that existing encryption channels
# work. We also keep a few slots spare (by reducing the limit) so that
# we can still trickle out some device list updates.
(
to_device_edus,
device_stream_id,
- ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
+ ) = await self.queue._get_to_device_message_edus(edu_limit - 10)
if to_device_edus:
self._device_stream_id = device_stream_id
else:
self.queue._last_device_stream_id = device_stream_id
- device_edu_limit -= len(to_device_edus)
+ pending_edus.extend(to_device_edus)
+ edu_limit -= len(to_device_edus)
+ # Add device list update EDUs.
device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
- device_edu_limit
+ edu_limit
)
if device_update_edus:
@@ -673,40 +751,17 @@ class _TransactionQueueManager:
else:
self.queue._last_device_list_stream_id = dev_list_id
- pending_edus = device_update_edus + to_device_edus
-
- # Now add the read receipt EDU.
- pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
-
- # And presence EDU.
- if self.queue._pending_presence:
- pending_edus.append(
- Edu(
- origin=self.queue._server_name,
- destination=self.queue._destination,
- edu_type=EduTypes.PRESENCE,
- content={
- "push": [
- format_user_presence_state(
- presence, self.queue._clock.time_msec()
- )
- for presence in self.queue._pending_presence.values()
- ]
- },
- )
- )
- self.queue._pending_presence = {}
+ pending_edus.extend(device_update_edus)
+ edu_limit -= len(device_update_edus)
# Finally add any other types of EDUs if there is room.
- pending_edus.extend(
- self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
- )
- while (
- len(pending_edus) < MAX_EDUS_PER_TRANSACTION
- and self.queue._pending_edus_keyed
- ):
+ other_edus = self.queue._pop_pending_edus(edu_limit)
+ pending_edus.extend(other_edus)
+ edu_limit -= len(other_edus)
+ while edu_limit > 0 and self.queue._pending_edus_keyed:
_, val = self.queue._pending_edus_keyed.popitem()
pending_edus.append(val)
+ edu_limit -= 1
# Now we look for any PDUs to send, by getting up to 50 PDUs from the
# queue
@@ -717,8 +772,10 @@ class _TransactionQueueManager:
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
- if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
- pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
+ if edu_limit:
+ pending_edus.extend(
+ self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
+ )
if self._pdus:
self._last_stream_ordering = self._pdus[
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index a3cfc701cd..77f1f39cac 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -185,9 +185,8 @@ class TransportLayerClient:
Raises:
Various exceptions when the request fails
"""
- path = _create_path(
- FEDERATION_UNSTABLE_PREFIX,
- "/org.matrix.msc3030/timestamp_to_event/%s",
+ path = _create_v1_path(
+ "/timestamp_to_event/%s",
room_id,
)
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 50623cd385..2725f53cf6 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -25,7 +25,6 @@ from synapse.federation.transport.server._base import (
from synapse.federation.transport.server.federation import (
FEDERATION_SERVLET_CLASSES,
FederationAccountStatusServlet,
- FederationTimestampLookupServlet,
)
from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import (
@@ -291,13 +290,6 @@ def register_servlets(
)
for servletclass in SERVLET_GROUPS[servlet_group]:
- # Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
- if (
- servletclass == FederationTimestampLookupServlet
- and not hs.config.experimental.msc3030_enabled
- ):
- continue
-
# Only allow the `/account_status` servlet if msc3720 is enabled
if (
servletclass == FederationAccountStatusServlet
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 205fd16daa..53e77b4bb6 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -218,14 +218,13 @@ class FederationTimestampLookupServlet(BaseFederationServerServlet):
`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.
- GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
+ GET /_matrix/federation/v1/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""
PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"
- PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030"
async def on_GET(
self,
|