summary refs log tree commit diff
path: root/synapse/handlers/room.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-14 10:16:41 +0100
committerGitHub <noreply@github.com>2020-09-14 10:16:41 +0100
commit04cc249b43e8716513f788b2a4eeb8ede24d19df (patch)
tree1f5e539b9db71f3d47980ce99c47e6f1d747352b /synapse/handlers/room.py
parentMerge tag 'v1.20.0rc3' into develop (diff)
downloadsynapse-04cc249b43e8716513f788b2a4eeb8ede24d19df.tar.xz
Add experimental support for sharding event persister. Again. (#8294)
This is *not* ready for production yet. Caveats:

1. We should write some tests...
2. The stream token that we use for events can get stalled at the minimum position of all writers. This means that new events may not be processed and e.g. sent down sync streams if a writer isn't writing or is slow.
Diffstat (limited to 'synapse/handlers/room.py')
-rw-r--r--synapse/handlers/room.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 53d85ab97d..eeade6ad3f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
 
         # Always wait for room creation to progate before returning
         await self._replication.wait_for_stream_position(
-            self.hs.config.worker.writers.events, "events", last_stream_id
+            self.hs.config.worker.events_shard_config.get_instance(room_id),
+            "events",
+            last_stream_id,
         )
 
         return result, last_stream_id
@@ -1259,10 +1261,10 @@ class RoomShutdownHandler:
             # We now wait for the create room to come back in via replication so
             # that we can assume that all the joins/invites have propogated before
             # we try and auto join below.
-            #
-            # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                self.hs.config.worker.writers.events, "events", stream_id
+                self.hs.config.worker.events_shard_config.get_instance(new_room_id),
+                "events",
+                stream_id,
             )
         else:
             new_room_id = None
@@ -1292,7 +1294,9 @@ class RoomShutdownHandler:
 
                 # Wait for leave to come in over replication before trying to forget.
                 await self._replication.wait_for_stream_position(
-                    self.hs.config.worker.writers.events, "events", stream_id
+                    self.hs.config.worker.events_shard_config.get_instance(room_id),
+                    "events",
+                    stream_id,
                 )
 
                 await self.room_member_handler.forget(target_requester.user, room_id)