summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-11-18 20:09:38 -0600
committerEric Eastwood <erice@element.io>2021-11-18 20:09:38 -0600
commitb8c60b9ad2d13815e868b6ea6159d5b77621d861 (patch)
tree4c24a3d552257c78612e9c94e2ea8a6055182f96
parentRefactor backfilled behavior into specific function parameters (diff)
downloadsynapse-b8c60b9ad2d13815e868b6ea6159d5b77621d861.tar.xz
Pipe arguments across the function stack
-rw-r--r--synapse/handlers/federation_event.py42
-rw-r--r--synapse/handlers/message.py18
-rw-r--r--synapse/replication/http/federation.py53
-rw-r--r--synapse/storage/databases/main/events.py44
-rw-r--r--synapse/storage/persist_events.py82
-rw-r--r--tests/replication/slave/storage/test_events.py7
6 files changed, 217 insertions, 29 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index ea625692a3..46606775d3 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1809,7 +1809,24 @@ class FederationEventHandler:
 
         try:
             await self.persist_events_and_notify(
-                event.room_id, [(event, context)], backfilled=backfilled
+                event.room_id,
+                [(event, context)],
+                # We should not send notifications about backfilled events.
+                inhibit_push_notifications=backfilled,
+                # We don't need to calculate the state for backfilled events and
+                # we there is no need to update the forward extrems because we
+                # already know this event happened in the past if it was
+                # backfilled.
+                should_calculate_state_and_forward_extrems=not backfilled,
+                # Backfilled events get a negative stream ordering so they don't
+                # come down incremental `/sync`
+                use_negative_stream_ordering=backfilled,
+                # Backfilled events do not affect the current local state
+                inhibit_local_membership_updates=backfilled,
+                # Backfilled events have negative stream ordering and happened
+                # in the past so we know that we don't need to update the
+                # stream_ordering tip for the room.
+                update_room_forward_stream_ordering=not backfilled,
             )
         except Exception:
             run_in_background(
@@ -1823,6 +1840,10 @@ class FederationEventHandler:
         event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
         *,
         inhibit_push_notifications: bool = False,
+        should_calculate_state_and_forward_extrems: bool = True,
+        use_negative_stream_ordering: bool = False,
+        inhibit_local_membership_updates: bool = False,
+        update_room_forward_stream_ordering: bool = True,
     ) -> int:
         """Persists events and tells the notifier/pushers about them, if
         necessary.
@@ -1835,6 +1856,19 @@ class FederationEventHandler:
             inhibit_push_notifications: Whether to stop the notifiers/pushers
                 from knowing about the event. Usually this is done for any backfilled
                 event.
+            should_calculate_state_and_forward_extrems: Determines whether we
+                need to calculate the state and new forward extremities for the
+                room. This should be set to false for backfilled events.
+            use_negative_stream_ordering: Whether to start stream_ordering on
+                the negative side and decrement. Usually this is done for any
+                backfilled event.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+            update_room_forward_stream_ordering: Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
 
         Returns:
             The stream ID after which all events have been persisted.
@@ -1861,7 +1895,11 @@ class FederationEventHandler:
             # Note that this returns the events that were persisted, which may not be
             # the same as were passed in if some were deduplicated due to transaction IDs.
             events, max_stream_token = await self._storage.persistence.persist_events(
-                event_and_contexts, backfilled=backfilled
+                event_and_contexts,
+                should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
+                use_negative_stream_ordering=use_negative_stream_ordering,
+                inhibit_local_membership_updates=inhibit_local_membership_updates,
+                update_room_forward_stream_ordering=update_room_forward_stream_ordering,
             )
 
             if self._ephemeral_messages_enabled:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 22dd4cf5fd..cd3c64d2d6 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1565,12 +1565,6 @@ class EventCreationHandler:
                         errcode=Codes.INVALID_PARAM,
                     )
 
-        # Mark any `m.historical` messages as backfilled so they don't appear
-        # in `/sync` and have the proper decrementing `stream_ordering` as we import
-        backfilled = False
-        if event.internal_metadata.is_historical():
-            backfilled = True
-
         # Note that this returns the event that was persisted, which may not be
         # the same as we passed in if it was deduplicated due transaction IDs.
         (
@@ -1578,7 +1572,17 @@ class EventCreationHandler:
             event_pos,
             max_stream_token,
         ) = await self.storage.persistence.persist_event(
-            event, context=context, backfilled=backfilled
+            event,
+            context=context,
+            # Make any historical messages behave like backfilled events
+            should_calculate_state_and_forward_extrems=not event.internal_metadata.is_historical(),
+            # We use a negative `stream_ordering`` for historical messages so
+            # they don't come down an incremental `/sync` and have the proper
+            # decrementing `stream_ordering` as we import so they sort
+            # as expected between two depths.
+            use_negative_stream_ordering=event.internal_metadata.is_historical(),
+            inhibit_local_membership_updates=event.internal_metadata.is_historical(),
+            update_room_forward_stream_ordering=not event.internal_metadata.is_historical(),
         )
 
         if self._ephemeral_events_enabled:
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index d6be52ab71..68c6a470f7 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -70,16 +70,37 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
     @staticmethod
     async def _serialize_payload(
-        store, room_id, event_and_contexts, inhibit_push_notifications: bool = False
+        store,
+        room_id,
+        event_and_contexts,
+        *,
+        inhibit_push_notifications: bool = False,
+        should_calculate_state_and_forward_extrems: bool = True,
+        use_negative_stream_ordering: bool = False,
+        inhibit_local_membership_updates: bool = False,
+        update_room_forward_stream_ordering: bool = True,
     ):
         """
         Args:
             store
             room_id (str)
             event_and_contexts (list[tuple[FrozenEvent, EventContext]])
-            inhibit_push_notifications (bool): Whether to stop the notifiers/pushers
+            inhibit_push_notifications: Whether to stop the notifiers/pushers
                 from knowing about the event. Usually this is done for any backfilled
                 event.
+            should_calculate_state_and_forward_extrems: Determines whether we
+                need to calculate the state and new forward extremities for the
+                room. This should be set to false for backfilled events.
+            use_negative_stream_ordering: Whether to start stream_ordering on
+                the negative side and decrement. Usually this is done for any
+                backfilled event.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+            update_room_forward_stream_ordering: Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
         """
         event_payloads = []
         for event, context in event_and_contexts:
@@ -100,6 +121,10 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         payload = {
             "events": event_payloads,
             "inhibit_push_notifications": inhibit_push_notifications,
+            "should_calculate_state_and_forward_extrems": should_calculate_state_and_forward_extrems,
+            "use_negative_stream_ordering": use_negative_stream_ordering,
+            "inhibit_local_membership_updates": inhibit_local_membership_updates,
+            "update_room_forward_stream_ordering": update_room_forward_stream_ordering,
             "room_id": room_id,
         }
 
@@ -111,6 +136,22 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
             room_id = content["room_id"]
             inhibit_push_notifications = content["inhibit_push_notifications"]
+            should_calculate_state_and_forward_extrems = content[
+                "should_calculate_state_and_forward_extrems"
+            ]
+            use_negative_stream_ordering = content["use_negative_stream_ordering"]
+            inhibit_local_membership_updates = content[
+                "inhibit_local_membership_updates"
+            ]
+            update_room_forward_stream_ordering = content[
+                "update_room_forward_stream_ordering"
+            ]
+
+            assert inhibit_push_notifications is not None
+            assert should_calculate_state_and_forward_extrems is not None
+            assert use_negative_stream_ordering is not None
+            assert inhibit_local_membership_updates is not None
+            assert update_room_forward_stream_ordering is not None
 
             event_payloads = content["events"]
 
@@ -135,7 +176,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         logger.info("Got %d events from federation", len(event_and_contexts))
 
         max_stream_id = await self.federation_event_handler.persist_events_and_notify(
-            room_id, event_and_contexts, inhibit_push_notifications
+            room_id,
+            event_and_contexts,
+            inhibit_push_notifications=inhibit_push_notifications,
+            should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
+            use_negative_stream_ordering=use_negative_stream_ordering,
+            inhibit_local_membership_updates=inhibit_local_membership_updates,
+            update_room_forward_stream_ordering=update_room_forward_stream_ordering,
         )
 
         return 200, {"max_stream_id": max_stream_id}
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 65ba2963d8..1e1fd8e425 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -126,6 +126,8 @@ class PersistEventsStore:
         state_delta_for_room: Dict[str, DeltaState],
         new_forward_extremeties: Dict[str, List[str]],
         use_negative_stream_ordering: bool = False,
+        inhibit_local_membership_updates: bool = False,
+        update_room_forward_stream_ordering: bool = True,
     ) -> None:
         """Persist a set of events alongside updates to the current state and
         forward extremities tables.
@@ -141,6 +143,13 @@ class PersistEventsStore:
             use_negative_stream_ordering: Whether to start stream_ordering on
                 the negative side and decrement. Usually this is done for any
                 backfilled event.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+            update_room_forward_stream_ordering: Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
 
         Returns:
             Resolves when the events have been persisted
@@ -179,7 +188,8 @@ class PersistEventsStore:
                 "persist_events",
                 self._persist_events_txn,
                 events_and_contexts=events_and_contexts,
-                backfilled=backfilled,
+                inhibit_local_membership_updates=inhibit_local_membership_updates,
+                update_room_forward_stream_ordering=update_room_forward_stream_ordering,
                 state_delta_for_room=state_delta_for_room,
                 new_forward_extremeties=new_forward_extremeties,
             )
@@ -320,8 +330,10 @@ class PersistEventsStore:
     def _persist_events_txn(
         self,
         txn: LoggingTransaction,
+        *,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
-        backfilled: bool,
+        inhibit_local_membership_updates: bool = False,
+        update_room_forward_stream_ordering: bool = True,
         state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
         new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
     ):
@@ -335,11 +347,18 @@ class PersistEventsStore:
             txn
             events_and_contexts: events to persist
             backfilled: True if the events were backfilled
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+            update_room_forward_stream_ordering: Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
             delete_existing True to purge existing table rows for the events
                 from the database. This is useful when retrying due to
                 IntegrityError.
             state_delta_for_room: The current-state delta for each room.
-            new_forward_extremetie: The new forward extremities for each room.
+            new_forward_extremeties: The new forward extremities for each room.
                 For each room, a list of the event ids which are the forward
                 extremities.
 
@@ -368,7 +387,9 @@ class PersistEventsStore:
         )
 
         self._update_room_depths_txn(
-            txn, events_and_contexts=events_and_contexts, backfilled=backfilled
+            txn,
+            events_and_contexts=events_and_contexts,
+            update_room_forward_stream_ordering=update_room_forward_stream_ordering,
         )
 
         # _update_outliers_txn filters out any events which have already been
@@ -402,7 +423,7 @@ class PersistEventsStore:
             txn,
             events_and_contexts=events_and_contexts,
             all_events_and_contexts=all_events_and_contexts,
-            backfilled=backfilled,
+            inhibit_local_membership_updates=inhibit_local_membership_updates,
         )
 
         # We call this last as it assumes we've inserted the events into
@@ -1435,7 +1456,12 @@ class PersistEventsStore:
         return [ec for ec in events_and_contexts if ec[0] not in to_remove]
 
     def _update_metadata_tables_txn(
-        self, txn, events_and_contexts, all_events_and_contexts, backfilled
+        self,
+        txn,
+        *,
+        events_and_contexts,
+        all_events_and_contexts,
+        inhibit_local_membership_updates: bool = False,
     ):
         """Update all the miscellaneous tables for new events
 
@@ -1447,7 +1473,9 @@ class PersistEventsStore:
                 events that we were going to persist. This includes events
                 we've already persisted, etc, that wouldn't appear in
                 events_and_context.
-            backfilled (bool): True if the events were backfilled
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
         """
 
         # Insert all the push actions into the event_push_actions table.
@@ -1521,7 +1549,7 @@ class PersistEventsStore:
                 for event, _ in events_and_contexts
                 if event.type == EventTypes.Member
             ],
-            backfilled=backfilled,
+            inhibit_local_membership_updates=inhibit_local_membership_updates,
         )
 
         # Insert event_reference_hashes table.
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index dc8d3e8eff..101b469bb4 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -296,15 +296,29 @@ class EventsPersistenceStorage:
     async def persist_events(
         self,
         events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
-        backfilled: bool = False,
+        *,
+        should_calculate_state_and_forward_extrems: bool = True,
+        use_negative_stream_ordering: bool = False,
+        inhibit_local_membership_updates: bool = False,
+        update_room_forward_stream_ordering: bool = True,
     ) -> Tuple[List[EventBase], RoomStreamToken]:
         """
         Write events to the database
         Args:
             events_and_contexts: list of tuples of (event, context)
-            backfilled: Whether the results are retrieved from federation
-                via backfill or not. Used to determine if they're "new" events
-                which might update the current state etc.
+            should_calculate_state_and_forward_extrems: Determines whether we
+                need to calculate the state and new forward extremities for the
+                room. This should be set to false for backfilled events.
+            use_negative_stream_ordering: Whether to start stream_ordering on
+                the negative side and decrement. Usually this is done for any
+                backfilled event.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+            update_room_forward_stream_ordering: Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
 
         Returns:
             List of events persisted, the current position room stream position.
@@ -320,7 +334,12 @@ class EventsPersistenceStorage:
         async def enqueue(item):
             room_id, evs_ctxs = item
             return await self._event_persist_queue.add_to_queue(
-                room_id, evs_ctxs, backfilled=backfilled
+                room_id,
+                evs_ctxs,
+                should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
+                use_negative_stream_ordering=use_negative_stream_ordering,
+                inhibit_local_membership_updates=inhibit_local_membership_updates,
+                update_room_forward_stream_ordering=update_room_forward_stream_ordering,
             )
 
         ret_vals = await yieldable_gather_results(enqueue, partitioned.items())
@@ -350,9 +369,35 @@ class EventsPersistenceStorage:
 
     @opentracing.trace
     async def persist_event(
-        self, event: EventBase, context: EventContext, backfilled: bool = False
+        self,
+        event: EventBase,
+        context: EventContext,
+        *,
+        should_calculate_state_and_forward_extrems: bool = True,
+        use_negative_stream_ordering: bool = False,
+        inhibit_local_membership_updates: bool = False,
+        update_room_forward_stream_ordering: bool = True,
     ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
         """
+        Write a single event to the database.
+
+        Args:
+            event:
+            context:
+            should_calculate_state_and_forward_extrems: Determines whether we
+                need to calculate the state and new forward extremities for the
+                room. This should be set to false for backfilled events.
+            use_negative_stream_ordering: Whether to start stream_ordering on
+                the negative side and decrement. Usually this is done for any
+                backfilled event.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+            update_room_forward_stream_ordering: Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
+
         Returns:
             The event, stream ordering of `event`, and the stream ordering of the
             latest persisted event. The returned event may not match the given
@@ -363,7 +408,12 @@ class EventsPersistenceStorage:
         # event was deduplicated. (The dict may also include other entries if
         # the event was persisted in a batch with other events.)
         replaced_events = await self._event_persist_queue.add_to_queue(
-            event.room_id, [(event, context)], backfilled=backfilled
+            event.room_id,
+            [(event, context)],
+            should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
+            use_negative_stream_ordering=use_negative_stream_ordering,
+            inhibit_local_membership_updates=inhibit_local_membership_updates,
+            update_room_forward_stream_ordering=update_room_forward_stream_ordering,
         )
         replaced_event = replaced_events.get(event.event_id)
         if replaced_event:
@@ -379,7 +429,11 @@ class EventsPersistenceStorage:
     async def _persist_event_batch(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
+        *,
         should_calculate_state_and_forward_extrems: bool = True,
+        use_negative_stream_ordering: bool = False,
+        inhibit_local_membership_updates: bool = False,
+        update_room_forward_stream_ordering: bool = True,
     ) -> Dict[str, str]:
         """Callback for the _event_persist_queue
 
@@ -391,6 +445,16 @@ class EventsPersistenceStorage:
             should_calculate_state_and_forward_extrems: Determines whether we
                 need to calculate the state and new forward extremities for the
                 room. This should be set to false for backfilled events.
+            use_negative_stream_ordering: Whether to start stream_ordering on
+                the negative side and decrement. Usually this is done for any
+                backfilled event.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+            update_room_forward_stream_ordering: Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
 
         Returns:
             A dictionary of event ID to event ID we didn't persist as we already
@@ -589,7 +653,9 @@ class EventsPersistenceStorage:
                 current_state_for_room=current_state_for_room,
                 state_delta_for_room=state_delta_for_room,
                 new_forward_extremeties=new_forward_extremeties,
-                backfilled=backfilled,
+                use_negative_stream_ordering=use_negative_stream_ordering,
+                inhibit_local_membership_updates=inhibit_local_membership_updates,
+                update_room_forward_stream_ordering=update_room_forward_stream_ordering,
             )
 
             await self._handle_potentially_left_users(potentially_left_users)
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index b25a06b427..dd54d89081 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -323,7 +323,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         if backfill:
             self.get_success(
                 self.storage.persistence.persist_events(
-                    [(event, context)], backfilled=True
+                    [(event, context)],
+                    # Backfilled event
+                    should_calculate_state_and_forward_extrems=False,
+                    use_negative_stream_ordering=True,
+                    inhibit_local_membership_updates=True,
+                    update_room_forward_stream_ordering=False,
                 )
             )
         else: