diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 42 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 84 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 12 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 6 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 6 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 56 |
6 files changed, 141 insertions, 65 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index a0071fec94..687cd841ac 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -883,18 +883,37 @@ class FederationClient(FederationBase): def get_public_rooms( self, - destination, - limit=None, - since_token=None, - search_filter=None, - include_all_networks=False, - third_party_instance_id=None, + remote_server: str, + limit: Optional[int] = None, + since_token: Optional[str] = None, + search_filter: Optional[Dict] = None, + include_all_networks: bool = False, + third_party_instance_id: Optional[str] = None, ): - if destination == self.server_name: - return + """Get the list of public rooms from a remote homeserver + + Args: + remote_server: The name of the remote server + limit: Maximum amount of rooms to return + since_token: Used for result pagination + search_filter: A filter dictionary to send the remote homeserver + and filter the result set + include_all_networks: Whether to include results from all third party instances + third_party_instance_id: Whether to only include results from a specific third + party instance + + Returns: + Deferred[Dict[str, Any]]: The response from the remote server, or None if + `remote_server` is the same as the local server_name + Raises: + HttpResponseException: There was an exception returned from the remote server + SynapseException: M_FORBIDDEN when the remote server has disallowed publicRoom + requests over federation + + """ return self.transport_layer.get_public_rooms( - destination, + remote_server, limit, since_token, search_filter, @@ -957,14 +976,13 @@ class FederationClient(FederationBase): return signed_events - @defer.inlineCallbacks - def forward_third_party_invite(self, destinations, room_id, event_dict): + async def forward_third_party_invite(self, destinations, room_id, event_dict): for destination in destinations: if destination == self.server_name: continue try: - yield self.transport_layer.exchange_third_party_invite( + await self.transport_layer.exchange_third_party_invite( destination=destination, room_id=room_id, event_dict=event_dict ) return None diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index e1700ca8aa..52f4f54215 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,6 +31,7 @@ Events are replicated via a separate events stream. import logging from collections import namedtuple +from typing import Dict, List, Tuple, Type from six import iteritems @@ -56,21 +57,35 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.presence_map = {} # Pending presence map user_id -> UserPresenceState - self.presence_changed = SortedDict() # Stream position -> list[user_id] + # Pending presence map user_id -> UserPresenceState + self.presence_map = {} # type: Dict[str, UserPresenceState] + + # Stream position -> list[user_id] + self.presence_changed = SortedDict() # type: SortedDict[int, List[str]] # Stores the destinations we need to explicitly send presence to about a # given user. # Stream position -> (user_id, destinations) - self.presence_destinations = SortedDict() + self.presence_destinations = ( + SortedDict() + ) # type: SortedDict[int, Tuple[str, List[str]]] + + # (destination, key) -> EDU + self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu] - self.keyed_edu = {} # (destination, key) -> EDU - self.keyed_edu_changed = SortedDict() # stream position -> (destination, key) + # stream position -> (destination, key) + self.keyed_edu_changed = ( + SortedDict() + ) # type: SortedDict[int, Tuple[str, tuple]] - self.edus = SortedDict() # stream position -> Edu + self.edus = SortedDict() # type: SortedDict[int, Edu] + # stream ID for the next entry into presence_changed/keyed_edu_changed/edus. self.pos = 1 - self.pos_time = SortedDict() + + # map from stream ID to the time that stream entry was generated, so that we + # can clear out entries after a while + self.pos_time = SortedDict() # type: SortedDict[int, int] # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner @@ -158,8 +173,10 @@ class FederationRemoteSendQueue(object): for edu_key in self.keyed_edu_changed.values(): live_keys.add(edu_key) - to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys] - for edu_key in to_del: + keys_to_del = [ + edu_key for edu_key in self.keyed_edu if edu_key not in live_keys + ] + for edu_key in keys_to_del: del self.keyed_edu[edu_key] # Delete things out of edu map @@ -250,19 +267,23 @@ class FederationRemoteSendQueue(object): self._clear_queue_before_pos(token) async def get_replication_rows( - self, from_token, to_token, limit, federation_ack=None - ): + self, instance_name: str, from_token: int, to_token: int, target_row_count: int + ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: """Get rows to be sent over federation between the two tokens Args: - from_token (int) - to_token(int) - limit (int) - federation_ack (int): Optional. The position where the worker is - explicitly acknowledged it has handled. Allows us to drop - data from before that point + instance_name: the name of the current process + from_token: the previous stream token: the starting point for fetching the + updates + to_token: the new stream token: the point to get updates up to + target_row_count: a target for the number of rows to be returned. + + Returns: a triplet `(updates, new_last_token, limited)`, where: + * `updates` is a list of `(token, row)` entries. + * `new_last_token` is the new position in stream. + * `limited` is whether there are more updates to fetch. """ - # TODO: Handle limit. + # TODO: Handle target_row_count. # To handle restarts where we wrap around if from_token > self.pos: @@ -270,12 +291,7 @@ class FederationRemoteSendQueue(object): # list of tuple(int, BaseFederationRow), where the first is the position # of the federation stream. - rows = [] - - # There should be only one reader, so lets delete everything its - # acknowledged its seen. - if federation_ack: - self._clear_queue_before_pos(federation_ack) + rows = [] # type: List[Tuple[int, BaseFederationRow]] # Fetch changed presence i = self.presence_changed.bisect_right(from_token) @@ -332,7 +348,11 @@ class FederationRemoteSendQueue(object): # Sort rows based on pos rows.sort() - return [(pos, row.TypeId, row.to_data()) for pos, row in rows] + return ( + [(pos, (row.TypeId, row.to_data())) for pos, row in rows], + to_token, + False, + ) class BaseFederationRow(object): @@ -341,7 +361,7 @@ class BaseFederationRow(object): Specifies how to identify, serialize and deserialize the different types. """ - TypeId = None # Unique string that ids the type. Must be overriden in sub classes. + TypeId = "" # Unique string that ids the type. Must be overriden in sub classes. @staticmethod def from_data(data): @@ -454,10 +474,14 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu buff.edus.setdefault(self.edu.destination, []).append(self.edu) -TypeToRow = { - Row.TypeId: Row - for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow,) -} +_rowtypes = ( + PresenceRow, + PresenceDestinationsRow, + KeyedEduRow, + EduRow, +) # type: Tuple[Type[BaseFederationRow], ...] + +TypeToRow = {Row.TypeId: Row for Row in _rowtypes} ParsedFederationStreamData = namedtuple( diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index a477578e44..d473576902 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Dict, Hashable, Iterable, List, Optional, Set +from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple from six import itervalues @@ -498,14 +498,16 @@ class FederationSender(object): self._get_per_destination_queue(destination).attempt_new_transaction() - def get_current_token(self) -> int: + @staticmethod + def get_current_token() -> int: # Dummy implementation for case where federation sender isn't offloaded # to a worker. return 0 + @staticmethod async def get_replication_rows( - self, from_token, to_token, limit, federation_ack=None - ): + instance_name: str, from_token: int, to_token: int, target_row_count: int + ) -> Tuple[List[Tuple[int, Tuple]], int, bool]: # Dummy implementation for case where federation sender isn't offloaded # to a worker. - return [] + return [], 0, False diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index e13cd20ffa..276a2b596f 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,11 +15,10 @@ # limitations under the License. import datetime import logging -from typing import Dict, Hashable, Iterable, List, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple from prometheus_client import Counter -import synapse.server from synapse.api.errors import ( FederationDeniedError, HttpResponseException, @@ -34,6 +33,9 @@ from synapse.storage.presence import UserPresenceState from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter +if TYPE_CHECKING: + import synapse.server + # This is defined in the Matrix spec and enforced by the receiver. MAX_EDUS_PER_TRANSACTION = 100 diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 3c2a02a3b3..a2752a54a5 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -13,11 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import List +from typing import TYPE_CHECKING, List from canonicaljson import json -import synapse.server from synapse.api.errors import HttpResponseException from synapse.events import EventBase from synapse.federation.persistence import TransactionActions @@ -31,6 +30,9 @@ from synapse.logging.opentracing import ( ) from synapse.util.metrics import measure_func +if TYPE_CHECKING: + import synapse.server + logger = logging.getLogger(__name__) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 383e3fdc8b..060bf07197 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -15,13 +15,14 @@ # limitations under the License. import logging -from typing import Any, Dict +from typing import Any, Dict, Optional from six.moves import urllib from twisted.internet import defer from synapse.api.constants import Membership +from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.api.urls import ( FEDERATION_UNSTABLE_PREFIX, FEDERATION_V1_PREFIX, @@ -326,18 +327,25 @@ class TransportLayerClient(object): @log_function def get_public_rooms( self, - remote_server, - limit, - since_token, - search_filter=None, - include_all_networks=False, - third_party_instance_id=None, + remote_server: str, + limit: Optional[int] = None, + since_token: Optional[str] = None, + search_filter: Optional[Dict] = None, + include_all_networks: bool = False, + third_party_instance_id: Optional[str] = None, ): + """Get the list of public rooms from a remote homeserver + + See synapse.federation.federation_client.FederationClient.get_public_rooms for + more information. + """ if search_filter: # this uses MSC2197 (Search Filtering over Federation) path = _create_v1_path("/publicRooms") - data = {"include_all_networks": "true" if include_all_networks else "false"} + data = { + "include_all_networks": "true" if include_all_networks else "false" + } # type: Dict[str, Any] if third_party_instance_id: data["third_party_instance_id"] = third_party_instance_id if limit: @@ -347,9 +355,19 @@ class TransportLayerClient(object): data["filter"] = search_filter - response = yield self.client.post_json( - destination=remote_server, path=path, data=data, ignore_backoff=True - ) + try: + response = yield self.client.post_json( + destination=remote_server, path=path, data=data, ignore_backoff=True + ) + except HttpResponseException as e: + if e.code == 403: + raise SynapseError( + 403, + "You are not allowed to view the public rooms list of %s" + % (remote_server,), + errcode=Codes.FORBIDDEN, + ) + raise else: path = _create_v1_path("/publicRooms") @@ -363,9 +381,19 @@ class TransportLayerClient(object): if since_token: args["since"] = [since_token] - response = yield self.client.get_json( - destination=remote_server, path=path, args=args, ignore_backoff=True - ) + try: + response = yield self.client.get_json( + destination=remote_server, path=path, args=args, ignore_backoff=True + ) + except HttpResponseException as e: + if e.code == 403: + raise SynapseError( + 403, + "You are not allowed to view the public rooms list of %s" + % (remote_server,), + errcode=Codes.FORBIDDEN, + ) + raise return response |