diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index d5ddc583ad..39a85801c1 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StreamToken, UserID
+from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -42,7 +42,7 @@ logger = logging.getLogger(__name__)
class InitialSyncHandler(BaseHandler):
def __init__(self, hs: "HomeServer"):
- super(InitialSyncHandler, self).__init__(hs)
+ super().__init__(hs)
self.hs = hs
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
@@ -116,14 +116,13 @@ class InitialSyncHandler(BaseHandler):
now_token = self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"]
- pagination_config = PaginationConfig(from_token=now_token)
- presence, _ = await presence_stream.get_pagination_rows(
- user, pagination_config.get_source_config("presence"), None
+ presence, _ = await presence_stream.get_new_events(
+ user, from_key=None, include_offline=False
)
- receipt_stream = self.hs.get_event_sources().sources["receipt"]
- receipt, _ = await receipt_stream.get_pagination_rows(
- user, pagination_config.get_source_config("receipt"), None
+ joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN]
+ receipt = await self.store.get_linearized_receipts_for_rooms(
+ joined_rooms, to_key=int(now_token.receipt_key),
)
tags_by_room = await self.store.get_tags_for_user(user_id)
@@ -168,7 +167,7 @@ class InitialSyncHandler(BaseHandler):
self.state_handler.get_current_state, event.room_id
)
elif event.membership == Membership.LEAVE:
- room_end_token = "s%d" % (event.stream_ordering,)
+ room_end_token = RoomStreamToken(None, event.stream_ordering,)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
@@ -204,8 +203,8 @@ class InitialSyncHandler(BaseHandler):
messages, time_now=time_now, as_client_event=as_client_event
)
),
- "start": start_token.to_string(),
- "end": end_token.to_string(),
+ "start": await start_token.to_string(self.store),
+ "end": await end_token.to_string(self.store),
}
d["state"] = await self._event_serializer.serialize_events(
@@ -250,7 +249,7 @@ class InitialSyncHandler(BaseHandler):
],
"account_data": account_data_events,
"receipts": receipt,
- "end": now_token.to_string(),
+ "end": await now_token.to_string(self.store),
}
return ret
@@ -326,7 +325,8 @@ class InitialSyncHandler(BaseHandler):
if limit is None:
limit = 10
- stream_token = await self.store.get_stream_token_for_event(member_event_id)
+ leave_position = await self.store.get_position_for_event(member_event_id)
+ stream_token = leave_position.to_room_stream_token()
messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
@@ -348,8 +348,8 @@ class InitialSyncHandler(BaseHandler):
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
- "start": start_token.to_string(),
- "end": end_token.to_string(),
+ "start": await start_token.to_string(self.store),
+ "end": await end_token.to_string(self.store),
},
"state": (
await self._event_serializer.serialize_events(
@@ -447,8 +447,8 @@ class InitialSyncHandler(BaseHandler):
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
- "start": start_token.to_string(),
- "end": end_token.to_string(),
+ "start": await start_token.to_string(self.store),
+ "end": await end_token.to_string(self.store),
},
"state": state,
"presence": presence,
|