summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/federation_server.py5
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/groups/groups_server.py2
-rw-r--r--synapse/handlers/groups_local.py34
-rw-r--r--synapse/handlers/receipts.py4
-rw-r--r--synapse/storage/group_server.py14
-rw-r--r--synapse/util/async.py24
-rw-r--r--synapse/util/logformatter.py51
-rw-r--r--tests/util/test_linearizer.py28
9 files changed, 135 insertions, 29 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f00d59e701..a8034bddc6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -211,8 +211,9 @@ class FederationServer(FederationBase):
                     edu.content
                 )
 
-            for failure in getattr(transaction, "pdu_failures", []):
-                logger.info("Got failure %r", failure)
+        pdu_failures = getattr(transaction, "pdu_failures", [])
+        for failure in pdu_failures:
+            logger.info("Got failure %r", failure)
 
         response = {
             "pdus": pdu_results,
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 36f6eb75e9..f96561c1fe 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -572,7 +572,7 @@ class TransportLayerClient(object):
         return self.client.post_json(
             destination=destination,
             path=path,
-            args=requester_user_id,
+            args={"requester_user_id": requester_user_id},
             data=content,
             ignore_backoff=True,
         )
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 6a85908dd6..1083bc2990 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -400,7 +400,7 @@ class GroupsServerHandler(object):
             if not is_public:
                 entry["is_public"] = False
 
-            if not self.is_mine_id(requester_user_id):
+            if not self.is_mine_id(g_user_id):
                 attestation = yield self.store.get_remote_attestation(group_id, g_user_id)
                 if not attestation:
                     continue
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index a2bacbfc38..3b676d46bd 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -102,18 +102,22 @@ class GroupsLocalHandler(object):
                 get_domain_from_id(group_id), group_id, requester_user_id,
             )
 
+            group_server_name = get_domain_from_id(group_id)
+
             # Loop through the users and validate the attestations.
             chunk = res["users_section"]["users"]
             valid_users = []
             for entry in chunk:
                 g_user_id = entry["user_id"]
-                attestation = entry.pop("attestation")
+                attestation = entry.pop("attestation", {})
                 try:
-                    yield self.attestations.verify_attestation(
-                        attestation,
-                        group_id=group_id,
-                        user_id=g_user_id,
-                    )
+                    if get_domain_from_id(g_user_id) != group_server_name:
+                        yield self.attestations.verify_attestation(
+                            attestation,
+                            group_id=group_id,
+                            user_id=g_user_id,
+                            server_name=get_domain_from_id(g_user_id),
+                        )
                     valid_users.append(entry)
                 except Exception as e:
                     logger.info("Failed to verify user is in group: %s", e)
@@ -160,6 +164,7 @@ class GroupsLocalHandler(object):
                 remote_attestation,
                 group_id=group_id,
                 user_id=user_id,
+                server_name=get_domain_from_id(group_id),
             )
 
         is_publicised = content.get("publicise", False)
@@ -187,6 +192,8 @@ class GroupsLocalHandler(object):
             )
             defer.returnValue(res)
 
+        group_server_name = get_domain_from_id(group_id)
+
         res = yield self.transport_client.get_users_in_group(
             get_domain_from_id(group_id), group_id, requester_user_id,
         )
@@ -195,13 +202,15 @@ class GroupsLocalHandler(object):
         valid_entries = []
         for entry in chunk:
             g_user_id = entry["user_id"]
-            attestation = entry.pop("attestation")
+            attestation = entry.pop("attestation", {})
             try:
-                yield self.attestations.verify_attestation(
-                    attestation,
-                    group_id=group_id,
-                    user_id=g_user_id,
-                )
+                if get_domain_from_id(g_user_id) != group_server_name:
+                    yield self.attestations.verify_attestation(
+                        attestation,
+                        group_id=group_id,
+                        user_id=g_user_id,
+                        server_name=get_domain_from_id(g_user_id),
+                    )
                 valid_entries.append(entry)
             except Exception as e:
                 logger.info("Failed to verify user is in group: %s", e)
@@ -240,6 +249,7 @@ class GroupsLocalHandler(object):
                 remote_attestation,
                 group_id=group_id,
                 user_id=user_id,
+                server_name=get_domain_from_id(group_id),
             )
 
         # TODO: Check that the group is public and we're being added publically
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index e1cd3a48e9..0525765272 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from synapse.util import logcontext
 
 from ._base import BaseHandler
 
@@ -59,6 +60,8 @@ class ReceiptsHandler(BaseHandler):
         is_new = yield self._handle_new_receipts([receipt])
 
         if is_new:
+            # fire off a process in the background to send the receipt to
+            # remote servers
             self._push_remotes([receipt])
 
     @defer.inlineCallbacks
@@ -126,6 +129,7 @@ class ReceiptsHandler(BaseHandler):
 
             defer.returnValue(True)
 
+    @logcontext.preserve_fn   # caller should not yield on this
     @defer.inlineCallbacks
     def _push_remotes(self, receipts):
         """Given a list of receipts, works out which remote servers should be
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index 4fe9172adc..3af372de59 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1172,13 +1172,13 @@ class GroupServerStore(SQLBaseStore):
                 LIMIT ?
             """
             txn.execute(sql, (from_token, to_token, limit,))
-            return [{
-                "stream_id": stream_id,
-                "group_id": group_id,
-                "user_id": user_id,
-                "type": gtype,
-                "content": json.loads(content_json),
-            } for stream_id, group_id, user_id, gtype, content_json in txn]
+            return [(
+                stream_id,
+                group_id,
+                user_id,
+                gtype,
+                json.loads(content_json),
+            ) for stream_id, group_id, user_id, gtype, content_json in txn]
         return self.runInteraction(
             "get_all_groups_changes", _get_all_groups_changes_txn,
         )
diff --git a/synapse/util/async.py b/synapse/util/async.py
index bb252f75d7..0fd5b42523 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -203,7 +203,26 @@ class Linearizer(object):
             except:
                 logger.exception("Unexpected exception in Linearizer")
 
-        logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+            logger.info("Acquired linearizer lock %r for key %r", self.name,
+                        key)
+
+            # if the code holding the lock completes synchronously, then it
+            # will recursively run the next claimant on the list. That can
+            # relatively rapidly lead to stack exhaustion. This is essentially
+            # the same problem as http://twistedmatrix.com/trac/ticket/9304.
+            #
+            # In order to break the cycle, we add a cheeky sleep(0) here to
+            # ensure that we fall back to the reactor between each iteration.
+            #
+            # (There's no particular need for it to happen before we return
+            # the context manager, but it needs to happen while we hold the
+            # lock, and the context manager's exit code must be synchronous,
+            # so actually this is the only sensible place.
+            yield run_on_reactor()
+
+        else:
+            logger.info("Acquired uncontended linearizer lock %r for key %r",
+                        self.name, key)
 
         @contextmanager
         def _ctx_manager():
@@ -211,7 +230,8 @@ class Linearizer(object):
                 yield
             finally:
                 logger.info("Releasing linearizer lock %r for key %r", self.name, key)
-                new_defer.callback(None)
+                with PreserveLoggingContext():
+                    new_defer.callback(None)
                 current_d = self.key_to_defer.get(key)
                 if current_d is new_defer:
                     self.key_to_defer.pop(key, None)
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
new file mode 100644
index 0000000000..cdbc4bffd7
--- /dev/null
+++ b/synapse/util/logformatter.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import StringIO
+import logging
+import traceback
+
+
+class LogFormatter(logging.Formatter):
+    """Log formatter which gives more detail for exceptions
+
+    This is the same as the standard log formatter, except that when logging
+    exceptions [typically via log.foo("msg", exc_info=1)], it prints the
+    sequence that led up to the point at which the exception was caught.
+    (Normally only stack frames between the point the exception was raised and
+    where it was caught are logged).
+    """
+    def __init__(self, *args, **kwargs):
+        super(LogFormatter, self).__init__(*args, **kwargs)
+
+    def formatException(self, ei):
+        sio = StringIO.StringIO()
+        (typ, val, tb) = ei
+
+        # log the stack above the exception capture point if possible, but
+        # check that we actually have an f_back attribute to work around
+        # https://twistedmatrix.com/trac/ticket/9305
+
+        if tb and hasattr(tb.tb_frame, 'f_back'):
+            sio.write("Capture point (most recent call last):\n")
+            traceback.print_stack(tb.tb_frame.f_back, None, sio)
+
+        traceback.print_exception(typ, val, tb, None, sio)
+        s = sio.getvalue()
+        sio.close()
+        if s[-1:] == "\n":
+            s = s[:-1]
+        return s
diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py
index afcba482f9..793a88e462 100644
--- a/tests/util/test_linearizer.py
+++ b/tests/util/test_linearizer.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+from synapse.util import async, logcontext
 from tests import unittest
 
 from twisted.internet import defer
@@ -38,7 +37,28 @@ class LinearizerTestCase(unittest.TestCase):
         with cm1:
             self.assertFalse(d2.called)
 
-        self.assertTrue(d2.called)
-
         with (yield d2):
             pass
+
+    def test_lots_of_queued_things(self):
+        # we have one slow thing, and lots of fast things queued up behind it.
+        # it should *not* explode the stack.
+        linearizer = Linearizer()
+
+        @defer.inlineCallbacks
+        def func(i, sleep=False):
+            with logcontext.LoggingContext("func(%s)" % i) as lc:
+                with (yield linearizer.queue("")):
+                    self.assertEqual(
+                        logcontext.LoggingContext.current_context(), lc)
+                    if sleep:
+                        yield async.sleep(0)
+
+                self.assertEqual(
+                    logcontext.LoggingContext.current_context(), lc)
+
+        func(0, sleep=True)
+        for i in xrange(1, 100):
+            func(i)
+
+        return func(1000)