diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index c3440de2cb..4171b904eb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -124,10 +124,12 @@ class PersistEventsStore:
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
+ *,
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
- backfilled: bool = False,
+ use_negative_stream_ordering: bool = False,
+ inhibit_local_membership_updates: bool = False,
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
@@ -140,7 +142,14 @@ class PersistEventsStore:
room state
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
- backfilled
+ use_negative_stream_ordering: Whether to start stream_ordering on
+ the negative side and decrement. This should be set as True
+ for backfilled events because backfilled events get a negative
+ stream ordering so they don't come down incremental `/sync`.
+ inhibit_local_membership_updates: Stop the local_current_membership
+ from being updated by these events. This should be set to True
+ for backfilled events because backfilled events in the past do
+ not affect the current local state.
Returns:
Resolves when the events have been persisted
@@ -162,7 +171,7 @@ class PersistEventsStore:
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
- if backfilled:
+ if use_negative_stream_ordering:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
@@ -179,13 +188,13 @@ class PersistEventsStore:
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
- backfilled=backfilled,
+ inhibit_local_membership_updates=inhibit_local_membership_updates,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc(len(events_and_contexts))
- if not backfilled:
+ if stream < 0:
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set(
@@ -319,8 +328,9 @@ class PersistEventsStore:
def _persist_events_txn(
self,
txn: LoggingTransaction,
+ *,
events_and_contexts: List[Tuple[EventBase, EventContext]],
- backfilled: bool,
+ inhibit_local_membership_updates: bool = False,
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
):
@@ -333,7 +343,10 @@ class PersistEventsStore:
Args:
txn
events_and_contexts: events to persist
- backfilled: True if the events were backfilled
+ inhibit_local_membership_updates: Stop the local_current_membership
+ from being updated by these events. This should be set to True
+ for backfilled events because backfilled events in the past do
+ not affect the current local state.
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
@@ -366,9 +379,7 @@ class PersistEventsStore:
events_and_contexts
)
- self._update_room_depths_txn(
- txn, events_and_contexts=events_and_contexts, backfilled=backfilled
- )
+ self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts)
# _update_outliers_txn filters out any events which have already been
# persisted, and returns the filtered list.
@@ -401,7 +412,7 @@ class PersistEventsStore:
txn,
events_and_contexts=events_and_contexts,
all_events_and_contexts=all_events_and_contexts,
- backfilled=backfilled,
+ inhibit_local_membership_updates=inhibit_local_membership_updates,
)
# We call this last as it assumes we've inserted the events into
@@ -1203,7 +1214,6 @@ class PersistEventsStore:
self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
- backfilled: bool,
):
"""Update min_depth for each room
@@ -1211,13 +1221,18 @@ class PersistEventsStore:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
- backfilled (bool): True if the events were backfilled
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
- if not backfilled:
+ # Then update the `stream_ordering` position to mark the latest
+ # event as the front of the room. This should not be done for
+ # backfilled events because backfilled events have negative
+ # stream_ordering and happened in the past so we know that we don't
+ # need to update the stream_ordering tip/front for the room.
+ assert event.internal_metadata.stream_ordering is not None
+ if event.internal_metadata.stream_ordering >= 0:
txn.call_after(
self.store._events_stream_cache.entity_has_changed,
event.room_id,
@@ -1430,7 +1445,12 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
def _update_metadata_tables_txn(
- self, txn, events_and_contexts, all_events_and_contexts, backfilled
+ self,
+ txn,
+ *,
+ events_and_contexts,
+ all_events_and_contexts,
+ inhibit_local_membership_updates: bool = False,
):
"""Update all the miscellaneous tables for new events
@@ -1442,7 +1462,10 @@ class PersistEventsStore:
events that we were going to persist. This includes events
we've already persisted, etc, that wouldn't appear in
events_and_context.
- backfilled (bool): True if the events were backfilled
+ inhibit_local_membership_updates: Stop the local_current_membership
+ from being updated by these events. This should be set to True
+ for backfilled events because backfilled events in the past do
+ not affect the current local state.
"""
# Insert all the push actions into the event_push_actions table.
@@ -1516,7 +1539,7 @@ class PersistEventsStore:
for event, _ in events_and_contexts
if event.type == EventTypes.Member
],
- backfilled=backfilled,
+ inhibit_local_membership_updates=inhibit_local_membership_updates,
)
# Insert event_reference_hashes table.
@@ -1643,8 +1666,19 @@ class PersistEventsStore:
txn, table="event_reference_hashes", values=vals
)
- def _store_room_members_txn(self, txn, events, backfilled):
- """Store a room member in the database."""
+ def _store_room_members_txn(
+ self, txn, events, *, inhibit_local_membership_updates: bool = False
+ ):
+ """
+ Store a room member in the database.
+ Args:
+ txn: The transaction to use.
+ events: List of events to store.
+ inhibit_local_membership_updates: Stop the local_current_membership
+ from being updated by these events. This should be set to True
+ for backfilled events because backfilled events in the past do
+ not affect the current local state.
+ """
def non_null_str_or_none(val: Any) -> Optional[str]:
return val if isinstance(val, str) and "\u0000" not in val else None
@@ -1687,7 +1721,7 @@ class PersistEventsStore:
# band membership", like a remote invite or a rejection of a remote invite.
if (
self.is_mine_id(event.state_key)
- and not backfilled
+ and not inhibit_local_membership_updates
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 402f134d89..428d66a617 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -583,7 +583,8 @@ class EventsPersistenceStorage:
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
- backfilled=backfilled,
+ use_negative_stream_ordering=backfilled,
+ inhibit_local_membership_updates=backfilled,
)
await self._handle_potentially_left_users(potentially_left_users)
|