diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a29305f655..d5f7c78edf 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -70,7 +70,7 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000
class RoomCreationHandler(BaseHandler):
def __init__(self, hs: "HomeServer"):
- super(RoomCreationHandler, self).__init__(hs)
+ super().__init__(hs)
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
@@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
# Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", last_stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ last_stream_id,
)
return result, last_stream_id
@@ -1075,11 +1077,13 @@ class RoomContextHandler:
# the token, which we replace.
token = StreamToken.START
- results["start"] = token.copy_and_replace(
+ results["start"] = await token.copy_and_replace(
"room_key", results["start"]
- ).to_string()
+ ).to_string(self.store)
- results["end"] = token.copy_and_replace("room_key", results["end"]).to_string()
+ results["end"] = await token.copy_and_replace(
+ "room_key", results["end"]
+ ).to_string(self.store)
return results
@@ -1091,20 +1095,19 @@ class RoomEventSource:
async def get_new_events(
self,
user: UserID,
- from_key: str,
+ from_key: RoomStreamToken,
limit: int,
room_ids: List[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
- ) -> Tuple[List[EventBase], str]:
+ ) -> Tuple[List[EventBase], RoomStreamToken]:
# We just ignore the key for now.
to_key = self.get_current_key()
- from_token = RoomStreamToken.parse(from_key)
- if from_token.topological:
+ if from_key.topological:
logger.warning("Stream has topological part!!!! %r", from_key)
- from_key = "s%s" % (from_token.stream,)
+ from_key = RoomStreamToken(None, from_key.stream)
app_service = self.store.get_app_service_by_user_id(user.to_string())
if app_service:
@@ -1139,8 +1142,8 @@ class RoomEventSource:
return (events, end_key)
- def get_current_key(self) -> str:
- return "s%d" % (self.store.get_room_max_stream_ordering(),)
+ def get_current_key(self) -> RoomStreamToken:
+ return self.store.get_room_max_token()
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
@@ -1260,10 +1263,10 @@ class RoomShutdownHandler:
# We now wait for the create room to come back in via replication so
# that we can assume that all the joins/invites have propogated before
# we try and auto join below.
- #
- # TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+ "events",
+ stream_id,
)
else:
new_room_id = None
@@ -1293,7 +1296,9 @@ class RoomShutdownHandler:
# Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position(
- self.hs.config.worker.writers.events, "events", stream_id
+ self.hs.config.worker.events_shard_config.get_instance(room_id),
+ "events",
+ stream_id,
)
await self.room_member_handler.forget(target_requester.user, room_id)
|