diff options
-rw-r--r-- | synapse/federation/federation_server.py | 5 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 2 | ||||
-rw-r--r-- | synapse/groups/groups_server.py | 2 | ||||
-rw-r--r-- | synapse/handlers/groups_local.py | 34 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 4 | ||||
-rw-r--r-- | synapse/storage/group_server.py | 14 | ||||
-rw-r--r-- | synapse/util/async.py | 24 | ||||
-rw-r--r-- | synapse/util/logformatter.py | 51 | ||||
-rw-r--r-- | tests/util/test_linearizer.py | 28 |
9 files changed, 135 insertions, 29 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index f00d59e701..a8034bddc6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -211,8 +211,9 @@ class FederationServer(FederationBase): edu.content ) - for failure in getattr(transaction, "pdu_failures", []): - logger.info("Got failure %r", failure) + pdu_failures = getattr(transaction, "pdu_failures", []) + for failure in pdu_failures: + logger.info("Got failure %r", failure) response = { "pdus": pdu_results, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 36f6eb75e9..f96561c1fe 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -572,7 +572,7 @@ class TransportLayerClient(object): return self.client.post_json( destination=destination, path=path, - args=requester_user_id, + args={"requester_user_id": requester_user_id}, data=content, ignore_backoff=True, ) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 6a85908dd6..1083bc2990 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -400,7 +400,7 @@ class GroupsServerHandler(object): if not is_public: entry["is_public"] = False - if not self.is_mine_id(requester_user_id): + if not self.is_mine_id(g_user_id): attestation = yield self.store.get_remote_attestation(group_id, g_user_id) if not attestation: continue diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index a2bacbfc38..3b676d46bd 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -102,18 +102,22 @@ class GroupsLocalHandler(object): get_domain_from_id(group_id), group_id, requester_user_id, ) + group_server_name = get_domain_from_id(group_id) + # Loop through the users and validate the attestations. chunk = res["users_section"]["users"] valid_users = [] for entry in chunk: g_user_id = entry["user_id"] - attestation = entry.pop("attestation") + attestation = entry.pop("attestation", {}) try: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - ) + if get_domain_from_id(g_user_id) != group_server_name: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + server_name=get_domain_from_id(g_user_id), + ) valid_users.append(entry) except Exception as e: logger.info("Failed to verify user is in group: %s", e) @@ -160,6 +164,7 @@ class GroupsLocalHandler(object): remote_attestation, group_id=group_id, user_id=user_id, + server_name=get_domain_from_id(group_id), ) is_publicised = content.get("publicise", False) @@ -187,6 +192,8 @@ class GroupsLocalHandler(object): ) defer.returnValue(res) + group_server_name = get_domain_from_id(group_id) + res = yield self.transport_client.get_users_in_group( get_domain_from_id(group_id), group_id, requester_user_id, ) @@ -195,13 +202,15 @@ class GroupsLocalHandler(object): valid_entries = [] for entry in chunk: g_user_id = entry["user_id"] - attestation = entry.pop("attestation") + attestation = entry.pop("attestation", {}) try: - yield self.attestations.verify_attestation( - attestation, - group_id=group_id, - user_id=g_user_id, - ) + if get_domain_from_id(g_user_id) != group_server_name: + yield self.attestations.verify_attestation( + attestation, + group_id=group_id, + user_id=g_user_id, + server_name=get_domain_from_id(g_user_id), + ) valid_entries.append(entry) except Exception as e: logger.info("Failed to verify user is in group: %s", e) @@ -240,6 +249,7 @@ class GroupsLocalHandler(object): remote_attestation, group_id=group_id, user_id=user_id, + server_name=get_domain_from_id(group_id), ) # TODO: Check that the group is public and we're being added publically diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index e1cd3a48e9..0525765272 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util import logcontext from ._base import BaseHandler @@ -59,6 +60,8 @@ class ReceiptsHandler(BaseHandler): is_new = yield self._handle_new_receipts([receipt]) if is_new: + # fire off a process in the background to send the receipt to + # remote servers self._push_remotes([receipt]) @defer.inlineCallbacks @@ -126,6 +129,7 @@ class ReceiptsHandler(BaseHandler): defer.returnValue(True) + @logcontext.preserve_fn # caller should not yield on this @defer.inlineCallbacks def _push_remotes(self, receipts): """Given a list of receipts, works out which remote servers should be diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 4fe9172adc..3af372de59 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -1172,13 +1172,13 @@ class GroupServerStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (from_token, to_token, limit,)) - return [{ - "stream_id": stream_id, - "group_id": group_id, - "user_id": user_id, - "type": gtype, - "content": json.loads(content_json), - } for stream_id, group_id, user_id, gtype, content_json in txn] + return [( + stream_id, + group_id, + user_id, + gtype, + json.loads(content_json), + ) for stream_id, group_id, user_id, gtype, content_json in txn] return self.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn, ) diff --git a/synapse/util/async.py b/synapse/util/async.py index bb252f75d7..0fd5b42523 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -203,7 +203,26 @@ class Linearizer(object): except: logger.exception("Unexpected exception in Linearizer") - logger.info("Acquired linearizer lock %r for key %r", self.name, key) + logger.info("Acquired linearizer lock %r for key %r", self.name, + key) + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (There's no particular need for it to happen before we return + # the context manager, but it needs to happen while we hold the + # lock, and the context manager's exit code must be synchronous, + # so actually this is the only sensible place. + yield run_on_reactor() + + else: + logger.info("Acquired uncontended linearizer lock %r for key %r", + self.name, key) @contextmanager def _ctx_manager(): @@ -211,7 +230,8 @@ class Linearizer(object): yield finally: logger.info("Releasing linearizer lock %r for key %r", self.name, key) - new_defer.callback(None) + with PreserveLoggingContext(): + new_defer.callback(None) current_d = self.key_to_defer.get(key) if current_d is new_defer: self.key_to_defer.pop(key, None) diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py new file mode 100644 index 0000000000..cdbc4bffd7 --- /dev/null +++ b/synapse/util/logformatter.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import StringIO +import logging +import traceback + + +class LogFormatter(logging.Formatter): + """Log formatter which gives more detail for exceptions + + This is the same as the standard log formatter, except that when logging + exceptions [typically via log.foo("msg", exc_info=1)], it prints the + sequence that led up to the point at which the exception was caught. + (Normally only stack frames between the point the exception was raised and + where it was caught are logged). + """ + def __init__(self, *args, **kwargs): + super(LogFormatter, self).__init__(*args, **kwargs) + + def formatException(self, ei): + sio = StringIO.StringIO() + (typ, val, tb) = ei + + # log the stack above the exception capture point if possible, but + # check that we actually have an f_back attribute to work around + # https://twistedmatrix.com/trac/ticket/9305 + + if tb and hasattr(tb.tb_frame, 'f_back'): + sio.write("Capture point (most recent call last):\n") + traceback.print_stack(tb.tb_frame.f_back, None, sio) + + traceback.print_exception(typ, val, tb, None, sio) + s = sio.getvalue() + sio.close() + if s[-1:] == "\n": + s = s[:-1] + return s diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index afcba482f9..793a88e462 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +from synapse.util import async, logcontext from tests import unittest from twisted.internet import defer @@ -38,7 +37,28 @@ class LinearizerTestCase(unittest.TestCase): with cm1: self.assertFalse(d2.called) - self.assertTrue(d2.called) - with (yield d2): pass + + def test_lots_of_queued_things(self): + # we have one slow thing, and lots of fast things queued up behind it. + # it should *not* explode the stack. + linearizer = Linearizer() + + @defer.inlineCallbacks + def func(i, sleep=False): + with logcontext.LoggingContext("func(%s)" % i) as lc: + with (yield linearizer.queue("")): + self.assertEqual( + logcontext.LoggingContext.current_context(), lc) + if sleep: + yield async.sleep(0) + + self.assertEqual( + logcontext.LoggingContext.current_context(), lc) + + func(0, sleep=True) + for i in xrange(1, 100): + func(i) + + return func(1000) |