diff options
author | David Baker <dbkr@matrix.org> | 2014-11-20 18:17:46 +0000 |
---|---|---|
committer | David Baker <dbkr@matrix.org> | 2014-11-20 18:17:46 +0000 |
commit | 23465a30b65808a75afa5cb98bcd1131a2a2c984 (patch) | |
tree | dcd987034fefbb2649da8346bf36c9d9ded25acc | |
parent | Merge branch 'http_client_refactor' into pushers (diff) | |
parent | Fix pep8 codestyle warnings (diff) | |
download | synapse-23465a30b65808a75afa5cb98bcd1131a2a2c984.tar.xz |
Merge branch 'develop' into pushers
35 files changed, 193 insertions, 140 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 6d8a9e4df7..cbf3ae0ca4 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -272,7 +272,7 @@ class Auth(object): key = (RoomCreateEvent.TYPE, "", ) create_event = event.old_state_events.get(key) if (create_event is not None and - create_event.content["creator"] == user_id): + create_event.content["creator"] == user_id): return 100 return level diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 97750ca2b0..64784bf212 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -17,6 +17,8 @@ import logging +logger = logging.getLogger(__name__) + class Codes(object): UNAUTHORIZED = "M_UNAUTHORIZED" @@ -39,7 +41,7 @@ class CodeMessageException(Exception): """An exception with integer code and message string attributes.""" def __init__(self, code, msg): - logging.error("%s: %s, %s", type(self).__name__, code, msg) + logger.info("%s: %s, %s", type(self).__name__, code, msg) super(CodeMessageException, self).__init__("%d: %s" % (code, msg)) self.code = code self.msg = msg @@ -141,7 +143,8 @@ def cs_exception(exception): if isinstance(exception, CodeMessageException): return exception.error_dict() else: - logging.error("Unknown exception type: %s", type(exception)) + logger.error("Unknown exception type: %s", type(exception)) + return {} def cs_error(msg, code=Codes.UNKNOWN, **kwargs): diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index a01c4a1351..8a35b4cb7d 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -83,6 +83,8 @@ class SynapseEvent(JsonEncodedObject): "content", ] + outlier = False + def __init__(self, raises=True, **kwargs): super(SynapseEvent, self).__init__(**kwargs) # if "content" in kwargs: diff --git a/synapse/api/events/validator.py b/synapse/api/events/validator.py index 2d4f2a3aa7..067215f6ef 100644 --- a/synapse/api/events/validator.py +++ b/synapse/api/events/validator.py @@ -84,4 +84,4 @@ class EventValidator(object): template[key][0] ) if msg: - return msg \ No newline at end of file + return msg diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 8f8d566191..a5833b2910 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -116,7 +116,7 @@ class SynapseHomeServer(HomeServer): # extra resources to existing nodes. See self._resource_id for the key. resource_mappings = {} for (full_path, resource) in desired_tree: - logging.info("Attaching %s to path %s", resource, full_path) + logger.info("Attaching %s to path %s", resource, full_path) last_resource = self.root_resource for path_seg in full_path.split('/')[1:-1]: if not path_seg in last_resource.listNames(): @@ -221,12 +221,12 @@ def setup(): db_name = hs.get_db_name() - logging.info("Preparing database: %s...", db_name) + logger.info("Preparing database: %s...", db_name) with sqlite3.connect(db_name) as db_conn: prepare_database(db_conn) - logging.info("Database prepared in %s.", db_name) + logger.info("Database prepared in %s.", db_name) hs.get_db_pool() @@ -259,13 +259,16 @@ def setup(): else: reactor.run() + def run(): with LoggingContext("run"): reactor.run() + def main(): with LoggingContext("main"): setup() + if __name__ == '__main__': main() diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index abe055a64c..52a0b729f4 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -21,11 +21,12 @@ import signal SYNAPSE = ["python", "-m", "synapse.app.homeserver"] -CONFIGFILE="homeserver.yaml" -PIDFILE="homeserver.pid" +CONFIGFILE = "homeserver.yaml" +PIDFILE = "homeserver.pid" + +GREEN = "\x1b[1;32m" +NORMAL = "\x1b[m" -GREEN="\x1b[1;32m" -NORMAL="\x1b[m" def start(): if not os.path.exists(CONFIGFILE): @@ -43,12 +44,14 @@ def start(): subprocess.check_call(args) print GREEN + "started" + NORMAL + def stop(): if os.path.exists(PIDFILE): pid = int(open(PIDFILE).read()) os.kill(pid, signal.SIGTERM) print GREEN + "stopped" + NORMAL + def main(): action = sys.argv[1] if sys.argv[1:] else "usage" if action == "start": @@ -62,5 +65,6 @@ def main(): sys.stderr.write("Usage: %s [start|stop|restart]\n" % (sys.argv[0],)) sys.exit(1) -if __name__=='__main__': + +if __name__ == "__main__": main() diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py index 4dff2c0ec2..a9d8953239 100644 --- a/synapse/crypto/event_signing.py +++ b/synapse/crypto/event_signing.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) def check_event_content_hash(event, hash_algorithm=hashlib.sha256): """Check whether the hash for this PDU matches the contents""" computed_hash = _compute_content_hash(event, hash_algorithm) - logging.debug("Expecting hash: %s", encode_base64(computed_hash.digest())) + logger.debug("Expecting hash: %s", encode_base64(computed_hash.digest())) if computed_hash.name not in event.hashes: raise SynapseError( 400, diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 694aed3a7d..ceb03ce6c2 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -135,7 +135,7 @@ class Keyring(object): time_now_ms = self.clock.time_msec() - self.store.store_server_certificate( + yield self.store.store_server_certificate( server_name, server_name, time_now_ms, @@ -143,7 +143,7 @@ class Keyring(object): ) for key_id, key in verify_keys.items(): - self.store.store_server_verify_key( + yield self.store.store_server_verify_key( server_name, server_name, time_now_ms, key ) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 65a53ae17c..124dc31225 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -24,6 +24,7 @@ from .units import Transaction, Edu from .persistence import TransactionActions from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext import logging @@ -319,19 +320,20 @@ class ReplicationLayer(object): logger.debug("[%s] Transacition is new", transaction.transaction_id) - dl = [] - for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + with PreserveLoggingContext(): + dl = [] + for pdu in pdu_list: + dl.append(self._handle_new_pdu(transaction.origin, pdu)) - if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( - transaction.origin, - edu.edu_type, - edu.content - ) + if hasattr(transaction, "edus"): + for edu in [Edu(**x) for x in transaction.edus]: + self.received_edu( + transaction.origin, + edu.edu_type, + edu.content + ) - results = yield defer.DeferredList(dl) + results = yield defer.DeferredList(dl) ret = [] for r in results: @@ -425,7 +427,9 @@ class ReplicationLayer(object): time_now = self._clock.time_msec() defer.returnValue((200, { "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]], - "auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]], + "auth_chain": [ + p.get_pdu_json(time_now) for p in res_pdus["auth_chain"] + ], })) @defer.inlineCallbacks @@ -436,7 +440,9 @@ class ReplicationLayer(object): ( 200, { - "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + "auth_chain": [ + a.get_pdu_json(time_now) for a in auth_pdus + ], } ) ) @@ -457,7 +463,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks def send_join(self, destination, pdu): - time_now = self._clock.time_msec() + time_now = self._clock.time_msec() _, content = yield self.transport_layer.send_join( destination, pdu.room_id, @@ -649,7 +655,8 @@ class _TransactionQueue(object): (pdu, deferred, order) ) - self._attempt_new_transaction(destination) + with PreserveLoggingContext(): + self._attempt_new_transaction(destination) deferreds.append(deferred) @@ -669,7 +676,9 @@ class _TransactionQueue(object): deferred.errback(failure) else: logger.exception("Failed to send edu", failure) - self._attempt_new_transaction(destination).addErrback(eb) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) return deferred diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 6e708edb8c..1bcd0548c2 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -25,7 +25,6 @@ import logging logger = logging.getLogger(__name__) - class Edu(JsonEncodedObject): """ An Edu represents a piece of data sent from one homeserver to another. diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 30c6733063..d53cd3df3e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -112,7 +112,7 @@ class BaseHandler(object): event.destinations = list(destinations) - self.notifier.on_new_room_event(event, extra_users=extra_users) + yield self.notifier.on_new_room_event(event, extra_users=extra_users) federation_handler = self.hs.get_handlers().federation_handler yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 05e5c6ecfc..af4e7d49c8 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -128,8 +128,9 @@ class DirectoryHandler(BaseHandler): "servers": result.servers, }) else: - raise SynapseError(404, "Room alias \"%s\" not found" % (room_alias,)) - + raise SynapseError( + 404, "Room alias \"%s\" not found" % (room_alias,) + ) @defer.inlineCallbacks def send_room_alias_update_event(self, user_id, room_id): diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 4993c92b74..d59221a4fb 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -56,7 +56,7 @@ class EventStreamHandler(BaseHandler): self.clock.cancel_call_later( self._stop_timer_per_user.pop(auth_user)) else: - self.distributor.fire( + yield self.distributor.fire( "started_user_eventstream", auth_user ) self._streams_per_user[auth_user] += 1 @@ -65,8 +65,10 @@ class EventStreamHandler(BaseHandler): pagin_config.from_token = None rm_handler = self.hs.get_handlers().room_member_handler + logger.debug("BETA") room_ids = yield rm_handler.get_rooms_for_user(auth_user) + logger.debug("ALPHA") with PreserveLoggingContext(): events, tokens = yield self.notifier.get_events_for( auth_user, room_ids, pagin_config, timeout @@ -93,7 +95,7 @@ class EventStreamHandler(BaseHandler): logger.debug( "_later stopped_user_eventstream %s", auth_user ) - self.distributor.fire( + yield self.distributor.fire( "stopped_user_eventstream", auth_user ) del self._stop_timer_per_user[auth_user] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 492005a170..2e8b8a1f9a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -122,7 +122,8 @@ class FederationHandler(BaseHandler): event.origin, redacted_pdu_json ) except SynapseError as e: - logger.warn("Signature check failed for %s redacted to %s", + logger.warn( + "Signature check failed for %s redacted to %s", encode_canonical_json(pdu.get_pdu_json()), encode_canonical_json(redacted_pdu_json), ) @@ -209,7 +210,7 @@ class FederationHandler(BaseHandler): if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) @@ -390,7 +391,8 @@ class FederationHandler(BaseHandler): event.outlier = False - is_new_state = yield self.state_handler.annotate_event_with_state(event) + state_handler = self.state_handler + is_new_state = yield state_handler.annotate_event_with_state(event) self.auth.check(event, raises=True) # FIXME (erikj): All this is duplicated above :( @@ -414,7 +416,7 @@ class FederationHandler(BaseHandler): if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index de70486b29..06a4e173f6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import RoomError from synapse.streams.config import PaginationConfig +from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler import logging @@ -86,9 +87,10 @@ class MessageHandler(BaseHandler): event, snapshot, suppress_auth=suppress_auth ) - self.hs.get_handlers().presence_handler.bump_presence_active_time( - user - ) + with PreserveLoggingContext(): + self.hs.get_handlers().presence_handler.bump_presence_active_time( + user + ) @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, @@ -296,7 +298,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def room_initial_sync(self, user_id, room_id, pagin_config=None, - feedback=False): + feedback=False): yield self.auth.check_joined_room(room_id, user_id) # TODO(paul): I wish I was called with user objects not user_id @@ -340,8 +342,8 @@ class MessageHandler(BaseHandler): ) presence.append(member_presence) except Exception: - logger.exception("Failed to get member presence of %r", - m.user_id + logger.exception( + "Failed to get member presence of %r", m.user_id ) defer.returnValue({ diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fcc92a8e32..b55d589daf 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler @@ -142,7 +143,7 @@ class PresenceHandler(BaseHandler): return UserPresenceCache() def registered_user(self, user): - self.store.create_presence(user.localpart) + return self.store.create_presence(user.localpart) @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): @@ -241,14 +242,12 @@ class PresenceHandler(BaseHandler): was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]] now_level = self.STATE_LEVELS[state["presence"]] - yield defer.DeferredList([ - self.store.set_presence_state( - target_user.localpart, state_to_store - ), - self.distributor.fire( - "collect_presencelike_data", target_user, state - ), - ]) + yield self.store.set_presence_state( + target_user.localpart, state_to_store + ) + yield self.distributor.fire( + "collect_presencelike_data", target_user, state + ) if now_level > was_level: state["last_active"] = self.clock.time_msec() @@ -256,14 +255,15 @@ class PresenceHandler(BaseHandler): now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap - if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) - elif not now_online and was_polling: - self.stop_polling_presence(target_user) + with PreserveLoggingContext(): + if now_online and not was_polling: + self.start_polling_presence(target_user, state=state) + elif not now_online and was_polling: + self.stop_polling_presence(target_user) - # TODO(paul): perform a presence push as part of start/stop poll so - # we don't have to do this all the time - self.changed_presencelike_data(target_user, state) + # TODO(paul): perform a presence push as part of start/stop poll so + # we don't have to do this all the time + self.changed_presencelike_data(target_user, state) def bump_presence_active_time(self, user, now=None): if now is None: @@ -277,7 +277,7 @@ class PresenceHandler(BaseHandler): self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) - self.push_presence(user, statuscache=statuscache) + return self.push_presence(user, statuscache=statuscache) @log_function def started_user_eventstream(self, user): @@ -381,8 +381,10 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) - - self.start_polling_presence(observer_user, target_user=observed_user) + with PreserveLoggingContext(): + self.start_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): @@ -401,7 +403,10 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - self.stop_polling_presence(observer_user, target_user=observed_user) + with PreserveLoggingContext(): + self.stop_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): @@ -710,7 +715,8 @@ class PresenceHandler(BaseHandler): if not self._remote_sendmap[user]: del self._remote_sendmap[user] - yield defer.DeferredList(deferreds) + with PreserveLoggingContext(): + yield defer.DeferredList(deferreds) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 7853bf5098..814b3b68fe 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import Membership +from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler @@ -46,7 +47,7 @@ class ProfileHandler(BaseHandler): ) def registered_user(self, user): - self.store.create_profile(user.localpart) + return self.store.create_profile(user.localpart) @defer.inlineCallbacks def get_displayname(self, target_user): @@ -152,13 +153,14 @@ class ProfileHandler(BaseHandler): if not user.is_mine: defer.returnValue(None) - (displayname, avatar_url) = yield defer.gatherResults( - [ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ], - consumeErrors=True - ) + with PreserveLoggingContext(): + (displayname, avatar_url) = yield defer.gatherResults( + [ + self.store.get_profile_displayname(user.localpart), + self.store.get_profile_avatar_url(user.localpart), + ], + consumeErrors=True + ) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index a39230bc76..122bf065c9 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -69,7 +69,7 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash ) - self.distributor.fire("registered_user", user) + yield self.distributor.fire("registered_user", user) else: # autogen a random user ID attempts = 0 diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7d9458e1d0..88955160c5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -178,7 +178,9 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() - directory_handler.send_room_alias_update_event(user_id, room_id) + yield directory_handler.send_room_alias_update_event( + user_id, room_id + ) defer.returnValue(result) @@ -211,7 +213,6 @@ class RoomCreationHandler(BaseHandler): **event_keys ) - power_levels_event = self.event_factory.create_event( etype=RoomPowerLevelsEvent.TYPE, content={ @@ -480,7 +481,7 @@ class RoomMemberHandler(BaseHandler): ) user = self.hs.parse_userid(event.user_id) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=room_id ) diff --git a/synapse/http/content_repository.py b/synapse/http/content_repository.py index 1306b35271..7e046dfe49 100644 --- a/synapse/http/content_repository.py +++ b/synapse/http/content_repository.py @@ -131,12 +131,14 @@ class ContentRepoResource(resource.Resource): request.setHeader('Content-Type', content_type) # cache for at least a day. - # XXX: we might want to turn this off for data we don't want to recommend - # caching as it's sensitive or private - or at least select private. - # don't bother setting Expires as all our matrix clients are smart enough to - # be happy with Cache-Control (right?) - request.setHeader('Cache-Control', 'public,max-age=86400,s-maxage=86400') - + # XXX: we might want to turn this off for data we don't want to + # recommend caching as it's sensitive or private - or at least + # select private. don't bother setting Expires as all our matrix + # clients are smart enough to be happy with Cache-Control (right?) + request.setHeader( + "Cache-Control", "public,max-age=86400,s-maxage=86400" + ) + d = FileSender().beginFileTransfer(f, request) # after the file has been sent, clean up and finish the request diff --git a/synapse/http/server.py b/synapse/http/server.py index ed1f1170cb..8024ff5bde 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -138,8 +138,7 @@ class JsonResource(HttpServer, resource.Resource): ) except CodeMessageException as e: if isinstance(e, SynapseError): - logger.error("%s SynapseError: %s - %s", request, e.code, - e.msg) + logger.info("%s SynapseError: %s - %s", request, e.code, e.msg) else: logger.exception(e) self._send_response( diff --git a/synapse/notifier.py b/synapse/notifier.py index c310a9fed6..5e14950449 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import run_on_reactor import logging @@ -96,6 +97,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + yield run_on_reactor() room_id = event.room_id room_source = self.event_sources.sources["room"] @@ -143,6 +145,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ + yield run_on_reactor() presence_source = self.event_sources.sources["presence"] listeners = set() @@ -211,6 +214,7 @@ class Notifier(object): timeout, deferred, ) + def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. diff --git a/synapse/rest/events.py b/synapse/rest/events.py index 92ff5e5ca7..3c1b041bfe 100644 --- a/synapse/rest/events.py +++ b/synapse/rest/events.py @@ -26,7 +26,6 @@ import logging logger = logging.getLogger(__name__) - class EventStreamRestServlet(RestServlet): PATTERN = client_path_pattern("/events$") diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py index 138cc88a05..502ed0d4ca 100644 --- a/synapse/rest/presence.py +++ b/synapse/rest/presence.py @@ -117,8 +117,6 @@ class PresenceListRestServlet(RestServlet): logger.exception("JSON parse error") raise SynapseError(400, "Unable to parse content") - deferreds = [] - if "invite" in content: for u in content["invite"]: if not isinstance(u, basestring): @@ -126,8 +124,9 @@ class PresenceListRestServlet(RestServlet): if len(u) == 0: continue invited_user = self.hs.parse_userid(u) - deferreds.append(self.handlers.presence_handler.send_invite( - observer_user=user, observed_user=invited_user)) + yield self.handlers.presence_handler.send_invite( + observer_user=user, observed_user=invited_user + ) if "drop" in content: for u in content["drop"]: @@ -136,10 +135,9 @@ class PresenceListRestServlet(RestServlet): if len(u) == 0: continue dropped_user = self.hs.parse_userid(u) - deferreds.append(self.handlers.presence_handler.drop( - observer_user=user, observed_user=dropped_user)) - - yield defer.DeferredList(deferreds) + yield self.handlers.presence_handler.drop( + observer_user=user, observed_user=dropped_user + ) defer.returnValue((200, {})) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 5957f938a4..e91fcc9789 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -510,7 +510,7 @@ def prepare_database(db_conn): "new for the server to understand" ) elif user_version < SCHEMA_VERSION: - logging.info( + logger.info( "Upgrading database from version %d", user_version ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5d4be09a82..fd5b2affad 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -57,7 +57,7 @@ class LoggingTransaction(object): if args and args[0]: values = args[0] sql_logger.debug( - "[SQL values] {%s} " + ", ".join(("<%s>",) * len(values)), + "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)), self.name, *values ) @@ -91,6 +91,7 @@ class SQLBaseStore(object): def runInteraction(self, desc, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" current_context = LoggingContext.current_context() + def inner_func(txn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) @@ -115,7 +116,6 @@ class SQLBaseStore(object): "[TXN END] {%s} %f", name, end - start ) - with PreserveLoggingContext(): result = yield self._db_pool.runInteraction( inner_func, *args, **kwargs diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 1f89d77344..4d15005c9e 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -75,7 +75,9 @@ class RegistrationStore(SQLBaseStore): "VALUES (?,?,?)", [user_id, password_hash, now]) except IntegrityError: - raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE) + raise StoreError( + 400, "User ID already taken.", errcode=Codes.USER_IN_USE + ) # it's possible for this to get a conflict, but only for a single user # since tokens are namespaced based on their user ID @@ -83,8 +85,8 @@ class RegistrationStore(SQLBaseStore): "VALUES (?,?)", [txn.lastrowid, token]) def get_user_by_id(self, user_id): - query = ("SELECT users.name, users.password_hash FROM users " - "WHERE users.name = ?") + query = ("SELECT users.name, users.password_hash FROM users" + " WHERE users.name = ?") return self._execute( self.cursor_to_dict, query, user_id @@ -120,10 +122,10 @@ class RegistrationStore(SQLBaseStore): def _query_for_auth(self, txn, token): sql = ( - "SELECT users.name, users.admin, access_tokens.device_id " - "FROM users " - "INNER JOIN access_tokens on users.id = access_tokens.user_id " - "WHERE token = ?" + "SELECT users.name, users.admin, access_tokens.device_id" + " FROM users" + " INNER JOIN access_tokens on users.id = access_tokens.user_id" + " WHERE token = ?" ) cursor = txn.execute(sql, (token,)) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index cc0513b8d2..2378d65943 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -27,7 +27,9 @@ import logging logger = logging.getLogger(__name__) -OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level", "redact_level")) +OpsLevel = collections.namedtuple("OpsLevel", ( + "ban_level", "kick_level", "redact_level") +) class RoomStore(SQLBaseStore): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 93329703a2..c37df59d45 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -177,8 +177,8 @@ class RoomMemberStore(SQLBaseStore): return self._get_members_query(clause, vals) def _get_members_query(self, where_clause, where_values): - return self._db_pool.runInteraction( - self._get_members_query_txn, + return self.runInteraction( + "get_members_query", self._get_members_query_txn, where_clause, where_values ) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index d90e08fff1..eea4f21065 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -36,7 +36,7 @@ class SignatureStore(SQLBaseStore): return dict(txn.fetchall()) def _store_event_content_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + hash_bytes): """Store a hash for a Event Args: txn (cursor): @@ -84,7 +84,7 @@ class SignatureStore(SQLBaseStore): return dict(txn.fetchall()) def _store_event_reference_hash_txn(self, txn, event_id, algorithm, - hash_bytes): + hash_bytes): """Store a hash for a PDU Args: txn (cursor): @@ -127,7 +127,7 @@ class SignatureStore(SQLBaseStore): return res def _store_event_signature_txn(self, txn, event_id, signature_name, key_id, - signature_bytes): + signature_bytes): """Store a signature from the origin server for a PDU. Args: txn (cursor): @@ -169,7 +169,7 @@ class SignatureStore(SQLBaseStore): return results def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id, - algorithm, hash_bytes): + algorithm, hash_bytes): self._simple_insert_txn( txn, "event_edge_hashes", @@ -180,4 +180,4 @@ class SignatureStore(SQLBaseStore): "hash": buffer(hash_bytes), }, or_ignore=True, - ) \ No newline at end of file + ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a954024678..b84735e61c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -213,8 +213,8 @@ class StreamStore(SQLBaseStore): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. - from_comp = '<=' if direction =='b' else '>' - to_comp = '>' if direction =='b' else '<=' + from_comp = '<=' if direction == 'b' else '>' + to_comp = '>' if direction == 'b' else '<=' order = "DESC" if direction == 'b' else "ASC" args = [room_id] @@ -235,9 +235,10 @@ class StreamStore(SQLBaseStore): ) sql = ( - "SELECT *, (%(redacted)s) AS redacted FROM events " - "WHERE outlier = 0 AND room_id = ? AND %(bounds)s " - "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " + "SELECT *, (%(redacted)s) AS redacted FROM events" + " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" + " ORDER BY topological_ordering %(order)s," + " stream_ordering %(order)s %(limit)s" ) % { "redacted": del_sql, "bounds": bounds, diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index e57fb0e914..7ec5033ceb 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -37,6 +37,7 @@ class Clock(object): def call_later(self, delay, callback): current_context = LoggingContext.current_context() + def wrapped_callback(): LoggingContext.thread_local.current_context = current_context callback() diff --git a/synapse/util/async.py b/synapse/util/async.py index 1219d927db..7dd3ec3a72 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -18,6 +18,7 @@ from twisted.internet import defer, reactor from .logcontext import PreserveLoggingContext + @defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() @@ -25,6 +26,7 @@ def sleep(seconds): with PreserveLoggingContext(): yield d + def run_on_reactor(): """ This will cause the rest of the function to be invoked upon the next iteration of the main loop diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index eddbe5837f..701ccdb781 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.logcontext import PreserveLoggingContext + from twisted.internet import defer import logging @@ -91,6 +93,7 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) + @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -98,22 +101,24 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - deferreds = [] - for observer in self.observers: - d = defer.maybeDeferred(observer, *args, **kwargs) - - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - raise failure - deferreds.append(d.addErrback(eb)) - - return defer.DeferredList( - deferreds, fireOnOneErrback=not self.suppress_failures - ) + with PreserveLoggingContext(): + deferreds = [] + for observer in self.observers: + d = defer.maybeDeferred(observer, *args, **kwargs) + + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + raise failure + deferreds.append(d.addErrback(eb)) + + result = yield defer.DeferredList( + deferreds, fireOnOneErrback=not self.suppress_failures + ) + defer.returnValue(result) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 2f430a0f19..23b3decb45 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -1,6 +1,8 @@ import threading import logging +logger = logging.getLogger(__name__) + class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a @@ -53,7 +55,7 @@ class LoggingContext(object): None to avoid suppressing any exeptions that were thrown. """ if self.thread_local.current_context is not self: - logging.error( + logger.error( "Current logging context %s is not the expected context %s", self.thread_local.current_context, self |