diff options
author | Eric Eastwood <erice@element.io> | 2021-11-18 18:43:08 -0600 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2021-11-18 19:37:39 -0600 |
commit | d203d22a6f07bcd2c445419caae6911f4b2d3687 (patch) | |
tree | 487bfad0b7beae76ec227cd9285df223593769a1 | |
parent | Prevent historical state from being pushed to an application service via `/tr... (diff) | |
download | synapse-d203d22a6f07bcd2c445419caae6911f4b2d3687.tar.xz |
Refactor backfilled behavior into specific function parameters
Part of https://github.com/matrix-org/synapse/issues/11300
-rw-r--r-- | synapse/handlers/federation_event.py | 12 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 17 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 39 | ||||
-rw-r--r-- | synapse/storage/persist_events.py | 10 |
4 files changed, 54 insertions, 24 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9917613298..ea625692a3 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1821,7 +1821,8 @@ class FederationEventHandler: self, room_id: str, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], - backfilled: bool = False, + *, + inhibit_push_notifications: bool = False, ) -> int: """Persists events and tells the notifier/pushers about them, if necessary. @@ -1831,8 +1832,9 @@ class FederationEventHandler: event_and_contexts: Sequence of events with their associated context that should be persisted. All events must belong to the same room. - backfilled: Whether these events are a result of - backfilling or not + inhibit_push_notifications: Whether to stop the notifiers/pushers + from knowing about the event. Usually this is done for any backfilled + event. Returns: The stream ID after which all events have been persisted. @@ -1850,7 +1852,7 @@ class FederationEventHandler: store=self._store, room_id=room_id, event_and_contexts=batch, - backfilled=backfilled, + inhibit_push_notifications=inhibit_push_notifications, ) return result["max_stream_id"] else: @@ -1867,7 +1869,7 @@ class FederationEventHandler: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - if not backfilled: # Never notify for backfilled events + if not inhibit_push_notifications: # Never notify for backfilled events for event in events: await self._notify_persisted_event(event, max_stream_token) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 5ed535c90d..d6be52ab71 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): "rejected_reason": .., // The event.rejected_reason field "context": { .. serialized event context .. }, }], - "backfilled": false + "inhibit_push_notifications": false } 200 OK @@ -69,14 +69,17 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.federation_event_handler = hs.get_federation_event_handler() @staticmethod - async def _serialize_payload(store, room_id, event_and_contexts, backfilled): + async def _serialize_payload( + store, room_id, event_and_contexts, inhibit_push_notifications: bool = False + ): """ Args: store room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) - backfilled (bool): Whether or not the events are the result of - backfilling + inhibit_push_notifications (bool): Whether to stop the notifiers/pushers + from knowing about the event. Usually this is done for any backfilled + event. """ event_payloads = [] for event, context in event_and_contexts: @@ -96,7 +99,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): payload = { "events": event_payloads, - "backfilled": backfilled, + "inhibit_push_notifications": inhibit_push_notifications, "room_id": room_id, } @@ -107,7 +110,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): content = parse_json_object_from_request(request) room_id = content["room_id"] - backfilled = content["backfilled"] + inhibit_push_notifications = content["inhibit_push_notifications"] event_payloads = content["events"] @@ -132,7 +135,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_event_handler.persist_events_and_notify( - room_id, event_and_contexts, backfilled + room_id, event_and_contexts, inhibit_push_notifications ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 120e4807d1..65ba2963d8 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -121,10 +121,11 @@ class PersistEventsStore: async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], - backfilled: bool = False, + use_negative_stream_ordering: bool = False, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -137,7 +138,9 @@ class PersistEventsStore: room state new_forward_extremities: Map from room_id to list of event IDs that are the new forward extremities of the room. - backfilled + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. Returns: Resolves when the events have been persisted @@ -159,7 +162,7 @@ class PersistEventsStore: # # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. - if backfilled: + if use_negative_stream_ordering: stream_ordering_manager = self._backfill_id_gen.get_next_mult( len(events_and_contexts) ) @@ -182,7 +185,8 @@ class PersistEventsStore: ) persist_event_counter.inc(len(events_and_contexts)) - if not backfilled: + # TODO: test that this actuall works + if stream < 0: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( @@ -1200,7 +1204,8 @@ class PersistEventsStore: self, txn, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + *, + update_room_forward_stream_ordering: bool = True, ): """Update min_depth for each room @@ -1208,13 +1213,16 @@ class PersistEventsStore: txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - backfilled (bool): True if the events were backfilled + update_room_forward_stream_ordering (bool): Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) - if not backfilled: + if update_room_forward_stream_ordering: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, @@ -1638,8 +1646,19 @@ class PersistEventsStore: txn, table="event_reference_hashes", values=vals ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database.""" + def _store_room_members_txn( + self, txn, events, *, inhibit_local_membership_updates: bool = False + ): + """ + Store a room member in the database. + + Args: + txn: The transaction to use. + events: List of events to store. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + """ def non_null_str_or_none(val: Any) -> Optional[str]: return val if isinstance(val, str) and "\u0000" not in val else None @@ -1682,7 +1701,7 @@ class PersistEventsStore: # band membership", like a remote invite or a rejection of a remote invite. if ( self.is_mine_id(event.state_key) - and not backfilled + and not inhibit_local_membership_updates and event.internal_metadata.is_outlier() and event.internal_metadata.is_out_of_band_membership() ): diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 402f134d89..dc8d3e8eff 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -379,13 +379,19 @@ class EventsPersistenceStorage: async def _persist_event_batch( self, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool = False, + should_calculate_state_and_forward_extrems: bool = True, ) -> Dict[str, str]: """Callback for the _event_persist_queue Calculates the change to current state and forward extremities, and persists the given events and with those updates. + Args: + events_and_contexts: + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + Returns: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. @@ -448,7 +454,7 @@ class EventsPersistenceStorage: # device lists as stale. potentially_left_users: Set[str] = set() - if not backfilled: + if should_calculate_state_and_forward_extrems: with Measure(self._clock, "_calculate_state_and_extrem"): # Work out the new "current state" for each room. # We do this by working out what the new extremities are and then |