summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py24
-rw-r--r--synapse/federation/federation_client.py8
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/persistence.py34
-rw-r--r--synapse/federation/sender/__init__.py12
-rw-r--r--synapse/federation/sender/transaction_manager.py9
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/federation/transport/server.py28
8 files changed, 53 insertions, 68 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 1e925b19e7..f7bb806ae7 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -27,8 +27,14 @@ from synapse.crypto.event_signing import check_event_content_hash
 from synapse.events import event_type_from_format_version
 from synapse.events.utils import prune_event
 from synapse.http.servlet import assert_params_in_dict
+from synapse.logging.context import (
+    LoggingContext,
+    PreserveLoggingContext,
+    make_deferred_yieldable,
+    preserve_fn,
+)
 from synapse.types import get_domain_from_id
-from synapse.util import logcontext, unwrapFirstError
+from synapse.util import unwrapFirstError
 
 logger = logging.getLogger(__name__)
 
@@ -73,7 +79,7 @@ class FederationBase(object):
         @defer.inlineCallbacks
         def handle_check_result(pdu, deferred):
             try:
-                res = yield logcontext.make_deferred_yieldable(deferred)
+                res = yield make_deferred_yieldable(deferred)
             except SynapseError:
                 res = None
 
@@ -102,10 +108,10 @@ class FederationBase(object):
 
             defer.returnValue(res)
 
-        handle = logcontext.preserve_fn(handle_check_result)
+        handle = preserve_fn(handle_check_result)
         deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
 
-        valid_pdus = yield logcontext.make_deferred_yieldable(
+        valid_pdus = yield make_deferred_yieldable(
             defer.gatherResults(deferreds2, consumeErrors=True)
         ).addErrback(unwrapFirstError)
 
@@ -115,7 +121,7 @@ class FederationBase(object):
             defer.returnValue([p for p in valid_pdus if p])
 
     def _check_sigs_and_hash(self, room_version, pdu):
-        return logcontext.make_deferred_yieldable(
+        return make_deferred_yieldable(
             self._check_sigs_and_hashes(room_version, [pdu])[0]
         )
 
@@ -133,14 +139,14 @@ class FederationBase(object):
               * returns a redacted version of the event (if the signature
                 matched but the hash did not)
               * throws a SynapseError if the signature check failed.
-            The deferreds run their callbacks in the sentinel logcontext.
+            The deferreds run their callbacks in the sentinel
         """
         deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
 
-        ctx = logcontext.LoggingContext.current_context()
+        ctx = LoggingContext.current_context()
 
         def callback(_, pdu):
-            with logcontext.PreserveLoggingContext(ctx):
+            with PreserveLoggingContext(ctx):
                 if not check_event_content_hash(pdu):
                     # let's try to distinguish between failures because the event was
                     # redacted (which are somewhat expected) vs actual ball-tampering
@@ -178,7 +184,7 @@ class FederationBase(object):
 
         def errback(failure, pdu):
             failure.trap(SynapseError)
-            with logcontext.PreserveLoggingContext(ctx):
+            with PreserveLoggingContext(ctx):
                 logger.warn(
                     "Signature check failed for %s: %s",
                     pdu.event_id,
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3883eb525e..3cb4b94420 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -39,10 +39,10 @@ from synapse.api.room_versions import (
 )
 from synapse.events import builder, room_version_to_event_format
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
-from synapse.util import logcontext, unwrapFirstError
+from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.utils import log_function
+from synapse.util import unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
@@ -207,7 +207,7 @@ class FederationClient(FederationBase):
         ]
 
         # FIXME: We should handle signature failures more gracefully.
-        pdus[:] = yield logcontext.make_deferred_yieldable(
+        pdus[:] = yield make_deferred_yieldable(
             defer.gatherResults(
                 self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
             ).addErrback(unwrapFirstError)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2e0cebb638..8c0a18b120 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -42,6 +42,8 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
+from synapse.logging.context import nested_logging_context
+from synapse.logging.utils import log_function
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
     ReplicationGetQueryRestServlet,
@@ -50,8 +52,6 @@ from synapse.types import get_domain_from_id
 from synapse.util import glob_to_regex
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import nested_logging_context
-from synapse.util.logutils import log_function
 
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 7535f79203..44edcabed4 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -21,9 +21,7 @@ These actions are mostly only used by the :py:mod:`.replication` module.
 
 import logging
 
-from twisted.internet import defer
-
-from synapse.util.logutils import log_function
+from synapse.logging.utils import log_function
 
 logger = logging.getLogger(__name__)
 
@@ -63,33 +61,3 @@ class TransactionActions(object):
         return self.store.set_received_txn_response(
             transaction.transaction_id, origin, code, response
         )
-
-    @defer.inlineCallbacks
-    @log_function
-    def prepare_to_send(self, transaction):
-        """ Persists the `Transaction` we are about to send and works out the
-        correct value for the `prev_ids` key.
-
-        Returns:
-            Deferred
-        """
-        transaction.prev_ids = yield self.store.prep_send_transaction(
-            transaction.transaction_id,
-            transaction.destination,
-            transaction.origin_server_ts,
-        )
-
-    @log_function
-    def delivered(self, transaction, response_code, response_dict):
-        """ Marks the given `Transaction` as having been successfully
-        delivered to the remote homeserver, and what the response was.
-
-        Returns:
-            Deferred
-        """
-        return self.store.delivered_txn(
-            transaction.transaction_id,
-            transaction.destination,
-            response_code,
-            response_dict,
-        )
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 766c5a37cd..d46f4aaeb1 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -26,6 +26,11 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
 from synapse.federation.units import Edu
 from synapse.handlers.presence import get_interested_remotes
+from synapse.logging.context import (
+    make_deferred_yieldable,
+    preserve_fn,
+    run_in_background,
+)
 from synapse.metrics import (
     LaterGauge,
     event_processing_loop_counter,
@@ -33,7 +38,6 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util import logcontext
 from synapse.util.metrics import measure_func
 
 logger = logging.getLogger(__name__)
@@ -210,10 +214,10 @@ class FederationSender(object):
                 for event in events:
                     events_by_room.setdefault(event.room_id, []).append(event)
 
-                yield logcontext.make_deferred_yieldable(
+                yield make_deferred_yieldable(
                     defer.gatherResults(
                         [
-                            logcontext.run_in_background(handle_room_events, evs)
+                            run_in_background(handle_room_events, evs)
                             for evs in itervalues(events_by_room)
                         ],
                         consumeErrors=True,
@@ -360,7 +364,7 @@ class FederationSender(object):
         for queue in queues:
             queue.flush_read_receipts_for_room(room_id)
 
-    @logcontext.preserve_fn  # the caller should not yield on this
+    @preserve_fn  # the caller should not yield on this
     @defer.inlineCallbacks
     def send_presence(self, states):
         """Send the new presence states to the appropriate destinations.
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index c987bb9a0d..0460a8c4ac 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -63,8 +63,6 @@ class TransactionManager(object):
             len(edus),
         )
 
-        logger.debug("TX [%s] Persisting transaction...", destination)
-
         transaction = Transaction.create_new(
             origin_server_ts=int(self.clock.time_msec()),
             transaction_id=txn_id,
@@ -76,9 +74,6 @@ class TransactionManager(object):
 
         self._next_txn_id += 1
 
-        yield self._transaction_actions.prepare_to_send(transaction)
-
-        logger.debug("TX [%s] Persisted transaction", destination)
         logger.info(
             "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
             destination,
@@ -118,10 +113,6 @@ class TransactionManager(object):
 
         logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
 
-        yield self._transaction_actions.delivered(transaction, code, response)
-
-        logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
-
         if code == 200:
             for e_id, r in response.get("pdus", {}).items():
                 if "error" in r:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index aecd142309..1aae9ec9e7 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import Membership
 from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
-from synapse.util.logutils import log_function
+from synapse.logging.utils import log_function
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 955f0f4308..c45d458d94 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -21,6 +21,7 @@ import re
 from twisted.internet import defer
 
 import synapse
+import synapse.logging.opentracing as opentracing
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import (
@@ -36,8 +37,8 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string_from_args,
 )
+from synapse.logging.context import run_in_background
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
-from synapse.util.logcontext import run_in_background
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
 
@@ -288,14 +289,29 @@ class BaseFederationServlet(object):
                 logger.warn("authenticate_request failed: %s", e)
                 raise
 
-            if origin:
-                with ratelimiter.ratelimit(origin) as d:
-                    yield d
+            # Start an opentracing span
+            with opentracing.start_active_span_from_context(
+                request.requestHeaders,
+                "incoming-federation-request",
+                tags={
+                    "request_id": request.get_request_id(),
+                    opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_SERVER,
+                    opentracing.tags.HTTP_METHOD: request.get_method(),
+                    opentracing.tags.HTTP_URL: request.get_redacted_uri(),
+                    opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
+                    "authenticated_entity": origin,
+                },
+            ):
+                if origin:
+                    with ratelimiter.ratelimit(origin) as d:
+                        yield d
+                        response = yield func(
+                            origin, content, request.args, *args, **kwargs
+                        )
+                else:
                     response = yield func(
                         origin, content, request.args, *args, **kwargs
                     )
-            else:
-                response = yield func(origin, content, request.args, *args, **kwargs)
 
             defer.returnValue(response)