diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5a4569ac95..faea30b44e 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError
from synapse.api.events.room import (
RoomTopicEvent, RoomMemberEvent, RoomConfigEvent
)
-from synapse.api.streams.event import EventStream, EventsStreamData
-from synapse.handlers.presence import PresenceStreamData
+from synapse.streams.config import PaginationConfig
from synapse.util import stringutils
from ._base import BaseHandler
@@ -115,13 +114,24 @@ class MessageHandler(BaseHandler):
"""
yield self.auth.check_joined_room(room_id, user_id)
- data_source = [
- EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
- ]
- event_stream = EventStream(user_id, data_source)
- pagin_config = yield event_stream.fix_tokens(pagin_config)
- data_chunk = yield event_stream.get_chunk(config=pagin_config)
- defer.returnValue(data_chunk)
+ data_source = self.hs.get_event_sources().sources["room"]
+
+ if not pagin_config.from_token:
+ pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
+
+ user = self.hs.parse_userid(user_id)
+
+ events, next_token = yield data_source.get_pagination_rows(
+ user, pagin_config, room_id
+ )
+
+ chunk = {
+ "chunk": [e.get_dict() for e in events],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+
+ defer.returnValue(chunk)
@defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True):
@@ -256,20 +266,20 @@ class MessageHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
+ user = self.hs.parse_userid(user_id)
+
rooms_ret = []
- now_rooms_token = yield self.store.get_room_events_max_id()
+ # FIXME (erikj): We need to not generate this token,
+ now_token = yield self.hs.get_event_sources().get_current_token()
# FIXME (erikj): Fix this.
- presence_stream = PresenceStreamData(self.hs)
- now_presence_token = yield presence_stream.max_token()
- presence = yield presence_stream.get_rows(
- user_id, 0, now_presence_token, None, None
+ presence_stream = self.hs.get_event_sources().sources["presence"]
+ pagination_config = PaginationConfig(from_token=now_token)
+ presence, _ = yield presence_stream.get_pagination_rows(
+ user, pagination_config, None
)
- # FIXME (erikj): We need to not generate this token,
- now_token = "%s_%s" % (now_rooms_token, now_presence_token)
-
limit = pagin_config.limit
if not limit:
limit = 10
@@ -291,7 +301,7 @@ class MessageHandler(BaseHandler):
messages, token = yield self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
- end_token=now_rooms_token,
+ end_token=now_token.events_key,
)
d["messages"] = {
@@ -300,14 +310,18 @@ class MessageHandler(BaseHandler):
"end": token[1],
}
- current_state = yield self.store.get_current_state(event.room_id)
+ current_state = yield self.store.get_current_state(
+ event.room_id
+ )
d["state"] = [c.get_dict() for c in current_state]
except:
logger.exception("Failed to get snapshot")
- ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
-
- # logger.debug("snapshot_all_rooms returning: %s", ret)
+ ret = {
+ "rooms": rooms_ret,
+ "presence": presence,
+ "end": now_token.to_string()
+ }
defer.returnValue(ret)
@@ -490,7 +504,7 @@ class RoomMemberHandler(BaseHandler):
for entry in member_list
]
chunk_data = {
- "start": "START", # FIXME (erikj): START is no longer a valid value
+ "start": "START", # FIXME (erikj): START is no longer valid
"end": "END",
"chunk": event_list
}
|