diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_push_actions.py | 24 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 5 | ||||
-rw-r--r-- | synapse/storage/registration.py | 4 | ||||
-rw-r--r-- | synapse/storage/schema/delta/30/as_users.py | 4 | ||||
-rw-r--r-- | synapse/storage/schema/delta/48/group_unique_indexes.py | 57 | ||||
-rw-r--r-- | synapse/storage/stream.py | 11 | ||||
-rw-r--r-- | synapse/storage/tags.py | 4 |
7 files changed, 93 insertions, 16 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e78f8d0114..c22762eb5c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) + @defer.inlineCallbacks def remove_push_actions_from_staging(self, event_id): """Called if we failed to persist the event to ensure that stale push actions don't build up in the DB @@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): event_id (str) """ - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) + try: + res = yield self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + defer.returnValue(res) + except Exception: + # this method is called from an exception handler, so propagating + # another exception here really isn't helpful - there's nothing + # the caller can do about it. Just log the exception and move on. + logger.exception( + "Error removing push actions after event persistence failure", + ) @defer.inlineCallbacks def _find_stream_orderings_for_times(self): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a937b9bceb..ba834854e1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -20,7 +20,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore): res = yield make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._get_event_from_row)( + run_in_background( + self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 6b557ca0cf..a50717db2d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -22,6 +22,8 @@ from synapse.storage import background_updates from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from six.moves import range + class RegistrationWorkerStore(SQLBaseStore): @cached() @@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore, match = regex.search(user_id) if match: found.add(int(match.group(1))) - for i in xrange(len(found) + 1): + for i in range(len(found) + 1): if i not in found: return i diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index c53e53c94f..85bd1a2006 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -14,6 +14,8 @@ import logging from synapse.config.appservice import load_appservices +from six.moves import range + logger = logging.getLogger(__name__) @@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): for as_id, user_ids in owned.items(): n = 100 - user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n)) + user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n)) for chunk in user_chunks: cur.execute( database_engine.convert_param_style( diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py new file mode 100644 index 0000000000..2233af87d7 --- /dev/null +++ b/synapse/storage/schema/delta/48/group_unique_indexes.py @@ -0,0 +1,57 @@ +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine +from synapse.storage.prepare_database import get_statements + +FIX_INDEXES = """ +-- rebuild indexes as uniques +DROP INDEX groups_invites_g_idx; +CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id); +DROP INDEX groups_users_g_idx; +CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id); + +-- rename other indexes to actually match their table names.. +DROP INDEX groups_users_u_idx; +CREATE INDEX group_users_u_idx ON group_users(user_id); +DROP INDEX groups_invites_u_idx; +CREATE INDEX group_invites_u_idx ON group_invites(user_id); +DROP INDEX groups_rooms_g_idx; +CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id); +DROP INDEX groups_rooms_r_idx; +CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); +""" + + +def run_create(cur, database_engine, *args, **kwargs): + rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" + + # remove duplicates from group_users & group_invites tables + cur.execute(""" + DELETE FROM group_users WHERE %s NOT IN ( + SELECT min(%s) FROM group_users GROUP BY group_id, user_id + ); + """ % (rowid, rowid)) + cur.execute(""" + DELETE FROM group_invites WHERE %s NOT IN ( + SELECT min(%s) FROM group_invites GROUP BY group_id, user_id + ); + """ % (rowid, rowid)) + + for statement in get_statements(FIX_INDEXES.splitlines()): + cur.execute(statement) + + +def run_upgrade(*args, **kwargs): + pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..f0784ba137 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,12 +41,14 @@ from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -196,13 +198,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(self.get_room_events_stream_for_room)( + run_in_background( + self.get_room_events_stream_for_room, room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ])) + ], consumeErrors=True)) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 13bff9f055..6671d3cfca 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -22,6 +22,8 @@ from twisted.internet import defer import simplejson as json import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore): batch_size = 50 results = [] - for i in xrange(0, len(tag_ids), batch_size): + for i in range(0, len(tag_ids), batch_size): tags = yield self.runInteraction( "get_all_updated_tag_content", get_tag_content, |