1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
|
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
import threading
from typing import Callable, List, Optional
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import (
BaseDatabaseEngine,
IncorrectDatabaseSetup,
PostgresEngine,
)
from synapse.storage.types import Connection, Cursor
logger = logging.getLogger(__name__)
_INCONSISTENT_SEQUENCE_ERROR = """
Postgres sequence '%(seq)s' is inconsistent with associated
table '%(table)s'. This can happen if Synapse has been downgraded and
then upgraded again, or due to a bad migration.
To fix this error, shut down Synapse (including any and all workers)
and run the following SQL:
SELECT setval('%(seq)s', (
%(max_id_sql)s
));
See docs/postgres.md for more information.
"""
class SequenceGenerator(metaclass=abc.ABCMeta):
"""A class which generates a unique sequence of integers"""
@abc.abstractmethod
def get_next_id_txn(self, txn: Cursor) -> int:
"""Gets the next ID in the sequence"""
...
@abc.abstractmethod
def check_consistency(
self,
db_conn: LoggingDatabaseConnection,
table: str,
id_column: str,
positive: bool = True,
):
"""Should be called during start up to test that the current value of
the sequence is greater than or equal to the maximum ID in the table.
This is to handle various cases where the sequence value can get out
of sync with the table, e.g. if Synapse gets rolled back to a previous
version and the rolled forwards again.
"""
...
class PostgresSequenceGenerator(SequenceGenerator):
"""An implementation of SequenceGenerator which uses a postgres sequence"""
def __init__(self, sequence_name: str):
self._sequence_name = sequence_name
def get_next_id_txn(self, txn: Cursor) -> int:
txn.execute("SELECT nextval(?)", (self._sequence_name,))
return txn.fetchone()[0]
def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
txn.execute(
"SELECT nextval(?) FROM generate_series(1, ?)", (self._sequence_name, n)
)
return [i for (i,) in txn]
def check_consistency(
self,
db_conn: LoggingDatabaseConnection,
table: str,
id_column: str,
positive: bool = True,
):
txn = db_conn.cursor(txn_name="sequence.check_consistency")
# First we get the current max ID from the table.
table_sql = "SELECT GREATEST(%(agg)s(%(id)s), 0) FROM %(table)s" % {
"id": id_column,
"table": table,
"agg": "MAX" if positive else "-MIN",
}
txn.execute(table_sql)
row = txn.fetchone()
if not row:
# Table is empty, so nothing to do.
txn.close()
return
# Now we fetch the current value from the sequence and compare with the
# above.
max_stream_id = row[0]
txn.execute(
"SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
)
last_value, is_called = txn.fetchone()
txn.close()
# If `is_called` is False then `last_value` is actually the value that
# will be generated next, so we decrement to get the true "last value".
if not is_called:
last_value -= 1
if max_stream_id > last_value:
logger.warning(
"Postgres sequence %s is behind table %s: %d < %d",
last_value,
max_stream_id,
)
raise IncorrectDatabaseSetup(
_INCONSISTENT_SEQUENCE_ERROR
% {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
)
GetFirstCallbackType = Callable[[Cursor], int]
class LocalSequenceGenerator(SequenceGenerator):
"""An implementation of SequenceGenerator which uses local locking
This only works reliably if there are no other worker processes generating IDs at
the same time.
"""
def __init__(self, get_first_callback: GetFirstCallbackType):
"""
Args:
get_first_callback: a callback which is called on the first call to
get_next_id_txn; should return the curreent maximum id
"""
# the callback. this is cleared after it is called, so that it can be GCed.
self._callback = get_first_callback # type: Optional[GetFirstCallbackType]
# The current max value, or None if we haven't looked in the DB yet.
self._current_max_id = None # type: Optional[int]
self._lock = threading.Lock()
def get_next_id_txn(self, txn: Cursor) -> int:
# We do application locking here since if we're using sqlite then
# we are a single process synapse.
with self._lock:
if self._current_max_id is None:
assert self._callback is not None
self._current_max_id = self._callback(txn)
self._callback = None
self._current_max_id += 1
return self._current_max_id
def check_consistency(
self, db_conn: Connection, table: str, id_column: str, positive: bool = True
):
# There is nothing to do for in memory sequences
pass
def build_sequence_generator(
database_engine: BaseDatabaseEngine,
get_first_callback: GetFirstCallbackType,
sequence_name: str,
) -> SequenceGenerator:
"""Get the best impl of SequenceGenerator available
This uses PostgresSequenceGenerator on postgres, and a locally-locked impl on
sqlite.
Args:
database_engine: the database engine we are connected to
get_first_callback: a callback which gets the next sequence ID. Used if
we're on sqlite.
sequence_name: the name of a postgres sequence to use.
"""
if isinstance(database_engine, PostgresEngine):
return PostgresSequenceGenerator(sequence_name)
else:
return LocalSequenceGenerator(get_first_callback)
|