diff options
author | Travis Ralston <travpc@gmail.com> | 2021-02-02 07:26:36 -0700 |
---|---|---|
committer | Travis Ralston <travpc@gmail.com> | 2021-02-02 07:26:36 -0700 |
commit | 6ec034411cdad6f9f77f7dab7749d426ffd5744b (patch) | |
tree | d7935121ad99051eddf5769993bef674e618379e /synapse/storage/util | |
parent | Appease the linters (diff) | |
parent | Update changelog (diff) | |
download | synapse-travis/fosdem/admin-api-groups.tar.xz |
Merge branch 'develop' into travis/fosdem/admin-api-groups github/travis/fosdem/admin-api-groups travis/fosdem/admin-api-groups
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/util/id_generators.py | 27 | ||||
-rw-r--r-- | synapse/storage/util/sequence.py | 72 |
2 files changed, 87 insertions, 12 deletions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 39a3ab1162..71ef5a72dc 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -15,12 +15,11 @@ import heapq import logging import threading -from collections import deque +from collections import OrderedDict from contextlib import contextmanager from typing import Dict, List, Optional, Set, Tuple, Union import attr -from typing_extensions import Deque from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.database import DatabasePool, LoggingTransaction @@ -101,7 +100,13 @@ class StreamIdGenerator: self._current = (max if step > 0 else min)( self._current, _load_current_id(db_conn, table, column, step) ) - self._unfinished_ids = deque() # type: Deque[int] + + # We use this as an ordered set, as we want to efficiently append items, + # remove items and get the first item. Since we insert IDs in order, the + # insertion ordering will ensure its in the correct ordering. + # + # The key and values are the same, but we never look at the values. + self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int] def get_next(self): """ @@ -113,7 +118,7 @@ class StreamIdGenerator: self._current += self._step next_id = self._current - self._unfinished_ids.append(next_id) + self._unfinished_ids[next_id] = next_id @contextmanager def manager(): @@ -121,7 +126,7 @@ class StreamIdGenerator: yield next_id finally: with self._lock: - self._unfinished_ids.remove(next_id) + self._unfinished_ids.pop(next_id) return _AsyncCtxManagerWrapper(manager()) @@ -140,7 +145,7 @@ class StreamIdGenerator: self._current += n * self._step for next_id in next_ids: - self._unfinished_ids.append(next_id) + self._unfinished_ids[next_id] = next_id @contextmanager def manager(): @@ -149,7 +154,7 @@ class StreamIdGenerator: finally: with self._lock: for next_id in next_ids: - self._unfinished_ids.remove(next_id) + self._unfinished_ids.pop(next_id) return _AsyncCtxManagerWrapper(manager()) @@ -162,7 +167,7 @@ class StreamIdGenerator: """ with self._lock: if self._unfinished_ids: - return self._unfinished_ids[0] - self._step + return next(iter(self._unfinished_ids)) - self._step return self._current @@ -261,7 +266,11 @@ class MultiWriterIdGenerator: # We check that the table and sequence haven't diverged. for table, _, id_column in tables: self._sequence_gen.check_consistency( - db_conn, table=table, id_column=id_column, positive=positive + db_conn, + table=table, + id_column=id_column, + stream_name=stream_name, + positive=positive, ) # This goes and fills out the above state from the database. diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 412df6b8ef..0ec4dc2918 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -45,6 +45,21 @@ and run the following SQL: See docs/postgres.md for more information. """ +_INCONSISTENT_STREAM_ERROR = """ +Postgres sequence '%(seq)s' is inconsistent with associated stream position +of '%(stream_name)s' in the 'stream_positions' table. + +This is likely a programming error and should be reported at +https://github.com/matrix-org/synapse. + +A temporary workaround to fix this error is to shut down Synapse (including +any and all workers) and run the following SQL: + + DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s'; + +This will need to be done every time the server is restarted. +""" + class SequenceGenerator(metaclass=abc.ABCMeta): """A class which generates a unique sequence of integers""" @@ -55,19 +70,30 @@ class SequenceGenerator(metaclass=abc.ABCMeta): ... @abc.abstractmethod + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + """Get the next `n` IDs in the sequence""" + ... + + @abc.abstractmethod def check_consistency( self, db_conn: "LoggingDatabaseConnection", table: str, id_column: str, + stream_name: Optional[str] = None, positive: bool = True, ): """Should be called during start up to test that the current value of the sequence is greater than or equal to the maximum ID in the table. - This is to handle various cases where the sequence value can get out - of sync with the table, e.g. if Synapse gets rolled back to a previous + This is to handle various cases where the sequence value can get out of + sync with the table, e.g. if Synapse gets rolled back to a previous version and the rolled forwards again. + + If a stream name is given then this will check that any value in the + `stream_positions` table is less than or equal to the current sequence + value. If it isn't then it's likely that streams have been crossed + somewhere (e.g. two ID generators have the same stream name). """ ... @@ -93,8 +119,12 @@ class PostgresSequenceGenerator(SequenceGenerator): db_conn: "LoggingDatabaseConnection", table: str, id_column: str, + stream_name: Optional[str] = None, positive: bool = True, ): + """See SequenceGenerator.check_consistency for docstring. + """ + txn = db_conn.cursor(txn_name="sequence.check_consistency") # First we get the current max ID from the table. @@ -118,6 +148,18 @@ class PostgresSequenceGenerator(SequenceGenerator): "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name} ) last_value, is_called = txn.fetchone() + + # If we have an associated stream check the stream_positions table. + max_in_stream_positions = None + if stream_name: + txn.execute( + "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?", + (stream_name,), + ) + row = txn.fetchone() + if row: + max_in_stream_positions = row[0] + txn.close() # If `is_called` is False then `last_value` is actually the value that @@ -138,6 +180,14 @@ class PostgresSequenceGenerator(SequenceGenerator): % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} ) + # If we have values in the stream positions table then they have to be + # less than or equal to `last_value` + if max_in_stream_positions and max_in_stream_positions > last_value: + raise IncorrectDatabaseSetup( + _INCONSISTENT_STREAM_ERROR + % {"seq": self._sequence_name, "stream_name": stream_name} + ) + GetFirstCallbackType = Callable[[Cursor], int] @@ -174,8 +224,24 @@ class LocalSequenceGenerator(SequenceGenerator): self._current_max_id += 1 return self._current_max_id + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + with self._lock: + if self._current_max_id is None: + assert self._callback is not None + self._current_max_id = self._callback(txn) + self._callback = None + + first_id = self._current_max_id + 1 + self._current_max_id += n + return [first_id + i for i in range(n)] + def check_consistency( - self, db_conn: Connection, table: str, id_column: str, positive: bool = True + self, + db_conn: Connection, + table: str, + id_column: str, + stream_name: Optional[str] = None, + positive: bool = True, ): # There is nothing to do for in memory sequences pass |