diff --git a/.circleci/config.yml b/.circleci/config.yml
index 605430fb3f..521aca22ef 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -99,23 +99,35 @@ workflows:
version: 2
build:
jobs:
- - sytestpy2
- - sytestpy2postgres
- - sytestpy3
- - sytestpy3postgres
+ - sytestpy2:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
+ - sytestpy2postgres:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
+ - sytestpy3:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
+ - sytestpy3postgres:
+ filters:
+ branches:
+ only: /develop|master|release-.*/
- sytestpy2merged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
- sytestpy2postgresmerged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
- sytestpy3merged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
- sytestpy3postgresmerged:
filters:
branches:
- ignore: /develop|master/
+ ignore: /develop|master|release-.*/
diff --git a/changelog.d/3957.misc b/changelog.d/3957.misc
new file mode 100644
index 0000000000..69d647f119
--- /dev/null
+++ b/changelog.d/3957.misc
@@ -0,0 +1 @@
+CircleCI now only runs merged jobs on PRs, and commit jobs on develop, master, and release branches.
diff --git a/changelog.d/3958.misc b/changelog.d/3958.misc
new file mode 100644
index 0000000000..5931d06dcf
--- /dev/null
+++ b/changelog.d/3958.misc
@@ -0,0 +1 @@
+Fix docstrings and add tests for state store methods
diff --git a/changelog.d/3959.feature b/changelog.d/3959.feature
new file mode 100644
index 0000000000..b3a4f37a8d
--- /dev/null
+++ b/changelog.d/3959.feature
@@ -0,0 +1 @@
+Include eventid in log lines when processing incoming federation transactions
\ No newline at end of file
diff --git a/changelog.d/3963.misc b/changelog.d/3963.misc
new file mode 100644
index 0000000000..f1e0eaf18e
--- /dev/null
+++ b/changelog.d/3963.misc
@@ -0,0 +1 @@
+fix docstring for FederationClient.get_state_for_room
diff --git a/changelog.d/3966.misc b/changelog.d/3966.misc
new file mode 100644
index 0000000000..1e3c8e1706
--- /dev/null
+++ b/changelog.d/3966.misc
@@ -0,0 +1 @@
+Improve the logging when handling a federation transaction
\ No newline at end of file
diff --git a/scripts-dev/next_github_number.sh b/scripts-dev/next_github_number.sh
new file mode 100755
index 0000000000..376280025a
--- /dev/null
+++ b/scripts-dev/next_github_number.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+set -e
+
+# Fetch the current GitHub issue number, add one to it -- presto! The likely
+# next PR number.
+CURRENT_NUMBER=`curl -s "https://api.github.com/repos/matrix-org/synapse/issues?state=all&per_page=1" | jq -r ".[0].number"`
+CURRENT_NUMBER=$((CURRENT_NUMBER+1))
+echo $CURRENT_NUMBER
\ No newline at end of file
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5a92428f56..8bf1ad0c1f 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -289,8 +289,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
- """Requests all of the `current` state PDUs for a given room from
- a remote home server.
+ """Requests all of the room state at a given event from a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
@@ -298,9 +297,10 @@ class FederationClient(FederationBase):
event_id (str): The id of the event we want the state at.
Returns:
- Deferred: Results in a list of PDUs.
+ Deferred[Tuple[List[EventBase], List[EventBase]]]:
+ A list of events in the state, and a list of events in the auth chain
+ for the given event.
"""
-
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9a571e4fc7..819e8f7331 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- try:
- yield self._handle_received_pdu(
- origin, pdu
- )
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warn("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s: %s",
- event_id, f.getTraceback().rstrip(),
- )
+ with nested_logging_context(event_id):
+ try:
+ yield self._handle_received_pdu(
+ origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s: %s",
+ event_id, f.getTraceback().rstrip(),
+ )
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2ccdc3bfa7..128926e719 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -339,14 +339,18 @@ class FederationHandler(BaseHandler):
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
- state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, room_id, p,
+
+ with logcontext.nested_logging_context(p):
+ state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, room_id, p,
+ )
)
- )
- auth_chains.update(got_auth_chain)
- state_group = {(x.type, x.state_key): x.event_id for x in state}
- state_groups.append(state_group)
+ auth_chains.update(got_auth_chain)
+ state_group = {
+ (x.type, x.state_key): x.event_id for x in state
+ }
+ state_groups.append(state_group)
# Resolve any conflicting state
def fetch(ev_ids):
@@ -483,20 +487,21 @@ class FederationHandler(BaseHandler):
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
- try:
- yield self.on_receive_pdu(
- origin,
- ev,
- sent_to_us_directly=False,
- )
- except FederationError as e:
- if e.code == 403:
- logger.warn(
- "[%s %s] Received prev_event %s failed history check.",
- room_id, event_id, ev.event_id,
+ with logcontext.nested_logging_context(ev.event_id):
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ ev,
+ sent_to_us_directly=False,
)
- else:
- raise
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
+ else:
+ raise
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
@@ -572,6 +577,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
+ logger.info(
+ "[%s %s] persisting newly-received auth/state events %s",
+ room_id, event_id, [e["event"].event_id for e in event_infos]
+ )
yield self._handle_new_events(origin, event_infos)
try:
@@ -1135,7 +1144,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
- yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+ with logcontext.nested_logging_context(p.event_id):
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@@ -1581,15 +1591,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
- contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.run_in_background(
- self._prep_event,
+
+ @defer.inlineCallbacks
+ def prep(ev_info):
+ event = ev_info["event"]
+ with logcontext.nested_logging_context(suffix=event.event_id):
+ res = yield self._prep_event(
origin,
- ev_info["event"],
+ event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
+ defer.returnValue(res)
+
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index c95477d318..7a7157b352 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
+ needed_event_count = len(needed_events)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d conflicted events", len(needed_events))
+ logger.info(
+ "Asking for %d/%d conflicted events",
+ len(needed_events),
+ needed_event_count,
+ )
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
@@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
)
new_needed_events = set(itervalues(auth_events))
+ new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d auth events", len(new_needed_events))
+ logger.info(
+ "Asking for %d/%d auth events",
+ len(new_needed_events),
+ new_needed_event_count,
+ )
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 4b971efdba..3f4cbd61c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
- def get_state_groups_ids(self, room_id, event_ids):
+ def get_state_groups_ids(self, _room_id, event_ids):
+ """Get the event IDs of all the state for the state groups for the given events
+
+ Args:
+ _room_id (str): id of the room for these events
+ event_ids (iterable[str]): ids of the events
+
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ """
if not event_ids:
defer.returnValue({})
@@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
- """Get the state IDs for the given state group
+ """Get the event IDs of all the state in the given state group
Args:
state_group (int)
@@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
- The return value is a dict mapping group names to lists of events.
+ Returns:
+ Deferred[dict[int, list[EventBase]]]:
+ dict of state_group_id -> list of state events.
"""
if not event_ids:
defer.returnValue({})
@@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
member events (if True), or to exclude member events (if False)
Returns:
- dictionary state_group -> (dict of (type, state_key) -> event id)
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
results = {}
@@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types is not None:
non_member_types = [t for t in types if t[0] != EventTypes.Member]
@@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types:
types = frozenset(types)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d37610..89224b26cc 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
sentinel = Sentinel()
- def __init__(self, name=None, parent_context=None):
+ def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name
@@ -218,6 +218,13 @@ class LoggingContext(object):
self.parent_context = parent_context
+ if self.parent_context is not None:
+ self.parent_context.copy_to(self)
+
+ if request is not None:
+ # the request param overrides the request from the parent context
+ self.request = request
+
def __str__(self):
return "%s@%x" % (self.name, id(self))
@@ -256,9 +263,6 @@ class LoggingContext(object):
)
self.alive = True
- if self.parent_context is not None:
- self.parent_context.copy_to(self)
-
return self
def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
)
+def nested_logging_context(suffix, parent_context=None):
+ """Creates a new logging context as a child of another.
+
+ The nested logging context will have a 'request' made up of the parent context's
+ request, plus the given suffix.
+
+ CPU/db usage stats will be added to the parent context's on exit.
+
+ Normal usage looks like:
+
+ with nested_logging_context(suffix):
+ # ... do stuff
+
+ Args:
+ suffix (str): suffix to add to the parent context's 'request'.
+ parent_context (LoggingContext|None): parent context. Will use the current context
+ if None.
+
+ Returns:
+ LoggingContext: new logging context.
+ """
+ if parent_context is None:
+ parent_context = LoggingContext.current_context()
+ return LoggingContext(
+ parent_context=parent_context,
+ request=parent_context.request + "-" + suffix,
+ )
+
+
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):
diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index b910965932..b9c5b39d59 100644
--- a/tests/storage/test_state.py
+++ b/tests/storage/test_state.py
@@ -75,6 +75,45 @@ class StateStoreTestCase(tests.unittest.TestCase):
self.assertEqual(len(s1), len(s2))
@defer.inlineCallbacks
+ def test_get_state_groups_ids(self):
+ e1 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Create, '', {}
+ )
+ e2 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
+ )
+
+ state_group_map = yield self.store.get_state_groups_ids(self.room, [e2.event_id])
+ self.assertEqual(len(state_group_map), 1)
+ state_map = list(state_group_map.values())[0]
+ self.assertDictEqual(
+ state_map,
+ {
+ (EventTypes.Create, ''): e1.event_id,
+ (EventTypes.Name, ''): e2.event_id,
+ },
+ )
+
+ @defer.inlineCallbacks
+ def test_get_state_groups(self):
+ e1 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Create, '', {}
+ )
+ e2 = yield self.inject_state_event(
+ self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
+ )
+
+ state_group_map = yield self.store.get_state_groups(
+ self.room, [e2.event_id])
+ self.assertEqual(len(state_group_map), 1)
+ state_list = list(state_group_map.values())[0]
+
+ self.assertEqual(
+ {ev.event_id for ev in state_list},
+ {e1.event_id, e2.event_id},
+ )
+
+ @defer.inlineCallbacks
def test_get_state_for_event(self):
# this defaults to a linear DAG as each new injection defaults to whatever
diff --git a/tests/test_federation.py b/tests/test_federation.py
index 2540604fcc..ff55c7a627 100644
--- a/tests/test_federation.py
+++ b/tests/test_federation.py
@@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed
from synapse.events import FrozenEvent
from synapse.types import Requester, UserID
from synapse.util import Clock
+from synapse.util.logcontext import LoggingContext
from tests import unittest
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
@@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase):
}
)
- d = self.handler.on_receive_pdu(
- "test.serv", lying_event, sent_to_us_directly=True
- )
+ with LoggingContext(request="lying_event"):
+ d = self.handler.on_receive_pdu(
+ "test.serv", lying_event, sent_to_us_directly=True
+ )
# Step the reactor, so the database fetches come back
self.reactor.advance(1)
@@ -209,11 +211,12 @@ class MessageAcceptTests(unittest.TestCase):
}
)
- d = self.handler.on_receive_pdu(
- "test.serv", good_event, sent_to_us_directly=True
- )
- self.reactor.advance(1)
- self.assertEqual(self.successResultOf(d), None)
+ with LoggingContext(request="good_event"):
+ d = self.handler.on_receive_pdu(
+ "test.serv", good_event, sent_to_us_directly=True
+ )
+ self.reactor.advance(1)
+ self.assertEqual(self.successResultOf(d), None)
bad_event = FrozenEvent(
{
@@ -230,10 +233,11 @@ class MessageAcceptTests(unittest.TestCase):
}
)
- d = self.handler.on_receive_pdu(
- "test.serv", bad_event, sent_to_us_directly=True
- )
- self.reactor.advance(1)
+ with LoggingContext(request="bad_event"):
+ d = self.handler.on_receive_pdu(
+ "test.serv", bad_event, sent_to_us_directly=True
+ )
+ self.reactor.advance(1)
extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py
index 4633db77b3..8adaee3c8d 100644
--- a/tests/util/test_logcontext.py
+++ b/tests/util/test_logcontext.py
@@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase):
self.assertEqual(r, "bum")
self._check_test_key("one")
+ def test_nested_logging_context(self):
+ with LoggingContext(request="foo"):
+ nested_context = logcontext.nested_logging_context(suffix="bar")
+ self.assertEqual(nested_context.request, "foo-bar")
+
# a function which returns a deferred which has been "called", but
# which had a function which returned another incomplete deferred on
|