diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index d5ddc583ad..8cd7eb22a3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
-from synapse.types import JsonDict, Requester, StreamToken, UserID
+from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -42,7 +42,7 @@ logger = logging.getLogger(__name__)
class InitialSyncHandler(BaseHandler):
def __init__(self, hs: "HomeServer"):
- super(InitialSyncHandler, self).__init__(hs)
+ super().__init__(hs)
self.hs = hs
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
@@ -116,14 +116,13 @@ class InitialSyncHandler(BaseHandler):
now_token = self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"]
- pagination_config = PaginationConfig(from_token=now_token)
- presence, _ = await presence_stream.get_pagination_rows(
- user, pagination_config.get_source_config("presence"), None
+ presence, _ = await presence_stream.get_new_events(
+ user, from_key=None, include_offline=False
)
- receipt_stream = self.hs.get_event_sources().sources["receipt"]
- receipt, _ = await receipt_stream.get_pagination_rows(
- user, pagination_config.get_source_config("receipt"), None
+ joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN]
+ receipt = await self.store.get_linearized_receipts_for_rooms(
+ joined_rooms, to_key=int(now_token.receipt_key),
)
tags_by_room = await self.store.get_tags_for_user(user_id)
@@ -168,7 +167,7 @@ class InitialSyncHandler(BaseHandler):
self.state_handler.get_current_state, event.room_id
)
elif event.membership == Membership.LEAVE:
- room_end_token = "s%d" % (event.stream_ordering,)
+ room_end_token = RoomStreamToken(None, event.stream_ordering,)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
|