summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-07-15 10:34:53 +0100
committerGitHub <noreply@github.com>2020-07-15 10:34:53 +0100
commit1d9dca02f94a121920f37d6956e9db93d36f1821 (patch)
tree771ce39d98038f66733e923f2414f554cc4a9ef1
parentFix bug in per-room message retention policies. (#7850) (diff)
downloadsynapse-1d9dca02f94a121920f37d6956e9db93d36f1821.tar.xz
remove `retry_on_integrity_error` wrapper for persist_events (#7848)
As far as I can tell from the sentry logs, the only time this has actually done
anything in the last two years is when we had two master workers running at
once, and even then, it made a bit of a mess of it (see
https://github.com/matrix-org/synapse/issues/7845#issuecomment-658238739).

Generally I feel like this code is doing more harm than good.
-rw-r--r--changelog.d/7848.misc1
-rw-r--r--synapse/storage/data_stores/main/events.py67
2 files changed, 1 insertions, 67 deletions
diff --git a/changelog.d/7848.misc b/changelog.d/7848.misc
new file mode 100644
index 0000000000..d9db1d8357
--- /dev/null
+++ b/changelog.d/7848.misc
@@ -0,0 +1 @@
+Remove redundant `retry_on_integrity_error` wrapper for event persistence code.
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 230fb5cd7f..66f01aad84 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -17,7 +17,6 @@
 import itertools
 import logging
 from collections import OrderedDict, namedtuple
-from functools import wraps
 from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
 import attr
@@ -69,27 +68,6 @@ def encode_json(json_object):
 _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
-def _retry_on_integrity_error(func):
-    """Wraps a database function so that it gets retried on IntegrityError,
-    with `delete_existing=True` passed in.
-
-    Args:
-        func: function that returns a Deferred and accepts a `delete_existing` arg
-    """
-
-    @wraps(func)
-    @defer.inlineCallbacks
-    def f(self, *args, **kwargs):
-        try:
-            res = yield func(self, *args, delete_existing=False, **kwargs)
-        except self.database_engine.module.IntegrityError:
-            logger.exception("IntegrityError, retrying.")
-            res = yield func(self, *args, delete_existing=True, **kwargs)
-        return res
-
-    return f
-
-
 @attr.s(slots=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
@@ -134,7 +112,6 @@ class PersistEventsStore:
             hs.config.worker.writers.events == hs.get_instance_name()
         ), "Can only instantiate EventsStore on master"
 
-    @_retry_on_integrity_error
     @defer.inlineCallbacks
     def _persist_events_and_state_updates(
         self,
@@ -143,7 +120,6 @@ class PersistEventsStore:
         state_delta_for_room: Dict[str, DeltaState],
         new_forward_extremeties: Dict[str, List[str]],
         backfilled: bool = False,
-        delete_existing: bool = False,
     ):
         """Persist a set of events alongside updates to the current state and
         forward extremities tables.
@@ -157,7 +133,6 @@ class PersistEventsStore:
             new_forward_extremities: Map from room_id to list of event IDs
                 that are the new forward extremities of the room.
             backfilled
-            delete_existing
 
         Returns:
             Deferred: resolves when the events have been persisted
@@ -197,7 +172,6 @@ class PersistEventsStore:
                 self._persist_events_txn,
                 events_and_contexts=events_and_contexts,
                 backfilled=backfilled,
-                delete_existing=delete_existing,
                 state_delta_for_room=state_delta_for_room,
                 new_forward_extremeties=new_forward_extremeties,
             )
@@ -341,7 +315,6 @@ class PersistEventsStore:
         txn: LoggingTransaction,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
         backfilled: bool,
-        delete_existing: bool = False,
         state_delta_for_room: Dict[str, DeltaState] = {},
         new_forward_extremeties: Dict[str, List[str]] = {},
     ):
@@ -393,13 +366,6 @@ class PersistEventsStore:
         # From this point onwards the events are only events that we haven't
         # seen before.
 
-        if delete_existing:
-            # For paranoia reasons, we go and delete all the existing entries
-            # for these events so we can reinsert them.
-            # This gets around any problems with some tables already having
-            # entries.
-            self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts)
-
         self._store_event_txn(txn, events_and_contexts=events_and_contexts)
 
         # Insert into event_to_state_groups.
@@ -797,39 +763,6 @@ class PersistEventsStore:
 
         return [ec for ec in events_and_contexts if ec[0] not in to_remove]
 
-    @classmethod
-    def _delete_existing_rows_txn(cls, txn, events_and_contexts):
-        if not events_and_contexts:
-            # nothing to do here
-            return
-
-        logger.info("Deleting existing")
-
-        for table in (
-            "events",
-            "event_auth",
-            "event_json",
-            "event_edges",
-            "event_forward_extremities",
-            "event_reference_hashes",
-            "event_search",
-            "event_to_state_groups",
-            "state_events",
-            "rejections",
-            "redactions",
-            "room_memberships",
-        ):
-            txn.executemany(
-                "DELETE FROM %s WHERE event_id = ?" % (table,),
-                [(ev.event_id,) for ev, _ in events_and_contexts],
-            )
-
-        for table in ("event_push_actions",):
-            txn.executemany(
-                "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
-                [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts],
-            )
-
     def _store_event_txn(self, txn, events_and_contexts):
         """Insert new events into the event and event_json tables