summary refs log tree commit diff
path: root/synapse/storage/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/util')
-rw-r--r--synapse/storage/util/id_generators.py6
-rw-r--r--synapse/storage/util/sequence.py56
2 files changed, 58 insertions, 4 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 39a3ab1162..bb84c0d792 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -261,7 +261,11 @@ class MultiWriterIdGenerator:
         # We check that the table and sequence haven't diverged.
         for table, _, id_column in tables:
             self._sequence_gen.check_consistency(
-                db_conn, table=table, id_column=id_column, positive=positive
+                db_conn,
+                table=table,
+                id_column=id_column,
+                stream_name=stream_name,
+                positive=positive,
             )
 
         # This goes and fills out the above state from the database.
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 412df6b8ef..b6fe136fb7 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -45,6 +45,21 @@ and run the following SQL:
 See docs/postgres.md for more information.
 """
 
+_INCONSISTENT_STREAM_ERROR = """
+Postgres sequence '%(seq)s' is inconsistent with associated stream position
+of '%(stream_name)s' in the 'stream_positions' table.
+
+This is likely a programming error and should be reported at
+https://github.com/matrix-org/synapse.
+
+A temporary workaround to fix this error is to shut down Synapse (including
+any and all workers) and run the following SQL:
+
+    DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
+
+This will need to be done every time the server is restarted.
+"""
+
 
 class SequenceGenerator(metaclass=abc.ABCMeta):
     """A class which generates a unique sequence of integers"""
@@ -60,14 +75,20 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
         db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
         """Should be called during start up to test that the current value of
         the sequence is greater than or equal to the maximum ID in the table.
 
-        This is to handle various cases where the sequence value can get out
-        of sync with the table, e.g. if Synapse gets rolled back to a previous
+        This is to handle various cases where the sequence value can get out of
+        sync with the table, e.g. if Synapse gets rolled back to a previous
         version and the rolled forwards again.
+
+        If a stream name is given then this will check that any value in the
+        `stream_positions` table is less than or equal to the current sequence
+        value. If it isn't then it's likely that streams have been crossed
+        somewhere (e.g. two ID generators have the same stream name).
         """
         ...
 
@@ -93,8 +114,12 @@ class PostgresSequenceGenerator(SequenceGenerator):
         db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
+        """See SequenceGenerator.check_consistency for docstring.
+        """
+
         txn = db_conn.cursor(txn_name="sequence.check_consistency")
 
         # First we get the current max ID from the table.
@@ -118,6 +143,18 @@ class PostgresSequenceGenerator(SequenceGenerator):
             "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
         )
         last_value, is_called = txn.fetchone()
+
+        # If we have an associated stream check the stream_positions table.
+        max_in_stream_positions = None
+        if stream_name:
+            txn.execute(
+                "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?",
+                (stream_name,),
+            )
+            row = txn.fetchone()
+            if row:
+                max_in_stream_positions = row[0]
+
         txn.close()
 
         # If `is_called` is False then `last_value` is actually the value that
@@ -138,6 +175,14 @@ class PostgresSequenceGenerator(SequenceGenerator):
                 % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
             )
 
+        # If we have values in the stream positions table then they have to be
+        # less than or equal to `last_value`
+        if max_in_stream_positions and max_in_stream_positions > last_value:
+            raise IncorrectDatabaseSetup(
+                _INCONSISTENT_STREAM_ERROR
+                % {"seq": self._sequence_name, "stream": stream_name}
+            )
+
 
 GetFirstCallbackType = Callable[[Cursor], int]
 
@@ -175,7 +220,12 @@ class LocalSequenceGenerator(SequenceGenerator):
             return self._current_max_id
 
     def check_consistency(
-        self, db_conn: Connection, table: str, id_column: str, positive: bool = True
+        self,
+        db_conn: Connection,
+        table: str,
+        id_column: str,
+        stream_name: Optional[str] = None,
+        positive: bool = True,
     ):
         # There is nothing to do for in memory sequences
         pass