diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 9e9b1c1c86..0860411218 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -43,7 +43,11 @@ from synapse.api.errors import (
from synapse.config.key import TrustedKeyServer
from synapse.events import EventBase
from synapse.events.utils import prune_event_dict
-from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.context import (
+ defer_to_thread,
+ make_deferred_yieldable,
+ run_in_background,
+)
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict
from synapse.util import unwrapFirstError
@@ -161,6 +165,7 @@ class Keyring:
self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None
):
self.clock = hs.get_clock()
+ self.reactor = hs.get_reactor()
if key_fetchers is None:
key_fetchers = (
@@ -288,7 +293,9 @@ class Keyring:
verify_key = key_result.verify_key
json_object = verify_request.get_json_object()
try:
- verify_signed_json(
+ await defer_to_thread(
+ self.reactor,
+ verify_signed_json,
json_object,
verify_request.server_name,
verify_key,
@@ -544,22 +551,18 @@ class BaseV2KeyFetcher(KeyFetcher):
key_json_bytes = encode_canonical_json(response_json)
- await make_deferred_yieldable(
- defer.gatherResults(
- [
- run_in_background(
- self.store.store_server_keys_json,
- server_name=server_name,
- key_id=key_id,
- from_server=from_server,
- ts_now_ms=time_added_ms,
- ts_expires_ms=ts_valid_until_ms,
- key_json_bytes=key_json_bytes,
- )
- for key_id in verify_keys
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ await self.store.store_server_keys_json_multi(
+ [
+ (
+ server_name,
+ key_id,
+ from_server,
+ time_added_ms,
+ ts_valid_until_ms,
+ key_json_bytes,
+ )
+ for key_id in verify_keys
+ ],
)
return verify_keys
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 1416abd0fb..2dc1a2397d 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -56,6 +56,7 @@ from synapse.api.room_versions import (
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.transport.client import SendJoinResponse
+from synapse.logging.opentracing import start_active_span
from synapse.logging.utils import log_function
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
@@ -754,7 +755,8 @@ class FederationClient(FederationBase):
"""
async def send_request(destination) -> SendJoinResult:
- response = await self._do_send_join(room_version, destination, pdu)
+ with start_active_span("_do_send_join"):
+ response = await self._do_send_join(room_version, destination, pdu)
# If an event was returned (and expected to be returned):
#
@@ -814,19 +816,21 @@ class FederationClient(FederationBase):
valid_pdus_map: Dict[str, EventBase] = {}
async def _execute(pdu: EventBase) -> None:
- valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
- pdu=pdu,
- origin=destination,
- outlier=True,
- room_version=room_version,
- )
+ with start_active_span("_check_sigs_and_hash_and_fetch_one"):
+ valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
+ pdu=pdu,
+ origin=destination,
+ outlier=True,
+ room_version=room_version,
+ )
if valid_pdu:
valid_pdus_map[valid_pdu.event_id] = valid_pdu
- await concurrently_execute(
- _execute, itertools.chain(state, auth_chain), 10000
- )
+ with start_active_span("check_sigs"):
+ await concurrently_execute(
+ _execute, itertools.chain(state, auth_chain), 10000
+ )
# NB: We *need* to copy to ensure that we don't have multiple
# references being passed on, as that causes... issues.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 77df9185f6..bc17b45b27 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -52,6 +52,7 @@ from synapse.logging.context import (
preserve_fn,
run_in_background,
)
+from synapse.logging.opentracing import start_active_span
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -452,14 +453,15 @@ class FederationHandler(BaseHandler):
logger.debug("Joining %s to %s", joinee, room_id)
- origin, event, room_version_obj = await self._make_and_verify_event(
- target_hosts,
- room_id,
- joinee,
- "join",
- content,
- params={"ver": KNOWN_ROOM_VERSIONS},
- )
+ with start_active_span("make_join"):
+ origin, event, room_version_obj = await self._make_and_verify_event(
+ target_hosts,
+ room_id,
+ joinee,
+ "join",
+ content,
+ params={"ver": KNOWN_ROOM_VERSIONS},
+ )
# This shouldn't happen, because the RoomMemberHandler has a
# linearizer lock which only allows one operation per user per room
@@ -480,9 +482,10 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- ret = await self.federation_client.send_join(
- host_list, event, room_version_obj
- )
+ with start_active_span("send_join"):
+ ret = await self.federation_client.send_join(
+ host_list, event, room_version_obj
+ )
event = ret.event
origin = ret.origin
@@ -510,9 +513,10 @@ class FederationHandler(BaseHandler):
auth_events=auth_chain,
)
- max_stream_id = await self._persist_auth_tree(
- origin, room_id, auth_chain, state, event, room_version_obj
- )
+ with start_active_span("_persist_auth_tree"):
+ max_stream_id = await self._persist_auth_tree(
+ origin, room_id, auth_chain, state, event, room_version_obj
+ )
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
@@ -1139,51 +1143,54 @@ class FederationHandler(BaseHandler):
if e_id not in event_map:
missing_auth_events.add(e_id)
- for e_id in missing_auth_events:
- m_ev = await self.federation_client.get_pdu(
- [origin],
- e_id,
- room_version=room_version,
- outlier=True,
- timeout=10000,
- )
- if m_ev and m_ev.event_id == e_id:
- event_map[e_id] = m_ev
- else:
- logger.info("Failed to find auth event %r", e_id)
-
- for e in itertools.chain(auth_events, state, [event]):
- auth_for_e = {
- (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
- for e_id in e.auth_event_ids()
- if e_id in event_map
- }
- if create_event:
- auth_for_e[(EventTypes.Create, "")] = create_event
+ with start_active_span("fetching.missing_auth_events"):
+ for e_id in missing_auth_events:
+ m_ev = await self.federation_client.get_pdu(
+ [origin],
+ e_id,
+ room_version=room_version,
+ outlier=True,
+ timeout=10000,
+ )
+ if m_ev and m_ev.event_id == e_id:
+ event_map[e_id] = m_ev
+ else:
+ logger.info("Failed to find auth event %r", e_id)
+
+ with start_active_span("authing_events"):
+ for e in itertools.chain(auth_events, state, [event]):
+ auth_for_e = {
+ (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
+ for e_id in e.auth_event_ids()
+ if e_id in event_map
+ }
+ if create_event:
+ auth_for_e[(EventTypes.Create, "")] = create_event
- try:
- event_auth.check(room_version, e, auth_events=auth_for_e)
- except SynapseError as err:
- # we may get SynapseErrors here as well as AuthErrors. For
- # instance, there are a couple of (ancient) events in some
- # rooms whose senders do not have the correct sigil; these
- # cause SynapseErrors in auth.check. We don't want to give up
- # the attempt to federate altogether in such cases.
+ try:
+ event_auth.check(room_version, e, auth_events=auth_for_e)
+ except SynapseError as err:
+ # we may get SynapseErrors here as well as AuthErrors. For
+ # instance, there are a couple of (ancient) events in some
+ # rooms whose senders do not have the correct sigil; these
+ # cause SynapseErrors in auth.check. We don't want to give up
+ # the attempt to federate altogether in such cases.
- logger.warning("Rejecting %s because %s", e.event_id, err.msg)
+ logger.warning("Rejecting %s because %s", e.event_id, err.msg)
- if e == event:
- raise
- events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
+ if e == event:
+ raise
+ events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
if auth_events or state:
- await self._federation_event_handler.persist_events_and_notify(
- room_id,
- [
- (e, events_to_context[e.event_id])
- for e in itertools.chain(auth_events, state)
- ],
- )
+ with start_active_span("persist_events_and_notify.state"):
+ await self._federation_event_handler.persist_events_and_notify(
+ room_id,
+ [
+ (e, events_to_context[e.event_id])
+ for e in itertools.chain(auth_events, state)
+ ],
+ )
new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index f07e288056..6a30aa6f81 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -16,6 +16,7 @@
import itertools
import logging
from collections import OrderedDict, namedtuple
+from synapse.logging.opentracing import start_active_span
from typing import (
TYPE_CHECKING,
Any,
@@ -382,7 +383,8 @@ class PersistEventsStore:
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
- self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
+ with start_active_span("_persist_event_auth_chain_txn"):
+ self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
@@ -393,12 +395,13 @@ class PersistEventsStore:
# From this point onwards the events are only ones that weren't
# rejected.
- self._update_metadata_tables_txn(
- txn,
- events_and_contexts=events_and_contexts,
- all_events_and_contexts=all_events_and_contexts,
- backfilled=backfilled,
- )
+ with start_active_span("_update_metadata_tables_txn"):
+ self._update_metadata_tables_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
+ backfilled=backfilled,
+ )
# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index 6990f3ed1d..1f01a5df0a 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -138,6 +138,19 @@ class KeyStore(SQLBaseStore):
for i in invalidations:
invalidate((i,))
+ async def store_server_keys_json_multi(
+ self,
+ entries: List[Tuple[str, str, str, int, int, bytes]],
+ ):
+ await self.db_pool.simple_upsert_many(
+ table="server_keys_json",
+ key_names=("server_name", "key_id", "from_server"),
+ key_values=[e[:3] for e in entries],
+ value_names=("ts_added_ms", "ts_valid_until_ms", "key_json"),
+ value_values=[(e[3], e[4], db_binary_type(e[5])) for e in entries],
+ desc="store_server_keys_json_multi",
+ )
+
async def store_server_keys_json(
self,
server_name: str,
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 0e8270746d..a19f75061b 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -445,7 +445,9 @@ class EventsPersistenceStorage:
potentially_left_users: Set[str] = set()
if not backfilled:
- with Measure(self._clock, "_calculate_state_and_extrem"):
+ with Measure(
+ self._clock, "_calculate_state_and_extrem"
+ ), opentracing.start_active_span("_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
|