summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/federation_server.py31
-rw-r--r--synapse/federation/sender/__init__.py25
-rw-r--r--synapse/federation/sender/per_destination_queue.py8
-rw-r--r--synapse/federation/transport/server.py14
-rw-r--r--synapse/federation/units.py5
6 files changed, 56 insertions, 29 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index 184096d165..10244ee0d2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -104,7 +104,7 @@ class FederationClient(FederationBase): max_len=1000, expiry_ms=120 * 1000, reset_expiry_on_get=False, - ) + ) # type: ExpiringCache[str, EventBase] def _clear_tried_cache(self): """Clear pdu_destination_tried cache""" diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index cb48cc5722..794c1138a9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -815,22 +815,20 @@ class FederationServer(FederationBase): await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True) - def __str__(self): + def __str__(self) -> str: return "<ReplicationLayer(%s)>" % self.server_name async def exchange_third_party_invite( self, sender_user_id: str, target_user_id: str, room_id: str, signed: Dict - ): - ret = await self.handler.exchange_third_party_invite( + ) -> None: + await self.handler.exchange_third_party_invite( sender_user_id, target_user_id, room_id, signed ) - return ret - async def on_exchange_third_party_invite_request(self, event_dict: Dict): - ret = await self.handler.on_exchange_third_party_invite_request(event_dict) - return ret + async def on_exchange_third_party_invite_request(self, event_dict: Dict) -> None: + await self.handler.on_exchange_third_party_invite_request(event_dict) - async def check_server_matches_acl(self, server_name: str, room_id: str): + async def check_server_matches_acl(self, server_name: str, room_id: str) -> None: """Check if the given server is allowed by the server ACLs in the room Args: @@ -946,6 +944,7 @@ class FederationHandlerRegistry: # A rate limiter for incoming room key requests per origin. self._room_key_request_rate_limiter = Ratelimiter( + store=hs.get_datastore(), clock=self.clock, rate_hz=self.config.rc_key_requests.per_second, burst_count=self.config.rc_key_requests.burst_count, @@ -953,7 +952,7 @@ class FederationHandlerRegistry: def register_edu_handler( self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] - ): + ) -> None: """Sets the handler callable that will be used to handle an incoming federation EDU of the given type. @@ -972,7 +971,7 @@ class FederationHandlerRegistry: def register_query_handler( self, query_type: str, handler: Callable[[dict], Awaitable[JsonDict]] - ): + ) -> None: """Sets the handler callable that will be used to handle an incoming federation query of the given type. @@ -990,15 +989,17 @@ class FederationHandlerRegistry: self.query_handlers[query_type] = handler - def register_instance_for_edu(self, edu_type: str, instance_name: str): + def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None: """Register that the EDU handler is on a different instance than master.""" self._edu_type_to_instance[edu_type] = [instance_name] - def register_instances_for_edu(self, edu_type: str, instance_names: List[str]): + def register_instances_for_edu( + self, edu_type: str, instance_names: List[str] + ) -> None: """Register that the EDU handler is on multiple instances.""" self._edu_type_to_instance[edu_type] = instance_names - async def on_edu(self, edu_type: str, origin: str, content: dict): + async def on_edu(self, edu_type: str, origin: str, content: dict) -> None: if not self.config.use_presence and edu_type == EduTypes.Presence: return @@ -1006,7 +1007,9 @@ class FederationHandlerRegistry: # the limit, drop them. if ( edu_type == EduTypes.RoomKeyRequest - and not self._room_key_request_rate_limiter.can_do_action(origin) + and not await self._room_key_request_rate_limiter.can_do_action( + None, origin + ) ): return diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8babb1ebbe..d821dcbf6a 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func if TYPE_CHECKING: + from synapse.events.presence_router import PresenceRouter from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender): self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id + self._presence_router = None # type: Optional[PresenceRouter] self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() @@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination """ - hosts_and_states = await get_interested_remotes(self.store, states, self.state) + # We pull the presence router here instead of __init__ + # to prevent a dependency cycle: + # + # AuthHandler -> Notifier -> FederationSender + # -> PresenceRouter -> ModuleApi -> AuthHandler + if self._presence_router is None: + self._presence_router = self.hs.get_presence_router() + + assert self._presence_router is not None + + hosts_and_states = await get_interested_remotes( + self.store, + self._presence_router, + states, + self.state, + ) for destinations, states in hosts_and_states: for destination in destinations: @@ -717,16 +734,18 @@ class FederationSender(AbstractFederationSender): self._catchup_after_startup_timer = None break + last_processed = destinations_to_wake[-1] + destinations_to_wake = [ d for d in destinations_to_wake if self._federation_shard_config.should_handle(self._instance_name, d) ] - for last_processed in destinations_to_wake: + for destination in destinations_to_wake: logger.info( "Destination %s has outstanding catch-up, waking up.", last_processed, ) - self.wake_destination(last_processed) + self.wake_destination(destination) await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 89df9a619b..e9c8a9f20a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py
@@ -29,6 +29,7 @@ from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state +from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ReadReceipt @@ -557,6 +558,13 @@ class PerDestinationQueue: contents, stream_id = await self._store.get_new_device_msgs_for_remote( self._destination, last_device_stream_id, to_device_stream_id, limit ) + for content in contents: + message_id = content.get("message_id") + if not message_id: + continue + + set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + edus = [ Edu( origin=self._server_name, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 294031e2a0..c616de5f22 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -427,13 +427,9 @@ class FederationSendServlet(BaseFederationServlet): logger.exception(e) return 400, {"error": "Invalid transaction"} - try: - code, response = await self.handler.on_incoming_transaction( - origin, transaction_data - ) - except Exception: - logger.exception("on_incoming_transaction failed") - raise + code, response = await self.handler.on_incoming_transaction( + origin, transaction_data + ) return code, response @@ -650,8 +646,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)" async def on_PUT(self, origin, content, query, room_id): - content = await self.handler.on_exchange_third_party_invite_request(content) - return 200, content + await self.handler.on_exchange_third_party_invite_request(content) + return 200, {} class FederationClientKeysQueryServlet(BaseFederationServlet): diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index b662c42621..0f8bf000ac 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py
@@ -18,6 +18,7 @@ server protocol. """ import logging +from typing import Optional import attr @@ -98,7 +99,7 @@ class Transaction(JsonEncodedObject): "pdus", ] - def __init__(self, transaction_id=None, pdus=[], **kwargs): + def __init__(self, transaction_id=None, pdus: Optional[list] = None, **kwargs): """If we include a list of pdus then we decode then as PDU's automatically. """ @@ -107,7 +108,7 @@ class Transaction(JsonEncodedObject): if "edus" in kwargs and not kwargs["edus"]: del kwargs["edus"] - super().__init__(transaction_id=transaction_id, pdus=pdus, **kwargs) + super().__init__(transaction_id=transaction_id, pdus=pdus or [], **kwargs) @staticmethod def create_new(pdus, **kwargs):