summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/3958.misc1
-rw-r--r--changelog.d/3959.feature1
-rw-r--r--changelog.d/3963.misc1
-rw-r--r--synapse/federation/federation_client.py8
-rw-r--r--synapse/federation/federation_server.py32
-rw-r--r--synapse/handlers/federation.py65
-rw-r--r--synapse/storage/state.py30
-rw-r--r--synapse/util/logcontext.py41
-rw-r--r--tests/storage/test_state.py39
-rw-r--r--tests/test_federation.py28
-rw-r--r--tests/util/test_logcontext.py5
11 files changed, 182 insertions, 69 deletions
diff --git a/changelog.d/3958.misc b/changelog.d/3958.misc
new file mode 100644
index 0000000000..5931d06dcf
--- /dev/null
+++ b/changelog.d/3958.misc
@@ -0,0 +1 @@
+Fix docstrings and add tests for state store methods
diff --git a/changelog.d/3959.feature b/changelog.d/3959.feature
new file mode 100644
index 0000000000..b3a4f37a8d
--- /dev/null
+++ b/changelog.d/3959.feature
@@ -0,0 +1 @@
+Include eventid in log lines when processing incoming federation transactions
\ No newline at end of file
diff --git a/changelog.d/3963.misc b/changelog.d/3963.misc
new file mode 100644
index 0000000000..f1e0eaf18e
--- /dev/null
+++ b/changelog.d/3963.misc
@@ -0,0 +1 @@
+fix docstring for FederationClient.get_state_for_room
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5a92428f56..8bf1ad0c1f 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -289,8 +289,7 @@ class FederationClient(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def get_state_for_room(self, destination, room_id, event_id):
-        """Requests all of the `current` state PDUs for a given room from
-        a remote home server.
+        """Requests all of the room state at a given event from a remote home server.
 
         Args:
             destination (str): The remote homeserver to query for the state.
@@ -298,9 +297,10 @@ class FederationClient(FederationBase):
             event_id (str): The id of the event we want the state at.
 
         Returns:
-            Deferred: Results in a list of PDUs.
+            Deferred[Tuple[List[EventBase], List[EventBase]]]:
+                A list of events in the state, and a list of events in the auth chain
+                for the given event.
         """
-
         try:
             # First we try and ask for just the IDs, as thats far quicker if
             # we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9a571e4fc7..819e8f7331 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
 from synapse.util.logutils import log_function
 
 # when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
 
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
-                try:
-                    yield self._handle_received_pdu(
-                        origin, pdu
-                    )
-                    pdu_results[event_id] = {}
-                except FederationError as e:
-                    logger.warn("Error handling PDU %s: %s", event_id, e)
-                    pdu_results[event_id] = {"error": str(e)}
-                except Exception as e:
-                    f = failure.Failure()
-                    pdu_results[event_id] = {"error": str(e)}
-                    logger.error(
-                        "Failed to handle PDU %s: %s",
-                        event_id, f.getTraceback().rstrip(),
-                    )
+                with nested_logging_context(event_id):
+                    try:
+                        yield self._handle_received_pdu(
+                            origin, pdu
+                        )
+                        pdu_results[event_id] = {}
+                    except FederationError as e:
+                        logger.warn("Error handling PDU %s: %s", event_id, e)
+                        pdu_results[event_id] = {"error": str(e)}
+                    except Exception as e:
+                        f = failure.Failure()
+                        pdu_results[event_id] = {"error": str(e)}
+                        logger.error(
+                            "Failed to handle PDU %s: %s",
+                            event_id, f.getTraceback().rstrip(),
+                        )
 
         yield concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(),
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a70ae8c830..128926e719 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -339,14 +339,18 @@ class FederationHandler(BaseHandler):
                             "[%s %s] Requesting state at missing prev_event %s",
                             room_id, event_id, p,
                         )
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, room_id, p,
+
+                        with logcontext.nested_logging_context(p):
+                            state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
                             )
-                        )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
+                            auth_chains.update(got_auth_chain)
+                            state_group = {
+                                (x.type, x.state_key): x.event_id for x in state
+                            }
+                            state_groups.append(state_group)
 
                     # Resolve any conflicting state
                     def fetch(ev_ids):
@@ -483,20 +487,21 @@ class FederationHandler(BaseHandler):
                 "[%s %s] Handling received prev_event %s",
                 room_id, event_id, ev.event_id,
             )
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    ev,
-                    sent_to_us_directly=False,
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn(
-                        "[%s %s] Received prev_event %s failed history check.",
-                        room_id, event_id, ev.event_id,
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
                     )
-                else:
-                    raise
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
     @defer.inlineCallbacks
     def _process_received_pdu(self, origin, event, state, auth_chain):
@@ -1139,7 +1144,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1585,15 +1591,22 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 4b971efdba..3f4cbd61c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_state_groups_ids(self, room_id, event_ids):
+    def get_state_groups_ids(self, _room_id, event_ids):
+        """Get the event IDs of all the state for the state groups for the given events
+
+        Args:
+            _room_id (str): id of the room for these events
+            event_ids (iterable[str]): ids of the events
+
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
+        """
         if not event_ids:
             defer.returnValue({})
 
@@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_state_ids_for_group(self, state_group):
-        """Get the state IDs for the given state group
+        """Get the event IDs of all the state in the given state group
 
         Args:
             state_group (int)
@@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     def get_state_groups(self, room_id, event_ids):
         """ Get the state groups for the given list of event_ids
 
-        The return value is a dict mapping group names to lists of events.
+        Returns:
+            Deferred[dict[int, list[EventBase]]]:
+                dict of state_group_id -> list of state events.
         """
         if not event_ids:
             defer.returnValue({})
@@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 member events (if True), or to exclude member events (if False)
 
         Returns:
-            dictionary state_group -> (dict of (type, state_key) -> event id)
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         results = {}
 
@@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            Deferred[dict[int, dict[(type, state_key), EventBase]]]
-                a dictionary mapping from state group to state dictionary.
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         if types is not None:
             non_member_types = [t for t in types if t[0] != EventTypes.Member]
@@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            Deferred[dict[int, dict[(type, state_key), EventBase]]]
-                a dictionary mapping from state group to state dictionary.
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         if types:
             types = frozenset(types)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d37610..89224b26cc 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
 
     sentinel = Sentinel()
 
-    def __init__(self, name=None, parent_context=None):
+    def __init__(self, name=None, parent_context=None, request=None):
         self.previous_context = LoggingContext.current_context()
         self.name = name
 
@@ -218,6 +218,13 @@ class LoggingContext(object):
 
         self.parent_context = parent_context
 
+        if self.parent_context is not None:
+            self.parent_context.copy_to(self)
+
+        if request is not None:
+            # the request param overrides the request from the parent context
+            self.request = request
+
     def __str__(self):
         return "%s@%x" % (self.name, id(self))
 
@@ -256,9 +263,6 @@ class LoggingContext(object):
             )
         self.alive = True
 
-        if self.parent_context is not None:
-            self.parent_context.copy_to(self)
-
         return self
 
     def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
                 )
 
 
+def nested_logging_context(suffix, parent_context=None):
+    """Creates a new logging context as a child of another.
+
+    The nested logging context will have a 'request' made up of the parent context's
+    request, plus the given suffix.
+
+    CPU/db usage stats will be added to the parent context's on exit.
+
+    Normal usage looks like:
+
+        with nested_logging_context(suffix):
+            # ... do stuff
+
+    Args:
+        suffix (str): suffix to add to the parent context's 'request'.
+        parent_context (LoggingContext|None): parent context. Will use the current context
+            if None.
+
+    Returns:
+        LoggingContext: new logging context.
+    """
+    if parent_context is None:
+        parent_context = LoggingContext.current_context()
+    return LoggingContext(
+        parent_context=parent_context,
+        request=parent_context.request + "-" + suffix,
+    )
+
+
 def preserve_fn(f):
     """Function decorator which wraps the function with run_in_background"""
     def g(*args, **kwargs):
diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index b910965932..b9c5b39d59 100644
--- a/tests/storage/test_state.py
+++ b/tests/storage/test_state.py
@@ -75,6 +75,45 @@ class StateStoreTestCase(tests.unittest.TestCase):
         self.assertEqual(len(s1), len(s2))
 
     @defer.inlineCallbacks
+    def test_get_state_groups_ids(self):
+        e1 = yield self.inject_state_event(
+            self.room, self.u_alice, EventTypes.Create, '', {}
+        )
+        e2 = yield self.inject_state_event(
+            self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
+        )
+
+        state_group_map = yield self.store.get_state_groups_ids(self.room, [e2.event_id])
+        self.assertEqual(len(state_group_map), 1)
+        state_map = list(state_group_map.values())[0]
+        self.assertDictEqual(
+            state_map,
+            {
+                (EventTypes.Create, ''): e1.event_id,
+                (EventTypes.Name, ''): e2.event_id,
+            },
+        )
+
+    @defer.inlineCallbacks
+    def test_get_state_groups(self):
+        e1 = yield self.inject_state_event(
+            self.room, self.u_alice, EventTypes.Create, '', {}
+        )
+        e2 = yield self.inject_state_event(
+            self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
+        )
+
+        state_group_map = yield self.store.get_state_groups(
+            self.room, [e2.event_id])
+        self.assertEqual(len(state_group_map), 1)
+        state_list = list(state_group_map.values())[0]
+
+        self.assertEqual(
+            {ev.event_id for ev in state_list},
+            {e1.event_id, e2.event_id},
+        )
+
+    @defer.inlineCallbacks
     def test_get_state_for_event(self):
 
         # this defaults to a linear DAG as each new injection defaults to whatever
diff --git a/tests/test_federation.py b/tests/test_federation.py
index 2540604fcc..ff55c7a627 100644
--- a/tests/test_federation.py
+++ b/tests/test_federation.py
@@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed
 from synapse.events import FrozenEvent
 from synapse.types import Requester, UserID
 from synapse.util import Clock
+from synapse.util.logcontext import LoggingContext
 
 from tests import unittest
 from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
@@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase):
             }
         )
 
-        d = self.handler.on_receive_pdu(
-            "test.serv", lying_event, sent_to_us_directly=True
-        )
+        with LoggingContext(request="lying_event"):
+            d = self.handler.on_receive_pdu(
+                "test.serv", lying_event, sent_to_us_directly=True
+            )
 
         # Step the reactor, so the database fetches come back
         self.reactor.advance(1)
@@ -209,11 +211,12 @@ class MessageAcceptTests(unittest.TestCase):
             }
         )
 
-        d = self.handler.on_receive_pdu(
-            "test.serv", good_event, sent_to_us_directly=True
-        )
-        self.reactor.advance(1)
-        self.assertEqual(self.successResultOf(d), None)
+        with LoggingContext(request="good_event"):
+            d = self.handler.on_receive_pdu(
+                "test.serv", good_event, sent_to_us_directly=True
+            )
+            self.reactor.advance(1)
+            self.assertEqual(self.successResultOf(d), None)
 
         bad_event = FrozenEvent(
             {
@@ -230,10 +233,11 @@ class MessageAcceptTests(unittest.TestCase):
             }
         )
 
-        d = self.handler.on_receive_pdu(
-            "test.serv", bad_event, sent_to_us_directly=True
-        )
-        self.reactor.advance(1)
+        with LoggingContext(request="bad_event"):
+            d = self.handler.on_receive_pdu(
+                "test.serv", bad_event, sent_to_us_directly=True
+            )
+            self.reactor.advance(1)
 
         extrem = maybeDeferred(
             self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py
index 4633db77b3..8adaee3c8d 100644
--- a/tests/util/test_logcontext.py
+++ b/tests/util/test_logcontext.py
@@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase):
             self.assertEqual(r, "bum")
             self._check_test_key("one")
 
+    def test_nested_logging_context(self):
+        with LoggingContext(request="foo"):
+            nested_context = logcontext.nested_logging_context(suffix="bar")
+            self.assertEqual(nested_context.request, "foo-bar")
+
 
 # a function which returns a deferred which has been "called", but
 # which had a function which returned another incomplete deferred on