diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0fdf49a2fd..b971f0cb18 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -495,6 +495,7 @@ class SQLBaseStore(object):
Deferred(bool): True if a new entry was created, False if an
existing one was updated.
"""
+ attempts = 0
while True:
try:
result = yield self.runInteraction(
@@ -504,6 +505,12 @@ class SQLBaseStore(object):
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
+ attempts += 1
+ if attempts >= 5:
+ # don't retry forever, because things other than races
+ # can cause IntegrityErrors
+ raise
+
# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
@@ -600,20 +607,18 @@ class SQLBaseStore(object):
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
- if keyvalues:
- where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
- else:
- where = ""
-
sql = (
- "SELECT %(retcol)s FROM %(table)s %(where)s"
+ "SELECT %(retcol)s FROM %(table)s"
) % {
"retcol": retcol,
"table": table,
- "where": where,
}
- txn.execute(sql, keyvalues.values())
+ if keyvalues:
+ sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+ txn.execute(sql, keyvalues.values())
+ else:
+ txn.execute(sql)
return [r[0] for r in txn]
@@ -624,7 +629,7 @@ class SQLBaseStore(object):
Args:
table (str): table name
- keyvalues (dict): column names and values to select the rows with
+ keyvalues (dict|None): column names and values to select the rows with
retcol (str): column whos value we wish to retrieve.
Returns:
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index c8a1eb016b..56a0bde549 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
- def add_account_data_txn(txn, next_id):
- self._simple_upsert_txn(
- txn,
+ with self._account_data_id_gen.get_next() as next_id:
+ # no need to lock here as room_account_data has a unique constraint
+ # on (user_id, room_id, account_data_type) so _simple_upsert will
+ # retry if there is a conflict.
+ yield self._simple_upsert(
+ desc="add_room_account_data",
table="room_account_data",
keyvalues={
"user_id": user_id,
@@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
- }
- )
- txn.call_after(
- self._account_data_stream_cache.entity_has_changed,
- user_id, next_id,
+ },
+ lock=False,
)
- txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
- self._update_max_stream_id(txn, next_id)
- with self._account_data_id_gen.get_next() as next_id:
- yield self.runInteraction(
- "add_room_account_data", add_account_data_txn, next_id
- )
+ # it's theoretically possible for the above to succeed and the
+ # below to fail - in which case we might reuse a stream id on
+ # restart, and the above update might not get propagated. That
+ # doesn't sound any worse than the whole update getting lost,
+ # which is what would happen if we combined the two into one
+ # transaction.
+ yield self._update_max_stream_id(next_id)
+
+ self._account_data_stream_cache.entity_has_changed(user_id, next_id)
+ self.get_account_data_for_user.invalidate((user_id,))
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
@@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
- def add_account_data_txn(txn, next_id):
- self._simple_upsert_txn(
- txn,
+ with self._account_data_id_gen.get_next() as next_id:
+ # no need to lock here as account_data has a unique constraint on
+ # (user_id, account_data_type) so _simple_upsert will retry if
+ # there is a conflict.
+ yield self._simple_upsert(
+ desc="add_user_account_data",
table="account_data",
keyvalues={
"user_id": user_id,
@@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
- }
+ },
+ lock=False,
)
- txn.call_after(
- self._account_data_stream_cache.entity_has_changed,
+
+ # it's theoretically possible for the above to succeed and the
+ # below to fail - in which case we might reuse a stream id on
+ # restart, and the above update might not get propagated. That
+ # doesn't sound any worse than the whole update getting lost,
+ # which is what would happen if we combined the two into one
+ # transaction.
+ yield self._update_max_stream_id(next_id)
+
+ self._account_data_stream_cache.entity_has_changed(
user_id, next_id,
)
- txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
- txn.call_after(
- self.get_global_account_data_by_type_for_user.invalidate,
+ self.get_account_data_for_user.invalidate((user_id,))
+ self.get_global_account_data_by_type_for_user.invalidate(
(account_data_type, user_id,)
)
- self._update_max_stream_id(txn, next_id)
-
- with self._account_data_id_gen.get_next() as next_id:
- yield self.runInteraction(
- "add_user_account_data", add_account_data_txn, next_id
- )
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
- def _update_max_stream_id(self, txn, next_id):
+ def _update_max_stream_id(self, next_id):
"""Update the max stream_id
Args:
- txn: The database cursor
next_id(int): The the revision to advance to.
"""
- update_max_id_sql = (
- "UPDATE account_data_max_stream_id"
- " SET stream_id = ?"
- " WHERE stream_id < ?"
+ def _update(txn):
+ update_max_id_sql = (
+ "UPDATE account_data_max_stream_id"
+ " SET stream_id = ?"
+ " WHERE stream_id < ?"
+ )
+ txn.execute(update_max_id_sql, (next_id, next_id))
+ return self.runInteraction(
+ "update_account_data_max_stream_id",
+ _update,
)
- txn.execute(update_max_id_sql, (next_id, next_id))
@cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 6f235ac051..11a1b942f1 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -85,6 +85,7 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
+ self._all_done = False
@defer.inlineCallbacks
def start_doing_background_updates(self):
@@ -106,9 +107,41 @@ class BackgroundUpdateStore(SQLBaseStore):
"No more background updates to do."
" Unscheduling background update task."
)
+ self._all_done = True
defer.returnValue(None)
@defer.inlineCallbacks
+ def has_completed_background_updates(self):
+ """Check if all the background updates have completed
+
+ Returns:
+ Deferred[bool]: True if all background updates have completed
+ """
+ # if we've previously determined that there is nothing left to do, that
+ # is easy
+ if self._all_done:
+ defer.returnValue(True)
+
+ # obviously, if we have things in our queue, we're not done.
+ if self._background_update_queue:
+ defer.returnValue(False)
+
+ # otherwise, check if there are updates to be run. This is important,
+ # as we may be running on a worker which doesn't perform the bg updates
+ # itself, but still wants to wait for them to happen.
+ updates = yield self._simple_select_onecol(
+ "background_updates",
+ keyvalues=None,
+ retcol="1",
+ desc="check_background_updates",
+ )
+ if not updates:
+ self._all_done = True
+ defer.returnValue(True)
+
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
"""Does some amount of work on the next queued background update
@@ -269,7 +302,7 @@ class BackgroundUpdateStore(SQLBaseStore):
# Sqlite doesn't support concurrent creation of indexes.
#
# We don't use partial indices on SQLite as it wasn't introduced
- # until 3.8, and wheezy has 3.7
+ # until 3.8, and wheezy and CentOS 7 have 3.7
#
# We assume that sqlite doesn't give us invalid indices; however
# we may still end up with the index existing but the
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 52e5cdad70..a66ff7c1e0 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -12,13 +12,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.storage.background_updates import BackgroundUpdateStore
-from ._base import SQLBaseStore
-
-class MediaRepositoryStore(SQLBaseStore):
+class MediaRepositoryStore(BackgroundUpdateStore):
"""Persistence for attachments and avatars"""
+ def __init__(self, db_conn, hs):
+ super(MediaRepositoryStore, self).__init__(db_conn, hs)
+
+ self.register_background_index_update(
+ update_name='local_media_repository_url_idx',
+ index_name='local_media_repository_url_idx',
+ table='local_media_repository',
+ columns=['created_ts'],
+ where_clause='url_cache IS NOT NULL',
+ )
+
def get_default_thumbnails(self, top_level_type, sub_type):
return []
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 8b9544c209..3aa810981f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -254,8 +254,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
If None, tokens associated with any device (or no device) will
be deleted
Returns:
- defer.Deferred[list[str, str|None]]: a list of the deleted tokens
- and device IDs
+ defer.Deferred[list[str, int, str|None, int]]: a list of
+ (token, token id, device id) for each of the deleted tokens
"""
def f(txn):
keyvalues = {
@@ -272,12 +272,12 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
values.append(except_token_id)
txn.execute(
- "SELECT token, device_id FROM access_tokens WHERE %s" % where_clause,
+ "SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause,
values
)
- tokens_and_devices = [(r[0], r[1]) for r in txn]
+ tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
- for token, _ in tokens_and_devices:
+ for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql
index e2b775f038..b12f9b2ebf 100644
--- a/synapse/storage/schema/delta/44/expire_url_cache.sql
+++ b/synapse/storage/schema/delta/44/expire_url_cache.sql
@@ -13,7 +13,10 @@
* limitations under the License.
*/
-CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
+-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was
+-- removed and replaced with 46/local_media_repository_url_idx.sql.
+--
+-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
-- indices on expressions until 3.9.
diff --git a/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
new file mode 100644
index 0000000000..bbfc7f5d1a
--- /dev/null
+++ b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
@@ -0,0 +1,24 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will recreate the
+-- local_media_repository_url_idx index.
+--
+-- We do this as a bg update not because it is a particularly onerous
+-- operation, but because we'd like it to be a partial index if possible, and
+-- the background_index_update code will understand whether we are on
+-- postgres or sqlite and behave accordingly.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('local_media_repository_url_idx', '{}');
|