summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2021-01-25 19:37:58 +0000
committerRichard van der Hoff <richard@matrix.org>2021-01-25 19:37:58 +0000
commit65fb3b2e254a2ec331ec46de470c9cebc9597b3a (patch)
tree7192abb4cd85142ebe38deb9ae6bf5540bfbbe5c /synapse/storage
parentAdd a check for duplicate IdP ids (#9184) (diff)
parentTweak changes. (diff)
downloadsynapse-65fb3b2e254a2ec331ec46de470c9cebc9597b3a.tar.xz
Merge tag 'v1.26.0rc2' into social_login
Synapse 1.26.0rc2 (2021-01-25)
==============================

Bugfixes
--------

- Fix receipts and account data not being sent down sync. Introduced in v1.26.0rc1. ([\#9193](https://github.com/matrix-org/synapse/issues/9193), [\#9195](https://github.com/matrix-org/synapse/issues/9195))
- Fix chain cover update to handle events with duplicate auth events. Introduced in v1.26.0rc1. ([\#9210](https://github.com/matrix-org/synapse/issues/9210))

Internal Changes
----------------

- Add an `oidc-` prefix to any `idp_id`s which are given in the `oidc_providers` configuration. ([\#9189](https://github.com/matrix-org/synapse/issues/9189))
- Bump minimum `psycopg2` version to v2.8. ([\#9204](https://github.com/matrix-org/synapse/issues/9204))
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/account_data.py2
-rw-r--r--synapse/storage/databases/main/receipts.py4
-rw-r--r--synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql18
-rw-r--r--synapse/storage/util/id_generators.py6
-rw-r--r--synapse/storage/util/sequence.py56
5 files changed, 79 insertions, 7 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 68896f34af..a277a1ef13 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -68,7 +68,7 @@ class AccountDataWorkerStore(SQLBaseStore):
             # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
             # updated over replication. (Multiple writers are not supported for
             # SQLite).
-            if hs.get_instance_name() in hs.config.worker.writers.events:
+            if hs.get_instance_name() in hs.config.worker.writers.account_data:
                 self._account_data_id_gen = StreamIdGenerator(
                     db_conn,
                     "room_account_data",
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index e0e57f0578..e4843a202c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -45,7 +45,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             self._receipts_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
-                stream_name="account_data",
+                stream_name="receipts",
                 instance_name=self._instance_name,
                 tables=[("receipts_linearized", "instance_name", "stream_id")],
                 sequence_name="receipts_sequence",
@@ -61,7 +61,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
             # updated over replication. (Multiple writers are not supported for
             # SQLite).
-            if hs.get_instance_name() in hs.config.worker.writers.events:
+            if hs.get_instance_name() in hs.config.worker.writers.receipts:
                 self._receipts_id_gen = StreamIdGenerator(
                     db_conn, "receipts_linearized", "stream_id"
                 )
diff --git a/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql b/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql
new file mode 100644
index 0000000000..9f2b5ebc5a
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- We incorrectly populated these, so we delete them and let the
+-- MultiWriterIdGenerator repopulate it.
+DELETE FROM stream_positions WHERE stream_name = 'receipts' OR stream_name = 'account_data';
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 39a3ab1162..bb84c0d792 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -261,7 +261,11 @@ class MultiWriterIdGenerator:
         # We check that the table and sequence haven't diverged.
         for table, _, id_column in tables:
             self._sequence_gen.check_consistency(
-                db_conn, table=table, id_column=id_column, positive=positive
+                db_conn,
+                table=table,
+                id_column=id_column,
+                stream_name=stream_name,
+                positive=positive,
             )
 
         # This goes and fills out the above state from the database.
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 412df6b8ef..c780ade077 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -45,6 +45,21 @@ and run the following SQL:
 See docs/postgres.md for more information.
 """
 
+_INCONSISTENT_STREAM_ERROR = """
+Postgres sequence '%(seq)s' is inconsistent with associated stream position
+of '%(stream_name)s' in the 'stream_positions' table.
+
+This is likely a programming error and should be reported at
+https://github.com/matrix-org/synapse.
+
+A temporary workaround to fix this error is to shut down Synapse (including
+any and all workers) and run the following SQL:
+
+    DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
+
+This will need to be done every time the server is restarted.
+"""
+
 
 class SequenceGenerator(metaclass=abc.ABCMeta):
     """A class which generates a unique sequence of integers"""
@@ -60,14 +75,20 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
         db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
         """Should be called during start up to test that the current value of
         the sequence is greater than or equal to the maximum ID in the table.
 
-        This is to handle various cases where the sequence value can get out
-        of sync with the table, e.g. if Synapse gets rolled back to a previous
+        This is to handle various cases where the sequence value can get out of
+        sync with the table, e.g. if Synapse gets rolled back to a previous
         version and the rolled forwards again.
+
+        If a stream name is given then this will check that any value in the
+        `stream_positions` table is less than or equal to the current sequence
+        value. If it isn't then it's likely that streams have been crossed
+        somewhere (e.g. two ID generators have the same stream name).
         """
         ...
 
@@ -93,8 +114,12 @@ class PostgresSequenceGenerator(SequenceGenerator):
         db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
+        """See SequenceGenerator.check_consistency for docstring.
+        """
+
         txn = db_conn.cursor(txn_name="sequence.check_consistency")
 
         # First we get the current max ID from the table.
@@ -118,6 +143,18 @@ class PostgresSequenceGenerator(SequenceGenerator):
             "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
         )
         last_value, is_called = txn.fetchone()
+
+        # If we have an associated stream check the stream_positions table.
+        max_in_stream_positions = None
+        if stream_name:
+            txn.execute(
+                "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?",
+                (stream_name,),
+            )
+            row = txn.fetchone()
+            if row:
+                max_in_stream_positions = row[0]
+
         txn.close()
 
         # If `is_called` is False then `last_value` is actually the value that
@@ -138,6 +175,14 @@ class PostgresSequenceGenerator(SequenceGenerator):
                 % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
             )
 
+        # If we have values in the stream positions table then they have to be
+        # less than or equal to `last_value`
+        if max_in_stream_positions and max_in_stream_positions > last_value:
+            raise IncorrectDatabaseSetup(
+                _INCONSISTENT_STREAM_ERROR
+                % {"seq": self._sequence_name, "stream_name": stream_name}
+            )
+
 
 GetFirstCallbackType = Callable[[Cursor], int]
 
@@ -175,7 +220,12 @@ class LocalSequenceGenerator(SequenceGenerator):
             return self._current_max_id
 
     def check_consistency(
-        self, db_conn: Connection, table: str, id_column: str, positive: bool = True
+        self,
+        db_conn: Connection,
+        table: str,
+        id_column: str,
+        stream_name: Optional[str] = None,
+        positive: bool = True,
     ):
         # There is nothing to do for in memory sequences
         pass