summary refs log tree commit diff
path: root/synapse/handlers/room.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r--synapse/handlers/room.py69
1 files changed, 41 insertions, 28 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7b4b051888..f01349b339 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError
 from synapse.api.events.room import (
     RoomTopicEvent, RoomMemberEvent, RoomConfigEvent
 )
-from synapse.api.streams.event import EventStream, EventsStreamData
-from synapse.handlers.presence import PresenceStreamData
+from synapse.streams.config import PaginationConfig
 from synapse.util import stringutils
 from ._base import BaseRoomHandler
 
@@ -107,13 +106,24 @@ class MessageHandler(BaseRoomHandler):
         """
         yield self.auth.check_joined_room(room_id, user_id)
 
-        data_source = [
-            EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
-        ]
-        event_stream = EventStream(user_id, data_source)
-        pagin_config = yield event_stream.fix_tokens(pagin_config)
-        data_chunk = yield event_stream.get_chunk(config=pagin_config)
-        defer.returnValue(data_chunk)
+        data_source = self.hs.get_event_sources().sources["room"]
+
+        if not pagin_config.from_token:
+            pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
+
+        user = self.hs.parse_userid(user_id)
+
+        events, next_token = yield data_source.get_pagination_rows(
+            user, pagin_config, room_id
+        )
+
+        chunk = {
+            "chunk": [e.get_dict() for e in events],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        defer.returnValue(chunk)
 
     @defer.inlineCallbacks
     def store_room_data(self, event=None, stamp_event=True):
@@ -235,20 +245,18 @@ class MessageHandler(BaseRoomHandler):
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
 
+        user = self.hs.parse_userid(user_id)
+
         rooms_ret = []
 
-        now_rooms_token = yield self.store.get_room_events_max_id()
+        now_token = yield self.hs.get_event_sources().get_current_token()
 
-        # FIXME (erikj): Fix this.
-        presence_stream = PresenceStreamData(self.hs)
-        now_presence_token = yield presence_stream.max_token()
-        presence = yield presence_stream.get_rows(
-            user_id, 0, now_presence_token, None, None
+        presence_stream = self.hs.get_event_sources().sources["presence"]
+        pagination_config = PaginationConfig(from_token=now_token)
+        presence, _ = yield presence_stream.get_pagination_rows(
+            user, pagination_config, None
         )
 
-        # FIXME (erikj): We need to not generate this token,
-        now_token = "%s_%s" % (now_rooms_token, now_presence_token)
-
         limit = pagin_config.limit
         if not limit:
             limit = 10
@@ -270,7 +278,7 @@ class MessageHandler(BaseRoomHandler):
                 messages, token = yield self.store.get_recent_events_for_room(
                     event.room_id,
                     limit=limit,
-                    end_token=now_rooms_token,
+                    end_token=now_token.events_key,
                 )
 
                 d["messages"] = {
@@ -279,14 +287,18 @@ class MessageHandler(BaseRoomHandler):
                     "end": token[1],
                 }
 
-                current_state = yield self.store.get_current_state(event.room_id)
+                current_state = yield self.store.get_current_state(
+                    event.room_id
+                )
                 d["state"] = [c.get_dict() for c in current_state]
             except:
                 logger.exception("Failed to get snapshot")
 
-        ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
-
-        # logger.debug("snapshot_all_rooms returning: %s", ret)
+        ret = {
+            "rooms": rooms_ret,
+            "presence": presence,
+            "end": now_token.to_string()
+        }
 
         defer.returnValue(ret)
 
@@ -381,7 +393,6 @@ class RoomCreationHandler(BaseRoomHandler):
 
         federation_handler = self.hs.get_handlers().federation_handler
         yield federation_handler.handle_new_event(config_event, snapshot)
-        # self.notifier.on_new_room_event(event, store_id)
 
         content = {"membership": Membership.JOIN}
         join_event = self.event_factory.create_event(
@@ -477,7 +488,7 @@ class RoomMemberHandler(BaseRoomHandler):
             for entry in member_list
         ]
         chunk_data = {
-            "start": "START",  # FIXME (erikj): START is no longer a valid value
+            "start": "START",  # FIXME (erikj): START is no longer valid
             "end": "END",
             "chunk": event_list
         }
@@ -701,17 +712,19 @@ class RoomMemberHandler(BaseRoomHandler):
         # If we're inviting someone, then we should also send it to that
         # HS.
         target_user_id = event.state_key
+        target_user = self.hs.parse_userid(target_user_id)
         if membership == Membership.INVITE:
-            host = UserID.from_string(target_user_id, self.hs).domain
+            host = target_user.domain
             destinations.append(host)
 
         # If we are joining a remote HS, include that.
         if membership == Membership.JOIN:
-            host = UserID.from_string(target_user_id, self.hs).domain
+            host = target_user.domain
             destinations.append(host)
 
         return self._on_new_room_event(
-            event, snapshot, extra_destinations=destinations
+            event, snapshot, extra_destinations=destinations,
+            extra_users=[target_user]
         )
 
 class RoomListHandler(BaseRoomHandler):