summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/8387.feature1
-rwxr-xr-xscripts/synapse_port_db24
2 files changed, 25 insertions, 0 deletions
diff --git a/changelog.d/8387.feature b/changelog.d/8387.feature
new file mode 100644
index 0000000000..b363e929ea
--- /dev/null
+++ b/changelog.d/8387.feature
@@ -0,0 +1 @@
+Add experimental support for sharding event persister.
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index ecca8b6e8f..684a518b8e 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -628,6 +628,7 @@ class Porter(object):
             self.progress.set_state("Setting up sequence generators")
             await self._setup_state_group_id_seq()
             await self._setup_user_id_seq()
+            await self._setup_events_stream_seqs()
 
             self.progress.done()
         except Exception as e:
@@ -804,6 +805,29 @@ class Porter(object):
 
         return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
 
+    def _setup_events_stream_seqs(self):
+        def r(txn):
+            txn.execute("SELECT MAX(stream_ordering) FROM events")
+            curr_id = txn.fetchone()[0]
+            if curr_id:
+                next_id = curr_id + 1
+                txn.execute(
+                    "ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,)
+                )
+
+            txn.execute("SELECT -MIN(stream_ordering) FROM events")
+            curr_id = txn.fetchone()[0]
+            if curr_id:
+                next_id = curr_id + 1
+                txn.execute(
+                    "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
+                    (next_id,),
+                )
+
+        return self.postgres_store.db_pool.runInteraction(
+            "_setup_events_stream_seqs", r
+        )
+
 
 ##############################################
 # The following is simply UI stuff