diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/account_data.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql | 18 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 6 | ||||
-rw-r--r-- | synapse/storage/util/sequence.py | 56 | ||||
-rw-r--r-- | synapse/util/iterutils.py | 2 |
8 files changed, 83 insertions, 11 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index d423856d82..3cd682f9e7 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -48,7 +48,7 @@ try: except ImportError: pass -__version__ = "1.26.0rc1" +__version__ = "1.26.0rc2" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index c97e0df1f5..bfd46a3730 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -86,8 +86,8 @@ REQUIREMENTS = [ CONDITIONAL_REQUIREMENTS = { "matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"], - # we use execute_batch, which arrived in psycopg 2.7. - "postgres": ["psycopg2>=2.7"], + # we use execute_values with the fetch param, which arrived in psycopg 2.8. + "postgres": ["psycopg2>=2.8"], # ACME support is required to provision TLS certificates from authorities # that use the protocol, such as Let's Encrypt. "acme": [ diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 68896f34af..a277a1ef13 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -68,7 +68,7 @@ class AccountDataWorkerStore(SQLBaseStore): # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets # updated over replication. (Multiple writers are not supported for # SQLite). - if hs.get_instance_name() in hs.config.worker.writers.events: + if hs.get_instance_name() in hs.config.worker.writers.account_data: self._account_data_id_gen = StreamIdGenerator( db_conn, "room_account_data", diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index e0e57f0578..e4843a202c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -45,7 +45,7 @@ class ReceiptsWorkerStore(SQLBaseStore): self._receipts_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, - stream_name="account_data", + stream_name="receipts", instance_name=self._instance_name, tables=[("receipts_linearized", "instance_name", "stream_id")], sequence_name="receipts_sequence", @@ -61,7 +61,7 @@ class ReceiptsWorkerStore(SQLBaseStore): # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets # updated over replication. (Multiple writers are not supported for # SQLite). - if hs.get_instance_name() in hs.config.worker.writers.events: + if hs.get_instance_name() in hs.config.worker.writers.receipts: self._receipts_id_gen = StreamIdGenerator( db_conn, "receipts_linearized", "stream_id" ) diff --git a/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql b/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql new file mode 100644 index 0000000000..9f2b5ebc5a --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- We incorrectly populated these, so we delete them and let the +-- MultiWriterIdGenerator repopulate it. +DELETE FROM stream_positions WHERE stream_name = 'receipts' OR stream_name = 'account_data'; diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 39a3ab1162..bb84c0d792 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -261,7 +261,11 @@ class MultiWriterIdGenerator: # We check that the table and sequence haven't diverged. for table, _, id_column in tables: self._sequence_gen.check_consistency( - db_conn, table=table, id_column=id_column, positive=positive + db_conn, + table=table, + id_column=id_column, + stream_name=stream_name, + positive=positive, ) # This goes and fills out the above state from the database. diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 412df6b8ef..c780ade077 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -45,6 +45,21 @@ and run the following SQL: See docs/postgres.md for more information. """ +_INCONSISTENT_STREAM_ERROR = """ +Postgres sequence '%(seq)s' is inconsistent with associated stream position +of '%(stream_name)s' in the 'stream_positions' table. + +This is likely a programming error and should be reported at +https://github.com/matrix-org/synapse. + +A temporary workaround to fix this error is to shut down Synapse (including +any and all workers) and run the following SQL: + + DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s'; + +This will need to be done every time the server is restarted. +""" + class SequenceGenerator(metaclass=abc.ABCMeta): """A class which generates a unique sequence of integers""" @@ -60,14 +75,20 @@ class SequenceGenerator(metaclass=abc.ABCMeta): db_conn: "LoggingDatabaseConnection", table: str, id_column: str, + stream_name: Optional[str] = None, positive: bool = True, ): """Should be called during start up to test that the current value of the sequence is greater than or equal to the maximum ID in the table. - This is to handle various cases where the sequence value can get out - of sync with the table, e.g. if Synapse gets rolled back to a previous + This is to handle various cases where the sequence value can get out of + sync with the table, e.g. if Synapse gets rolled back to a previous version and the rolled forwards again. + + If a stream name is given then this will check that any value in the + `stream_positions` table is less than or equal to the current sequence + value. If it isn't then it's likely that streams have been crossed + somewhere (e.g. two ID generators have the same stream name). """ ... @@ -93,8 +114,12 @@ class PostgresSequenceGenerator(SequenceGenerator): db_conn: "LoggingDatabaseConnection", table: str, id_column: str, + stream_name: Optional[str] = None, positive: bool = True, ): + """See SequenceGenerator.check_consistency for docstring. + """ + txn = db_conn.cursor(txn_name="sequence.check_consistency") # First we get the current max ID from the table. @@ -118,6 +143,18 @@ class PostgresSequenceGenerator(SequenceGenerator): "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name} ) last_value, is_called = txn.fetchone() + + # If we have an associated stream check the stream_positions table. + max_in_stream_positions = None + if stream_name: + txn.execute( + "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?", + (stream_name,), + ) + row = txn.fetchone() + if row: + max_in_stream_positions = row[0] + txn.close() # If `is_called` is False then `last_value` is actually the value that @@ -138,6 +175,14 @@ class PostgresSequenceGenerator(SequenceGenerator): % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} ) + # If we have values in the stream positions table then they have to be + # less than or equal to `last_value` + if max_in_stream_positions and max_in_stream_positions > last_value: + raise IncorrectDatabaseSetup( + _INCONSISTENT_STREAM_ERROR + % {"seq": self._sequence_name, "stream_name": stream_name} + ) + GetFirstCallbackType = Callable[[Cursor], int] @@ -175,7 +220,12 @@ class LocalSequenceGenerator(SequenceGenerator): return self._current_max_id def check_consistency( - self, db_conn: Connection, table: str, id_column: str, positive: bool = True + self, + db_conn: Connection, + table: str, + id_column: str, + stream_name: Optional[str] = None, + positive: bool = True, ): # There is nothing to do for in memory sequences pass diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 6ef2b008a4..8d2411513f 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -78,7 +78,7 @@ def sorted_topologically( if node not in degree_map: continue - for edge in edges: + for edge in set(edges): if edge in degree_map: degree_map[node] += 1 |