summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-11-29 16:01:54 -0600
committerGitHub <noreply@github.com>2021-11-29 16:01:54 -0600
commitfb58611d212ea16be2b42d0e2441a6dc09f6f61d (patch)
treee992f380c02a9c3397da38931612fe0ca4f78358
parentSupport the stable /hierarchy endpoint from MSC2946 (#11329) (diff)
downloadsynapse-fb58611d212ea16be2b42d0e2441a6dc09f6f61d.tar.xz
Refactor `backfilled` into specific behavior function arguments (`_persist_events_and_state_updates`) (#11417)
Part of https://github.com/matrix-org/synapse/issues/11300

Call stack:

 - `_persist_events_and_state_updates` (added `use_negative_stream_ordering`)
    - `_persist_events_txn`
       - `_update_room_depths_txn` (added `update_room_forward_stream_ordering`)
       - `_update_metadata_tables_txn`
          - `_store_room_members_txn` (added `inhibit_local_membership_updates`)

Using keyword-only arguments (`*`) to reduce the mistakes from `backfilled` being left as a positional argument somewhere and being interpreted wrong by our new arguments.
-rw-r--r--changelog.d/11417.misc1
-rw-r--r--synapse/storage/databases/main/events.py74
-rw-r--r--synapse/storage/persist_events.py3
3 files changed, 57 insertions, 21 deletions
diff --git a/changelog.d/11417.misc b/changelog.d/11417.misc
new file mode 100644
index 0000000000..88dc4722da
--- /dev/null
+++ b/changelog.d/11417.misc
@@ -0,0 +1 @@
+Refactor `backfilled` into specific behavior function arguments (`_persist_events_and_state_updates` and downstream calls).
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index c3440de2cb..4171b904eb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -124,10 +124,12 @@ class PersistEventsStore:
     async def _persist_events_and_state_updates(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
+        *,
         current_state_for_room: Dict[str, StateMap[str]],
         state_delta_for_room: Dict[str, DeltaState],
         new_forward_extremeties: Dict[str, List[str]],
-        backfilled: bool = False,
+        use_negative_stream_ordering: bool = False,
+        inhibit_local_membership_updates: bool = False,
     ) -> None:
         """Persist a set of events alongside updates to the current state and
         forward extremities tables.
@@ -140,7 +142,14 @@ class PersistEventsStore:
                 room state
             new_forward_extremities: Map from room_id to list of event IDs
                 that are the new forward extremities of the room.
-            backfilled
+            use_negative_stream_ordering: Whether to start stream_ordering on
+                the negative side and decrement. This should be set as True
+                for backfilled events because backfilled events get a negative
+                stream ordering so they don't come down incremental `/sync`.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. This should be set to True
+                for backfilled events because backfilled events in the past do
+                not affect the current local state.
 
         Returns:
             Resolves when the events have been persisted
@@ -162,7 +171,7 @@ class PersistEventsStore:
         #
         # Note: Multiple instances of this function cannot be in flight at
         # the same time for the same room.
-        if backfilled:
+        if use_negative_stream_ordering:
             stream_ordering_manager = self._backfill_id_gen.get_next_mult(
                 len(events_and_contexts)
             )
@@ -179,13 +188,13 @@ class PersistEventsStore:
                 "persist_events",
                 self._persist_events_txn,
                 events_and_contexts=events_and_contexts,
-                backfilled=backfilled,
+                inhibit_local_membership_updates=inhibit_local_membership_updates,
                 state_delta_for_room=state_delta_for_room,
                 new_forward_extremeties=new_forward_extremeties,
             )
             persist_event_counter.inc(len(events_and_contexts))
 
-            if not backfilled:
+            if stream < 0:
                 # backfilled events have negative stream orderings, so we don't
                 # want to set the event_persisted_position to that.
                 synapse.metrics.event_persisted_position.set(
@@ -319,8 +328,9 @@ class PersistEventsStore:
     def _persist_events_txn(
         self,
         txn: LoggingTransaction,
+        *,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
-        backfilled: bool,
+        inhibit_local_membership_updates: bool = False,
         state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
         new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
     ):
@@ -333,7 +343,10 @@ class PersistEventsStore:
         Args:
             txn
             events_and_contexts: events to persist
-            backfilled: True if the events were backfilled
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. This should be set to True
+                for backfilled events because backfilled events in the past do
+                not affect the current local state.
             delete_existing True to purge existing table rows for the events
                 from the database. This is useful when retrying due to
                 IntegrityError.
@@ -366,9 +379,7 @@ class PersistEventsStore:
             events_and_contexts
         )
 
-        self._update_room_depths_txn(
-            txn, events_and_contexts=events_and_contexts, backfilled=backfilled
-        )
+        self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts)
 
         # _update_outliers_txn filters out any events which have already been
         # persisted, and returns the filtered list.
@@ -401,7 +412,7 @@ class PersistEventsStore:
             txn,
             events_and_contexts=events_and_contexts,
             all_events_and_contexts=all_events_and_contexts,
-            backfilled=backfilled,
+            inhibit_local_membership_updates=inhibit_local_membership_updates,
         )
 
         # We call this last as it assumes we've inserted the events into
@@ -1203,7 +1214,6 @@ class PersistEventsStore:
         self,
         txn,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
-        backfilled: bool,
     ):
         """Update min_depth for each room
 
@@ -1211,13 +1221,18 @@ class PersistEventsStore:
             txn (twisted.enterprise.adbapi.Connection): db connection
             events_and_contexts (list[(EventBase, EventContext)]): events
                 we are persisting
-            backfilled (bool): True if the events were backfilled
         """
         depth_updates: Dict[str, int] = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
             txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
-            if not backfilled:
+            # Then update the `stream_ordering` position to mark the latest
+            # event as the front of the room. This should not be done for
+            # backfilled events because backfilled events have negative
+            # stream_ordering and happened in the past so we know that we don't
+            # need to update the stream_ordering tip/front for the room.
+            assert event.internal_metadata.stream_ordering is not None
+            if event.internal_metadata.stream_ordering >= 0:
                 txn.call_after(
                     self.store._events_stream_cache.entity_has_changed,
                     event.room_id,
@@ -1430,7 +1445,12 @@ class PersistEventsStore:
         return [ec for ec in events_and_contexts if ec[0] not in to_remove]
 
     def _update_metadata_tables_txn(
-        self, txn, events_and_contexts, all_events_and_contexts, backfilled
+        self,
+        txn,
+        *,
+        events_and_contexts,
+        all_events_and_contexts,
+        inhibit_local_membership_updates: bool = False,
     ):
         """Update all the miscellaneous tables for new events
 
@@ -1442,7 +1462,10 @@ class PersistEventsStore:
                 events that we were going to persist. This includes events
                 we've already persisted, etc, that wouldn't appear in
                 events_and_context.
-            backfilled (bool): True if the events were backfilled
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. This should be set to True
+                for backfilled events because backfilled events in the past do
+                not affect the current local state.
         """
 
         # Insert all the push actions into the event_push_actions table.
@@ -1516,7 +1539,7 @@ class PersistEventsStore:
                 for event, _ in events_and_contexts
                 if event.type == EventTypes.Member
             ],
-            backfilled=backfilled,
+            inhibit_local_membership_updates=inhibit_local_membership_updates,
         )
 
         # Insert event_reference_hashes table.
@@ -1643,8 +1666,19 @@ class PersistEventsStore:
             txn, table="event_reference_hashes", values=vals
         )
 
-    def _store_room_members_txn(self, txn, events, backfilled):
-        """Store a room member in the database."""
+    def _store_room_members_txn(
+        self, txn, events, *, inhibit_local_membership_updates: bool = False
+    ):
+        """
+        Store a room member in the database.
+        Args:
+            txn: The transaction to use.
+            events: List of events to store.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. This should be set to True
+                for backfilled events because backfilled events in the past do
+                not affect the current local state.
+        """
 
         def non_null_str_or_none(val: Any) -> Optional[str]:
             return val if isinstance(val, str) and "\u0000" not in val else None
@@ -1687,7 +1721,7 @@ class PersistEventsStore:
             # band membership", like a remote invite or a rejection of a remote invite.
             if (
                 self.is_mine_id(event.state_key)
-                and not backfilled
+                and not inhibit_local_membership_updates
                 and event.internal_metadata.is_outlier()
                 and event.internal_metadata.is_out_of_band_membership()
             ):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 402f134d89..428d66a617 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -583,7 +583,8 @@ class EventsPersistenceStorage:
                 current_state_for_room=current_state_for_room,
                 state_delta_for_room=state_delta_for_room,
                 new_forward_extremeties=new_forward_extremeties,
-                backfilled=backfilled,
+                use_negative_stream_ordering=backfilled,
+                inhibit_local_membership_updates=backfilled,
             )
 
             await self._handle_potentially_left_users(potentially_left_users)