summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py70
1 files changed, 66 insertions, 4 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4664cfe6d9..340c0621cc 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,6 +27,7 @@ from synapse.api.errors import SynapseError
 
 from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple, OrderedDict
+from functools import wraps
 
 import synapse
 import synapse.metrics
@@ -150,6 +151,26 @@ class _EventPeristenceQueue(object):
 _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
+def _retry_on_integrity_error(func):
+    """Wraps a database function so that it gets retried on IntegrityError,
+    with `delete_existing=True` passed in.
+
+    Args:
+        func: function that returns a Deferred and accepts a `delete_existing` arg
+    """
+    @wraps(func)
+    @defer.inlineCallbacks
+    def f(self, *args, **kwargs):
+        try:
+            res = yield func(self, *args, **kwargs)
+        except self.database_engine.module.IntegrityError:
+            logger.exception("IntegrityError, retrying.")
+            res = yield func(self, *args, delete_existing=True, **kwargs)
+        defer.returnValue(res)
+
+    return f
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -229,8 +250,10 @@ class EventsStore(SQLBaseStore):
 
         self._event_persist_queue.handle_queue(room_id, persisting_queue)
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
-    def _persist_events(self, events_and_contexts, backfilled=False):
+    def _persist_events(self, events_and_contexts, backfilled=False,
+                        delete_existing=False):
         if not events_and_contexts:
             return
 
@@ -273,12 +296,15 @@ class EventsStore(SQLBaseStore):
                         self._persist_events_txn,
                         events_and_contexts=chunk,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
                     persist_event_counter.inc_by(len(chunk))
 
+    @_retry_on_integrity_error
     @defer.inlineCallbacks
     @log_function
-    def _persist_event(self, event, context, current_state=None, backfilled=False):
+    def _persist_event(self, event, context, current_state=None, backfilled=False,
+                       delete_existing=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
                 with self._state_groups_id_gen.get_next() as state_group_id:
@@ -291,6 +317,7 @@ class EventsStore(SQLBaseStore):
                         context=context,
                         current_state=current_state,
                         backfilled=backfilled,
+                        delete_existing=delete_existing,
                     )
                     persist_event_counter.inc()
         except _RollbackButIsFineException:
@@ -353,7 +380,8 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
+    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
+                           delete_existing=False):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -393,15 +421,20 @@ class EventsStore(SQLBaseStore):
             txn,
             [(event, context)],
             backfilled=backfilled,
+            delete_existing=delete_existing,
         )
 
     @log_function
-    def _persist_events_txn(self, txn, events_and_contexts, backfilled):
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+                            delete_existing=False):
         """Insert some number of room events into the necessary database tables.
 
         Rejected events are only inserted into the events table, the events_json table,
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
+
+        If delete_existing is True then existing events will be purged from the
+        database before insertion. This is useful when retrying due to IntegrityError.
         """
         # Ensure that we don't have the same event twice.
         # Pick the earliest non-outlier if there is one, else the earliest one.
@@ -537,6 +570,35 @@ class EventsStore(SQLBaseStore):
                 ]
             }
 
+        if delete_existing:
+            # For paranoia reasons, we go and delete all the existing entries
+            # for these events so we can reinsert them.
+            # This gets around any problems with some tables already having
+            # entries.
+
+            logger.info("Deleting existing")
+
+            for table in (
+                "events",
+                "event_json",
+                "event_content_hashes",
+                "event_destinations",
+                "event_edge_hashes",
+                "event_edges",
+                "event_forward_extremities",
+                "event_push_actions",
+                "event_reference_hashes",
+                "event_search",
+                "event_signatures",
+                "event_to_state_groups",
+                "rejections",
+                "redactions",
+            ):
+                txn.executemany(
+                    "DELETE FROM %s WHERE event_id = ?" % (table,),
+                    [(ev.event_id,) for ev, _ in events_and_contexts]
+                )
+
         self._simple_insert_many_txn(
             txn,
             table="event_json",