diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_push_actions.py | 24 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 5 | ||||
-rw-r--r-- | synapse/storage/registration.py | 4 | ||||
-rw-r--r-- | synapse/storage/room.py | 6 | ||||
-rw-r--r-- | synapse/storage/schema/delta/30/as_users.py | 4 | ||||
-rw-r--r-- | synapse/storage/search.py | 2 | ||||
-rw-r--r-- | synapse/storage/stream.py | 11 | ||||
-rw-r--r-- | synapse/storage/tags.py | 4 |
8 files changed, 40 insertions, 20 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e78f8d0114..c22762eb5c 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): "add_push_actions_to_staging", _add_push_actions_to_staging_txn ) + @defer.inlineCallbacks def remove_push_actions_from_staging(self, event_id): """Called if we failed to persist the event to ensure that stale push actions don't build up in the DB @@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): event_id (str) """ - return self._simple_delete( - table="event_push_actions_staging", - keyvalues={ - "event_id": event_id, - }, - desc="remove_push_actions_from_staging", - ) + try: + res = yield self._simple_delete( + table="event_push_actions_staging", + keyvalues={ + "event_id": event_id, + }, + desc="remove_push_actions_from_staging", + ) + defer.returnValue(res) + except Exception: + # this method is called from an exception handler, so propagating + # another exception here really isn't helpful - there's nothing + # the caller can do about it. Just log the exception and move on. + logger.exception( + "Error removing push actions after event persistence failure", + ) @defer.inlineCallbacks def _find_stream_orderings_for_times(self): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a937b9bceb..ba834854e1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -20,7 +20,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore): res = yield make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._get_event_from_row)( + run_in_background( + self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 6b557ca0cf..a50717db2d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -22,6 +22,8 @@ from synapse.storage import background_updates from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from six.moves import range + class RegistrationWorkerStore(SQLBaseStore): @cached() @@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore, match = regex.search(user_id) if match: found.add(int(match.group(1))) - for i in xrange(len(found) + 1): + for i in range(len(found) + 1): if i not in found: return i diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 740c036975..ea6a189185 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore): # Convert the IDs to MXC URIs for media_id in local_mxcs: - local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id)) + local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id)) for hostname, media_id in remote_mxcs: remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id)) @@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore): while next_token: sql = """ SELECT stream_ordering, json FROM events - JOIN event_json USING (event_id) + JOIN event_json USING (room_id, event_id) WHERE room_id = ? AND stream_ordering < ? AND contains_url = ? AND outlier = ? @@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore): if matches: hostname = matches.group(1) media_id = matches.group(2) - if hostname == self.hostname: + if hostname == self.hs.hostname: local_media_mxcs.append(media_id) else: remote_media_mxcs.append((hostname, media_id)) diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py index c53e53c94f..85bd1a2006 100644 --- a/synapse/storage/schema/delta/30/as_users.py +++ b/synapse/storage/schema/delta/30/as_users.py @@ -14,6 +14,8 @@ import logging from synapse.config.appservice import load_appservices +from six.moves import range + logger = logging.getLogger(__name__) @@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): for as_id, user_ids in owned.items(): n = 100 - user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n)) + user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n)) for chunk in user_chunks: cur.execute( database_engine.convert_param_style( diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 426cbe6e1a..6ba3e59889 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore): sql = ( "SELECT stream_ordering, event_id, room_id, type, json, " " origin_server_ts FROM events" - " JOIN event_json USING (event_id)" + " JOIN event_json USING (room_id, event_id)" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..f0784ba137 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,12 +41,14 @@ from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -196,13 +198,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(self.get_room_events_stream_for_room)( + run_in_background( + self.get_room_events_stream_for_room, room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ])) + ], consumeErrors=True)) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 13bff9f055..6671d3cfca 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -22,6 +22,8 @@ from twisted.internet import defer import simplejson as json import logging +from six.moves import range + logger = logging.getLogger(__name__) @@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore): batch_size = 50 results = [] - for i in xrange(0, len(tag_ids), batch_size): + for i in range(0, len(tag_ids), batch_size): tags = yield self.runInteraction( "get_all_updated_tag_content", get_tag_content, |