diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 116 |
1 files changed, 92 insertions, 24 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c63ca36df6..e4dbaa3547 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -26,7 +26,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from canonicaljson import encode_canonical_json -from collections import deque, namedtuple +from collections import deque, namedtuple, OrderedDict +from functools import wraps import synapse import synapse.metrics @@ -150,6 +151,26 @@ class _EventPeristenceQueue(object): _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) +def _retry_on_integrity_error(func): + """Wraps a database function so that it gets retried on IntegrityError, + with `delete_existing=True` passed in. + + Args: + func: function that returns a Deferred and accepts a `delete_existing` arg + """ + @wraps(func) + @defer.inlineCallbacks + def f(self, *args, **kwargs): + try: + res = yield func(self, *args, **kwargs) + except self.database_engine.module.IntegrityError: + logger.exception("IntegrityError, retrying.") + res = yield func(self, *args, delete_existing=True, **kwargs) + defer.returnValue(res) + + return f + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -229,8 +250,10 @@ class EventsStore(SQLBaseStore): self._event_persist_queue.handle_queue(room_id, persisting_queue) + @_retry_on_integrity_error @defer.inlineCallbacks - def _persist_events(self, events_and_contexts, backfilled=False): + def _persist_events(self, events_and_contexts, backfilled=False, + delete_existing=False): if not events_and_contexts: return @@ -273,12 +296,15 @@ class EventsStore(SQLBaseStore): self._persist_events_txn, events_and_contexts=chunk, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc_by(len(chunk)) + @_retry_on_integrity_error @defer.inlineCallbacks @log_function - def _persist_event(self, event, context, current_state=None, backfilled=False): + def _persist_event(self, event, context, current_state=None, backfilled=False, + delete_existing=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -291,6 +317,7 @@ class EventsStore(SQLBaseStore): context=context, current_state=current_state, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc() except _RollbackButIsFineException: @@ -353,7 +380,8 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @log_function - def _persist_event_txn(self, txn, event, context, current_state, backfilled=False): + def _persist_event_txn(self, txn, event, context, current_state, backfilled=False, + delete_existing=False): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: @@ -393,16 +421,38 @@ class EventsStore(SQLBaseStore): txn, [(event, context)], backfilled=backfilled, + delete_existing=delete_existing, ) @log_function - def _persist_events_txn(self, txn, events_and_contexts, backfilled): + def _persist_events_txn(self, txn, events_and_contexts, backfilled, + delete_existing=False): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, and the rejections table. Things reading from those table will need to check whether the event was rejected. + + If delete_existing is True then existing events will be purged from the + database before insertion. This is useful when retrying due to IntegrityError. """ + # Ensure that we don't have the same event twice. + # Pick the earliest non-outlier if there is one, else the earliest one. + new_events_and_contexts = OrderedDict() + for event, context in events_and_contexts: + prev_event_context = new_events_and_contexts.get(event.event_id) + if prev_event_context: + if not event.internal_metadata.is_outlier(): + if prev_event_context[0].internal_metadata.is_outlier(): + # To ensure correct ordering we pop, as OrderedDict is + # ordered by first insertion. + new_events_and_contexts.pop(event.event_id, None) + new_events_and_contexts[event.event_id] = (event, context) + else: + new_events_and_contexts[event.event_id] = (event, context) + + events_and_contexts = new_events_and_contexts.values() + depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -433,8 +483,6 @@ class EventsStore(SQLBaseStore): for event_id, outlier in txn.fetchall() } - # Remove the events that we've seen before. - event_map = {} to_remove = set() for event, context in events_and_contexts: if context.rejected: @@ -445,23 +493,6 @@ class EventsStore(SQLBaseStore): to_remove.add(event) continue - # Handle the case of the list including the same event multiple - # times. The tricky thing here is when they differ by whether - # they are an outlier. - if event.event_id in event_map: - other = event_map[event.event_id] - - if not other.internal_metadata.is_outlier(): - to_remove.add(event) - continue - elif not event.internal_metadata.is_outlier(): - to_remove.add(event) - continue - else: - to_remove.add(other) - - event_map[event.event_id] = event - if event.event_id not in have_persisted: continue @@ -539,6 +570,43 @@ class EventsStore(SQLBaseStore): ] } + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + + logger.info("Deleting existing") + + for table in ( + "events", + "event_auth", + "event_json", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_push_actions", + "event_reference_hashes", + "event_search", + "event_signatures", + "event_to_state_groups", + "guest_access", + "history_visibility", + "local_invites", + "room_names", + "state_events", + "rejections", + "redactions", + "room_memberships", + "state_events" + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + self._simple_insert_many_txn( txn, table="event_json", |