summary refs log tree commit diff
path: root/synapse/storage/util/sequence.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-01-21 15:09:09 +0000
committerGitHub <noreply@github.com>2021-01-21 15:09:09 +0000
commit2506074ef0a880b527d61457c32cd397a0d3ab2d (patch)
tree5fd682065a7585ee24a423e89800bc208052d306 /synapse/storage/util/sequence.py
parentPrefix idp_id with "oidc-" (#9189) (diff)
downloadsynapse-2506074ef0a880b527d61457c32cd397a0d3ab2d.tar.xz
Fix receipts or account data not being sent down sync (#9193)
Introduced in #9104 

This wasn't picked up by the tests as this is all fine the first time you run Synapse (after upgrading), but then when you restart the wrong value is pulled from `stream_positions`. 
Diffstat (limited to 'synapse/storage/util/sequence.py')
-rw-r--r--synapse/storage/util/sequence.py56
1 files changed, 53 insertions, 3 deletions
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 412df6b8ef..b6fe136fb7 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -45,6 +45,21 @@ and run the following SQL:
 See docs/postgres.md for more information.
 """
 
+_INCONSISTENT_STREAM_ERROR = """
+Postgres sequence '%(seq)s' is inconsistent with associated stream position
+of '%(stream_name)s' in the 'stream_positions' table.
+
+This is likely a programming error and should be reported at
+https://github.com/matrix-org/synapse.
+
+A temporary workaround to fix this error is to shut down Synapse (including
+any and all workers) and run the following SQL:
+
+    DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
+
+This will need to be done every time the server is restarted.
+"""
+
 
 class SequenceGenerator(metaclass=abc.ABCMeta):
     """A class which generates a unique sequence of integers"""
@@ -60,14 +75,20 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
         db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
         """Should be called during start up to test that the current value of
         the sequence is greater than or equal to the maximum ID in the table.
 
-        This is to handle various cases where the sequence value can get out
-        of sync with the table, e.g. if Synapse gets rolled back to a previous
+        This is to handle various cases where the sequence value can get out of
+        sync with the table, e.g. if Synapse gets rolled back to a previous
         version and the rolled forwards again.
+
+        If a stream name is given then this will check that any value in the
+        `stream_positions` table is less than or equal to the current sequence
+        value. If it isn't then it's likely that streams have been crossed
+        somewhere (e.g. two ID generators have the same stream name).
         """
         ...
 
@@ -93,8 +114,12 @@ class PostgresSequenceGenerator(SequenceGenerator):
         db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
+        """See SequenceGenerator.check_consistency for docstring.
+        """
+
         txn = db_conn.cursor(txn_name="sequence.check_consistency")
 
         # First we get the current max ID from the table.
@@ -118,6 +143,18 @@ class PostgresSequenceGenerator(SequenceGenerator):
             "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
         )
         last_value, is_called = txn.fetchone()
+
+        # If we have an associated stream check the stream_positions table.
+        max_in_stream_positions = None
+        if stream_name:
+            txn.execute(
+                "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?",
+                (stream_name,),
+            )
+            row = txn.fetchone()
+            if row:
+                max_in_stream_positions = row[0]
+
         txn.close()
 
         # If `is_called` is False then `last_value` is actually the value that
@@ -138,6 +175,14 @@ class PostgresSequenceGenerator(SequenceGenerator):
                 % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
             )
 
+        # If we have values in the stream positions table then they have to be
+        # less than or equal to `last_value`
+        if max_in_stream_positions and max_in_stream_positions > last_value:
+            raise IncorrectDatabaseSetup(
+                _INCONSISTENT_STREAM_ERROR
+                % {"seq": self._sequence_name, "stream": stream_name}
+            )
+
 
 GetFirstCallbackType = Callable[[Cursor], int]
 
@@ -175,7 +220,12 @@ class LocalSequenceGenerator(SequenceGenerator):
             return self._current_max_id
 
     def check_consistency(
-        self, db_conn: Connection, table: str, id_column: str, positive: bool = True
+        self,
+        db_conn: Connection,
+        table: str,
+        id_column: str,
+        stream_name: Optional[str] = None,
+        positive: bool = True,
     ):
         # There is nothing to do for in memory sequences
         pass