summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/crypto/keyring.py39
-rw-r--r--synapse/federation/federation_client.py24
-rw-r--r--synapse/handlers/federation.py115
-rw-r--r--synapse/storage/databases/main/events.py17
-rw-r--r--synapse/storage/databases/main/keys.py13
-rw-r--r--synapse/storage/persist_events.py4
6 files changed, 122 insertions, 90 deletions
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py

index 9e9b1c1c86..0860411218 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py
@@ -43,7 +43,11 @@ from synapse.api.errors import ( from synapse.config.key import TrustedKeyServer from synapse.events import EventBase from synapse.events.utils import prune_event_dict -from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.context import ( + defer_to_thread, + make_deferred_yieldable, + run_in_background, +) from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict from synapse.util import unwrapFirstError @@ -161,6 +165,7 @@ class Keyring: self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None ): self.clock = hs.get_clock() + self.reactor = hs.get_reactor() if key_fetchers is None: key_fetchers = ( @@ -288,7 +293,9 @@ class Keyring: verify_key = key_result.verify_key json_object = verify_request.get_json_object() try: - verify_signed_json( + await defer_to_thread( + self.reactor, + verify_signed_json, json_object, verify_request.server_name, verify_key, @@ -544,22 +551,18 @@ class BaseV2KeyFetcher(KeyFetcher): key_json_bytes = encode_canonical_json(response_json) - await make_deferred_yieldable( - defer.gatherResults( - [ - run_in_background( - self.store.store_server_keys_json, - server_name=server_name, - key_id=key_id, - from_server=from_server, - ts_now_ms=time_added_ms, - ts_expires_ms=ts_valid_until_ms, - key_json_bytes=key_json_bytes, - ) - for key_id in verify_keys - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + await self.store.store_server_keys_json_multi( + [ + ( + server_name, + key_id, + from_server, + time_added_ms, + ts_valid_until_ms, + key_json_bytes, + ) + for key_id in verify_keys + ], ) return verify_keys diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 1416abd0fb..2dc1a2397d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -56,6 +56,7 @@ from synapse.api.room_versions import ( from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.transport.client import SendJoinResponse +from synapse.logging.opentracing import start_active_span from synapse.logging.utils import log_function from synapse.types import JsonDict, get_domain_from_id from synapse.util.async_helpers import concurrently_execute @@ -754,7 +755,8 @@ class FederationClient(FederationBase): """ async def send_request(destination) -> SendJoinResult: - response = await self._do_send_join(room_version, destination, pdu) + with start_active_span("_do_send_join"): + response = await self._do_send_join(room_version, destination, pdu) # If an event was returned (and expected to be returned): # @@ -814,19 +816,21 @@ class FederationClient(FederationBase): valid_pdus_map: Dict[str, EventBase] = {} async def _execute(pdu: EventBase) -> None: - valid_pdu = await self._check_sigs_and_hash_and_fetch_one( - pdu=pdu, - origin=destination, - outlier=True, - room_version=room_version, - ) + with start_active_span("_check_sigs_and_hash_and_fetch_one"): + valid_pdu = await self._check_sigs_and_hash_and_fetch_one( + pdu=pdu, + origin=destination, + outlier=True, + room_version=room_version, + ) if valid_pdu: valid_pdus_map[valid_pdu.event_id] = valid_pdu - await concurrently_execute( - _execute, itertools.chain(state, auth_chain), 10000 - ) + with start_active_span("check_sigs"): + await concurrently_execute( + _execute, itertools.chain(state, auth_chain), 10000 + ) # NB: We *need* to copy to ensure that we don't have multiple # references being passed on, as that causes... issues. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 77df9185f6..bc17b45b27 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -52,6 +52,7 @@ from synapse.logging.context import ( preserve_fn, run_in_background, ) +from synapse.logging.opentracing import start_active_span from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -452,14 +453,15 @@ class FederationHandler(BaseHandler): logger.debug("Joining %s to %s", joinee, room_id) - origin, event, room_version_obj = await self._make_and_verify_event( - target_hosts, - room_id, - joinee, - "join", - content, - params={"ver": KNOWN_ROOM_VERSIONS}, - ) + with start_active_span("make_join"): + origin, event, room_version_obj = await self._make_and_verify_event( + target_hosts, + room_id, + joinee, + "join", + content, + params={"ver": KNOWN_ROOM_VERSIONS}, + ) # This shouldn't happen, because the RoomMemberHandler has a # linearizer lock which only allows one operation per user per room @@ -480,9 +482,10 @@ class FederationHandler(BaseHandler): except ValueError: pass - ret = await self.federation_client.send_join( - host_list, event, room_version_obj - ) + with start_active_span("send_join"): + ret = await self.federation_client.send_join( + host_list, event, room_version_obj + ) event = ret.event origin = ret.origin @@ -510,9 +513,10 @@ class FederationHandler(BaseHandler): auth_events=auth_chain, ) - max_stream_id = await self._persist_auth_tree( - origin, room_id, auth_chain, state, event, room_version_obj - ) + with start_active_span("_persist_auth_tree"): + max_stream_id = await self._persist_auth_tree( + origin, room_id, auth_chain, state, event, room_version_obj + ) # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. @@ -1139,51 +1143,54 @@ class FederationHandler(BaseHandler): if e_id not in event_map: missing_auth_events.add(e_id) - for e_id in missing_auth_events: - m_ev = await self.federation_client.get_pdu( - [origin], - e_id, - room_version=room_version, - outlier=True, - timeout=10000, - ) - if m_ev and m_ev.event_id == e_id: - event_map[e_id] = m_ev - else: - logger.info("Failed to find auth event %r", e_id) - - for e in itertools.chain(auth_events, state, [event]): - auth_for_e = { - (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] - for e_id in e.auth_event_ids() - if e_id in event_map - } - if create_event: - auth_for_e[(EventTypes.Create, "")] = create_event + with start_active_span("fetching.missing_auth_events"): + for e_id in missing_auth_events: + m_ev = await self.federation_client.get_pdu( + [origin], + e_id, + room_version=room_version, + outlier=True, + timeout=10000, + ) + if m_ev and m_ev.event_id == e_id: + event_map[e_id] = m_ev + else: + logger.info("Failed to find auth event %r", e_id) + + with start_active_span("authing_events"): + for e in itertools.chain(auth_events, state, [event]): + auth_for_e = { + (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] + for e_id in e.auth_event_ids() + if e_id in event_map + } + if create_event: + auth_for_e[(EventTypes.Create, "")] = create_event - try: - event_auth.check(room_version, e, auth_events=auth_for_e) - except SynapseError as err: - # we may get SynapseErrors here as well as AuthErrors. For - # instance, there are a couple of (ancient) events in some - # rooms whose senders do not have the correct sigil; these - # cause SynapseErrors in auth.check. We don't want to give up - # the attempt to federate altogether in such cases. + try: + event_auth.check(room_version, e, auth_events=auth_for_e) + except SynapseError as err: + # we may get SynapseErrors here as well as AuthErrors. For + # instance, there are a couple of (ancient) events in some + # rooms whose senders do not have the correct sigil; these + # cause SynapseErrors in auth.check. We don't want to give up + # the attempt to federate altogether in such cases. - logger.warning("Rejecting %s because %s", e.event_id, err.msg) + logger.warning("Rejecting %s because %s", e.event_id, err.msg) - if e == event: - raise - events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR + if e == event: + raise + events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR if auth_events or state: - await self._federation_event_handler.persist_events_and_notify( - room_id, - [ - (e, events_to_context[e.event_id]) - for e in itertools.chain(auth_events, state) - ], - ) + with start_active_span("persist_events_and_notify.state"): + await self._federation_event_handler.persist_events_and_notify( + room_id, + [ + (e, events_to_context[e.event_id]) + for e in itertools.chain(auth_events, state) + ], + ) new_event_context = await self.state_handler.compute_event_context( event, old_state=state diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index f07e288056..6a30aa6f81 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -16,6 +16,7 @@ import itertools import logging from collections import OrderedDict, namedtuple +from synapse.logging.opentracing import start_active_span from typing import ( TYPE_CHECKING, Any, @@ -382,7 +383,8 @@ class PersistEventsStore: # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) - self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts]) + with start_active_span("_persist_event_auth_chain_txn"): + self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts]) # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. @@ -393,12 +395,13 @@ class PersistEventsStore: # From this point onwards the events are only ones that weren't # rejected. - self._update_metadata_tables_txn( - txn, - events_and_contexts=events_and_contexts, - all_events_and_contexts=all_events_and_contexts, - backfilled=backfilled, - ) + with start_active_span("_update_metadata_tables_txn"): + self._update_metadata_tables_txn( + txn, + events_and_contexts=events_and_contexts, + all_events_and_contexts=all_events_and_contexts, + backfilled=backfilled, + ) # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index 6990f3ed1d..1f01a5df0a 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py
@@ -138,6 +138,19 @@ class KeyStore(SQLBaseStore): for i in invalidations: invalidate((i,)) + async def store_server_keys_json_multi( + self, + entries: List[Tuple[str, str, str, int, int, bytes]], + ): + await self.db_pool.simple_upsert_many( + table="server_keys_json", + key_names=("server_name", "key_id", "from_server"), + key_values=[e[:3] for e in entries], + value_names=("ts_added_ms", "ts_valid_until_ms", "key_json"), + value_values=[(e[3], e[4], db_binary_type(e[5])) for e in entries], + desc="store_server_keys_json_multi", + ) + async def store_server_keys_json( self, server_name: str, diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 0e8270746d..a19f75061b 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py
@@ -445,7 +445,9 @@ class EventsPersistenceStorage: potentially_left_users: Set[str] = set() if not backfilled: - with Measure(self._clock, "_calculate_state_and_extrem"): + with Measure( + self._clock, "_calculate_state_and_extrem" + ), opentracing.start_active_span("_calculate_state_and_extrem"): # Work out the new "current state" for each room. # We do this by working out what the new extremities are and then # calculating the state from that.