summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-11-18 18:43:08 -0600
committerEric Eastwood <erice@element.io>2021-11-18 19:37:39 -0600
commitd203d22a6f07bcd2c445419caae6911f4b2d3687 (patch)
tree487bfad0b7beae76ec227cd9285df223593769a1
parentPrevent historical state from being pushed to an application service via `/tr... (diff)
downloadsynapse-d203d22a6f07bcd2c445419caae6911f4b2d3687.tar.xz
Refactor backfilled behavior into specific function parameters
Part of https://github.com/matrix-org/synapse/issues/11300
-rw-r--r--synapse/handlers/federation_event.py12
-rw-r--r--synapse/replication/http/federation.py17
-rw-r--r--synapse/storage/databases/main/events.py39
-rw-r--r--synapse/storage/persist_events.py10
4 files changed, 54 insertions, 24 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9917613298..ea625692a3 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1821,7 +1821,8 @@ class FederationEventHandler:
         self,
         room_id: str,
         event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
-        backfilled: bool = False,
+        *,
+        inhibit_push_notifications: bool = False,
     ) -> int:
         """Persists events and tells the notifier/pushers about them, if
         necessary.
@@ -1831,8 +1832,9 @@ class FederationEventHandler:
             event_and_contexts: Sequence of events with their associated
                 context that should be persisted. All events must belong to
                 the same room.
-            backfilled: Whether these events are a result of
-                backfilling or not
+            inhibit_push_notifications: Whether to stop the notifiers/pushers
+                from knowing about the event. Usually this is done for any backfilled
+                event.
 
         Returns:
             The stream ID after which all events have been persisted.
@@ -1850,7 +1852,7 @@ class FederationEventHandler:
                     store=self._store,
                     room_id=room_id,
                     event_and_contexts=batch,
-                    backfilled=backfilled,
+                    inhibit_push_notifications=inhibit_push_notifications,
                 )
             return result["max_stream_id"]
         else:
@@ -1867,7 +1869,7 @@ class FederationEventHandler:
                     # If there's an expiry timestamp on the event, schedule its expiry.
                     self._message_handler.maybe_schedule_expiry(event)
 
-            if not backfilled:  # Never notify for backfilled events
+            if not inhibit_push_notifications:  # Never notify for backfilled events
                 for event in events:
                     await self._notify_persisted_event(event, max_stream_token)
 
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 5ed535c90d..d6be52ab71 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
                 "rejected_reason": ..,   // The event.rejected_reason field
                 "context": { .. serialized event context .. },
             }],
-            "backfilled": false
+            "inhibit_push_notifications": false
         }
 
         200 OK
@@ -69,14 +69,17 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         self.federation_event_handler = hs.get_federation_event_handler()
 
     @staticmethod
-    async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
+    async def _serialize_payload(
+        store, room_id, event_and_contexts, inhibit_push_notifications: bool = False
+    ):
         """
         Args:
             store
             room_id (str)
             event_and_contexts (list[tuple[FrozenEvent, EventContext]])
-            backfilled (bool): Whether or not the events are the result of
-                backfilling
+            inhibit_push_notifications (bool): Whether to stop the notifiers/pushers
+                from knowing about the event. Usually this is done for any backfilled
+                event.
         """
         event_payloads = []
         for event, context in event_and_contexts:
@@ -96,7 +99,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
         payload = {
             "events": event_payloads,
-            "backfilled": backfilled,
+            "inhibit_push_notifications": inhibit_push_notifications,
             "room_id": room_id,
         }
 
@@ -107,7 +110,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
             content = parse_json_object_from_request(request)
 
             room_id = content["room_id"]
-            backfilled = content["backfilled"]
+            inhibit_push_notifications = content["inhibit_push_notifications"]
 
             event_payloads = content["events"]
 
@@ -132,7 +135,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         logger.info("Got %d events from federation", len(event_and_contexts))
 
         max_stream_id = await self.federation_event_handler.persist_events_and_notify(
-            room_id, event_and_contexts, backfilled
+            room_id, event_and_contexts, inhibit_push_notifications
         )
 
         return 200, {"max_stream_id": max_stream_id}
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 120e4807d1..65ba2963d8 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -121,10 +121,11 @@ class PersistEventsStore:
     async def _persist_events_and_state_updates(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
+        *,
         current_state_for_room: Dict[str, StateMap[str]],
         state_delta_for_room: Dict[str, DeltaState],
         new_forward_extremeties: Dict[str, List[str]],
-        backfilled: bool = False,
+        use_negative_stream_ordering: bool = False,
     ) -> None:
         """Persist a set of events alongside updates to the current state and
         forward extremities tables.
@@ -137,7 +138,9 @@ class PersistEventsStore:
                 room state
             new_forward_extremities: Map from room_id to list of event IDs
                 that are the new forward extremities of the room.
-            backfilled
+            use_negative_stream_ordering: Whether to start stream_ordering on
+                the negative side and decrement. Usually this is done for any
+                backfilled event.
 
         Returns:
             Resolves when the events have been persisted
@@ -159,7 +162,7 @@ class PersistEventsStore:
         #
         # Note: Multiple instances of this function cannot be in flight at
         # the same time for the same room.
-        if backfilled:
+        if use_negative_stream_ordering:
             stream_ordering_manager = self._backfill_id_gen.get_next_mult(
                 len(events_and_contexts)
             )
@@ -182,7 +185,8 @@ class PersistEventsStore:
             )
             persist_event_counter.inc(len(events_and_contexts))
 
-            if not backfilled:
+            # TODO: test that this actuall works
+            if stream < 0:
                 # backfilled events have negative stream orderings, so we don't
                 # want to set the event_persisted_position to that.
                 synapse.metrics.event_persisted_position.set(
@@ -1200,7 +1204,8 @@ class PersistEventsStore:
         self,
         txn,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
-        backfilled: bool,
+        *,
+        update_room_forward_stream_ordering: bool = True,
     ):
         """Update min_depth for each room
 
@@ -1208,13 +1213,16 @@ class PersistEventsStore:
             txn (twisted.enterprise.adbapi.Connection): db connection
             events_and_contexts (list[(EventBase, EventContext)]): events
                 we are persisting
-            backfilled (bool): True if the events were backfilled
+            update_room_forward_stream_ordering (bool): Whether to update the
+                stream_ordering position to mark the latest event as the front
+                of the room. This should only be set as false for backfilled
+                events.
         """
         depth_updates: Dict[str, int] = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
             txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
-            if not backfilled:
+            if update_room_forward_stream_ordering:
                 txn.call_after(
                     self.store._events_stream_cache.entity_has_changed,
                     event.room_id,
@@ -1638,8 +1646,19 @@ class PersistEventsStore:
             txn, table="event_reference_hashes", values=vals
         )
 
-    def _store_room_members_txn(self, txn, events, backfilled):
-        """Store a room member in the database."""
+    def _store_room_members_txn(
+        self, txn, events, *, inhibit_local_membership_updates: bool = False
+    ):
+        """
+        Store a room member in the database.
+
+        Args:
+            txn: The transaction to use.
+            events: List of events to store.
+            inhibit_local_membership_updates: Stop the local_current_membership
+                from being updated by these events. Usually this is done for
+                backfilled events.
+        """
 
         def non_null_str_or_none(val: Any) -> Optional[str]:
             return val if isinstance(val, str) and "\u0000" not in val else None
@@ -1682,7 +1701,7 @@ class PersistEventsStore:
             # band membership", like a remote invite or a rejection of a remote invite.
             if (
                 self.is_mine_id(event.state_key)
-                and not backfilled
+                and not inhibit_local_membership_updates
                 and event.internal_metadata.is_outlier()
                 and event.internal_metadata.is_out_of_band_membership()
             ):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 402f134d89..dc8d3e8eff 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -379,13 +379,19 @@ class EventsPersistenceStorage:
     async def _persist_event_batch(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
-        backfilled: bool = False,
+        should_calculate_state_and_forward_extrems: bool = True,
     ) -> Dict[str, str]:
         """Callback for the _event_persist_queue
 
         Calculates the change to current state and forward extremities, and
         persists the given events and with those updates.
 
+        Args:
+            events_and_contexts:
+            should_calculate_state_and_forward_extrems: Determines whether we
+                need to calculate the state and new forward extremities for the
+                room. This should be set to false for backfilled events.
+
         Returns:
             A dictionary of event ID to event ID we didn't persist as we already
             had another event persisted with the same TXN ID.
@@ -448,7 +454,7 @@ class EventsPersistenceStorage:
             # device lists as stale.
             potentially_left_users: Set[str] = set()
 
-            if not backfilled:
+            if should_calculate_state_and_forward_extrems:
                 with Measure(self._clock, "_calculate_state_and_extrem"):
                     # Work out the new "current state" for each room.
                     # We do this by working out what the new extremities are and then