diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-29 12:10:00 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-29 12:10:00 +0100 |
commit | 47fb286184dd27d8098af2fd24d22dd82205d3d5 (patch) | |
tree | 3e186226e18131b7c5d9ca89fdb4075667110c6d /synapse | |
parent | Expand architecture section to introduce room IDs, room aliases, user IDs, ev... (diff) | |
parent | Fix a couple of bugs in presence handler related to pushing updatesto the cor... (diff) | |
download | synapse-47fb286184dd27d8098af2fd24d22dd82205d3d5.tar.xz |
Merge branch 'presence_logging' into develop
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/replication.py | 2 | ||||
-rw-r--r-- | synapse/handlers/events.py | 6 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 216 | ||||
-rw-r--r-- | synapse/notifier.py | 1 | ||||
-rw-r--r-- | synapse/streams/config.py | 2 | ||||
-rw-r--r-- | synapse/util/logutils.py | 53 |
6 files changed, 189 insertions, 91 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 7868575a2e..cadf574b3b 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -543,6 +543,8 @@ class _TransactionQueue(object): def eb(failure): if not deferred.called: deferred.errback(failure) + else: + logger.exception("Failed to send edu", failure) self._attempt_new_transaction(destination).addErrback(eb) return deferred diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index e08231406d..980a169b25 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.events import SynapseEvent +from synapse.util.logutils import log_function from ._base import BaseHandler @@ -44,6 +45,7 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() @defer.inlineCallbacks + @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0): auth_user = self.hs.parse_userid(auth_user_id) @@ -90,13 +92,15 @@ class EventStreamHandler(BaseHandler): # 10 seconds of grace to allow the client to reconnect again # before we think they're gone def _later(): + logger.debug("_later stopped_user_eventstream %s", auth_user) self.distributor.fire( "stopped_user_eventstream", auth_user ) del self._stop_timer_per_user[auth_user] + logger.debug("Scheduling _later: for %s", auth_user) self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(5, _later) + self.clock.call_later(30, _later) ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index bef1508892..7731de85c0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,6 +18,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState +from synapse.util.logutils import log_function + from ._base import BaseHandler import logging @@ -141,6 +143,10 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): + defer.returnValue(True) + # return + # FIXME (erikj): This code path absolutely kills the database. + assert(observed_user.is_mine) if observer_user == observed_user: @@ -184,7 +190,12 @@ class PresenceHandler(BaseHandler): defer.returnValue(state) @defer.inlineCallbacks + @log_function def set_state(self, target_user, auth_user, state): + # return + # TODO (erikj): Turn this back on. Why did we end up sending EDUs + # everywhere? + if not target_user.is_mine: raise SynapseError(400, "User is not hosted on this Home Server") @@ -237,33 +248,42 @@ class PresenceHandler(BaseHandler): self.push_presence(user, statuscache=statuscache) + @log_function def started_user_eventstream(self, user): # TODO(paul): Use "last online" state self.set_state(user, user, {"state": PresenceState.ONLINE}) + @log_function def stopped_user_eventstream(self, user): # TODO(paul): Save current state as "last online" state self.set_state(user, user, {"state": PresenceState.OFFLINE}) @defer.inlineCallbacks def user_joined_room(self, user, room_id): - localusers = set() - remotedomains = set() - - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into(room_id, - localusers=localusers, remotedomains=remotedomains, - ignore_user=user) if user.is_mine: - yield self._send_presence_to_distribution(srcuser=user, - localusers=localusers, remotedomains=remotedomains, + self.push_update_to_local_and_remote( + observed_user=user, + room_ids=[room_id], statuscache=self._get_or_offline_usercache(user), ) - for srcuser in localusers: - yield self._send_presence(srcuser=srcuser, destuser=user, - statuscache=self._get_or_offline_usercache(srcuser), + else: + self.push_update_to_clients( + observed_user=user, + room_ids=[room_id], + statuscache=self._get_or_offline_usercache(user), + ) + + # We also want to tell them about current presence of people. + rm_handler = self.homeserver.get_handlers().room_member_handler + curr_users = yield rm_handler.get_room_members(room_id) + + for local_user in [c for c in curr_users if c.is_mine]: + self.push_update_to_local_and_remote( + observed_user=local_user, + users_to_push=[user], + statuscache=self._get_or_offline_usercache(local_user), ) @defer.inlineCallbacks @@ -374,11 +394,13 @@ class PresenceHandler(BaseHandler): defer.returnValue(presence) @defer.inlineCallbacks + @log_function def start_polling_presence(self, user, target_user=None, state=None): logger.debug("Start polling for presence from %s", user) if target_user: target_users = set([target_user]) + room_ids = [] else: presence = yield self.store.get_presence_list( user.localpart, accepted=True @@ -392,23 +414,37 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - for member in (yield rm_handler.get_room_members(room_id)): - target_users.add(member) - if state is None: state = yield self.store.get_presence_state(user.localpart) - - localusers, remoteusers = partitionbool( - target_users, - lambda u: u.is_mine + else: +# statuscache = self._get_or_make_usercache(user) +# self._user_cachemap_latest_serial += 1 +# statuscache.update(state, self._user_cachemap_latest_serial) + pass + + yield self.push_update_to_local_and_remote( + observed_user=user, + users_to_push=target_users, + room_ids=room_ids, + statuscache=self._get_or_make_usercache(user), ) - for target_user in localusers: - self._start_polling_local(user, target_user) + for target_user in target_users: + if target_user.is_mine: + self._start_polling_local(user, target_user) + + # We want to tell the person that just came online + # presence state of people they are interested in? + self.push_update_to_clients( + observed_user=target_user, + users_to_push=[user], + statuscache=self._get_or_offline_usercache(target_user), + ) deferreds = [] - remoteusers_by_domain = partition(remoteusers, lambda u: u.domain) + remote_users = [u for u in target_users if not u.is_mine] + remoteusers_by_domain = partition(remote_users, lambda u: u.domain) + # Only poll for people in our get_presence_list for domain in remoteusers_by_domain: remoteusers = remoteusers_by_domain[domain] @@ -430,12 +466,6 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) - self.push_update_to_clients( - observer_user=user, - observed_user=target_user, - statuscache=self._get_or_offline_usercache(target_user), - ) - def _start_polling_remote(self, user, domain, remoteusers): to_poll = set() @@ -455,6 +485,7 @@ class PresenceHandler(BaseHandler): content={"poll": [u.to_string() for u in to_poll]} ) + @log_function def stop_polling_presence(self, user, target_user=None): logger.debug("Stop polling for presence from %s", user) @@ -494,6 +525,7 @@ class PresenceHandler(BaseHandler): if not self._local_pushmap[localpart]: del self._local_pushmap[localpart] + @log_function def _stop_polling_remote(self, user, domain, remoteusers): to_unpoll = set() @@ -514,6 +546,7 @@ class PresenceHandler(BaseHandler): ) @defer.inlineCallbacks + @log_function def push_presence(self, user, statuscache): assert(user.is_mine) @@ -529,53 +562,17 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains, - ignore_user=user, - ) - - if not localusers and not remotedomains: + if not localusers and not room_ids: defer.returnValue(None) - yield self._send_presence_to_distribution(user, - localusers=localusers, remotedomains=remotedomains, - statuscache=statuscache + yield self.push_update_to_local_and_remote( + observed_user=user, + users_to_push=localusers, + remote_domains=remotedomains, + room_ids=room_ids, + statuscache=statuscache, ) - def _send_presence(self, srcuser, destuser, statuscache): - if destuser.is_mine: - self.push_update_to_clients( - observer_user=destuser, - observed_user=srcuser, - statuscache=statuscache) - return defer.succeed(None) - else: - return self._push_presence_remote(srcuser, destuser.domain, - state=statuscache.get_state() - ) - - @defer.inlineCallbacks - def _send_presence_to_distribution(self, srcuser, localusers=set(), - remotedomains=set(), statuscache=None): - - for u in localusers: - logger.debug(" | push to local user %s", u) - self.push_update_to_clients( - observer_user=u, - observed_user=srcuser, - statuscache=statuscache, - ) - - deferreds = [] - for domain in remotedomains: - logger.debug(" | push to remote domain %s", domain) - deferreds.append(self._push_presence_remote(srcuser, domain, - state=statuscache.get_state()) - ) - - yield defer.DeferredList(deferreds) - @defer.inlineCallbacks def _push_presence_remote(self, user, destination, state=None): if state is None: @@ -591,12 +588,17 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("mtime") ) + user_state = { + "user_id": user.to_string(), + } + user_state.update(**state) + yield self.federation.send_edu( destination=destination, edu_type="m.presence", content={ "push": [ - dict(user_id=user.to_string(), **state), + user_state, ], } ) @@ -615,12 +617,7 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=observers, ignore_user=user - ) - - if not observers: + if not observers and not room_ids: break state = dict(push) @@ -636,12 +633,12 @@ class PresenceHandler(BaseHandler): self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) - for observer_user in observers: - self.push_update_to_clients( - observer_user=observer_user, - observed_user=user, - statuscache=statuscache, - ) + self.push_update_to_clients( + observed_user=user, + users_to_push=observers, + room_ids=room_ids, + statuscache=statuscache, + ) if state["state"] == PresenceState.OFFLINE: del self._user_cachemap[user] @@ -675,12 +672,53 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds) - def push_update_to_clients(self, observer_user, observed_user, - statuscache): - statuscache.make_event(user=observed_user, clock=self.clock) + @defer.inlineCallbacks + def push_update_to_local_and_remote(self, observed_user, + users_to_push=[], room_ids=[], + remote_domains=[], + statuscache=None): + + localusers, remoteusers = partitionbool( + users_to_push, + lambda u: u.is_mine + ) + + localusers = set(localusers) + + self.push_update_to_clients( + observed_user=observed_user, + users_to_push=localusers, + room_ids=room_ids, + statuscache=statuscache, + ) + + remote_domains = set(remote_domains) + remote_domains |= set([r.domain for r in remoteusers]) + for room_id in room_ids: + remote_domains.update( + (yield self.store.get_joined_hosts_for_room(room_id)) + ) + + remote_domains.discard(self.hs.hostname) + + deferreds = [] + for domain in remote_domains: + logger.debug(" | push to remote domain %s", domain) + deferreds.append( + self._push_presence_remote( + observed_user, domain, state=statuscache.get_state() + ) + ) + + yield defer.DeferredList(deferreds) + + defer.returnValue((localusers, remote_domains)) + def push_update_to_clients(self, observed_user, users_to_push=[], + room_ids=[], statuscache=None): self.notifier.on_new_user_event( - [observer_user], + users_to_push, + room_ids, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 3d3fcdabdb..b6d5ec4820 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -119,6 +119,7 @@ class Notifier(object): ) @defer.inlineCallbacks + @log_function def on_new_user_event(self, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 2434844d80..01bab568ff 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -81,4 +81,4 @@ class PaginationConfig(object): return ( "<PaginationConfig from_tok=%s, to_tok=%s, " "direction=%s, limit=%s>" - ) % (self.from_tok, self.to_tok, self.direction, self.limit) + ) % (self.from_token, self.to_token, self.direction, self.limit) diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 021649071b..b94a749786 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -18,6 +18,8 @@ from inspect import getcallargs from functools import wraps import logging +import inspect +import traceback def log_function(f): @@ -65,4 +67,55 @@ def log_function(f): return f(*args, **kwargs) + wrapped.__name__ = func_name + return wrapped + + +def trace_function(f): + func_name = f.__name__ + linenum = f.func_code.co_firstlineno + pathname = f.func_code.co_filename + + def wrapped(*args, **kwargs): + name = f.__module__ + logger = logging.getLogger(name) + level = logging.DEBUG + + s = inspect.currentframe().f_back + + to_print = [ + "\t%s:%s %s. Args: args=%s, kwargs=%s" % ( + pathname, linenum, func_name, args, kwargs + ) + ] + while s: + if True or s.f_globals["__name__"].startswith("synapse"): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + to_print.append( + "\t%s:%d %s. Args: %s" % ( + filename, lineno, function, args_string + ) + ) + + s = s.f_back + + msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print) + + record = logging.LogRecord( + name=name, + level=level, + pathname=pathname, + lineno=lineno, + msg=msg, + args=None, + exc_info=None + ) + + logger.handle(record) + + return f(*args, **kwargs) + + wrapped.__name__ = func_name return wrapped |