diff options
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r-- | synapse/handlers/room.py | 69 |
1 files changed, 41 insertions, 28 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7b4b051888..f01349b339 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData +from synapse.streams.config import PaginationConfig from synapse.util import stringutils from ._base import BaseRoomHandler @@ -107,13 +106,24 @@ class MessageHandler(BaseRoomHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources["room"] + + if not pagin_config.from_token: + pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() + + user = self.hs.parse_userid(user_id) + + events, next_token = yield data_source.get_pagination_rows( + user, pagin_config, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -235,20 +245,18 @@ class MessageHandler(BaseRoomHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) + user = self.hs.parse_userid(user_id) + rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + now_token = yield self.hs.get_event_sources().get_current_token() - # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources["presence"] + pagination_config = PaginationConfig(from_token=now_token) + presence, _ = yield presence_stream.get_pagination_rows( + user, pagination_config, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -270,7 +278,7 @@ class MessageHandler(BaseRoomHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key, ) d["messages"] = { @@ -279,14 +287,18 @@ class MessageHandler(BaseRoomHandler): "end": token[1], } - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.store.get_current_state( + event.room_id + ) d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = { + "rooms": rooms_ret, + "presence": presence, + "end": now_token.to_string() + } defer.returnValue(ret) @@ -381,7 +393,6 @@ class RoomCreationHandler(BaseRoomHandler): federation_handler = self.hs.get_handlers().federation_handler yield federation_handler.handle_new_event(config_event, snapshot) - # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} join_event = self.event_factory.create_event( @@ -477,7 +488,7 @@ class RoomMemberHandler(BaseRoomHandler): for entry in member_list ] chunk_data = { - "start": "START", # FIXME (erikj): START is no longer a valid value + "start": "START", # FIXME (erikj): START is no longer valid "end": "END", "chunk": event_list } @@ -701,17 +712,19 @@ class RoomMemberHandler(BaseRoomHandler): # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) if membership == Membership.INVITE: - host = UserID.from_string(target_user_id, self.hs).domain + host = target_user.domain destinations.append(host) # If we are joining a remote HS, include that. if membership == Membership.JOIN: - host = UserID.from_string(target_user_id, self.hs).domain + host = target_user.domain destinations.append(host) return self._on_new_room_event( - event, snapshot, extra_destinations=destinations + event, snapshot, extra_destinations=destinations, + extra_users=[target_user] ) class RoomListHandler(BaseRoomHandler): |