From 63878c03794d33a8767425e114845159e5c1cb9a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 13:42:21 +0100 Subject: Don't bother checking for updates if the stream token hasn't advanced for a user --- tests/rest/client/v1/test_presence.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'tests/rest/client/v1/test_presence.py') diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 8e0c5fa630..c0c52796ad 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -27,6 +27,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.rest.client.v1 import presence from synapse.rest.client.v1 import events from synapse.types import UserID +from synapse.util.async import run_on_reactor OFFLINE = PresenceState.OFFLINE @@ -264,6 +265,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): datastore=Mock(spec=[ "set_presence_state", "get_presence_list", + "get_rooms_for_user", ]), clock=Mock(spec=[ "call_later", @@ -298,6 +300,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.get_app_service_by_user_id = Mock( return_value=defer.succeed(None) ) + self.mock_datastore.get_rooms_for_user = ( + lambda u: get_rooms_for_user(UserID.from_string(u)) + ) def get_profile_displayname(user_id): return defer.succeed("Frank") @@ -350,19 +355,19 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore.set_presence_state.return_value = defer.succeed( {"state": ONLINE} ) - self.mock_datastore.get_presence_list.return_value = defer.succeed( - [] - ) + self.mock_datastore.get_presence_list.return_value = defer.succeed([]) yield self.presence.set_state(self.u_banana, self.u_banana, state={"presence": ONLINE} ) + yield run_on_reactor() + (code, response) = yield self.mock_resource.trigger("GET", - "/events?from=0_1_0&timeout=0", None) + "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "0_1_0", "end": "0_2_0", "chunk": [ + self.assertEquals({"start": "s0_1_0", "end": "s0_2_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", -- cgit 1.5.1 From f1b83d88a3d3ad596631e51852a9802d0a7270a0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 13 May 2015 16:54:02 +0100 Subject: Discard unused NotifierUserStreams --- synapse/notifier.py | 50 ++++++++++++++++++++++++----------- tests/rest/client/v1/test_presence.py | 1 + tests/utils.py | 3 +++ 3 files changed, 38 insertions(+), 16 deletions(-) (limited to 'tests/rest/client/v1/test_presence.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 6fcb7767a0..344dd03172 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -71,14 +71,17 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, current_token, appservice=None): + def __init__(self, user, rooms, current_token, time_now_ms, + appservice=None): self.user = str(user) self.appservice = appservice self.listeners = set() self.rooms = set(rooms) self.current_token = current_token + self.last_notified_ms = time_now_ms - def notify(self, stream_key, stream_id): + def notify(self, stream_key, stream_id, time_now_ms): + self.last_notified_ms = time_now_ms self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) @@ -96,7 +99,7 @@ class _NotifierUserStream(object): lst = notifier.room_to_user_streams.get(room, set()) lst.discard(self) - notifier.user_to_user_streams.get(self.user, set()).discard(self) + notifier.user_to_user_stream.pop(self.user) if self.appservice: notifier.appservice_to_user_streams.get( @@ -111,6 +114,8 @@ class Notifier(object): Primarily used from the /events stream. """ + UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 + def __init__(self, hs): self.hs = hs @@ -128,6 +133,10 @@ class Notifier(object): "user_joined_room", self._user_joined_room ) + self.clock.looping_call( + self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS + ) + # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. @@ -221,9 +230,12 @@ class Notifier(object): logger.debug("on_new_room_event listeners %s", user_streams) + time_now_ms = self.clock.time_msec() for user_stream in user_streams: try: - user_stream.notify("room_key", "s%d" % (room_stream_id,)) + user_stream.notify( + "room_key", "s%d" % (room_stream_id,), time_now_ms + ) except: logger.exception("Failed to notify listener") @@ -246,9 +258,10 @@ class Notifier(object): for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) + time_now_ms = self.clock.time_msec() for user_stream in user_streams: try: - user_stream.notify(stream_key, new_token) + user_stream.notify(stream_key, new_token, time_now_ms) except: logger.exception("Failed to notify listener") @@ -260,6 +273,7 @@ class Notifier(object): """ deferred = defer.Deferred() + time_now_ms = self.clock.time_msec() user = str(user) user_stream = self.user_to_user_stream.get(user) @@ -272,6 +286,7 @@ class Notifier(object): rooms=rooms, appservice=appservice, current_token=current_token, + time_now_ms=time_now_ms, ) self._register_with_keys(user_stream) else: @@ -365,6 +380,20 @@ class Notifier(object): defer.returnValue(result) + @log_function + def remove_expired_streams(self): + time_now_ms = self.clock.time_msec() + expired_streams = [] + expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS + for stream in self.user_to_user_stream.values(): + if stream.listeners: + continue + if stream.last_notified_ms < expire_before_ts: + expired_streams.append(stream) + + for expired_stream in expired_streams: + expired_stream.remove(self) + @log_function def _register_with_keys(self, user_stream): self.user_to_user_stream[user_stream.user] = user_stream @@ -385,14 +414,3 @@ class Notifier(object): room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) - - -def _discard_if_notified(listener_set): - """Remove any 'stale' listeners from the given set. - """ - to_discard = set() - for l in listener_set: - if l.notified(): - to_discard.add(l) - - listener_set -= to_discard diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index c0c52796ad..29c0038f06 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -271,6 +271,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): "call_later", "cancel_call_later", "time_msec", + "looping_call", ]), ) diff --git a/tests/utils.py b/tests/utils.py index a67530bd63..3b5c335911 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -197,6 +197,9 @@ class MockClock(object): return t + def looping_call(self, function, interval): + pass + def cancel_call_later(self, timer): if timer[2]: raise Exception("Cannot cancel an expired timer") -- cgit 1.5.1 From 8eca5bd50abc9deb3cd428f3f6b3b8fbeb8bdee1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 May 2015 13:22:18 +0100 Subject: Fix the presence tests --- tests/handlers/test_presence.py | 13 +++---------- tests/rest/client/v1/test_presence.py | 3 +++ 2 files changed, 6 insertions(+), 10 deletions(-) (limited to 'tests/rest/client/v1/test_presence.py') diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index ee773797e7..12cf5747a2 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -624,6 +624,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): """ PRESENCE_LIST = { 'apple': [ "@banana:test", "@clementine:test" ], + 'banana': [ "@apple:test" ], } @defer.inlineCallbacks @@ -836,12 +837,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): @defer.inlineCallbacks def test_recv_remote(self): - # TODO(paul): Gut-wrenching - potato_set = self.handler._remote_recvmap.setdefault(self.u_potato, - set()) - potato_set.add(self.u_apple) - - self.room_members = [self.u_banana, self.u_potato] + self.room_members = [self.u_apple, self.u_banana, self.u_potato] self.assertEquals(self.event_source.get_current_key(), 0) @@ -886,11 +882,8 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): @defer.inlineCallbacks def test_recv_remote_offline(self): """ Various tests relating to SYN-261 """ - potato_set = self.handler._remote_recvmap.setdefault(self.u_potato, - set()) - potato_set.add(self.u_apple) - self.room_members = [self.u_banana, self.u_potato] + self.room_members = [self.u_apple, self.u_banana, self.u_potato] self.assertEquals(self.event_source.get_current_key(), 0) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 29c0038f06..8f3df92418 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -295,6 +295,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): else: return [] hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user + hs.handlers.room_member_handler.get_room_members = ( + lambda r: self.room_members if r == "a-room" else [] + ) self.mock_datastore = hs.get_datastore() self.mock_datastore.get_app_service_by_token = Mock(return_value=None) -- cgit 1.5.1 From 88f1ea36cedff158e7a595c84bf949286fa38532 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 May 2015 15:23:40 +0100 Subject: Oops, get_rooms_for_user returns a namedtuple, not a room_id --- synapse/notifier.py | 1 + tests/rest/client/v1/test_presence.py | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) (limited to 'tests/rest/client/v1/test_presence.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 1e73d52c4d..4f47f88df8 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -296,6 +296,7 @@ class Notifier(object): appservice = yield self.store.get_app_service_by_user_id(user) current_token = yield self.event_sources.get_current_token() rooms = yield self.store.get_rooms_for_user(user) + rooms = [room.room_id for room in rooms] user_stream = _NotifierUserStream( user=user, rooms=rooms, diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 29c0038f06..21f42b3d3e 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -29,6 +29,8 @@ from synapse.rest.client.v1 import events from synapse.types import UserID from synapse.util.async import run_on_reactor +from collections import namedtuple + OFFLINE = PresenceState.OFFLINE UNAVAILABLE = PresenceState.UNAVAILABLE @@ -302,7 +304,10 @@ class PresenceEventStreamTestCase(unittest.TestCase): return_value=defer.succeed(None) ) self.mock_datastore.get_rooms_for_user = ( - lambda u: get_rooms_for_user(UserID.from_string(u)) + lambda u: [ + namedtuple("Room", "room_id")(r) + for r in get_rooms_for_user(UserID.from_string(u)) + ] ) def get_profile_displayname(user_id): -- cgit 1.5.1 From 17167898c816c95db8a3f4661d955f43ad6a87ce Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 16:22:54 +0100 Subject: Fix the presence tests --- tests/handlers/test_presence.py | 16 ++++++++++------ tests/handlers/test_presencelike.py | 20 +++++++++++--------- tests/rest/client/v1/test_presence.py | 4 ++-- 3 files changed, 23 insertions(+), 17 deletions(-) (limited to 'tests/rest/client/v1/test_presence.py') diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 12cf5747a2..29372d488a 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -233,7 +233,7 @@ class MockedDatastorePresenceTestCase(PresenceTestCase): if not user_localpart in self.PRESENCE_LIST: return defer.succeed([]) return defer.succeed([ - {"observed_user_id": u} for u in + {"observed_user_id": u, "accepted": accepted} for u in self.PRESENCE_LIST[user_localpart]]) datastore.get_presence_list = get_presence_list @@ -734,10 +734,12 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): self.assertEquals( [ - {"observed_user": self.u_banana, - "presence": OFFLINE}, + {"observed_user": self.u_banana, + "presence": OFFLINE, + "accepted": True}, {"observed_user": self.u_clementine, - "presence": OFFLINE}, + "presence": OFFLINE, + "accepted": True}, ], presence ) @@ -758,9 +760,11 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): self.assertEquals([ {"observed_user": self.u_banana, "presence": ONLINE, - "last_active_ago": 2000}, + "last_active_ago": 2000, + "accepted": True}, {"observed_user": self.u_clementine, - "presence": OFFLINE}, + "presence": OFFLINE, + "accepted": True}, ], presence) (events, _) = yield self.event_source.get_new_events_for_user( diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index 1f2e66ac11..19107caeee 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -101,8 +101,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.datastore.get_profile_avatar_url = get_profile_avatar_url self.presence_list = [ - {"observed_user_id": "@banana:test"}, - {"observed_user_id": "@clementine:test"}, + {"observed_user_id": "@banana:test", "accepted": True}, + {"observed_user_id": "@clementine:test", "accepted": True}, ] def get_presence_list(user_localpart, accepted=None): return defer.succeed(self.presence_list) @@ -144,8 +144,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_set_my_state(self): self.presence_list = [ - {"observed_user_id": "@banana:test"}, - {"observed_user_id": "@clementine:test"}, + {"observed_user_id": "@banana:test", "accepted": True}, + {"observed_user_id": "@clementine:test", "accepted": True}, ] mocked_set = self.datastore.set_presence_state @@ -167,8 +167,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.mock_get_joined.side_effect = get_joined self.presence_list = [ - {"observed_user_id": "@banana:test"}, - {"observed_user_id": "@clementine:test"}, + {"observed_user_id": "@banana:test", "accepted": True}, + {"observed_user_id": "@clementine:test", "accepted": True}, ] self.datastore.set_presence_state.return_value = defer.succeed( @@ -203,9 +203,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): "presence": ONLINE, "last_active_ago": 0, "displayname": "Frank", - "avatar_url": "http://foo"}, + "avatar_url": "http://foo", + "accepted": True}, {"observed_user": self.u_clementine, - "presence": OFFLINE} + "presence": OFFLINE, + "accepted": True} ], presence) self.mock_update_client.assert_has_calls([ @@ -233,7 +235,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): @defer.inlineCallbacks def test_push_remote(self): self.presence_list = [ - {"observed_user_id": "@potato:remote"}, + {"observed_user_id": "@potato:remote", "accepted": True}, ] self.datastore.set_presence_state.return_value = defer.succeed( diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 523b30cf8a..4b32c7a203 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -183,7 +183,7 @@ class PresenceListTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_list(self): self.datastore.get_presence_list.return_value = defer.succeed( - [{"observed_user_id": "@banana:test"}], + [{"observed_user_id": "@banana:test", "accepted": True}], ) (code, response) = yield self.mock_resource.trigger("GET", @@ -191,7 +191,7 @@ class PresenceListTestCase(unittest.TestCase): self.assertEquals(200, code) self.assertEquals([ - {"user_id": "@banana:test", "presence": OFFLINE}, + {"user_id": "@banana:test", "presence": OFFLINE, "accepted": True}, ], response) self.datastore.get_presence_list.assert_called_with( -- cgit 1.5.1