summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7848.misc1
-rw-r--r--synapse/storage/data_stores/main/events.py67
2 files changed, 1 insertions, 67 deletions
diff --git a/changelog.d/7848.misc b/changelog.d/7848.misc
new file mode 100644
index 0000000000..d9db1d8357
--- /dev/null
+++ b/changelog.d/7848.misc
@@ -0,0 +1 @@
+Remove redundant `retry_on_integrity_error` wrapper for event persistence code.
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 230fb5cd7f..66f01aad84 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -17,7 +17,6 @@
 import itertools
 import logging
 from collections import OrderedDict, namedtuple
-from functools import wraps
 from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
 import attr
@@ -69,27 +68,6 @@ def encode_json(json_object):
 _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
-def _retry_on_integrity_error(func):
-    """Wraps a database function so that it gets retried on IntegrityError,
-    with `delete_existing=True` passed in.
-
-    Args:
-        func: function that returns a Deferred and accepts a `delete_existing` arg
-    """
-
-    @wraps(func)
-    @defer.inlineCallbacks
-    def f(self, *args, **kwargs):
-        try:
-            res = yield func(self, *args, delete_existing=False, **kwargs)
-        except self.database_engine.module.IntegrityError:
-            logger.exception("IntegrityError, retrying.")
-            res = yield func(self, *args, delete_existing=True, **kwargs)
-        return res
-
-    return f
-
-
 @attr.s(slots=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
@@ -134,7 +112,6 @@ class PersistEventsStore:
             hs.config.worker.writers.events == hs.get_instance_name()
         ), "Can only instantiate EventsStore on master"
 
-    @_retry_on_integrity_error
     @defer.inlineCallbacks
     def _persist_events_and_state_updates(
         self,
@@ -143,7 +120,6 @@ class PersistEventsStore:
         state_delta_for_room: Dict[str, DeltaState],
         new_forward_extremeties: Dict[str, List[str]],
         backfilled: bool = False,
-        delete_existing: bool = False,
     ):
         """Persist a set of events alongside updates to the current state and
         forward extremities tables.
@@ -157,7 +133,6 @@ class PersistEventsStore:
             new_forward_extremities: Map from room_id to list of event IDs
                 that are the new forward extremities of the room.
             backfilled
-            delete_existing
 
         Returns:
             Deferred: resolves when the events have been persisted
@@ -197,7 +172,6 @@ class PersistEventsStore:
                 self._persist_events_txn,
                 events_and_contexts=events_and_contexts,
                 backfilled=backfilled,
-                delete_existing=delete_existing,
                 state_delta_for_room=state_delta_for_room,
                 new_forward_extremeties=new_forward_extremeties,
             )
@@ -341,7 +315,6 @@ class PersistEventsStore:
         txn: LoggingTransaction,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
         backfilled: bool,
-        delete_existing: bool = False,
         state_delta_for_room: Dict[str, DeltaState] = {},
         new_forward_extremeties: Dict[str, List[str]] = {},
     ):
@@ -393,13 +366,6 @@ class PersistEventsStore:
         # From this point onwards the events are only events that we haven't
         # seen before.
 
-        if delete_existing:
-            # For paranoia reasons, we go and delete all the existing entries
-            # for these events so we can reinsert them.
-            # This gets around any problems with some tables already having
-            # entries.
-            self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts)
-
         self._store_event_txn(txn, events_and_contexts=events_and_contexts)
 
         # Insert into event_to_state_groups.
@@ -797,39 +763,6 @@ class PersistEventsStore:
 
         return [ec for ec in events_and_contexts if ec[0] not in to_remove]
 
-    @classmethod
-    def _delete_existing_rows_txn(cls, txn, events_and_contexts):
-        if not events_and_contexts:
-            # nothing to do here
-            return
-
-        logger.info("Deleting existing")
-
-        for table in (
-            "events",
-            "event_auth",
-            "event_json",
-            "event_edges",
-            "event_forward_extremities",
-            "event_reference_hashes",
-            "event_search",
-            "event_to_state_groups",
-            "state_events",
-            "rejections",
-            "redactions",
-            "room_memberships",
-        ):
-            txn.executemany(
-                "DELETE FROM %s WHERE event_id = ?" % (table,),
-                [(ev.event_id,) for ev, _ in events_and_contexts],
-            )
-
-        for table in ("event_push_actions",):
-            txn.executemany(
-                "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
-                [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts],
-            )
-
     def _store_event_txn(self, txn, events_and_contexts):
         """Insert new events into the event and event_json tables