diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 482bbdd867..c7400c737b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -22,7 +22,6 @@ from typing import (
Callable,
Collection,
Dict,
- Iterable,
List,
Optional,
Tuple,
@@ -577,10 +576,10 @@ class FederationServer(FederationBase):
async def _on_context_state_request_compute(
self, room_id: str, event_id: Optional[str]
) -> Dict[str, list]:
+ pdus: Collection[EventBase]
if event_id:
- pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu(
- room_id, event_id
- )
+ event_ids = await self.handler.get_state_ids_for_pdu(room_id, event_id)
+ pdus = await self.store.get_events_as_list(event_ids)
else:
pdus = (await self.state.get_current_state(room_id)).values()
@@ -1093,7 +1092,7 @@ class FederationServer(FederationBase):
# has started processing).
while True:
async with lock:
- logger.info("handling received PDU: %s", event)
+ logger.info("handling received PDU in room %s: %s", room_id, event)
try:
with nested_logging_context(event.event_id):
await self._federation_event_handler.on_receive_pdu(
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 0d7c4f5067..d720b5fd3f 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -244,7 +244,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
self.notifier.on_new_replication_data()
- def send_device_messages(self, destination: str) -> None:
+ def send_device_messages(self, destination: str, immediate: bool = False) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 6106a486d1..30e2421efc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -118,7 +118,12 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
- def send_device_messages(self, destination: str) -> None:
+ def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+ """Tells the sender that a new device message is ready to be sent to the
+ destination. The `immediate` flag specifies whether the messages should
+ be tried to be sent immediately, or whether it can be delayed for a
+ short while (to aid performance).
+ """
raise NotImplementedError()
@abc.abstractmethod
@@ -146,9 +151,8 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
@attr.s
-class _PresenceQueue:
- """A queue of destinations that need to be woken up due to new presence
- updates.
+class _DestinationWakeupQueue:
+ """A queue of destinations that need to be woken up due to new updates.
Staggers waking up of per destination queues to ensure that we don't attempt
to start TLS connections with many hosts all at once, leading to pinned CPU.
@@ -175,7 +179,7 @@ class _PresenceQueue:
if not self.processing:
self._handle()
- @wrap_as_background_process("_PresenceQueue.handle")
+ @wrap_as_background_process("_DestinationWakeupQueue.handle")
async def _handle(self) -> None:
"""Background process to drain the queue."""
@@ -297,7 +301,7 @@ class FederationSender(AbstractFederationSender):
self._external_cache = hs.get_external_cache()
- self._presence_queue = _PresenceQueue(self, self.clock)
+ self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
@@ -614,7 +618,7 @@ class FederationSender(AbstractFederationSender):
states, start_loop=False
)
- self._presence_queue.add_to_queue(destination)
+ self._destination_wakeup_queue.add_to_queue(destination)
def build_and_send_edu(
self,
@@ -667,7 +671,7 @@ class FederationSender(AbstractFederationSender):
else:
queue.send_edu(edu)
- def send_device_messages(self, destination: str) -> None:
+ def send_device_messages(self, destination: str, immediate: bool = False) -> None:
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
@@ -677,7 +681,11 @@ class FederationSender(AbstractFederationSender):
):
return
- self._get_per_destination_queue(destination).attempt_new_transaction()
+ if immediate:
+ self._get_per_destination_queue(destination).attempt_new_transaction()
+ else:
+ self._get_per_destination_queue(destination).mark_new_data()
+ self._destination_wakeup_queue.add_to_queue(destination)
def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index c8768f22bc..d80f0ac5e8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -219,6 +219,16 @@ class PerDestinationQueue:
self._pending_edus.append(edu)
self.attempt_new_transaction()
+ def mark_new_data(self) -> None:
+ """Marks that the destination has new data to send, without starting a
+ new transaction.
+
+ If a transaction loop is already in progress then a new transcation will
+ be attempted when the current one finishes.
+ """
+
+ self._new_data_to_send = True
+
def attempt_new_transaction(self) -> None:
"""Try to start a new transaction to this destination
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index 67a6347907..71b2f90eb9 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -289,7 +289,7 @@ class OpenIdUserInfo(BaseFederationServlet):
return 200, {"sub": user_id}
-DEFAULT_SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
+SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
"federation": FEDERATION_SERVLET_CLASSES,
"room_list": (PublicRoomList,),
"group_server": GROUP_SERVER_SERVLET_CLASSES,
@@ -298,6 +298,10 @@ DEFAULT_SERVLET_GROUPS: Dict[str, Iterable[Type[BaseFederationServlet]]] = {
"openid": (OpenIdUserInfo,),
}
+DEFAULT_SERVLET_GROUPS = ("federation", "room_list", "openid")
+
+GROUP_SERVLET_GROUPS = ("group_server", "group_local", "group_attestation")
+
def register_servlets(
hs: "HomeServer",
@@ -320,16 +324,19 @@ def register_servlets(
Defaults to ``DEFAULT_SERVLET_GROUPS``.
"""
if not servlet_groups:
- servlet_groups = DEFAULT_SERVLET_GROUPS.keys()
+ servlet_groups = DEFAULT_SERVLET_GROUPS
+ # Only allow the groups servlets if the deprecated groups feature is enabled.
+ if hs.config.experimental.groups_enabled:
+ servlet_groups = servlet_groups + GROUP_SERVLET_GROUPS
for servlet_group in servlet_groups:
# Skip unknown servlet groups.
- if servlet_group not in DEFAULT_SERVLET_GROUPS:
+ if servlet_group not in SERVLET_GROUPS:
raise RuntimeError(
f"Attempting to register unknown federation servlet: '{servlet_group}'"
)
- for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]:
+ for servletclass in SERVLET_GROUPS[servlet_group]:
# Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
if (
servletclass == FederationTimestampLookupServlet
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 87e99c7ddf..2529dee613 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -63,7 +63,7 @@ class Authenticator:
self.replication_client = None
if hs.config.worker.worker_app:
- self.replication_client = hs.get_tcp_replication()
+ self.replication_client = hs.get_replication_command_handler()
# A method just so we can pass 'self' as the authenticator to the Servlets
async def authenticate_request(
|