diff options
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r-- | synapse/handlers/room.py | 60 |
1 files changed, 37 insertions, 23 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..faea30b44e 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData +from synapse.streams.config import PaginationConfig from synapse.util import stringutils from ._base import BaseHandler @@ -115,13 +114,24 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources["room"] + + if not pagin_config.from_token: + pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() + + user = self.hs.parse_userid(user_id) + + events, next_token = yield data_source.get_pagination_rows( + user, pagin_config, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -256,20 +266,20 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) + user = self.hs.parse_userid(user_id) + rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + # FIXME (erikj): We need to not generate this token, + now_token = yield self.hs.get_event_sources().get_current_token() # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources["presence"] + pagination_config = PaginationConfig(from_token=now_token) + presence, _ = yield presence_stream.get_pagination_rows( + user, pagination_config, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -291,7 +301,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key, ) d["messages"] = { @@ -300,14 +310,18 @@ class MessageHandler(BaseHandler): "end": token[1], } - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.store.get_current_state( + event.room_id + ) d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = { + "rooms": rooms_ret, + "presence": presence, + "end": now_token.to_string() + } defer.returnValue(ret) @@ -490,7 +504,7 @@ class RoomMemberHandler(BaseHandler): for entry in member_list ] chunk_data = { - "start": "START", # FIXME (erikj): START is no longer a valid value + "start": "START", # FIXME (erikj): START is no longer valid "end": "END", "chunk": event_list } |