diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_base.py | 24 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 8 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 4 | ||||
-rw-r--r-- | synapse/federation/persistence.py | 34 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 12 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 9 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 2 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 28 |
8 files changed, 53 insertions, 68 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 1e925b19e7..f7bb806ae7 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -27,8 +27,14 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.events import event_type_from_format_version from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict +from synapse.logging.context import ( + LoggingContext, + PreserveLoggingContext, + make_deferred_yieldable, + preserve_fn, +) from synapse.types import get_domain_from_id -from synapse.util import logcontext, unwrapFirstError +from synapse.util import unwrapFirstError logger = logging.getLogger(__name__) @@ -73,7 +79,7 @@ class FederationBase(object): @defer.inlineCallbacks def handle_check_result(pdu, deferred): try: - res = yield logcontext.make_deferred_yieldable(deferred) + res = yield make_deferred_yieldable(deferred) except SynapseError: res = None @@ -102,10 +108,10 @@ class FederationBase(object): defer.returnValue(res) - handle = logcontext.preserve_fn(handle_check_result) + handle = preserve_fn(handle_check_result) deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)] - valid_pdus = yield logcontext.make_deferred_yieldable( + valid_pdus = yield make_deferred_yieldable( defer.gatherResults(deferreds2, consumeErrors=True) ).addErrback(unwrapFirstError) @@ -115,7 +121,7 @@ class FederationBase(object): defer.returnValue([p for p in valid_pdus if p]) def _check_sigs_and_hash(self, room_version, pdu): - return logcontext.make_deferred_yieldable( + return make_deferred_yieldable( self._check_sigs_and_hashes(room_version, [pdu])[0] ) @@ -133,14 +139,14 @@ class FederationBase(object): * returns a redacted version of the event (if the signature matched but the hash did not) * throws a SynapseError if the signature check failed. - The deferreds run their callbacks in the sentinel logcontext. + The deferreds run their callbacks in the sentinel """ deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus) - ctx = logcontext.LoggingContext.current_context() + ctx = LoggingContext.current_context() def callback(_, pdu): - with logcontext.PreserveLoggingContext(ctx): + with PreserveLoggingContext(ctx): if not check_event_content_hash(pdu): # let's try to distinguish between failures because the event was # redacted (which are somewhat expected) vs actual ball-tampering @@ -178,7 +184,7 @@ class FederationBase(object): def errback(failure, pdu): failure.trap(SynapseError) - with logcontext.PreserveLoggingContext(ctx): + with PreserveLoggingContext(ctx): logger.warn( "Signature check failed for %s: %s", pdu.event_id, diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 3883eb525e..3cb4b94420 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -39,10 +39,10 @@ from synapse.api.room_versions import ( ) from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json -from synapse.util import logcontext, unwrapFirstError +from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.utils import log_function +from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, run_in_background -from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -207,7 +207,7 @@ class FederationClient(FederationBase): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield logcontext.make_deferred_yieldable( + pdus[:] = yield make_deferred_yieldable( defer.gatherResults( self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True ).addErrback(unwrapFirstError) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2e0cebb638..8c0a18b120 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -42,6 +42,8 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.logging.context import nested_logging_context +from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, @@ -50,8 +52,6 @@ from synapse.types import get_domain_from_id from synapse.util import glob_to_regex from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import nested_logging_context -from synapse.util.logutils import log_function # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 7535f79203..44edcabed4 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -21,9 +21,7 @@ These actions are mostly only used by the :py:mod:`.replication` module. import logging -from twisted.internet import defer - -from synapse.util.logutils import log_function +from synapse.logging.utils import log_function logger = logging.getLogger(__name__) @@ -63,33 +61,3 @@ class TransactionActions(object): return self.store.set_received_txn_response( transaction.transaction_id, origin, code, response ) - - @defer.inlineCallbacks - @log_function - def prepare_to_send(self, transaction): - """ Persists the `Transaction` we are about to send and works out the - correct value for the `prev_ids` key. - - Returns: - Deferred - """ - transaction.prev_ids = yield self.store.prep_send_transaction( - transaction.transaction_id, - transaction.destination, - transaction.origin_server_ts, - ) - - @log_function - def delivered(self, transaction, response_code, response_dict): - """ Marks the given `Transaction` as having been successfully - delivered to the remote homeserver, and what the response was. - - Returns: - Deferred - """ - return self.store.delivered_txn( - transaction.transaction_id, - transaction.destination, - response_code, - response_dict, - ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 766c5a37cd..d46f4aaeb1 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -26,6 +26,11 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu from synapse.handlers.presence import get_interested_remotes +from synapse.logging.context import ( + make_deferred_yieldable, + preserve_fn, + run_in_background, +) from synapse.metrics import ( LaterGauge, event_processing_loop_counter, @@ -33,7 +38,6 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util import logcontext from synapse.util.metrics import measure_func logger = logging.getLogger(__name__) @@ -210,10 +214,10 @@ class FederationSender(object): for event in events: events_by_room.setdefault(event.room_id, []).append(event) - yield logcontext.make_deferred_yieldable( + yield make_deferred_yieldable( defer.gatherResults( [ - logcontext.run_in_background(handle_room_events, evs) + run_in_background(handle_room_events, evs) for evs in itervalues(events_by_room) ], consumeErrors=True, @@ -360,7 +364,7 @@ class FederationSender(object): for queue in queues: queue.flush_read_receipts_for_room(room_id) - @logcontext.preserve_fn # the caller should not yield on this + @preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): """Send the new presence states to the appropriate destinations. diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index c987bb9a0d..0460a8c4ac 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -63,8 +63,6 @@ class TransactionManager(object): len(edus), ) - logger.debug("TX [%s] Persisting transaction...", destination) - transaction = Transaction.create_new( origin_server_ts=int(self.clock.time_msec()), transaction_id=txn_id, @@ -76,9 +74,6 @@ class TransactionManager(object): self._next_txn_id += 1 - yield self._transaction_actions.prepare_to_send(transaction) - - logger.debug("TX [%s] Persisted transaction", destination) logger.info( "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", destination, @@ -118,10 +113,6 @@ class TransactionManager(object): logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) - yield self._transaction_actions.delivered(transaction, code, response) - - logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) - if code == 200: for e_id, r in response.get("pdus", {}).items(): if "error" in r: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index aecd142309..1aae9ec9e7 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX -from synapse.util.logutils import log_function +from synapse.logging.utils import log_function logger = logging.getLogger(__name__) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 955f0f4308..c45d458d94 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -21,6 +21,7 @@ import re from twisted.internet import defer import synapse +import synapse.logging.opentracing as opentracing from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.room_versions import RoomVersions from synapse.api.urls import ( @@ -36,8 +37,8 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string_from_args, ) +from synapse.logging.context import run_in_background from synapse.types import ThirdPartyInstanceID, get_domain_from_id -from synapse.util.logcontext import run_in_background from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string @@ -288,14 +289,29 @@ class BaseFederationServlet(object): logger.warn("authenticate_request failed: %s", e) raise - if origin: - with ratelimiter.ratelimit(origin) as d: - yield d + # Start an opentracing span + with opentracing.start_active_span_from_context( + request.requestHeaders, + "incoming-federation-request", + tags={ + "request_id": request.get_request_id(), + opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_SERVER, + opentracing.tags.HTTP_METHOD: request.get_method(), + opentracing.tags.HTTP_URL: request.get_redacted_uri(), + opentracing.tags.PEER_HOST_IPV6: request.getClientIP(), + "authenticated_entity": origin, + }, + ): + if origin: + with ratelimiter.ratelimit(origin) as d: + yield d + response = yield func( + origin, content, request.args, *args, **kwargs + ) + else: response = yield func( origin, content, request.args, *args, **kwargs ) - else: - response = yield func(origin, content, request.args, *args, **kwargs) defer.returnValue(response) |