diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index afdb5bf2fa..55533d7501 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -102,7 +102,7 @@ class FederationClient(FederationBase):
max_len=1000,
expiry_ms=120 * 1000,
reset_expiry_on_get=False,
- )
+ ) # type: ExpiringCache[str, EventBase]
def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 71cb120ef7..b9f8d966a6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -739,22 +739,20 @@ class FederationServer(FederationBase):
await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
- def __str__(self):
+ def __str__(self) -> str:
return "<ReplicationLayer(%s)>" % self.server_name
async def exchange_third_party_invite(
self, sender_user_id: str, target_user_id: str, room_id: str, signed: Dict
- ):
- ret = await self.handler.exchange_third_party_invite(
+ ) -> None:
+ await self.handler.exchange_third_party_invite(
sender_user_id, target_user_id, room_id, signed
)
- return ret
- async def on_exchange_third_party_invite_request(self, event_dict: Dict):
- ret = await self.handler.on_exchange_third_party_invite_request(event_dict)
- return ret
+ async def on_exchange_third_party_invite_request(self, event_dict: Dict) -> None:
+ await self.handler.on_exchange_third_party_invite_request(event_dict)
- async def check_server_matches_acl(self, server_name: str, room_id: str):
+ async def check_server_matches_acl(self, server_name: str, room_id: str) -> None:
"""Check if the given server is allowed by the server ACLs in the room
Args:
@@ -878,7 +876,7 @@ class FederationHandlerRegistry:
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
- ):
+ ) -> None:
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.
@@ -897,7 +895,7 @@ class FederationHandlerRegistry:
def register_query_handler(
self, query_type: str, handler: Callable[[dict], Awaitable[JsonDict]]
- ):
+ ) -> None:
"""Sets the handler callable that will be used to handle an incoming
federation query of the given type.
@@ -915,15 +913,17 @@ class FederationHandlerRegistry:
self.query_handlers[query_type] = handler
- def register_instance_for_edu(self, edu_type: str, instance_name: str):
+ def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
"""Register that the EDU handler is on a different instance than master."""
self._edu_type_to_instance[edu_type] = [instance_name]
- def register_instances_for_edu(self, edu_type: str, instance_names: List[str]):
+ def register_instances_for_edu(
+ self, edu_type: str, instance_names: List[str]
+ ) -> None:
"""Register that the EDU handler is on multiple instances."""
self._edu_type_to_instance[edu_type] = instance_names
- async def on_edu(self, edu_type: str, origin: str, content: dict):
+ async def on_edu(self, edu_type: str, origin: str, content: dict) -> None:
if not self.config.use_presence and edu_type == EduTypes.Presence:
return
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8babb1ebbe..d821dcbf6a 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
if TYPE_CHECKING:
+ from synapse.events.presence_router import PresenceRouter
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender):
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
+ self._presence_router = None # type: Optional[PresenceRouter]
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
@@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
- hosts_and_states = await get_interested_remotes(self.store, states, self.state)
+ # We pull the presence router here instead of __init__
+ # to prevent a dependency cycle:
+ #
+ # AuthHandler -> Notifier -> FederationSender
+ # -> PresenceRouter -> ModuleApi -> AuthHandler
+ if self._presence_router is None:
+ self._presence_router = self.hs.get_presence_router()
+
+ assert self._presence_router is not None
+
+ hosts_and_states = await get_interested_remotes(
+ self.store,
+ self._presence_router,
+ states,
+ self.state,
+ )
for destinations, states in hosts_and_states:
for destination in destinations:
@@ -717,16 +734,18 @@ class FederationSender(AbstractFederationSender):
self._catchup_after_startup_timer = None
break
+ last_processed = destinations_to_wake[-1]
+
destinations_to_wake = [
d
for d in destinations_to_wake
if self._federation_shard_config.should_handle(self._instance_name, d)
]
- for last_processed in destinations_to_wake:
+ for destination in destinations_to_wake:
logger.info(
"Destination %s has outstanding catch-up, waking up.",
last_processed,
)
- self.wake_destination(last_processed)
+ self.wake_destination(destination)
await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 84e39c5a46..5ef0556ef7 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -620,8 +620,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
async def on_PUT(self, origin, content, query, room_id):
- content = await self.handler.on_exchange_third_party_invite_request(content)
- return 200, content
+ await self.handler.on_exchange_third_party_invite_request(content)
+ return 200, {}
class FederationClientKeysQueryServlet(BaseFederationServlet):
|