diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_base.py | 28 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 19 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 8 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 2 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 9 |
5 files changed, 33 insertions, 33 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 5c991e5412..4b115aac04 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -25,19 +25,15 @@ from twisted.python.failure import Failure from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError -from synapse.api.room_versions import ( - KNOWN_ROOM_VERSIONS, - EventFormatVersions, - RoomVersion, -) +from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.crypto.event_signing import check_event_content_hash from synapse.crypto.keyring import Keyring from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( - LoggingContext, PreserveLoggingContext, + current_context, make_deferred_yieldable, ) from synapse.types import JsonDict, get_domain_from_id @@ -55,13 +51,15 @@ class FederationBase(object): self.store = hs.get_datastore() self._clock = hs.get_clock() - def _check_sigs_and_hash(self, room_version: str, pdu: EventBase) -> Deferred: + def _check_sigs_and_hash( + self, room_version: RoomVersion, pdu: EventBase + ) -> Deferred: return make_deferred_yieldable( self._check_sigs_and_hashes(room_version, [pdu])[0] ) def _check_sigs_and_hashes( - self, room_version: str, pdus: List[EventBase] + self, room_version: RoomVersion, pdus: List[EventBase] ) -> List[Deferred]: """Checks that each of the received events is correctly signed by the sending server. @@ -80,7 +78,7 @@ class FederationBase(object): """ deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus) - ctx = LoggingContext.current_context() + ctx = current_context() def callback(_, pdu: EventBase): with PreserveLoggingContext(ctx): @@ -146,7 +144,7 @@ class PduToCheckSig( def _check_sigs_on_pdus( - keyring: Keyring, room_version: str, pdus: Iterable[EventBase] + keyring: Keyring, room_version: RoomVersion, pdus: Iterable[EventBase] ) -> List[Deferred]: """Check that the given events are correctly signed @@ -191,10 +189,6 @@ def _check_sigs_on_pdus( for p in pdus ] - v = KNOWN_ROOM_VERSIONS.get(room_version) - if not v: - raise RuntimeError("Unrecognized room version %s" % (room_version,)) - # First we check that the sender event is signed by the sender's domain # (except if its a 3pid invite, in which case it may be sent by any server) pdus_to_check_sender = [p for p in pdus_to_check if not _is_invite_via_3pid(p.pdu)] @@ -204,7 +198,7 @@ def _check_sigs_on_pdus( ( p.sender_domain, p.redacted_pdu_json, - p.pdu.origin_server_ts if v.enforce_key_validity else 0, + p.pdu.origin_server_ts if room_version.enforce_key_validity else 0, p.pdu.event_id, ) for p in pdus_to_check_sender @@ -227,7 +221,7 @@ def _check_sigs_on_pdus( # event id's domain (normally only the case for joins/leaves), and add additional # checks. Only do this if the room version has a concept of event ID domain # (ie, the room version uses old-style non-hash event IDs). - if v.event_format == EventFormatVersions.V1: + if room_version.event_format == EventFormatVersions.V1: pdus_to_check_event_id = [ p for p in pdus_to_check @@ -239,7 +233,7 @@ def _check_sigs_on_pdus( ( get_domain_from_id(p.pdu.event_id), p.redacted_pdu_json, - p.pdu.origin_server_ts if v.enforce_key_validity else 0, + p.pdu.origin_server_ts if room_version.enforce_key_validity else 0, p.pdu.event_id, ) for p in pdus_to_check_event_id diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8c6b839478..a0071fec94 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -220,8 +220,7 @@ class FederationClient(FederationBase): # FIXME: We should handle signature failures more gracefully. pdus[:] = await make_deferred_yieldable( defer.gatherResults( - self._check_sigs_and_hashes(room_version.identifier, pdus), - consumeErrors=True, + self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True, ).addErrback(unwrapFirstError) ) @@ -291,9 +290,7 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = await self._check_sigs_and_hash( - room_version.identifier, pdu - ) + signed_pdu = await self._check_sigs_and_hash(room_version, pdu) break @@ -350,7 +347,7 @@ class FederationClient(FederationBase): self, origin: str, pdus: List[EventBase], - room_version: str, + room_version: RoomVersion, outlier: bool = False, include_none: bool = False, ) -> List[EventBase]: @@ -396,7 +393,7 @@ class FederationClient(FederationBase): self.get_pdu( destinations=[pdu.origin], event_id=pdu.event_id, - room_version=room_version, # type: ignore + room_version=room_version, outlier=outlier, timeout=10000, ) @@ -434,7 +431,7 @@ class FederationClient(FederationBase): ] signed_auth = await self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True, room_version=room_version.identifier + destination, auth_chain, outlier=True, room_version=room_version ) signed_auth.sort(key=lambda e: e.depth) @@ -661,7 +658,7 @@ class FederationClient(FederationBase): destination, list(pdus.values()), outlier=True, - room_version=room_version.identifier, + room_version=room_version, ) valid_pdus_map = {p.event_id: p for p in valid_pdus} @@ -756,7 +753,7 @@ class FederationClient(FederationBase): pdu = event_from_pdu_json(pdu_dict, room_version) # Check signatures are correct. - pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) # FIXME: We should handle signature failures more gracefully. @@ -948,7 +945,7 @@ class FederationClient(FederationBase): ] signed_events = await self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False, room_version=room_version.identifier + destination, events, outlier=False, room_version=room_version ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 275b9c99d7..89d521bc31 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -409,7 +409,7 @@ class FederationServer(FederationBase): pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) - pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version) time_now = self._clock.time_msec() return {"event": ret_pdu.get_pdu_json(time_now)} @@ -425,7 +425,7 @@ class FederationServer(FederationBase): logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) res_pdus = await self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() @@ -455,7 +455,7 @@ class FederationServer(FederationBase): logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) await self.handler.on_send_leave_request(origin, pdu) return {} @@ -611,7 +611,7 @@ class FederationServer(FederationBase): logger.info("Accepting join PDU %s from %s", pdu.event_id, origin) # We've already checked that we know the room version by this point - room_version = await self.store.get_room_version_id(pdu.room_id) + room_version = await self.store.get_room_version(pdu.room_id) # Check signature. try: diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 876fb0e245..e1700ca8aa 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows): Args: transaction_queue (FederationSender) - rows (list(synapse.replication.tcp.streams.FederationStreamRow)) + rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow)) """ # The federation stream contains a bunch of different types of diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 233cb33daf..a477578e44 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -499,4 +499,13 @@ class FederationSender(object): self._get_per_destination_queue(destination).attempt_new_transaction() def get_current_token(self) -> int: + # Dummy implementation for case where federation sender isn't offloaded + # to a worker. return 0 + + async def get_replication_rows( + self, from_token, to_token, limit, federation_ack=None + ): + # Dummy implementation for case where federation sender isn't offloaded + # to a worker. + return [] |