diff options
-rw-r--r-- | docs/specification.rst | 101 | ||||
-rw-r--r-- | synapse/federation/replication.py | 2 | ||||
-rw-r--r-- | synapse/handlers/events.py | 6 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 216 | ||||
-rw-r--r-- | synapse/notifier.py | 1 | ||||
-rw-r--r-- | synapse/streams/config.py | 2 | ||||
-rw-r--r-- | synapse/util/logutils.py | 53 | ||||
-rw-r--r-- | tests/handlers/test_presence.py | 138 | ||||
-rw-r--r-- | tests/handlers/test_presencelike.py | 54 | ||||
-rw-r--r-- | tests/utils.py | 37 | ||||
-rw-r--r-- | webclient/room/room-controller.js | 19 |
11 files changed, 423 insertions, 206 deletions
diff --git a/docs/specification.rst b/docs/specification.rst index d650683efc..c1559c886c 100644 --- a/docs/specification.rst +++ b/docs/specification.rst @@ -9,7 +9,9 @@ TODO(Introduction) : Matthew Architecture ============ -- Sending a message from A to B + +Clients transmit data to other clients through home servers (HSes). Clients do not communicate with each +other directly. :: @@ -26,39 +28,42 @@ Architecture | |<--------( HTTP )-----------| | +------------------+ Federation +------------------+ -- Client is an end-user (web app, mobile app) which uses C-S APIs to talk to the home server. - A given client is typically responsible for a single user. -- A single user is represented by a User ID, scoped to the home server which allocated the account. - User IDs MUST have @ prefix; looks like @foo:domain - domain indicates the user's home - server. -- Home server provides C-S APIs and has the ability to federate with other HSes. - Typically responsible for N clients. -- Federation's purpose is to share content between interested HSes; no SPOF. -- Events are actions within the system. Typically each action (e.g. sending a message) - correlates with exactly one event. Each event has a ``type`` string. -- ``type`` values SHOULD be namespaced according to standard Java package naming conventions, - with a ``.`` delimiter e.g. ``com.example.myapp.event`` -- Events are typically send in the context of a room. +A "Client" is an end-user, typically a human using a web application or mobile app. Clients use the +"Client-to-Server" (C-S) API to communicate with their home server. A single Client is usually +responsible for a single user account. A user account is represented by their "User ID". This ID is +namespaced to the home server which allocated the account and looks like:: + + @localpart:domain + +The ``localpart`` of a user ID may be a user name, or an opaque ID identifying this user. + + +A "Home Server" is a server which provides C-S APIs and has the ability to federate with other HSes. +It is typically responsible for multiple clients. "Federation" is the term used to describe the +sharing of data between two or more home servers. + +Data in Matrix is encapsulated in an "Event". An event is an action within the system. Typically each +action (e.g. sending a message) correlates with exactly one event. Each event has a ``type`` which is +used to differentiate different kinds of data. ``type`` values SHOULD be namespaced according to standard +Java package naming conventions, e.g. ``com.example.myapp.event``. Events are usually sent in the context +of a "Room". Room structure -------------- -A room is a conceptual place where users can send and receive messages. Rooms -can be created, joined and left. Messages are sent to a room, and all -participants in that room will receive the message. Rooms are uniquely -identified via a room ID. There is exactly one room ID for each room. Each -room can also have an alias. Each room can have many aliases. +A room is a conceptual place where users can send and receive events. Rooms +can be created, joined and left. Events are sent to a room, and all +participants in that room will receive the event. Rooms are uniquely +identified via a "Room ID", which look like:: -- Room IDs MUST have ! prefix; looks like !foo:domain - domain is simply for namespacing, - the room does NOT reside on any one domain. NOT human readable. + !opaque_id:domain -- Room Aliases MUST have # prefix; looks like #foo:domain - domain indicates where this - alias can be mapped to a room ID. Key point: human readable / friendly. +There is exactly one room ID for each room. Whilst the room ID does contain a +domain, it is simply for namespacing room IDs. The room does NOT reside on the +domain specified. Room IDs are not meant to be human readable. -- Aliases can be queried on the domain they specify, which will return a room ID if a - mapping exists. These mappings can change. - -:: +The following diagram shows an ``m.room.message`` event being sent in the room +``!qporfwt:matrix.org``:: { @alice:matrix.org } { @bob:domain.com } | ^ @@ -73,18 +78,48 @@ room can also have an alias. Each room can have many aliases. | matrix.org |<-------Federation------->| domain.com | +------------------+ +------------------+ | ................................. | - |______| Shared State |_______| - | Room ID: !qporfwt:matrix.org | + |______| Partially Shared State |_______| + | Room ID: !qporfwt:matrix.org | | Servers: matrix.org, domain.com | | Members: | | - @alice:matrix.org | | - @bob:domain.com | |.................................| -- Federation's goal is to maintain the shared state. Don't need FULL state in order - to be a part of a room. -- Introduce the DAG. -- Events are wrapped in PDUs. +Federation maintains shared state between multiple home servers, such that when an event is +sent to a room, the home server knows where to forward the event on to, and how to process +the event. Home servers do not need to have completely shared state in order to participate +in a room. State is scoped to a single room, and federation ensures that all home servers +have the information they need, even if that means the home server has to request more +information from another home server before processing the event. + +Room Aliases +------------ + +Each room can also have multiple "Room Aliases", which looks like:: + + #room_alias:domain + +A room alias "points" to a room ID. The room ID the alias is pointing to can be obtained +by visiting the domain specified. Room aliases are designed to be human readable strings +which can be used to publicise rooms. Note that the mapping from a room alias to a +room ID is not fixed, and may change over time to point to a different room ID. For this +reason, Clients SHOULD resolve the room alias to a room ID once and then use that ID on +subsequent requests. + +:: + + GET + #matrix:domain.com !aaabaa:matrix.org + | ^ + | | + _______V____________________|____ + | domain.com | + | Mappings: | + | #matrix >> !aaabaa:matrix.org | + | #golf >> !wfeiofh:sport.com | + | #bike >> !4rguxf:matrix.org | + |________________________________| Identity diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 7868575a2e..cadf574b3b 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -543,6 +543,8 @@ class _TransactionQueue(object): def eb(failure): if not deferred.called: deferred.errback(failure) + else: + logger.exception("Failed to send edu", failure) self._attempt_new_transaction(destination).addErrback(eb) return deferred diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index e08231406d..980a169b25 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.events import SynapseEvent +from synapse.util.logutils import log_function from ._base import BaseHandler @@ -44,6 +45,7 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() @defer.inlineCallbacks + @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0): auth_user = self.hs.parse_userid(auth_user_id) @@ -90,13 +92,15 @@ class EventStreamHandler(BaseHandler): # 10 seconds of grace to allow the client to reconnect again # before we think they're gone def _later(): + logger.debug("_later stopped_user_eventstream %s", auth_user) self.distributor.fire( "stopped_user_eventstream", auth_user ) del self._stop_timer_per_user[auth_user] + logger.debug("Scheduling _later: for %s", auth_user) self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(5, _later) + self.clock.call_later(30, _later) ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index bef1508892..7731de85c0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,6 +18,8 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState +from synapse.util.logutils import log_function + from ._base import BaseHandler import logging @@ -141,6 +143,10 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): + defer.returnValue(True) + # return + # FIXME (erikj): This code path absolutely kills the database. + assert(observed_user.is_mine) if observer_user == observed_user: @@ -184,7 +190,12 @@ class PresenceHandler(BaseHandler): defer.returnValue(state) @defer.inlineCallbacks + @log_function def set_state(self, target_user, auth_user, state): + # return + # TODO (erikj): Turn this back on. Why did we end up sending EDUs + # everywhere? + if not target_user.is_mine: raise SynapseError(400, "User is not hosted on this Home Server") @@ -237,33 +248,42 @@ class PresenceHandler(BaseHandler): self.push_presence(user, statuscache=statuscache) + @log_function def started_user_eventstream(self, user): # TODO(paul): Use "last online" state self.set_state(user, user, {"state": PresenceState.ONLINE}) + @log_function def stopped_user_eventstream(self, user): # TODO(paul): Save current state as "last online" state self.set_state(user, user, {"state": PresenceState.OFFLINE}) @defer.inlineCallbacks def user_joined_room(self, user, room_id): - localusers = set() - remotedomains = set() - - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into(room_id, - localusers=localusers, remotedomains=remotedomains, - ignore_user=user) if user.is_mine: - yield self._send_presence_to_distribution(srcuser=user, - localusers=localusers, remotedomains=remotedomains, + self.push_update_to_local_and_remote( + observed_user=user, + room_ids=[room_id], statuscache=self._get_or_offline_usercache(user), ) - for srcuser in localusers: - yield self._send_presence(srcuser=srcuser, destuser=user, - statuscache=self._get_or_offline_usercache(srcuser), + else: + self.push_update_to_clients( + observed_user=user, + room_ids=[room_id], + statuscache=self._get_or_offline_usercache(user), + ) + + # We also want to tell them about current presence of people. + rm_handler = self.homeserver.get_handlers().room_member_handler + curr_users = yield rm_handler.get_room_members(room_id) + + for local_user in [c for c in curr_users if c.is_mine]: + self.push_update_to_local_and_remote( + observed_user=local_user, + users_to_push=[user], + statuscache=self._get_or_offline_usercache(local_user), ) @defer.inlineCallbacks @@ -374,11 +394,13 @@ class PresenceHandler(BaseHandler): defer.returnValue(presence) @defer.inlineCallbacks + @log_function def start_polling_presence(self, user, target_user=None, state=None): logger.debug("Start polling for presence from %s", user) if target_user: target_users = set([target_user]) + room_ids = [] else: presence = yield self.store.get_presence_list( user.localpart, accepted=True @@ -392,23 +414,37 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - for member in (yield rm_handler.get_room_members(room_id)): - target_users.add(member) - if state is None: state = yield self.store.get_presence_state(user.localpart) - - localusers, remoteusers = partitionbool( - target_users, - lambda u: u.is_mine + else: +# statuscache = self._get_or_make_usercache(user) +# self._user_cachemap_latest_serial += 1 +# statuscache.update(state, self._user_cachemap_latest_serial) + pass + + yield self.push_update_to_local_and_remote( + observed_user=user, + users_to_push=target_users, + room_ids=room_ids, + statuscache=self._get_or_make_usercache(user), ) - for target_user in localusers: - self._start_polling_local(user, target_user) + for target_user in target_users: + if target_user.is_mine: + self._start_polling_local(user, target_user) + + # We want to tell the person that just came online + # presence state of people they are interested in? + self.push_update_to_clients( + observed_user=target_user, + users_to_push=[user], + statuscache=self._get_or_offline_usercache(target_user), + ) deferreds = [] - remoteusers_by_domain = partition(remoteusers, lambda u: u.domain) + remote_users = [u for u in target_users if not u.is_mine] + remoteusers_by_domain = partition(remote_users, lambda u: u.domain) + # Only poll for people in our get_presence_list for domain in remoteusers_by_domain: remoteusers = remoteusers_by_domain[domain] @@ -430,12 +466,6 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) - self.push_update_to_clients( - observer_user=user, - observed_user=target_user, - statuscache=self._get_or_offline_usercache(target_user), - ) - def _start_polling_remote(self, user, domain, remoteusers): to_poll = set() @@ -455,6 +485,7 @@ class PresenceHandler(BaseHandler): content={"poll": [u.to_string() for u in to_poll]} ) + @log_function def stop_polling_presence(self, user, target_user=None): logger.debug("Stop polling for presence from %s", user) @@ -494,6 +525,7 @@ class PresenceHandler(BaseHandler): if not self._local_pushmap[localpart]: del self._local_pushmap[localpart] + @log_function def _stop_polling_remote(self, user, domain, remoteusers): to_unpoll = set() @@ -514,6 +546,7 @@ class PresenceHandler(BaseHandler): ) @defer.inlineCallbacks + @log_function def push_presence(self, user, statuscache): assert(user.is_mine) @@ -529,53 +562,17 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains, - ignore_user=user, - ) - - if not localusers and not remotedomains: + if not localusers and not room_ids: defer.returnValue(None) - yield self._send_presence_to_distribution(user, - localusers=localusers, remotedomains=remotedomains, - statuscache=statuscache + yield self.push_update_to_local_and_remote( + observed_user=user, + users_to_push=localusers, + remote_domains=remotedomains, + room_ids=room_ids, + statuscache=statuscache, ) - def _send_presence(self, srcuser, destuser, statuscache): - if destuser.is_mine: - self.push_update_to_clients( - observer_user=destuser, - observed_user=srcuser, - statuscache=statuscache) - return defer.succeed(None) - else: - return self._push_presence_remote(srcuser, destuser.domain, - state=statuscache.get_state() - ) - - @defer.inlineCallbacks - def _send_presence_to_distribution(self, srcuser, localusers=set(), - remotedomains=set(), statuscache=None): - - for u in localusers: - logger.debug(" | push to local user %s", u) - self.push_update_to_clients( - observer_user=u, - observed_user=srcuser, - statuscache=statuscache, - ) - - deferreds = [] - for domain in remotedomains: - logger.debug(" | push to remote domain %s", domain) - deferreds.append(self._push_presence_remote(srcuser, domain, - state=statuscache.get_state()) - ) - - yield defer.DeferredList(deferreds) - @defer.inlineCallbacks def _push_presence_remote(self, user, destination, state=None): if state is None: @@ -591,12 +588,17 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("mtime") ) + user_state = { + "user_id": user.to_string(), + } + user_state.update(**state) + yield self.federation.send_edu( destination=destination, edu_type="m.presence", content={ "push": [ - dict(user_id=user.to_string(), **state), + user_state, ], } ) @@ -615,12 +617,7 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=observers, ignore_user=user - ) - - if not observers: + if not observers and not room_ids: break state = dict(push) @@ -636,12 +633,12 @@ class PresenceHandler(BaseHandler): self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) - for observer_user in observers: - self.push_update_to_clients( - observer_user=observer_user, - observed_user=user, - statuscache=statuscache, - ) + self.push_update_to_clients( + observed_user=user, + users_to_push=observers, + room_ids=room_ids, + statuscache=statuscache, + ) if state["state"] == PresenceState.OFFLINE: del self._user_cachemap[user] @@ -675,12 +672,53 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds) - def push_update_to_clients(self, observer_user, observed_user, - statuscache): - statuscache.make_event(user=observed_user, clock=self.clock) + @defer.inlineCallbacks + def push_update_to_local_and_remote(self, observed_user, + users_to_push=[], room_ids=[], + remote_domains=[], + statuscache=None): + + localusers, remoteusers = partitionbool( + users_to_push, + lambda u: u.is_mine + ) + + localusers = set(localusers) + + self.push_update_to_clients( + observed_user=observed_user, + users_to_push=localusers, + room_ids=room_ids, + statuscache=statuscache, + ) + + remote_domains = set(remote_domains) + remote_domains |= set([r.domain for r in remoteusers]) + for room_id in room_ids: + remote_domains.update( + (yield self.store.get_joined_hosts_for_room(room_id)) + ) + + remote_domains.discard(self.hs.hostname) + + deferreds = [] + for domain in remote_domains: + logger.debug(" | push to remote domain %s", domain) + deferreds.append( + self._push_presence_remote( + observed_user, domain, state=statuscache.get_state() + ) + ) + + yield defer.DeferredList(deferreds) + + defer.returnValue((localusers, remote_domains)) + def push_update_to_clients(self, observed_user, users_to_push=[], + room_ids=[], statuscache=None): self.notifier.on_new_user_event( - [observer_user], + users_to_push, + room_ids, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 3d3fcdabdb..b6d5ec4820 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -119,6 +119,7 @@ class Notifier(object): ) @defer.inlineCallbacks + @log_function def on_new_user_event(self, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. diff --git a/synapse/streams/config.py b/synapse/streams/config.py index 2434844d80..01bab568ff 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -81,4 +81,4 @@ class PaginationConfig(object): return ( "<PaginationConfig from_tok=%s, to_tok=%s, " "direction=%s, limit=%s>" - ) % (self.from_tok, self.to_tok, self.direction, self.limit) + ) % (self.from_token, self.to_token, self.direction, self.limit) diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 021649071b..b94a749786 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -18,6 +18,8 @@ from inspect import getcallargs from functools import wraps import logging +import inspect +import traceback def log_function(f): @@ -65,4 +67,55 @@ def log_function(f): return f(*args, **kwargs) + wrapped.__name__ = func_name + return wrapped + + +def trace_function(f): + func_name = f.__name__ + linenum = f.func_code.co_firstlineno + pathname = f.func_code.co_filename + + def wrapped(*args, **kwargs): + name = f.__module__ + logger = logging.getLogger(name) + level = logging.DEBUG + + s = inspect.currentframe().f_back + + to_print = [ + "\t%s:%s %s. Args: args=%s, kwargs=%s" % ( + pathname, linenum, func_name, args, kwargs + ) + ] + while s: + if True or s.f_globals["__name__"].startswith("synapse"): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + to_print.append( + "\t%s:%d %s. Args: %s" % ( + filename, lineno, function, args_string + ) + ) + + s = s.f_back + + msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print) + + record = logging.LogRecord( + name=name, + level=level, + pathname=pathname, + lineno=lineno, + msg=msg, + args=None, + exc_info=None + ) + + logger.handle(record) + + return f(*args, **kwargs) + + wrapped.__name__ = func_name return wrapped diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 8d094fd1f9..fcd7a784cd 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -193,6 +193,8 @@ class PresenceStateTestCase(unittest.TestCase): SynapseError ) + test_get_disallowed_state.skip = "Presence permissions are disabled" + @defer.inlineCallbacks def test_set_my_state(self): mocked_set = self.datastore.set_presence_state @@ -497,6 +499,7 @@ class PresencePushTestCase(unittest.TestCase): db_pool=None, datastore=Mock(spec=[ "set_presence_state", + "get_joined_hosts_for_room", # Bits that Federation needs "prep_send_transaction", @@ -511,8 +514,12 @@ class PresencePushTestCase(unittest.TestCase): ) hs.handlers = JustPresenceHandlers(hs) + def update(*args,**kwargs): + # print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,) + return defer.succeed(None) + self.mock_update_client = Mock() - self.mock_update_client.return_value = defer.succeed(None) + self.mock_update_client.side_effect = update self.datastore = hs.get_datastore() @@ -546,6 +553,14 @@ class PresencePushTestCase(unittest.TestCase): return defer.succeed([]) self.room_member_handler.get_room_members = get_room_members + def get_room_hosts(room_id): + if room_id == "a-room": + hosts = set([u.domain for u in self.room_members]) + return defer.succeed(hosts) + else: + return defer.succeed([]) + self.datastore.get_joined_hosts_for_room = get_room_hosts + @defer.inlineCallbacks def fetch_room_distributions_into(room_id, localusers=None, remotedomains=None, ignore_user=None): @@ -611,18 +626,10 @@ class PresencePushTestCase(unittest.TestCase): {"state": ONLINE}) self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_apple, + call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), + room_ids=["a-room"], observed_user=self.u_apple, statuscache=ANY), # self-reflection - call(observer_user=self.u_banana, - observed_user=self.u_apple, - statuscache=ANY), - call(observer_user=self.u_clementine, - observed_user=self.u_apple, - statuscache=ANY), - call(observer_user=self.u_elderberry, - observed_user=self.u_apple, - statuscache=ANY), ], any_order=True) self.mock_update_client.reset_mock() @@ -651,7 +658,8 @@ class PresencePushTestCase(unittest.TestCase): ], presence) self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_banana, + call(users_to_push=set([self.u_banana]), + room_ids=[], observed_user=self.u_banana, statuscache=ANY), # self-reflection ]) # and no others... @@ -659,21 +667,21 @@ class PresencePushTestCase(unittest.TestCase): @defer.inlineCallbacks def test_push_remote(self): put_json = self.mock_http_client.put_json - put_json.expect_call_and_return( - call("remote", - path=ANY, # Can't guarantee which txn ID will be which - data=_expect_edu("remote", "m.presence", - content={ - "push": [ - {"user_id": "@apple:test", - "state": "online", - "mtime_age": 0}, - ], - } - ) - ), - defer.succeed((200, "OK")) - ) +# put_json.expect_call_and_return( +# call("remote", +# path=ANY, # Can't guarantee which txn ID will be which +# data=_expect_edu("remote", "m.presence", +# content={ +# "push": [ +# {"user_id": "@apple:test", +# "state": "online", +# "mtime_age": 0}, +# ], +# } +# ) +# ), +# defer.succeed((200, "OK")) +# ) put_json.expect_call_and_return( call("farm", path=ANY, # Can't guarantee which txn ID will be which @@ -681,7 +689,7 @@ class PresencePushTestCase(unittest.TestCase): content={ "push": [ {"user_id": "@apple:test", - "state": "online", + "state": u"online", "mtime_age": 0}, ], } @@ -730,10 +738,8 @@ class PresencePushTestCase(unittest.TestCase): ) self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_apple, - observed_user=self.u_potato, - statuscache=ANY), - call(observer_user=self.u_banana, + call(users_to_push=set([self.u_apple]), + room_ids=["a-room"], observed_user=self.u_potato, statuscache=ANY), ], any_order=True) @@ -753,19 +759,17 @@ class PresencePushTestCase(unittest.TestCase): ) self.mock_update_client.assert_has_calls([ - # Apple and Elderberry see each other - call(observer_user=self.u_apple, + call(room_ids=["a-room"], observed_user=self.u_elderberry, + users_to_push=set(), statuscache=ANY), - call(observer_user=self.u_elderberry, + call(users_to_push=set([self.u_elderberry]), observed_user=self.u_apple, + room_ids=[], statuscache=ANY), - # Banana and Elderberry see each other - call(observer_user=self.u_banana, - observed_user=self.u_elderberry, - statuscache=ANY), - call(observer_user=self.u_elderberry, + call(users_to_push=set([self.u_elderberry]), observed_user=self.u_banana, + room_ids=[], statuscache=ANY), ], any_order=True) @@ -887,7 +891,12 @@ class PresencePollingTestCase(unittest.TestCase): self.datastore.get_received_txn_response = get_received_txn_response self.mock_update_client = Mock() - self.mock_update_client.return_value = defer.succeed(None) + + def update(*args,**kwargs): + # print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,) + return defer.succeed(None) + + self.mock_update_client.side_effect = update self.handler = hs.get_handlers().presence_handler self.handler.push_update_to_clients = self.mock_update_client @@ -951,10 +960,10 @@ class PresencePollingTestCase(unittest.TestCase): # apple should see both banana and clementine currently offline self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_apple, + call(users_to_push=[self.u_apple], observed_user=self.u_banana, statuscache=ANY), - call(observer_user=self.u_apple, + call(users_to_push=[self.u_apple], observed_user=self.u_clementine, statuscache=ANY), ], any_order=True) @@ -974,10 +983,11 @@ class PresencePollingTestCase(unittest.TestCase): # apple and banana should now both see each other online self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_apple, + call(users_to_push=set([self.u_apple]), observed_user=self.u_banana, + room_ids=[], statuscache=ANY), - call(observer_user=self.u_banana, + call(users_to_push=[self.u_banana], observed_user=self.u_apple, statuscache=ANY), ], any_order=True) @@ -994,8 +1004,9 @@ class PresencePollingTestCase(unittest.TestCase): # banana should now be told apple is offline self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_banana, + call(users_to_push=set([self.u_banana, self.u_apple]), observed_user=self.u_apple, + room_ids=[], statuscache=ANY), ], any_order=True) @@ -1008,7 +1019,7 @@ class PresencePollingTestCase(unittest.TestCase): put_json = self.mock_http_client.put_json put_json.expect_call_and_return( call("remote", - path="/matrix/federation/v1/send/1000000/", + path=ANY, data=_expect_edu("remote", "m.presence", content={ "poll": [ "@potato:remote" ], @@ -1018,6 +1029,18 @@ class PresencePollingTestCase(unittest.TestCase): defer.succeed((200, "OK")) ) + put_json.expect_call_and_return( + call("remote", + path=ANY, + data=_expect_edu("remote", "m.presence", + content={ + "push": [ {"user_id": "@clementine:test" }], + }, + ), + ), + defer.succeed((200, "OK")) + ) + # clementine goes online yield self.handler.set_state( target_user=self.u_clementine, auth_user=self.u_clementine, @@ -1032,15 +1055,28 @@ class PresencePollingTestCase(unittest.TestCase): self.assertTrue(self.u_clementine in self.handler._remote_recvmap[self.u_potato]) + + put_json.expect_call_and_return( + call("remote", + path=ANY, + data=_expect_edu("remote", "m.presence", + content={ + "push": [ {"user_id": "@fig:test" }], + }, + ), + ), + defer.succeed((200, "OK")) + ) + # fig goes online; shouldn't send a second poll yield self.handler.set_state( target_user=self.u_fig, auth_user=self.u_fig, state={"state": ONLINE} ) - reactor.iterate(delay=0) + # reactor.iterate(delay=0) - put_json.assert_had_no_calls() + yield put_json.await_calls() # fig goes offline yield self.handler.set_state( @@ -1054,7 +1090,7 @@ class PresencePollingTestCase(unittest.TestCase): put_json.expect_call_and_return( call("remote", - path="/matrix/federation/v1/send/1000001/", + path=ANY, data=_expect_edu("remote", "m.presence", content={ "unpoll": [ "@potato:remote" ], @@ -1069,7 +1105,7 @@ class PresencePollingTestCase(unittest.TestCase): target_user=self.u_clementine, auth_user=self.u_clementine, state={"state": OFFLINE}) - put_json.await_calls() + yield put_json.await_calls() self.assertFalse(self.u_potato in self.handler._remote_recvmap, msg="expected potato not to be in _remote_recvmap" diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index da06a06647..e81d7ce101 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -81,7 +81,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.replication = hs.get_replication_layer() self.replication.send_edu = Mock() - self.replication.send_edu.return_value = defer.succeed((200, "OK")) + + def send_edu(*args, **kwargs): + # print "send_edu: %s, %s" % (args, kwargs) + return defer.succeed((200, "OK")) + self.replication.send_edu.side_effect = send_edu def get_profile_displayname(user_localpart): return defer.succeed("Frank") @@ -95,11 +99,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): return defer.succeed("http://foo") self.datastore.get_profile_avatar_url = get_profile_avatar_url + self.presence_list = [ + {"observed_user_id": "@banana:test"}, + {"observed_user_id": "@clementine:test"}, + ] def get_presence_list(user_localpart, accepted=None): - return defer.succeed([ - {"observed_user_id": "@banana:test"}, - {"observed_user_id": "@clementine:test"}, - ]) + return defer.succeed(self.presence_list) self.datastore.get_presence_list = get_presence_list def do_users_share_a_room(userlist): @@ -109,7 +114,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.handlers = hs.get_handlers() self.mock_update_client = Mock() - self.mock_update_client.return_value = defer.succeed(None) + def update(*args, **kwargs): + # print "mock_update_client: %s, %s" %(args, kwargs) + return defer.succeed(None) + self.mock_update_client.side_effect = update self.handlers.presence_handler.push_update_to_clients = ( self.mock_update_client) @@ -130,6 +138,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_set_my_state(self): + self.presence_list = [ + {"observed_user_id": "@banana:test"}, + {"observed_user_id": "@clementine:test"}, + ] + mocked_set = self.datastore.set_presence_state mocked_set.return_value = defer.succeed({"state": OFFLINE}) @@ -142,6 +155,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_push_local(self): + self.presence_list = [ + {"observed_user_id": "@banana:test"}, + {"observed_user_id": "@clementine:test"}, + ] + self.datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE}) @@ -173,12 +191,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): presence) self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_apple, + call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), + room_ids=[], observed_user=self.u_apple, statuscache=ANY), # self-reflection - call(observer_user=self.u_banana, - observed_user=self.u_apple, - statuscache=ANY), ], any_order=True) statuscache = self.mock_update_client.call_args[1]["statuscache"] @@ -198,12 +214,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.u_apple, "I am an Apple") self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_apple, + call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), + room_ids=[], observed_user=self.u_apple, statuscache=ANY), # self-reflection - call(observer_user=self.u_banana, - observed_user=self.u_apple, - statuscache=ANY), ], any_order=True) statuscache = self.mock_update_client.call_args[1]["statuscache"] @@ -217,6 +231,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_push_remote(self): + self.presence_list = [ + {"observed_user_id": "@potato:remote"}, + ] + self.datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE}) @@ -247,6 +265,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_recv_remote(self): + self.presence_list = [ + {"observed_user_id": "@banana:test"}, + {"observed_user_id": "@clementine:test"}, + ] + # TODO(paul): Gut-wrenching potato_set = self.handlers.presence_handler._remote_recvmap.setdefault( self.u_potato, set()) @@ -264,7 +287,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): ) self.mock_update_client.assert_called_with( - observer_user=self.u_apple, + users_to_push=set([self.u_apple]), + room_ids=[], observed_user=self.u_potato, statuscache=ANY) diff --git a/tests/utils.py b/tests/utils.py index 98d4f9ed58..37b759febc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,7 +21,7 @@ from synapse.api.events.room import ( RoomMemberEvent, MessageEvent ) -from twisted.internet import defer +from twisted.internet import defer, reactor from collections import namedtuple from mock import patch, Mock @@ -263,18 +263,43 @@ class DeferredMockCallable(object): d.callback(None) return result - raise AssertionError("Was not expecting call(%s)" % + failure = AssertionError("Was not expecting call(%s)" % _format_call(args, kwargs) ) + for _, _, d in self.expectations: + try: + d.errback(failure) + except: + pass + + raise failure + def expect_call_and_return(self, call, result): self.expectations.append((call, result, defer.Deferred())) @defer.inlineCallbacks - def await_calls(self): - while self.expectations: - (_, _, d) = self.expectations.pop(0) - yield d + def await_calls(self, timeout=1000): + deferred = defer.DeferredList( + [d for _, _, d in self.expectations], + fireOnOneErrback=True + ) + + timer = reactor.callLater( + timeout/1000, + deferred.errback, + AssertionError( + "%d pending calls left: %s"% ( + len([e for e in self.expectations if not e[2].called]), + [e for e in self.expectations if not e[2].called] + ) + ) + ) + + yield deferred + + timer.cancel() + self.calls = [] def assert_had_no_calls(self): diff --git a/webclient/room/room-controller.js b/webclient/room/room-controller.js index 8bb48b3692..6232ce8ed3 100644 --- a/webclient/room/room-controller.js +++ b/webclient/room/room-controller.js @@ -15,8 +15,8 @@ limitations under the License. */ angular.module('RoomController', ['ngSanitize', 'mFileInput']) -.controller('RoomController', ['$scope', '$timeout', '$routeParams', '$location', '$rootScope', 'matrixService', 'eventHandlerService', 'mFileUpload', 'matrixPhoneService', 'MatrixCall', - function($scope, $timeout, $routeParams, $location, $rootScope, matrixService, eventHandlerService, mFileUpload, matrixPhoneService, MatrixCall) { +.controller('RoomController', ['$scope', '$timeout', '$routeParams', '$location', '$rootScope', 'matrixService', 'eventHandlerService', 'mFileUpload', 'mPresence', 'matrixPhoneService', 'MatrixCall', + function($scope, $timeout, $routeParams, $location, $rootScope, matrixService, eventHandlerService, mFileUpload, mPresence, matrixPhoneService, MatrixCall) { 'use strict'; var MESSAGES_PER_PAGINATION = 30; var THUMBNAIL_SIZE = 320; @@ -57,15 +57,14 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput']) scrollToBottom(); if (window.Notification) { - // FIXME: we should also notify based on a timer or other heuristics - // rather than the window being minimised - if (document.hidden) { + // Show notification when the user is idle + if (matrixService.presence.offline === mPresence.getState()) { var notification = new window.Notification( ($scope.members[event.user_id].displayname || event.user_id) + " (" + ($scope.room_alias || $scope.room_id) + ")", // FIXME: don't leak room_ids here { "body": event.content.body, - "icon": $scope.members[event.user_id].avatar_url, + "icon": $scope.members[event.user_id].avatar_url }); $timeout(function() { notification.close(); @@ -230,7 +229,7 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput']) var member = $scope.members[target_user_id]; member.content.membership = chunk.content.membership; } - } + }; var updatePresence = function(chunk) { if (!(chunk.content.user_id in $scope.members)) { @@ -257,10 +256,10 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput']) if ("avatar_url" in chunk.content) { member.avatar_url = chunk.content.avatar_url; } - } + }; $scope.send = function() { - if ($scope.textInput == "") { + if ($scope.textInput === "") { return; } @@ -269,7 +268,7 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput']) // Send the text message var promise; // FIXME: handle other commands too - if ($scope.textInput.indexOf("/me") == 0) { + if ($scope.textInput.indexOf("/me") === 0) { promise = matrixService.sendEmoteMessage($scope.room_id, $scope.textInput.substr(4)); } else { |