From 59dfbaba3b1ed236a832f8bfb2c6fc92d071f8b6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 13 Aug 2014 18:14:37 +0100 Subject: when we're talking about backfilling data in federation, call it backfilling - not pagination. --- synapse/storage/pdu.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index 202d7f6cb6..13adc581e1 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -168,7 +168,7 @@ class PduStore(SQLBaseStore): return self._get_pdu_tuples(txn, txn.fetchall()) - def get_pagination(self, context, pdu_list, limit): + def get_backfill(self, context, pdu_list, limit): """Get a list of Pdus for a given topic that occured before (and including) the pdus in pdu_list. Return a list of max size `limit`. @@ -182,12 +182,12 @@ class PduStore(SQLBaseStore): list: A list of PduTuples """ return self._db_pool.runInteraction( - self._get_paginate, context, pdu_list, limit + self._get_backfill, context, pdu_list, limit ) - def _get_paginate(self, txn, context, pdu_list, limit): + def _get_backfill(self, txn, context, pdu_list, limit): logger.debug( - "paginate: %s, %s, %s", + "backfill: %s, %s, %s", context, repr(pdu_list), limit ) @@ -213,7 +213,7 @@ class PduStore(SQLBaseStore): new_front = [] for pdu_id, origin in front: logger.debug( - "_paginate_interaction: i=%s, o=%s", + "_backfill_interaction: i=%s, o=%s", pdu_id, origin ) @@ -224,7 +224,7 @@ class PduStore(SQLBaseStore): for row in txn.fetchall(): logger.debug( - "_paginate_interaction: got i=%s, o=%s", + "_backfill_interaction: got i=%s, o=%s", *row ) new_front.append(row) @@ -262,7 +262,7 @@ class PduStore(SQLBaseStore): def update_min_depth_for_context(self, context, depth): """Update the minimum `depth` of the given context, which is the line - where we stop paginating backwards on. + on which we stop backfilling backwards. Args: context (str) @@ -320,9 +320,9 @@ class PduStore(SQLBaseStore): return [(row[0], row[1], row[2]) for row in results] def get_oldest_pdus_in_context(self, context): - """Get a list of Pdus that we paginated beyond yet (and haven't seen). - This list is used when we want to paginate backwards and is the list we - send to the remote server. + """Get a list of Pdus that we haven't backfilled beyond yet (and haven't + seen). This list is used when we want to backfill backwards and is the + list we send to the remote server. Args: txn -- cgit 1.4.1 From baf04be5cfa6337dcc6041cdb67023aa7f950ee1 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 13 Aug 2014 18:15:23 +0100 Subject: Set datastore's .hs field in SQLBaseStore rather than in the toplevel DataStore mixed-in result class --- synapse/storage/__init__.py | 1 - synapse/storage/_base.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 3c27428c08..5d5b5f7c44 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,7 +44,6 @@ class DataStore(RoomDataStore, RoomMemberStore, MessageStore, RoomStore, def __init__(self, hs): super(DataStore, self).__init__(hs) self.event_factory = hs.get_event_factory() - self.hs = hs def persist_event(self, event): if event.type == MessageEvent.TYPE: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 65f691ead4..1b98bdfcef 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -27,6 +27,7 @@ logger = logging.getLogger(__name__) class SQLBaseStore(object): def __init__(self, hs): + self.hs = hs self._db_pool = hs.get_db_pool() def cursor_to_dict(self, cursor): -- cgit 1.4.1 From d05aa651f80b604428c003a13a03c4f6f61c317d Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 13 Aug 2014 19:18:55 +0100 Subject: An initial hack at storing presence state-change mtimes in database and presenting age durations to clients/federation events --- synapse/handlers/presence.py | 41 ++++++++++++++++++++++++++++++------- synapse/storage/_base.py | 1 + synapse/storage/presence.py | 5 +++-- synapse/storage/schema/presence.sql | 1 + tests/handlers/test_presence.py | 36 ++++++++++++++++++++++++-------- tests/handlers/test_presencelike.py | 34 +++++++++++++++++++++--------- tests/rest/test_presence.py | 12 +++++++++-- 7 files changed, 101 insertions(+), 29 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1c24efd454..8bdb0fe5c7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -56,6 +56,8 @@ class PresenceHandler(BaseHandler): self.homeserver = hs + self.clock = hs.get_clock() + distributor = hs.get_distributor() distributor.observe("registered_user", self.registered_user) @@ -168,14 +170,15 @@ class PresenceHandler(BaseHandler): state = yield self.store.get_presence_state( target_user.localpart ) - defer.returnValue(state) else: raise SynapseError(404, "Presence information not visible") else: # TODO(paul): Have remote server send us permissions set - defer.returnValue( - self._get_or_offline_usercache(target_user).get_state() - ) + state = self._get_or_offline_usercache(target_user).get_state() + + if "mtime" in state: + state["mtime_age"] = self.clock.time_msec() - state.pop("mtime") + defer.returnValue(state) @defer.inlineCallbacks def set_state(self, target_user, auth_user, state): @@ -209,6 +212,8 @@ class PresenceHandler(BaseHandler): ), ]) + state["mtime"] = self.clock.time_msec() + now_online = state["state"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap @@ -361,6 +366,8 @@ class PresenceHandler(BaseHandler): observed_user = self.hs.parse_userid(p.pop("observed_user_id")) p["observed_user"] = observed_user p.update(self._get_or_offline_usercache(observed_user).get_state()) + if "mtime" in p: + p["mtime_age"] = self.clock.time_msec() - p.pop("mtime") defer.returnValue(presence) @@ -546,10 +553,15 @@ class PresenceHandler(BaseHandler): def _push_presence_remote(self, user, destination, state=None): if state is None: state = yield self.store.get_presence_state(user.localpart) + yield self.distributor.fire( "collect_presencelike_data", user, state ) + if "mtime" in state: + state = dict(state) + state["mtime_age"] = self.clock.time_msec() - state.pop("mtime") + yield self.federation.send_edu( destination=destination, edu_type="m.presence", @@ -585,6 +597,9 @@ class PresenceHandler(BaseHandler): state = dict(push) del state["user_id"] + if "mtime_age" in state: + state["mtime"] = self.clock.time_msec() - state.pop("mtime_age") + statuscache = self._get_or_make_usercache(user) self._user_cachemap_latest_serial += 1 @@ -631,9 +646,14 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, observer_user, observed_user, statuscache): + state = statuscache.make_event(user=observed_user, clock=self.clock) + self.notifier.on_new_user_event( observer_user.to_string(), - event_data=statuscache.make_event(user=observed_user), + event_data=statuscache.make_event( + user=observed_user, + clock=self.clock + ), stream_type=PresenceStreamData, store_id=statuscache.serial ) @@ -652,8 +672,10 @@ class PresenceStreamData(StreamData): if from_key < cachemap[k].serial <= to_key] if updates: + clock = self.presence.clock + latest_serial = max([x[1].serial for x in updates]) - data = [x[1].make_event(user=x[0]) for x in updates] + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] return ((data, latest_serial)) else: return (([], self.presence._user_cachemap_latest_serial)) @@ -674,6 +696,8 @@ class UserPresenceCache(object): self.serial = None def update(self, state, serial): + assert("mtime_age" not in state) + self.state.update(state) # Delete keys that are now 'None' for k in self.state.keys(): @@ -691,8 +715,11 @@ class UserPresenceCache(object): # clone it so caller can't break our cache return dict(self.state) - def make_event(self, user): + def make_event(self, user, clock): content = self.get_state() content["user_id"] = user.to_string() + if "mtime" in content: + content["mtime_age"] = clock.time_msec() - content.pop("mtime") + return {"type": "m.presence", "content": content} diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1b98bdfcef..bf1800f4bf 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -29,6 +29,7 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() + self._clock = hs.get_clock() def cursor_to_dict(self, cursor): """Converts a SQL cursor into an list of dicts. diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 6f5b042c25..23b6d1694e 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -35,7 +35,7 @@ class PresenceStore(SQLBaseStore): return self._simple_select_one( table="presence", keyvalues={"user_id": user_localpart}, - retcols=["state", "status_msg"], + retcols=["state", "status_msg", "mtime"], ) def set_presence_state(self, user_localpart, new_state): @@ -43,7 +43,8 @@ class PresenceStore(SQLBaseStore): table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], - "status_msg": new_state["status_msg"]}, + "status_msg": new_state["status_msg"], + "mtime": self._clock.time_msec()}, retcols=["state"], ) diff --git a/synapse/storage/schema/presence.sql b/synapse/storage/schema/presence.sql index b22e3ba863..b1081d3aab 100644 --- a/synapse/storage/schema/presence.sql +++ b/synapse/storage/schema/presence.sql @@ -16,6 +16,7 @@ CREATE TABLE IF NOT EXISTS presence( user_id INTEGER NOT NULL, state INTEGER, status_msg TEXT, + mtime INTEGER, -- miliseconds since last state change FOREIGN KEY(user_id) REFERENCES users(id) ); diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 2299a2a7ba..b365741d99 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -20,6 +20,8 @@ from twisted.internet import defer from mock import Mock, call, ANY import logging +from ..utils import MockClock + from synapse.server import HomeServer from synapse.api.constants import PresenceState from synapse.api.errors import SynapseError @@ -55,6 +57,7 @@ class PresenceStateTestCase(unittest.TestCase): def setUp(self): hs = HomeServer("test", + clock=MockClock(), db_pool=None, datastore=Mock(spec=[ "get_presence_state", @@ -154,7 +157,11 @@ class PresenceStateTestCase(unittest.TestCase): mocked_set.assert_called_with("apple", {"state": UNAVAILABLE, "status_msg": "Away"}) self.mock_start.assert_called_with(self.u_apple, - state={"state": UNAVAILABLE, "status_msg": "Away"}) + state={ + "state": UNAVAILABLE, + "status_msg": "Away", + "mtime": 1000000, # MockClock + }) yield self.handler.set_state( target_user=self.u_apple, auth_user=self.u_apple, @@ -386,7 +393,10 @@ class PresencePushTestCase(unittest.TestCase): self.replication.send_edu = Mock() self.replication.send_edu.return_value = defer.succeed((200, "OK")) + self.clock = MockClock() + hs = HomeServer("test", + clock=self.clock, db_pool=None, datastore=Mock(spec=[ "set_presence_state", @@ -519,13 +529,18 @@ class PresencePushTestCase(unittest.TestCase): yield self.handler.set_state(self.u_banana, self.u_banana, {"state": ONLINE}) + self.clock.advance_time(2) + presence = yield self.handler.get_presence_list( observer_user=self.u_apple, accepted=True) self.assertEquals([ - {"observed_user": self.u_banana, "state": ONLINE}, - {"observed_user": self.u_clementine, "state": OFFLINE}], - presence) + {"observed_user": self.u_banana, + "state": ONLINE, + "mtime_age": 2000}, + {"observed_user": self.u_clementine, + "state": OFFLINE}, + ], presence) self.mock_update_client.assert_has_calls([ call(observer_user=self.u_banana, @@ -555,7 +570,8 @@ class PresencePushTestCase(unittest.TestCase): content={ "push": [ {"user_id": "@apple:test", - "state": "online"}, + "state": "online", + "mtime_age": 0}, ], }), call( @@ -564,7 +580,8 @@ class PresencePushTestCase(unittest.TestCase): content={ "push": [ {"user_id": "@apple:test", - "state": "online"}, + "state": "online", + "mtime_age": 0}, ], }) ], any_order=True) @@ -582,7 +599,8 @@ class PresencePushTestCase(unittest.TestCase): "remote", "m.presence", { "push": [ {"user_id": "@potato:remote", - "state": "online"}, + "state": "online", + "mtime_age": 1000}, ], } ) @@ -596,9 +614,11 @@ class PresencePushTestCase(unittest.TestCase): statuscache=ANY), ], any_order=True) + self.clock.advance_time(2) + state = yield self.handler.get_state(self.u_potato, self.u_apple) - self.assertEquals({"state": ONLINE}, state) + self.assertEquals({"state": ONLINE, "mtime_age": 3000}, state) @defer.inlineCallbacks def test_join_room_local(self): diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index 54b92ba8e2..6eeb1bb522 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -22,6 +22,8 @@ from twisted.internet import defer from mock import Mock, call, ANY import logging +from ..utils import MockClock + from synapse.server import HomeServer from synapse.api.constants import PresenceState from synapse.handlers.presence import PresenceHandler @@ -60,6 +62,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): def setUp(self): hs = HomeServer("test", + clock=MockClock(), db_pool=None, datastore=Mock(spec=[ "set_presence_state", @@ -156,10 +159,14 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): observer_user=self.u_apple, accepted=True) self.assertEquals([ - {"observed_user": self.u_banana, "state": ONLINE, - "displayname": "Frank", "avatar_url": "http://foo"}, - {"observed_user": self.u_clementine, "state": OFFLINE}], - presence) + {"observed_user": self.u_banana, + "state": ONLINE, + "mtime_age": 0, + "displayname": "Frank", + "avatar_url": "http://foo"}, + {"observed_user": self.u_clementine, + "state": OFFLINE}], + presence) self.mock_update_client.assert_has_calls([ call(observer_user=self.u_apple, @@ -171,9 +178,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): ], any_order=True) statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({"state": ONLINE, - "displayname": "Frank", - "avatar_url": "http://foo"}, statuscache.state) + self.assertEquals({ + "state": ONLINE, + "mtime": 1000000, # MockClock + "displayname": "Frank", + "avatar_url": "http://foo", + }, statuscache.state) self.mock_update_client.reset_mock() @@ -193,9 +203,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): ], any_order=True) statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({"state": ONLINE, - "displayname": "I am an Apple", - "avatar_url": "http://foo"}, statuscache.state) + self.assertEquals({ + "state": ONLINE, + "mtime": 1000000, # MockClock + "displayname": "I am an Apple", + "avatar_url": "http://foo", + }, statuscache.state) @defer.inlineCallbacks def test_push_remote(self): @@ -220,6 +233,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): "push": [ {"user_id": "@apple:test", "state": "online", + "mtime_age": 0, "displayname": "Frank", "avatar_url": "http://foo"}, ], diff --git a/tests/rest/test_presence.py b/tests/rest/test_presence.py index 7c54e067c9..f013abbee4 100644 --- a/tests/rest/test_presence.py +++ b/tests/rest/test_presence.py @@ -234,7 +234,11 @@ class PresenceEventStreamTestCase(unittest.TestCase): # I'll already get my own presence state change self.assertEquals({"start": "0", "end": "1", "chunk": [ {"type": "m.presence", - "content": {"user_id": "@apple:test", "state": ONLINE}}, + "content": { + "user_id": "@apple:test", + "state": ONLINE, + "mtime_age": 0, + }}, ]}, response) self.mock_datastore.set_presence_state.return_value = defer.succeed( @@ -251,5 +255,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.assertEquals(200, code) self.assertEquals({"start": "1", "end": "2", "chunk": [ {"type": "m.presence", - "content": {"user_id": "@banana:test", "state": ONLINE}}, + "content": { + "user_id": "@banana:test", + "state": ONLINE, + "mtime_age": 0, + }}, ]}, response) -- cgit 1.4.1