summary refs log tree commit diff
diff options
context:
space:
mode:
authorPaul "LeoNerd" Evans <paul@matrix.org>2014-09-02 16:29:04 +0100
committerPaul "LeoNerd" Evans <paul@matrix.org>2014-09-03 13:46:52 +0100
commitda31b96b55c7ae54c0ec7708c40c7637287521b1 (patch)
tree13ab55bbfaf60897fb69373af11ab36b48993203
parentDon't eat federation transmit errors during unit tests; fix remote presence E... (diff)
downloadsynapse-da31b96b55c7ae54c0ec7708c40c7637287521b1.tar.xz
Implement presence state visibilty limiting when polling eventsource for stream
-rw-r--r--synapse/handlers/presence.py66
-rw-r--r--tests/handlers/test_presence.py68
-rw-r--r--tests/rest/test_presence.py28
3 files changed, 147 insertions, 15 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index beb5aa3a6a..ae4735898e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -156,7 +156,7 @@ class PresenceHandler(BaseHandler):
             defer.returnValue(True)
 
         if (yield self.store.user_rooms_intersect(
-            [observer_user, observed_user]
+            [u.to_string() for u in observer_user, observed_user]
         )):
             defer.returnValue(True)
 
@@ -772,15 +772,52 @@ class PresenceEventSource(object):
         self.hs = hs
         self.clock = hs.get_clock()
 
+    @defer.inlineCallbacks
+    def is_visible(self, observer_user, observed_user):
+        if observer_user == observed_user:
+            defer.returnValue(True)
+
+        presence = self.hs.get_handlers().presence_handler
+
+        if (yield presence.store.user_rooms_intersect(
+            [u.to_string() for u in observer_user, observed_user]
+        )):
+            defer.returnValue(True)
+
+        if observed_user.is_mine:
+            pushmap = presence._local_pushmap
+
+            defer.returnValue(
+                observed_user.localpart in pushmap and
+                observer_user in pushmap[observed_user.localpart]
+            )
+        else:
+            recvmap = presence._remote_recvmap
+
+            defer.returnValue(
+                observed_user in recvmap and
+                observer_user in recvmap[observed_user]
+            )
+
+    @defer.inlineCallbacks
     def get_new_events_for_user(self, user, from_key, limit):
         from_key = int(from_key)
 
+        observer_user = user
+
         presence = self.hs.get_handlers().presence_handler
         cachemap = presence._user_cachemap
 
-        # TODO(paul): limit, and filter by visibility
-        updates = [(k, cachemap[k]) for k in cachemap
-                   if from_key < cachemap[k].serial]
+        updates = []
+        # TODO(paul): use a DeferredList ? How to limit concurrency.
+        for observed_user in cachemap.keys():
+            if not (from_key < cachemap[observed_user].serial):
+                continue
+
+            if (yield self.is_visible(observer_user, observed_user)):
+                updates.append((observed_user, cachemap[observed_user]))
+
+        # TODO(paul): limit
 
         if updates:
             clock = self.clock
@@ -788,14 +825,15 @@ class PresenceEventSource(object):
             latest_serial = max([x[1].serial for x in updates])
             data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
 
-            return ((data, latest_serial))
+            defer.returnValue((data, latest_serial))
         else:
-            return (([], presence._user_cachemap_latest_serial))
+            defer.returnValue(([], presence._user_cachemap_latest_serial))
 
     def get_current_key(self):
         presence = self.hs.get_handlers().presence_handler
         return presence._user_cachemap_latest_serial
 
+    @defer.inlineCallbacks
     def get_pagination_rows(self, user, pagination_config, key):
         # TODO (erikj): Does this make sense? Ordering?
 
@@ -812,7 +850,17 @@ class PresenceEventSource(object):
         presence = self.hs.get_handlers().presence_handler
         cachemap = presence._user_cachemap
 
-        # TODO(paul): limit, and filter by visibility
+        updates = []
+        # TODO(paul): use a DeferredList ? How to limit concurrency.
+        for observed_user in cachemap.keys():
+            if not (to_key < cachemap[observed_user].serial < from_key):
+                continue
+
+            if (yield self.is_visible(observer_user, observed_user)):
+                updates.append((observed_user, cachemap[observed_user]))
+
+        # TODO(paul): limit
+
         updates = [(k, cachemap[k]) for k in cachemap
                    if to_key < cachemap[k].serial < from_key]
 
@@ -830,13 +878,13 @@ class PresenceEventSource(object):
             next_token = next_token.copy_and_replace(
                 "presence_key", earliest_serial
             )
-            return ((data, next_token))
+            defer.returnValue((data, next_token))
         else:
             if not to_token:
                 to_token = from_token.copy_and_replace(
                     "presence_key", 0
                 )
-            return (([], to_token))
+            defer.returnValue(([], to_token))
 
 
 class UserPresenceCache(object):
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 4583ff8bc6..8506961d2d 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -118,7 +118,9 @@ class PresenceStateTestCase(unittest.TestCase):
         room_member_handler.get_room_members = get_room_members
 
         def user_rooms_intersect(userlist):
-            shared = all(map(lambda u: u in self.room_members, userlist))
+            room_member_ids = map(lambda u: u.to_string(), self.room_members)
+
+            shared = all(map(lambda i: i in room_member_ids, userlist))
             return defer.succeed(shared)
         self.datastore.user_rooms_intersect = user_rooms_intersect
 
@@ -562,6 +564,13 @@ class PresencePushTestCase(unittest.TestCase):
                 return defer.succeed([])
         self.datastore.get_joined_hosts_for_room = get_room_hosts
 
+        def user_rooms_intersect(userlist):
+            room_member_ids = map(lambda u: u.to_string(), self.room_members)
+
+            shared = all(map(lambda i: i in room_member_ids, userlist))
+            return defer.succeed(shared)
+        self.datastore.user_rooms_intersect = user_rooms_intersect
+
         @defer.inlineCallbacks
         def fetch_room_distributions_into(room_id, localusers=None,
                 remotedomains=None, ignore_user=None):
@@ -604,6 +613,7 @@ class PresencePushTestCase(unittest.TestCase):
         self.u_apple = hs.parse_userid("@apple:test")
         self.u_banana = hs.parse_userid("@banana:test")
         self.u_clementine = hs.parse_userid("@clementine:test")
+        self.u_durian = hs.parse_userid("@durian:test")
         self.u_elderberry = hs.parse_userid("@elderberry:test")
 
         # Remote user
@@ -632,6 +642,7 @@ class PresencePushTestCase(unittest.TestCase):
             {"presence": ONLINE}
         )
 
+        # Apple sees self-reflection
         (events, _) = yield self.event_source.get_new_events_for_user(
             self.u_apple, 0, None
         )
@@ -647,6 +658,55 @@ class PresencePushTestCase(unittest.TestCase):
                     "last_active_ago": 0,
                 }},
             ],
+            msg="Presence event should be visible to self-reflection"
+        )
+
+        # Banana sees it because of presence subscription
+        (events, _) = yield self.event_source.get_new_events_for_user(
+            self.u_banana, 0, None
+        )
+
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(events,
+            [
+                {"type": "m.presence",
+                 "content": {
+                    "user_id": "@apple:test",
+                    "presence": ONLINE,
+                    "state": ONLINE,
+                    "last_active_ago": 0,
+                }},
+            ],
+            msg="Presence event should be visible to explicit subscribers"
+        )
+
+        # Elderberry sees it because of same room
+        (events, _) = yield self.event_source.get_new_events_for_user(
+            self.u_elderberry, 0, None
+        )
+
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(events,
+            [
+                {"type": "m.presence",
+                 "content": {
+                    "user_id": "@apple:test",
+                    "presence": ONLINE,
+                    "state": ONLINE,
+                    "last_active_ago": 0,
+                }},
+            ],
+            msg="Presence event should be visible to other room members"
+        )
+
+        # Durian is not in the room, should not see this event
+        (events, _) = yield self.event_source.get_new_events_for_user(
+            self.u_durian, 0, None
+        )
+
+        self.assertEquals(self.event_source.get_current_key(), 1)
+        self.assertEquals(events, [],
+            msg="Presence event should not be visible to others"
         )
 
         presence = yield self.handler.get_presence_list(
@@ -664,6 +724,10 @@ class PresencePushTestCase(unittest.TestCase):
             presence
         )
 
+        # TODO(paul): Gut-wrenching
+        banana_set = self.handler._local_pushmap.setdefault("banana", set())
+        banana_set.add(self.u_apple)
+
         yield self.handler.set_state(self.u_banana, self.u_banana,
             {"presence": ONLINE}
         )
@@ -825,6 +889,8 @@ class PresencePushTestCase(unittest.TestCase):
             "a-room"
         )
 
+        self.room_members.append(self.u_clementine)
+
         (events, _) = yield self.event_source.get_new_events_for_user(
             self.u_apple, 0, None
         )
diff --git a/tests/rest/test_presence.py b/tests/rest/test_presence.py
index e2cdd80e07..df8dd74151 100644
--- a/tests/rest/test_presence.py
+++ b/tests/rest/test_presence.py
@@ -269,11 +269,16 @@ class PresenceEventStreamTestCase(unittest.TestCase):
 
         hs.register_servlets()
 
-        hs.handlers.room_member_handler = Mock(spec=[
-            "get_rooms_for_user",
-        ])
-        hs.handlers.room_member_handler.get_rooms_for_user = (
-                lambda u: defer.succeed([]))
+        hs.handlers.room_member_handler = Mock(spec=[])
+
+        self.room_members = []
+
+        def get_rooms_for_user(user):
+            if user in self.room_members:
+                return ["a-room"]
+            else:
+                return []
+        hs.handlers.room_member_handler.get_rooms_for_user = get_rooms_for_user
 
         self.mock_datastore = hs.get_datastore()
 
@@ -285,6 +290,17 @@ class PresenceEventStreamTestCase(unittest.TestCase):
             return defer.succeed(None)
         self.mock_datastore.get_profile_avatar_url = get_profile_avatar_url
 
+        def user_rooms_intersect(user_list):
+            room_member_ids = map(lambda u: u.to_string(), self.room_members)
+
+            shared = all(map(lambda i: i in room_member_ids, user_list))
+            return defer.succeed(shared)
+        self.mock_datastore.user_rooms_intersect = user_rooms_intersect
+
+        def get_joined_hosts_for_room(room_id):
+            return []
+        self.mock_datastore.get_joined_hosts_for_room = get_joined_hosts_for_room
+
         self.presence = hs.get_handlers().presence_handler
 
         self.u_apple = hs.parse_userid("@apple:test")
@@ -292,6 +308,8 @@ class PresenceEventStreamTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_shortpoll(self):
+        self.room_members = [self.u_apple, self.u_banana]
+
         self.mock_datastore.set_presence_state.return_value = defer.succeed(
                 {"state": ONLINE})
         self.mock_datastore.get_presence_list.return_value = defer.succeed(