summary refs log tree commit diff
path: root/synapse/handlers/room.py
diff options
context:
space:
mode:
authorBen Banfield-Zanin <benbz@matrix.org>2020-10-15 14:48:13 +0100
committerBen Banfield-Zanin <benbz@matrix.org>2020-10-15 14:48:13 +0100
commit8d9ae573f33110e0420204bceb111fd8df649e7c (patch)
treec8113c67df9769a14e8bb0a03620026dbe9aa0ba /synapse/handlers/room.py
parentMerge remote-tracking branch 'origin/anoa/3pid_check_invite_exemption' into b... (diff)
parentRemove racey assertion in MultiWriterIDGenerator (#8530) (diff)
downloadsynapse-8d9ae573f33110e0420204bceb111fd8df649e7c.tar.xz
Merge remote-tracking branch 'origin/release-v1.21.2' into bbz/info-mainline-1.21.2 github/bbz/info-mainline-1.21.2 bbz/info-mainline-1.21.2
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r--synapse/handlers/room.py37
1 files changed, 21 insertions, 16 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a29305f655..d5f7c78edf 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -70,7 +70,7 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000
 
 class RoomCreationHandler(BaseHandler):
     def __init__(self, hs: "HomeServer"):
-        super(RoomCreationHandler, self).__init__(hs)
+        super().__init__(hs)
 
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
@@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
 
         # Always wait for room creation to progate before returning
         await self._replication.wait_for_stream_position(
-            self.hs.config.worker.writers.events, "events", last_stream_id
+            self.hs.config.worker.events_shard_config.get_instance(room_id),
+            "events",
+            last_stream_id,
         )
 
         return result, last_stream_id
@@ -1075,11 +1077,13 @@ class RoomContextHandler:
         # the token, which we replace.
         token = StreamToken.START
 
-        results["start"] = token.copy_and_replace(
+        results["start"] = await token.copy_and_replace(
             "room_key", results["start"]
-        ).to_string()
+        ).to_string(self.store)
 
-        results["end"] = token.copy_and_replace("room_key", results["end"]).to_string()
+        results["end"] = await token.copy_and_replace(
+            "room_key", results["end"]
+        ).to_string(self.store)
 
         return results
 
@@ -1091,20 +1095,19 @@ class RoomEventSource:
     async def get_new_events(
         self,
         user: UserID,
-        from_key: str,
+        from_key: RoomStreamToken,
         limit: int,
         room_ids: List[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
-    ) -> Tuple[List[EventBase], str]:
+    ) -> Tuple[List[EventBase], RoomStreamToken]:
         # We just ignore the key for now.
 
         to_key = self.get_current_key()
 
-        from_token = RoomStreamToken.parse(from_key)
-        if from_token.topological:
+        if from_key.topological:
             logger.warning("Stream has topological part!!!! %r", from_key)
-            from_key = "s%s" % (from_token.stream,)
+            from_key = RoomStreamToken(None, from_key.stream)
 
         app_service = self.store.get_app_service_by_user_id(user.to_string())
         if app_service:
@@ -1139,8 +1142,8 @@ class RoomEventSource:
 
         return (events, end_key)
 
-    def get_current_key(self) -> str:
-        return "s%d" % (self.store.get_room_max_stream_ordering(),)
+    def get_current_key(self) -> RoomStreamToken:
+        return self.store.get_room_max_token()
 
     def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
         return self.store.get_room_events_max_id(room_id)
@@ -1260,10 +1263,10 @@ class RoomShutdownHandler:
             # We now wait for the create room to come back in via replication so
             # that we can assume that all the joins/invites have propogated before
             # we try and auto join below.
-            #
-            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.hs.config.worker.writers.events, "events", stream_id
+                self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+                "events",
+                stream_id,
             )
         else:
             new_room_id = None
@@ -1293,7 +1296,9 @@ class RoomShutdownHandler:
 
                 # Wait for leave to come in over replication before trying to forget.
                 await self._replication.wait_for_stream_position(
-                    self.hs.config.worker.writers.events, "events", stream_id
+                    self.hs.config.worker.events_shard_config.get_instance(room_id),
+                    "events",
+                    stream_id,
                 )
 
                 await self.room_member_handler.forget(target_requester.user, room_id)