diff options
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r-- | synapse/handlers/room.py | 79 |
1 files changed, 43 insertions, 36 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a4569ac95..760373344d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError from synapse.api.events.room import ( RoomTopicEvent, RoomMemberEvent, RoomConfigEvent ) -from synapse.api.streams.event import EventStream, EventsStreamData -from synapse.handlers.presence import PresenceStreamData +from synapse.streams.config import PaginationConfig from synapse.util import stringutils from ._base import BaseHandler @@ -95,7 +94,7 @@ class MessageHandler(BaseHandler): event.room_id ) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) yield self.hs.get_federation().handle_new_event(event) @@ -115,13 +114,24 @@ class MessageHandler(BaseHandler): """ yield self.auth.check_joined_room(room_id, user_id) - data_source = [ - EventsStreamData(self.hs, room_id=room_id, feedback=feedback) - ] - event_stream = EventStream(user_id, data_source) - pagin_config = yield event_stream.fix_tokens(pagin_config) - data_chunk = yield event_stream.get_chunk(config=pagin_config) - defer.returnValue(data_chunk) + data_source = self.hs.get_event_sources().sources["room"] + + if not pagin_config.from_token: + pagin_config.from_token = yield self.hs.get_event_sources().get_current_token() + + user = self.hs.parse_userid(user_id) + + events, next_token = yield data_source.get_pagination_rows( + user, pagin_config, room_id + ) + + chunk = { + "chunk": [e.get_dict() for e in events], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + } + + defer.returnValue(chunk) @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): @@ -148,7 +158,7 @@ class MessageHandler(BaseHandler): event.destinations = yield self.store.get_joined_hosts_for_room( event.room_id ) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) yield self.hs.get_federation().handle_new_event(event) @@ -230,7 +240,7 @@ class MessageHandler(BaseHandler): ) yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event) @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, @@ -256,20 +266,18 @@ class MessageHandler(BaseHandler): membership_list=[Membership.INVITE, Membership.JOIN] ) + user = self.hs.parse_userid(user_id) + rooms_ret = [] - now_rooms_token = yield self.store.get_room_events_max_id() + now_token = yield self.hs.get_event_sources().get_current_token() - # FIXME (erikj): Fix this. - presence_stream = PresenceStreamData(self.hs) - now_presence_token = yield presence_stream.max_token() - presence = yield presence_stream.get_rows( - user_id, 0, now_presence_token, None, None + presence_stream = self.hs.get_event_sources().sources["presence"] + pagination_config = PaginationConfig(from_token=now_token) + presence, _ = yield presence_stream.get_pagination_rows( + user, pagination_config, None ) - # FIXME (erikj): We need to not generate this token, - now_token = "%s_%s" % (now_rooms_token, now_presence_token) - limit = pagin_config.limit if not limit: limit = 10 @@ -291,7 +299,7 @@ class MessageHandler(BaseHandler): messages, token = yield self.store.get_recent_events_for_room( event.room_id, limit=limit, - end_token=now_rooms_token, + end_token=now_token.events_key, ) d["messages"] = { @@ -300,14 +308,18 @@ class MessageHandler(BaseHandler): "end": token[1], } - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.store.get_current_state( + event.room_id + ) d["state"] = [c.get_dict() for c in current_state] except: logger.exception("Failed to get snapshot") - ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token} - - # logger.debug("snapshot_all_rooms returning: %s", ret) + ret = { + "rooms": rooms_ret, + "presence": presence, + "end": now_token.to_string() + } defer.returnValue(ret) @@ -391,10 +403,8 @@ class RoomCreationHandler(BaseHandler): ) yield self.state_handler.handle_new_event(config_event) - # store_id = persist... yield self.hs.get_federation().handle_new_event(config_event) - # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} join_event = self.event_factory.create_event( @@ -490,7 +500,7 @@ class RoomMemberHandler(BaseHandler): for entry in member_list ] chunk_data = { - "start": "START", # FIXME (erikj): START is no longer a valid value + "start": "START", # FIXME (erikj): START is no longer valid "end": "END", "chunk": event_list } @@ -712,23 +722,20 @@ class RoomMemberHandler(BaseHandler): # If we're inviting someone, then we should also send it to that # HS. target_user_id = event.state_key + target_user = self.hs.parse_userid(target_user_id) if membership == Membership.INVITE: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = target_user.domain destinations.append(host) # If we are joining a remote HS, include that. if membership == Membership.JOIN: - host = UserID.from_string( - target_user_id, self.hs - ).domain + host = target_user.domain destinations.append(host) event.destinations = list(set(destinations)) yield self.hs.get_federation().handle_new_event(event) - self.notifier.on_new_room_event(event, store_id) + self.notifier.on_new_room_event(event, extra_users=[target_user]) class RoomListHandler(BaseHandler): |