diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
commit | 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 (patch) | |
tree | 5f5b515cdaa586f1f7f92337471dfdc2e0d4683d /synapse/handlers/room.py | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor (diff) | |
download | synapse-3a2a5b959cb1f56b26af32e1ad4c1db424279eb7.tar.xz |
WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r-- | synapse/handlers/room.py | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..20b4bbb665 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,6 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData from synapse.util import stringutils from ._base import BaseHandler @@ -115,13 +113,24 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources[0] + + if pagin_config.from_token: + from_token = pagin_config.from_token + else: + from_token = yield self.hs.get_event_sources().get_current_token() + + events, next_token = yield data_source.get_pagination_rows( + from_token, pagin_config.to_token, pagin_config.limit, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start_token": from_token.to_string(), + "end_token": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -258,18 +267,15 @@ class MessageHandler(BaseHandler): rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + # FIXME (erikj): We need to not generate this token, + now_token = yield self.hs.get_event_sources().get_current_token() # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources[1] + presence = yield presence_stream.get_new_events_for_user( + user_id, now_token, None, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -291,7 +297,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key.to_string(), ) d["messages"] = { @@ -305,9 +311,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token.to_string()} defer.returnValue(ret) |