diff options
author | Eric Eastwood <erice@element.io> | 2021-11-18 20:09:38 -0600 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2021-11-18 20:09:38 -0600 |
commit | b8c60b9ad2d13815e868b6ea6159d5b77621d861 (patch) | |
tree | 4c24a3d552257c78612e9c94e2ea8a6055182f96 | |
parent | Refactor backfilled behavior into specific function parameters (diff) | |
download | synapse-b8c60b9ad2d13815e868b6ea6159d5b77621d861.tar.xz |
Pipe arguments across the function stack
-rw-r--r-- | synapse/handlers/federation_event.py | 42 | ||||
-rw-r--r-- | synapse/handlers/message.py | 18 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 53 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 44 | ||||
-rw-r--r-- | synapse/storage/persist_events.py | 82 | ||||
-rw-r--r-- | tests/replication/slave/storage/test_events.py | 7 |
6 files changed, 217 insertions, 29 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index ea625692a3..46606775d3 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1809,7 +1809,24 @@ class FederationEventHandler: try: await self.persist_events_and_notify( - event.room_id, [(event, context)], backfilled=backfilled + event.room_id, + [(event, context)], + # We should not send notifications about backfilled events. + inhibit_push_notifications=backfilled, + # We don't need to calculate the state for backfilled events and + # we there is no need to update the forward extrems because we + # already know this event happened in the past if it was + # backfilled. + should_calculate_state_and_forward_extrems=not backfilled, + # Backfilled events get a negative stream ordering so they don't + # come down incremental `/sync` + use_negative_stream_ordering=backfilled, + # Backfilled events do not affect the current local state + inhibit_local_membership_updates=backfilled, + # Backfilled events have negative stream ordering and happened + # in the past so we know that we don't need to update the + # stream_ordering tip for the room. + update_room_forward_stream_ordering=not backfilled, ) except Exception: run_in_background( @@ -1823,6 +1840,10 @@ class FederationEventHandler: event_and_contexts: Sequence[Tuple[EventBase, EventContext]], *, inhibit_push_notifications: bool = False, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> int: """Persists events and tells the notifier/pushers about them, if necessary. @@ -1835,6 +1856,19 @@ class FederationEventHandler: inhibit_push_notifications: Whether to stop the notifiers/pushers from knowing about the event. Usually this is done for any backfilled event. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: The stream ID after which all events have been persisted. @@ -1861,7 +1895,11 @@ class FederationEventHandler: # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. events, max_stream_token = await self._storage.persistence.persist_events( - event_and_contexts, backfilled=backfilled + event_and_contexts, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) if self._ephemeral_messages_enabled: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22dd4cf5fd..cd3c64d2d6 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1565,12 +1565,6 @@ class EventCreationHandler: errcode=Codes.INVALID_PARAM, ) - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True - # Note that this returns the event that was persisted, which may not be # the same as we passed in if it was deduplicated due transaction IDs. ( @@ -1578,7 +1572,17 @@ class EventCreationHandler: event_pos, max_stream_token, ) = await self.storage.persistence.persist_event( - event, context=context, backfilled=backfilled + event, + context=context, + # Make any historical messages behave like backfilled events + should_calculate_state_and_forward_extrems=not event.internal_metadata.is_historical(), + # We use a negative `stream_ordering`` for historical messages so + # they don't come down an incremental `/sync` and have the proper + # decrementing `stream_ordering` as we import so they sort + # as expected between two depths. + use_negative_stream_ordering=event.internal_metadata.is_historical(), + inhibit_local_membership_updates=event.internal_metadata.is_historical(), + update_room_forward_stream_ordering=not event.internal_metadata.is_historical(), ) if self._ephemeral_events_enabled: diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index d6be52ab71..68c6a470f7 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -70,16 +70,37 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): @staticmethod async def _serialize_payload( - store, room_id, event_and_contexts, inhibit_push_notifications: bool = False + store, + room_id, + event_and_contexts, + *, + inhibit_push_notifications: bool = False, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ): """ Args: store room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) - inhibit_push_notifications (bool): Whether to stop the notifiers/pushers + inhibit_push_notifications: Whether to stop the notifiers/pushers from knowing about the event. Usually this is done for any backfilled event. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. """ event_payloads = [] for event, context in event_and_contexts: @@ -100,6 +121,10 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): payload = { "events": event_payloads, "inhibit_push_notifications": inhibit_push_notifications, + "should_calculate_state_and_forward_extrems": should_calculate_state_and_forward_extrems, + "use_negative_stream_ordering": use_negative_stream_ordering, + "inhibit_local_membership_updates": inhibit_local_membership_updates, + "update_room_forward_stream_ordering": update_room_forward_stream_ordering, "room_id": room_id, } @@ -111,6 +136,22 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): room_id = content["room_id"] inhibit_push_notifications = content["inhibit_push_notifications"] + should_calculate_state_and_forward_extrems = content[ + "should_calculate_state_and_forward_extrems" + ] + use_negative_stream_ordering = content["use_negative_stream_ordering"] + inhibit_local_membership_updates = content[ + "inhibit_local_membership_updates" + ] + update_room_forward_stream_ordering = content[ + "update_room_forward_stream_ordering" + ] + + assert inhibit_push_notifications is not None + assert should_calculate_state_and_forward_extrems is not None + assert use_negative_stream_ordering is not None + assert inhibit_local_membership_updates is not None + assert update_room_forward_stream_ordering is not None event_payloads = content["events"] @@ -135,7 +176,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_event_handler.persist_events_and_notify( - room_id, event_and_contexts, inhibit_push_notifications + room_id, + event_and_contexts, + inhibit_push_notifications=inhibit_push_notifications, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 65ba2963d8..1e1fd8e425 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -126,6 +126,8 @@ class PersistEventsStore: state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -141,6 +143,13 @@ class PersistEventsStore: use_negative_stream_ordering: Whether to start stream_ordering on the negative side and decrement. Usually this is done for any backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: Resolves when the events have been persisted @@ -179,7 +188,8 @@ class PersistEventsStore: "persist_events", self._persist_events_txn, events_and_contexts=events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) @@ -320,8 +330,10 @@ class PersistEventsStore: def _persist_events_txn( self, txn: LoggingTransaction, + *, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): @@ -335,11 +347,18 @@ class PersistEventsStore: txn events_and_contexts: events to persist backfilled: True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. state_delta_for_room: The current-state delta for each room. - new_forward_extremetie: The new forward extremities for each room. + new_forward_extremeties: The new forward extremities for each room. For each room, a list of the event ids which are the forward extremities. @@ -368,7 +387,9 @@ class PersistEventsStore: ) self._update_room_depths_txn( - txn, events_and_contexts=events_and_contexts, backfilled=backfilled + txn, + events_and_contexts=events_and_contexts, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) # _update_outliers_txn filters out any events which have already been @@ -402,7 +423,7 @@ class PersistEventsStore: txn, events_and_contexts=events_and_contexts, all_events_and_contexts=all_events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # We call this last as it assumes we've inserted the events into @@ -1435,7 +1456,12 @@ class PersistEventsStore: return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _update_metadata_tables_txn( - self, txn, events_and_contexts, all_events_and_contexts, backfilled + self, + txn, + *, + events_and_contexts, + all_events_and_contexts, + inhibit_local_membership_updates: bool = False, ): """Update all the miscellaneous tables for new events @@ -1447,7 +1473,9 @@ class PersistEventsStore: events that we were going to persist. This includes events we've already persisted, etc, that wouldn't appear in events_and_context. - backfilled (bool): True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. """ # Insert all the push actions into the event_push_actions table. @@ -1521,7 +1549,7 @@ class PersistEventsStore: for event, _ in events_and_contexts if event.type == EventTypes.Member ], - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # Insert event_reference_hashes table. diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index dc8d3e8eff..101b469bb4 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -296,15 +296,29 @@ class EventsPersistenceStorage: async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], - backfilled: bool = False, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Tuple[List[EventBase], RoomStreamToken]: """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) - backfilled: Whether the results are retrieved from federation - via backfill or not. Used to determine if they're "new" events - which might update the current state etc. + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: List of events persisted, the current position room stream position. @@ -320,7 +334,12 @@ class EventsPersistenceStorage: async def enqueue(item): room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( - room_id, evs_ctxs, backfilled=backfilled + room_id, + evs_ctxs, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) @@ -350,9 +369,35 @@ class EventsPersistenceStorage: @opentracing.trace async def persist_event( - self, event: EventBase, context: EventContext, backfilled: bool = False + self, + event: EventBase, + context: EventContext, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: """ + Write a single event to the database. + + Args: + event: + context: + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. + Returns: The event, stream ordering of `event`, and the stream ordering of the latest persisted event. The returned event may not match the given @@ -363,7 +408,12 @@ class EventsPersistenceStorage: # event was deduplicated. (The dict may also include other entries if # the event was persisted in a batch with other events.) replaced_events = await self._event_persist_queue.add_to_queue( - event.room_id, [(event, context)], backfilled=backfilled + event.room_id, + [(event, context)], + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) replaced_event = replaced_events.get(event.event_id) if replaced_event: @@ -379,7 +429,11 @@ class EventsPersistenceStorage: async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> Dict[str, str]: """Callback for the _event_persist_queue @@ -391,6 +445,16 @@ class EventsPersistenceStorage: should_calculate_state_and_forward_extrems: Determines whether we need to calculate the state and new forward extremities for the room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: A dictionary of event ID to event ID we didn't persist as we already @@ -589,7 +653,9 @@ class EventsPersistenceStorage: current_state_for_room=current_state_for_room, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, - backfilled=backfilled, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, ) await self._handle_potentially_left_users(potentially_left_users) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index b25a06b427..dd54d89081 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -323,7 +323,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): if backfill: self.get_success( self.storage.persistence.persist_events( - [(event, context)], backfilled=True + [(event, context)], + # Backfilled event + should_calculate_state_and_forward_extrems=False, + use_negative_stream_ordering=True, + inhibit_local_membership_updates=True, + update_room_forward_stream_ordering=False, ) ) else: |