summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorDavid Baker <dbkr@matrix.org>2014-11-20 17:49:48 +0000
committerDavid Baker <dbkr@matrix.org>2014-11-20 17:49:48 +0000
commitf1c7f8e8131d6e5531e23e3bc2cd57ab7d1881ae (patch)
treeb1ebd402c0805f475500c9f36a0356322bf26f13 /synapse
parentSeparate out the matrix http client completely because just about all of its ... (diff)
parentFix pep8 codestyle warnings (diff)
downloadsynapse-f1c7f8e8131d6e5531e23e3bc2cd57ab7d1881ae.tar.xz
Merge branch 'develop' into http_client_refactor
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py2
-rw-r--r--synapse/api/errors.py7
-rw-r--r--synapse/api/events/__init__.py2
-rw-r--r--synapse/api/events/validator.py2
-rwxr-xr-xsynapse/app/homeserver.py9
-rwxr-xr-xsynapse/app/synctl.py14
-rw-r--r--synapse/crypto/event_signing.py2
-rw-r--r--synapse/crypto/keyring.py4
-rw-r--r--synapse/federation/replication.py41
-rw-r--r--synapse/federation/units.py1
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/directory.py5
-rw-r--r--synapse/handlers/events.py6
-rw-r--r--synapse/handlers/federation.py10
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/presence.py48
-rw-r--r--synapse/handlers/profile.py18
-rw-r--r--synapse/handlers/register.py2
-rw-r--r--synapse/handlers/room.py7
-rw-r--r--synapse/http/content_repository.py14
-rw-r--r--synapse/http/server.py3
-rw-r--r--synapse/notifier.py4
-rw-r--r--synapse/rest/events.py1
-rw-r--r--synapse/rest/presence.py14
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/registration.py16
-rw-r--r--synapse/storage/room.py4
-rw-r--r--synapse/storage/roommember.py4
-rw-r--r--synapse/storage/signatures.py10
-rw-r--r--synapse/storage/stream.py11
-rw-r--r--synapse/util/__init__.py1
-rw-r--r--synapse/util/async.py2
-rw-r--r--synapse/util/distributor.py43
-rw-r--r--synapse/util/logcontext.py4
35 files changed, 193 insertions, 140 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 6d8a9e4df7..cbf3ae0ca4 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -272,7 +272,7 @@ class Auth(object):
             key = (RoomCreateEvent.TYPE, "", )
             create_event = event.old_state_events.get(key)
             if (create_event is not None and
-                create_event.content["creator"] == user_id):
+                    create_event.content["creator"] == user_id):
                 return 100
 
         return level
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 33d15072af..581439ceb3 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -17,6 +17,8 @@
 
 import logging
 
+logger = logging.getLogger(__name__)
+
 
 class Codes(object):
     UNAUTHORIZED = "M_UNAUTHORIZED"
@@ -38,7 +40,7 @@ class CodeMessageException(Exception):
     """An exception with integer code and message string attributes."""
 
     def __init__(self, code, msg):
-        logging.error("%s: %s, %s", type(self).__name__, code, msg)
+        logger.info("%s: %s, %s", type(self).__name__, code, msg)
         super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
         self.code = code
         self.msg = msg
@@ -140,7 +142,8 @@ def cs_exception(exception):
     if isinstance(exception, CodeMessageException):
         return exception.error_dict()
     else:
-        logging.error("Unknown exception type: %s", type(exception))
+        logger.error("Unknown exception type: %s", type(exception))
+        return {}
 
 
 def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index a01c4a1351..8a35b4cb7d 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -83,6 +83,8 @@ class SynapseEvent(JsonEncodedObject):
         "content",
     ]
 
+    outlier = False
+
     def __init__(self, raises=True, **kwargs):
         super(SynapseEvent, self).__init__(**kwargs)
         # if "content" in kwargs:
diff --git a/synapse/api/events/validator.py b/synapse/api/events/validator.py
index 2d4f2a3aa7..067215f6ef 100644
--- a/synapse/api/events/validator.py
+++ b/synapse/api/events/validator.py
@@ -84,4 +84,4 @@ class EventValidator(object):
                             template[key][0]
                         )
                         if msg:
-                            return msg
\ No newline at end of file
+                            return msg
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index c563d14104..855fe8e170 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -116,7 +116,7 @@ class SynapseHomeServer(HomeServer):
         # extra resources to existing nodes. See self._resource_id for the key.
         resource_mappings = {}
         for (full_path, resource) in desired_tree:
-            logging.info("Attaching %s to path %s", resource, full_path)
+            logger.info("Attaching %s to path %s", resource, full_path)
             last_resource = self.root_resource
             for path_seg in full_path.split('/')[1:-1]:
                 if not path_seg in last_resource.listNames():
@@ -221,12 +221,12 @@ def setup():
 
     db_name = hs.get_db_name()
 
-    logging.info("Preparing database: %s...", db_name)
+    logger.info("Preparing database: %s...", db_name)
 
     with sqlite3.connect(db_name) as db_conn:
         prepare_database(db_conn)
 
-    logging.info("Database prepared in %s.", db_name)
+    logger.info("Database prepared in %s.", db_name)
 
     hs.get_db_pool()
 
@@ -257,13 +257,16 @@ def setup():
     else:
         reactor.run()
 
+
 def run():
     with LoggingContext("run"):
         reactor.run()
 
+
 def main():
     with LoggingContext("main"):
         setup()
 
+
 if __name__ == '__main__':
     main()
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index abe055a64c..52a0b729f4 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -21,11 +21,12 @@ import signal
 
 SYNAPSE = ["python", "-m", "synapse.app.homeserver"]
 
-CONFIGFILE="homeserver.yaml"
-PIDFILE="homeserver.pid"
+CONFIGFILE = "homeserver.yaml"
+PIDFILE = "homeserver.pid"
+
+GREEN = "\x1b[1;32m"
+NORMAL = "\x1b[m"
 
-GREEN="\x1b[1;32m"
-NORMAL="\x1b[m"
 
 def start():
     if not os.path.exists(CONFIGFILE):
@@ -43,12 +44,14 @@ def start():
     subprocess.check_call(args)
     print GREEN + "started" + NORMAL
 
+
 def stop():
     if os.path.exists(PIDFILE):
         pid = int(open(PIDFILE).read())
         os.kill(pid, signal.SIGTERM)
         print GREEN + "stopped" + NORMAL
 
+
 def main():
     action = sys.argv[1] if sys.argv[1:] else "usage"
     if action == "start":
@@ -62,5 +65,6 @@ def main():
         sys.stderr.write("Usage: %s [start|stop|restart]\n" % (sys.argv[0],))
         sys.exit(1)
 
-if __name__=='__main__':
+
+if __name__ == "__main__":
     main()
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index 4dff2c0ec2..a9d8953239 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
 def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
     """Check whether the hash for this PDU matches the contents"""
     computed_hash = _compute_content_hash(event, hash_algorithm)
-    logging.debug("Expecting hash: %s", encode_base64(computed_hash.digest()))
+    logger.debug("Expecting hash: %s", encode_base64(computed_hash.digest()))
     if computed_hash.name not in event.hashes:
         raise SynapseError(
             400,
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 694aed3a7d..ceb03ce6c2 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -135,7 +135,7 @@ class Keyring(object):
 
         time_now_ms = self.clock.time_msec()
 
-        self.store.store_server_certificate(
+        yield self.store.store_server_certificate(
             server_name,
             server_name,
             time_now_ms,
@@ -143,7 +143,7 @@ class Keyring(object):
         )
 
         for key_id, key in verify_keys.items():
-            self.store.store_server_verify_key(
+            yield self.store.store_server_verify_key(
                 server_name, server_name, time_now_ms, key
             )
 
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 65a53ae17c..124dc31225 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -24,6 +24,7 @@ from .units import Transaction, Edu
 from .persistence import TransactionActions
 
 from synapse.util.logutils import log_function
+from synapse.util.logcontext import PreserveLoggingContext
 
 import logging
 
@@ -319,19 +320,20 @@ class ReplicationLayer(object):
 
         logger.debug("[%s] Transacition is new", transaction.transaction_id)
 
-        dl = []
-        for pdu in pdu_list:
-            dl.append(self._handle_new_pdu(transaction.origin, pdu))
+        with PreserveLoggingContext():
+            dl = []
+            for pdu in pdu_list:
+                dl.append(self._handle_new_pdu(transaction.origin, pdu))
 
-        if hasattr(transaction, "edus"):
-            for edu in [Edu(**x) for x in transaction.edus]:
-                self.received_edu(
-                    transaction.origin,
-                    edu.edu_type,
-                    edu.content
-                )
+            if hasattr(transaction, "edus"):
+                for edu in [Edu(**x) for x in transaction.edus]:
+                    self.received_edu(
+                        transaction.origin,
+                        edu.edu_type,
+                        edu.content
+                    )
 
-        results = yield defer.DeferredList(dl)
+            results = yield defer.DeferredList(dl)
 
         ret = []
         for r in results:
@@ -425,7 +427,9 @@ class ReplicationLayer(object):
         time_now = self._clock.time_msec()
         defer.returnValue((200, {
             "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
-            "auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]],
+            "auth_chain": [
+                p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
+            ],
         }))
 
     @defer.inlineCallbacks
@@ -436,7 +440,9 @@ class ReplicationLayer(object):
             (
                 200,
                 {
-                    "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
+                    "auth_chain": [
+                        a.get_pdu_json(time_now) for a in auth_pdus
+                    ],
                 }
             )
         )
@@ -457,7 +463,7 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     def send_join(self, destination, pdu):
-        time_now  = self._clock.time_msec()
+        time_now = self._clock.time_msec()
         _, content = yield self.transport_layer.send_join(
             destination,
             pdu.room_id,
@@ -649,7 +655,8 @@ class _TransactionQueue(object):
                 (pdu, deferred, order)
             )
 
-            self._attempt_new_transaction(destination)
+            with PreserveLoggingContext():
+                self._attempt_new_transaction(destination)
 
             deferreds.append(deferred)
 
@@ -669,7 +676,9 @@ class _TransactionQueue(object):
                 deferred.errback(failure)
             else:
                 logger.exception("Failed to send edu", failure)
-        self._attempt_new_transaction(destination).addErrback(eb)
+
+        with PreserveLoggingContext():
+            self._attempt_new_transaction(destination).addErrback(eb)
 
         return deferred
 
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 6e708edb8c..1bcd0548c2 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -25,7 +25,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-
 class Edu(JsonEncodedObject):
     """ An Edu represents a piece of data sent from one homeserver to another.
 
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 30c6733063..d53cd3df3e 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,7 +112,7 @@ class BaseHandler(object):
 
         event.destinations = list(destinations)
 
-        self.notifier.on_new_room_event(event, extra_users=extra_users)
+        yield self.notifier.on_new_room_event(event, extra_users=extra_users)
 
         federation_handler = self.hs.get_handlers().federation_handler
         yield federation_handler.handle_new_event(event, snapshot)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 05e5c6ecfc..af4e7d49c8 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -128,8 +128,9 @@ class DirectoryHandler(BaseHandler):
                 "servers": result.servers,
             })
         else:
-            raise SynapseError(404, "Room alias \"%s\" not found" % (room_alias,))
-
+            raise SynapseError(
+                404, "Room alias \"%s\" not found" % (room_alias,)
+            )
 
     @defer.inlineCallbacks
     def send_room_alias_update_event(self, user_id, room_id):
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 4993c92b74..d59221a4fb 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -56,7 +56,7 @@ class EventStreamHandler(BaseHandler):
                     self.clock.cancel_call_later(
                         self._stop_timer_per_user.pop(auth_user))
                 else:
-                    self.distributor.fire(
+                    yield self.distributor.fire(
                         "started_user_eventstream", auth_user
                     )
             self._streams_per_user[auth_user] += 1
@@ -65,8 +65,10 @@ class EventStreamHandler(BaseHandler):
                 pagin_config.from_token = None
 
             rm_handler = self.hs.get_handlers().room_member_handler
+            logger.debug("BETA")
             room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
+            logger.debug("ALPHA")
             with PreserveLoggingContext():
                 events, tokens = yield self.notifier.get_events_for(
                     auth_user, room_ids, pagin_config, timeout
@@ -93,7 +95,7 @@ class EventStreamHandler(BaseHandler):
                     logger.debug(
                         "_later stopped_user_eventstream %s", auth_user
                     )
-                    self.distributor.fire(
+                    yield self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
                     del self._stop_timer_per_user[auth_user]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 492005a170..2e8b8a1f9a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -122,7 +122,8 @@ class FederationHandler(BaseHandler):
                 event.origin, redacted_pdu_json
             )
         except SynapseError as e:
-            logger.warn("Signature check failed for %s redacted to %s",
+            logger.warn(
+                "Signature check failed for %s redacted to %s",
                 encode_canonical_json(pdu.get_pdu_json()),
                 encode_canonical_json(redacted_pdu_json),
             )
@@ -209,7 +210,7 @@ class FederationHandler(BaseHandler):
         if event.type == RoomMemberEvent.TYPE:
             if event.membership == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
-                self.distributor.fire(
+                yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
 
@@ -390,7 +391,8 @@ class FederationHandler(BaseHandler):
 
         event.outlier = False
 
-        is_new_state = yield self.state_handler.annotate_event_with_state(event)
+        state_handler = self.state_handler
+        is_new_state = yield state_handler.annotate_event_with_state(event)
         self.auth.check(event, raises=True)
 
         # FIXME (erikj):  All this is duplicated above :(
@@ -414,7 +416,7 @@ class FederationHandler(BaseHandler):
         if event.type == RoomMemberEvent.TYPE:
             if event.membership == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
-                self.distributor.fire(
+                yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index de70486b29..06a4e173f6 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.api.constants import Membership
 from synapse.api.errors import RoomError
 from synapse.streams.config import PaginationConfig
+from synapse.util.logcontext import PreserveLoggingContext
 from ._base import BaseHandler
 
 import logging
@@ -86,9 +87,10 @@ class MessageHandler(BaseHandler):
             event, snapshot, suppress_auth=suppress_auth
         )
 
-        self.hs.get_handlers().presence_handler.bump_presence_active_time(
-            user
-        )
+        with PreserveLoggingContext():
+            self.hs.get_handlers().presence_handler.bump_presence_active_time(
+                user
+            )
 
     @defer.inlineCallbacks
     def get_messages(self, user_id=None, room_id=None, pagin_config=None,
@@ -296,7 +298,7 @@ class MessageHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def room_initial_sync(self, user_id, room_id, pagin_config=None,
-                      feedback=False):
+                          feedback=False):
         yield self.auth.check_joined_room(room_id, user_id)
 
         # TODO(paul): I wish I was called with user objects not user_id
@@ -340,8 +342,8 @@ class MessageHandler(BaseHandler):
                 )
                 presence.append(member_presence)
             except Exception:
-                logger.exception("Failed to get member presence of %r",
-                    m.user_id
+                logger.exception(
+                    "Failed to get member presence of %r", m.user_id
                 )
 
         defer.returnValue({
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fcc92a8e32..b55d589daf 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, AuthError
 from synapse.api.constants import PresenceState
 
 from synapse.util.logutils import log_function
+from synapse.util.logcontext import PreserveLoggingContext
 
 from ._base import BaseHandler
 
@@ -142,7 +143,7 @@ class PresenceHandler(BaseHandler):
             return UserPresenceCache()
 
     def registered_user(self, user):
-        self.store.create_presence(user.localpart)
+        return self.store.create_presence(user.localpart)
 
     @defer.inlineCallbacks
     def is_presence_visible(self, observer_user, observed_user):
@@ -241,14 +242,12 @@ class PresenceHandler(BaseHandler):
         was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
         now_level = self.STATE_LEVELS[state["presence"]]
 
-        yield defer.DeferredList([
-            self.store.set_presence_state(
-                target_user.localpart, state_to_store
-            ),
-            self.distributor.fire(
-                "collect_presencelike_data", target_user, state
-            ),
-        ])
+        yield self.store.set_presence_state(
+            target_user.localpart, state_to_store
+        )
+        yield self.distributor.fire(
+            "collect_presencelike_data", target_user, state
+        )
 
         if now_level > was_level:
             state["last_active"] = self.clock.time_msec()
@@ -256,14 +255,15 @@ class PresenceHandler(BaseHandler):
         now_online = state["presence"] != PresenceState.OFFLINE
         was_polling = target_user in self._user_cachemap
 
-        if now_online and not was_polling:
-            self.start_polling_presence(target_user, state=state)
-        elif not now_online and was_polling:
-            self.stop_polling_presence(target_user)
+        with PreserveLoggingContext():
+            if now_online and not was_polling:
+                self.start_polling_presence(target_user, state=state)
+            elif not now_online and was_polling:
+                self.stop_polling_presence(target_user)
 
-        # TODO(paul): perform a presence push as part of start/stop poll so
-        #   we don't have to do this all the time
-        self.changed_presencelike_data(target_user, state)
+            # TODO(paul): perform a presence push as part of start/stop poll so
+            #   we don't have to do this all the time
+            self.changed_presencelike_data(target_user, state)
 
     def bump_presence_active_time(self, user, now=None):
         if now is None:
@@ -277,7 +277,7 @@ class PresenceHandler(BaseHandler):
         self._user_cachemap_latest_serial += 1
         statuscache.update(state, serial=self._user_cachemap_latest_serial)
 
-        self.push_presence(user, statuscache=statuscache)
+        return self.push_presence(user, statuscache=statuscache)
 
     @log_function
     def started_user_eventstream(self, user):
@@ -381,8 +381,10 @@ class PresenceHandler(BaseHandler):
         yield self.store.set_presence_list_accepted(
             observer_user.localpart, observed_user.to_string()
         )
-
-        self.start_polling_presence(observer_user, target_user=observed_user)
+        with PreserveLoggingContext():
+            self.start_polling_presence(
+                observer_user, target_user=observed_user
+            )
 
     @defer.inlineCallbacks
     def deny_presence(self, observed_user, observer_user):
@@ -401,7 +403,10 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        self.stop_polling_presence(observer_user, target_user=observed_user)
+        with PreserveLoggingContext():
+            self.stop_polling_presence(
+                observer_user, target_user=observed_user
+            )
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
@@ -710,7 +715,8 @@ class PresenceHandler(BaseHandler):
                 if not self._remote_sendmap[user]:
                     del self._remote_sendmap[user]
 
-        yield defer.DeferredList(deferreds)
+        with PreserveLoggingContext():
+            yield defer.DeferredList(deferreds)
 
     @defer.inlineCallbacks
     def push_update_to_local_and_remote(self, observed_user, statuscache,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 7853bf5098..814b3b68fe 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError, CodeMessageException
 from synapse.api.constants import Membership
+from synapse.util.logcontext import PreserveLoggingContext
 
 from ._base import BaseHandler
 
@@ -46,7 +47,7 @@ class ProfileHandler(BaseHandler):
         )
 
     def registered_user(self, user):
-        self.store.create_profile(user.localpart)
+        return self.store.create_profile(user.localpart)
 
     @defer.inlineCallbacks
     def get_displayname(self, target_user):
@@ -152,13 +153,14 @@ class ProfileHandler(BaseHandler):
         if not user.is_mine:
             defer.returnValue(None)
 
-        (displayname, avatar_url) = yield defer.gatherResults(
-            [
-                self.store.get_profile_displayname(user.localpart),
-                self.store.get_profile_avatar_url(user.localpart),
-            ],
-            consumeErrors=True
-        )
+        with PreserveLoggingContext():
+            (displayname, avatar_url) = yield defer.gatherResults(
+                [
+                    self.store.get_profile_displayname(user.localpart),
+                    self.store.get_profile_avatar_url(user.localpart),
+                ],
+                consumeErrors=True
+            )
 
         state["displayname"] = displayname
         state["avatar_url"] = avatar_url
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 4c9044ed19..277616ea69 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -69,7 +69,7 @@ class RegistrationHandler(BaseHandler):
                 password_hash=password_hash
             )
 
-            self.distributor.fire("registered_user", user)
+            yield self.distributor.fire("registered_user", user)
         else:
             # autogen a random user ID
             attempts = 0
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7d9458e1d0..88955160c5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -178,7 +178,9 @@ class RoomCreationHandler(BaseHandler):
 
         if room_alias:
             result["room_alias"] = room_alias.to_string()
-            directory_handler.send_room_alias_update_event(user_id, room_id)
+            yield directory_handler.send_room_alias_update_event(
+                user_id, room_id
+            )
 
         defer.returnValue(result)
 
@@ -211,7 +213,6 @@ class RoomCreationHandler(BaseHandler):
             **event_keys
         )
 
-
         power_levels_event = self.event_factory.create_event(
             etype=RoomPowerLevelsEvent.TYPE,
             content={
@@ -480,7 +481,7 @@ class RoomMemberHandler(BaseHandler):
             )
 
         user = self.hs.parse_userid(event.user_id)
-        self.distributor.fire(
+        yield self.distributor.fire(
             "user_joined_room", user=user, room_id=room_id
         )
 
diff --git a/synapse/http/content_repository.py b/synapse/http/content_repository.py
index 1306b35271..7e046dfe49 100644
--- a/synapse/http/content_repository.py
+++ b/synapse/http/content_repository.py
@@ -131,12 +131,14 @@ class ContentRepoResource(resource.Resource):
             request.setHeader('Content-Type', content_type)
 
             # cache for at least a day.
-            # XXX: we might want to turn this off for data we don't want to recommend
-            # caching as it's sensitive or private - or at least select private.
-            # don't bother setting Expires as all our matrix clients are smart enough to
-            # be happy with Cache-Control (right?)
-            request.setHeader('Cache-Control', 'public,max-age=86400,s-maxage=86400')
-            
+            # XXX: we might want to turn this off for data we don't want to
+            # recommend caching as it's sensitive or private - or at least
+            # select private. don't bother setting Expires as all our matrix
+            # clients are smart enough to be happy with Cache-Control (right?)
+            request.setHeader(
+                "Cache-Control", "public,max-age=86400,s-maxage=86400"
+            )
+
             d = FileSender().beginFileTransfer(f, request)
 
             # after the file has been sent, clean up and finish the request
diff --git a/synapse/http/server.py b/synapse/http/server.py
index ed1f1170cb..8024ff5bde 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -138,8 +138,7 @@ class JsonResource(HttpServer, resource.Resource):
             )
         except CodeMessageException as e:
             if isinstance(e, SynapseError):
-                logger.error("%s SynapseError: %s - %s", request, e.code,
-                             e.msg)
+                logger.info("%s SynapseError: %s - %s", request, e.code, e.msg)
             else:
                 logger.exception(e)
             self._send_response(
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c310a9fed6..5e14950449 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.async import run_on_reactor
 
 import logging
 
@@ -96,6 +97,7 @@ class Notifier(object):
         listening to the room, and any listeners for the users in the
         `extra_users` param.
         """
+        yield run_on_reactor()
         room_id = event.room_id
 
         room_source = self.event_sources.sources["room"]
@@ -143,6 +145,7 @@ class Notifier(object):
 
         Will wake up all listeners for the given users and rooms.
         """
+        yield run_on_reactor()
         presence_source = self.event_sources.sources["presence"]
 
         listeners = set()
@@ -211,6 +214,7 @@ class Notifier(object):
             timeout,
             deferred,
         )
+
         def _timeout_listener():
             # TODO (erikj): We should probably set to_token to the current
             # max rather than reusing from_token.
diff --git a/synapse/rest/events.py b/synapse/rest/events.py
index 92ff5e5ca7..3c1b041bfe 100644
--- a/synapse/rest/events.py
+++ b/synapse/rest/events.py
@@ -26,7 +26,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-
 class EventStreamRestServlet(RestServlet):
     PATTERN = client_path_pattern("/events$")
 
diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py
index 138cc88a05..502ed0d4ca 100644
--- a/synapse/rest/presence.py
+++ b/synapse/rest/presence.py
@@ -117,8 +117,6 @@ class PresenceListRestServlet(RestServlet):
             logger.exception("JSON parse error")
             raise SynapseError(400, "Unable to parse content")
 
-        deferreds = []
-
         if "invite" in content:
             for u in content["invite"]:
                 if not isinstance(u, basestring):
@@ -126,8 +124,9 @@ class PresenceListRestServlet(RestServlet):
                 if len(u) == 0:
                     continue
                 invited_user = self.hs.parse_userid(u)
-                deferreds.append(self.handlers.presence_handler.send_invite(
-                    observer_user=user, observed_user=invited_user))
+                yield self.handlers.presence_handler.send_invite(
+                    observer_user=user, observed_user=invited_user
+                )
 
         if "drop" in content:
             for u in content["drop"]:
@@ -136,10 +135,9 @@ class PresenceListRestServlet(RestServlet):
                 if len(u) == 0:
                     continue
                 dropped_user = self.hs.parse_userid(u)
-                deferreds.append(self.handlers.presence_handler.drop(
-                    observer_user=user, observed_user=dropped_user))
-
-        yield defer.DeferredList(deferreds)
+                yield self.handlers.presence_handler.drop(
+                    observer_user=user, observed_user=dropped_user
+                )
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 330d3b793f..1231794de0 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -508,7 +508,7 @@ def prepare_database(db_conn):
                 "new for the server to understand"
             )
         elif user_version < SCHEMA_VERSION:
-            logging.info(
+            logger.info(
                 "Upgrading database from version %d",
                 user_version
             )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 5d4be09a82..fd5b2affad 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -57,7 +57,7 @@ class LoggingTransaction(object):
             if args and args[0]:
                 values = args[0]
                 sql_logger.debug(
-                    "[SQL values] {%s} " + ", ".join(("<%s>",) * len(values)),
+                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
                     self.name,
                     *values
                 )
@@ -91,6 +91,7 @@ class SQLBaseStore(object):
     def runInteraction(self, desc, func, *args, **kwargs):
         """Wraps the .runInteraction() method on the underlying db_pool."""
         current_context = LoggingContext.current_context()
+
         def inner_func(txn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
@@ -115,7 +116,6 @@ class SQLBaseStore(object):
                         "[TXN END] {%s} %f",
                         name, end - start
                     )
-
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
                 inner_func, *args, **kwargs
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1f89d77344..4d15005c9e 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -75,7 +75,9 @@ class RegistrationStore(SQLBaseStore):
                         "VALUES (?,?,?)",
                         [user_id, password_hash, now])
         except IntegrityError:
-            raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
+            raise StoreError(
+                400, "User ID already taken.", errcode=Codes.USER_IN_USE
+            )
 
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
@@ -83,8 +85,8 @@ class RegistrationStore(SQLBaseStore):
                     "VALUES (?,?)", [txn.lastrowid, token])
 
     def get_user_by_id(self, user_id):
-        query = ("SELECT users.name, users.password_hash FROM users "
-                "WHERE users.name = ?")
+        query = ("SELECT users.name, users.password_hash FROM users"
+                 " WHERE users.name = ?")
         return self._execute(
             self.cursor_to_dict,
             query, user_id
@@ -120,10 +122,10 @@ class RegistrationStore(SQLBaseStore):
 
     def _query_for_auth(self, txn, token):
         sql = (
-            "SELECT users.name, users.admin, access_tokens.device_id "
-            "FROM users "
-            "INNER JOIN access_tokens on users.id = access_tokens.user_id "
-            "WHERE token = ?"
+            "SELECT users.name, users.admin, access_tokens.device_id"
+            " FROM users"
+            " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+            " WHERE token = ?"
         )
 
         cursor = txn.execute(sql, (token,))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index cc0513b8d2..2378d65943 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -27,7 +27,9 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level", "redact_level"))
+OpsLevel = collections.namedtuple("OpsLevel", (
+    "ban_level", "kick_level", "redact_level")
+)
 
 
 class RoomStore(SQLBaseStore):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 93329703a2..c37df59d45 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -177,8 +177,8 @@ class RoomMemberStore(SQLBaseStore):
         return self._get_members_query(clause, vals)
 
     def _get_members_query(self, where_clause, where_values):
-        return self._db_pool.runInteraction(
-            self._get_members_query_txn,
+        return self.runInteraction(
+            "get_members_query", self._get_members_query_txn,
             where_clause, where_values
         )
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index d90e08fff1..eea4f21065 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -36,7 +36,7 @@ class SignatureStore(SQLBaseStore):
         return dict(txn.fetchall())
 
     def _store_event_content_hash_txn(self, txn, event_id, algorithm,
-                                    hash_bytes):
+                                      hash_bytes):
         """Store a hash for a Event
         Args:
             txn (cursor):
@@ -84,7 +84,7 @@ class SignatureStore(SQLBaseStore):
         return dict(txn.fetchall())
 
     def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
-                                      hash_bytes):
+                                        hash_bytes):
         """Store a hash for a PDU
         Args:
             txn (cursor):
@@ -127,7 +127,7 @@ class SignatureStore(SQLBaseStore):
         return res
 
     def _store_event_signature_txn(self, txn, event_id, signature_name, key_id,
-                                          signature_bytes):
+                                   signature_bytes):
         """Store a signature from the origin server for a PDU.
         Args:
             txn (cursor):
@@ -169,7 +169,7 @@ class SignatureStore(SQLBaseStore):
         return results
 
     def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id,
-                                 algorithm, hash_bytes):
+                                   algorithm, hash_bytes):
         self._simple_insert_txn(
             txn,
             "event_edge_hashes",
@@ -180,4 +180,4 @@ class SignatureStore(SQLBaseStore):
                 "hash": buffer(hash_bytes),
             },
             or_ignore=True,
-        )
\ No newline at end of file
+        )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a954024678..b84735e61c 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -213,8 +213,8 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        from_comp = '<=' if direction =='b' else '>'
-        to_comp = '>' if direction =='b' else '<='
+        from_comp = '<=' if direction == 'b' else '>'
+        to_comp = '>' if direction == 'b' else '<='
         order = "DESC" if direction == 'b' else "ASC"
 
         args = [room_id]
@@ -235,9 +235,10 @@ class StreamStore(SQLBaseStore):
         )
 
         sql = (
-            "SELECT *, (%(redacted)s) AS redacted FROM events "
-            "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
-            "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
+            "SELECT *, (%(redacted)s) AS redacted FROM events"
+            " WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
+            " ORDER BY topological_ordering %(order)s,"
+            " stream_ordering %(order)s %(limit)s"
         ) % {
             "redacted": del_sql,
             "bounds": bounds,
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index e57fb0e914..7ec5033ceb 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -37,6 +37,7 @@ class Clock(object):
 
     def call_later(self, delay, callback):
         current_context = LoggingContext.current_context()
+
         def wrapped_callback():
             LoggingContext.thread_local.current_context = current_context
             callback()
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1219d927db..7dd3ec3a72 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -18,6 +18,7 @@ from twisted.internet import defer, reactor
 
 from .logcontext import PreserveLoggingContext
 
+
 @defer.inlineCallbacks
 def sleep(seconds):
     d = defer.Deferred()
@@ -25,6 +26,7 @@ def sleep(seconds):
     with PreserveLoggingContext():
         yield d
 
+
 def run_on_reactor():
     """ This will cause the rest of the function to be invoked upon the next
     iteration of the main loop
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index eddbe5837f..701ccdb781 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.util.logcontext import PreserveLoggingContext
+
 from twisted.internet import defer
 
 import logging
@@ -91,6 +93,7 @@ class Signal(object):
         Each observer callable may return a Deferred."""
         self.observers.append(observer)
 
+    @defer.inlineCallbacks
     def fire(self, *args, **kwargs):
         """Invokes every callable in the observer list, passing in the args and
         kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -98,22 +101,24 @@ class Signal(object):
 
         Returns a Deferred that will complete when all the observers have
         completed."""
-        deferreds = []
-        for observer in self.observers:
-            d = defer.maybeDeferred(observer, *args, **kwargs)
-
-            def eb(failure):
-                logger.warning(
-                    "%s signal observer %s failed: %r",
-                    self.name, observer, failure,
-                    exc_info=(
-                        failure.type,
-                        failure.value,
-                        failure.getTracebackObject()))
-                if not self.suppress_failures:
-                    raise failure
-            deferreds.append(d.addErrback(eb))
-
-        return defer.DeferredList(
-            deferreds, fireOnOneErrback=not self.suppress_failures
-        )
+        with PreserveLoggingContext():
+            deferreds = []
+            for observer in self.observers:
+                d = defer.maybeDeferred(observer, *args, **kwargs)
+
+                def eb(failure):
+                    logger.warning(
+                        "%s signal observer %s failed: %r",
+                        self.name, observer, failure,
+                        exc_info=(
+                            failure.type,
+                            failure.value,
+                            failure.getTracebackObject()))
+                    if not self.suppress_failures:
+                        raise failure
+                deferreds.append(d.addErrback(eb))
+
+            result = yield defer.DeferredList(
+                deferreds, fireOnOneErrback=not self.suppress_failures
+            )
+        defer.returnValue(result)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 2f430a0f19..23b3decb45 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -1,6 +1,8 @@
 import threading
 import logging
 
+logger = logging.getLogger(__name__)
+
 
 class LoggingContext(object):
     """Additional context for log formatting. Contexts are scoped within a
@@ -53,7 +55,7 @@ class LoggingContext(object):
             None to avoid suppressing any exeptions that were thrown.
         """
         if self.thread_local.current_context is not self:
-            logging.error(
+            logger.error(
                 "Current logging context %s is not the expected context %s",
                 self.thread_local.current_context,
                 self