diff options
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r-- | synapse/storage/databases/main/events.py | 107 |
1 files changed, 73 insertions, 34 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 06832221ad..4e528612ea 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -15,7 +15,7 @@ # limitations under the License. import itertools import logging -from collections import OrderedDict, namedtuple +from collections import OrderedDict from typing import ( TYPE_CHECKING, Any, @@ -41,9 +41,10 @@ from synapse.events.snapshot import EventContext # noqa: F401 from synapse.logging.utils import log_function from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool, LoggingTransaction +from synapse.storage.databases.main.events_worker import EventCacheEntry from synapse.storage.databases.main.search import SearchEntry from synapse.storage.types import Connection -from synapse.storage.util.id_generators import MultiWriterIdGenerator +from synapse.storage.util.id_generators import AbstractStreamIdGenerator from synapse.storage.util.sequence import SequenceGenerator from synapse.types import StateMap, get_domain_from_id from synapse.util import json_encoder @@ -64,9 +65,6 @@ event_counter = Counter( ) -_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) - - @attr.s(slots=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -108,23 +106,30 @@ class PersistEventsStore: self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages self.is_mine_id = hs.is_mine_id - # Ideally we'd move these ID gens here, unfortunately some other ID - # generators are chained off them so doing so is a bit of a PITA. - self._backfill_id_gen: MultiWriterIdGenerator = self.store._backfill_id_gen - self._stream_id_gen: MultiWriterIdGenerator = self.store._stream_id_gen - # This should only exist on instances that are configured to write assert ( hs.get_instance_name() in hs.config.worker.writers.events ), "Can only instantiate EventsStore on master" + # Since we have been configured to write, we ought to have id generators, + # rather than id trackers. + assert isinstance(self.store._backfill_id_gen, AbstractStreamIdGenerator) + assert isinstance(self.store._stream_id_gen, AbstractStreamIdGenerator) + + # Ideally we'd move these ID gens here, unfortunately some other ID + # generators are chained off them so doing so is a bit of a PITA. + self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen + self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], + *, current_state_for_room: Dict[str, StateMap[str]], state_delta_for_room: Dict[str, DeltaState], new_forward_extremeties: Dict[str, List[str]], - backfilled: bool = False, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -137,7 +142,14 @@ class PersistEventsStore: room state new_forward_extremities: Map from room_id to list of event IDs that are the new forward extremities of the room. - backfilled + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. This should be set as True + for backfilled events because backfilled events get a negative + stream ordering so they don't come down incremental `/sync`. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. Returns: Resolves when the events have been persisted @@ -159,7 +171,7 @@ class PersistEventsStore: # # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. - if backfilled: + if use_negative_stream_ordering: stream_ordering_manager = self._backfill_id_gen.get_next_mult( len(events_and_contexts) ) @@ -176,13 +188,13 @@ class PersistEventsStore: "persist_events", self._persist_events_txn, events_and_contexts=events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, state_delta_for_room=state_delta_for_room, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc(len(events_and_contexts)) - if not backfilled: + if stream < 0: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. synapse.metrics.event_persisted_position.set( @@ -316,8 +328,9 @@ class PersistEventsStore: def _persist_events_txn( self, txn: LoggingTransaction, + *, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, + inhibit_local_membership_updates: bool = False, state_delta_for_room: Optional[Dict[str, DeltaState]] = None, new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): @@ -330,7 +343,10 @@ class PersistEventsStore: Args: txn events_and_contexts: events to persist - backfilled: True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. delete_existing True to purge existing table rows for the events from the database. This is useful when retrying due to IntegrityError. @@ -363,9 +379,7 @@ class PersistEventsStore: events_and_contexts ) - self._update_room_depths_txn( - txn, events_and_contexts=events_and_contexts, backfilled=backfilled - ) + self._update_room_depths_txn(txn, events_and_contexts=events_and_contexts) # _update_outliers_txn filters out any events which have already been # persisted, and returns the filtered list. @@ -398,7 +412,7 @@ class PersistEventsStore: txn, events_and_contexts=events_and_contexts, all_events_and_contexts=all_events_and_contexts, - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # We call this last as it assumes we've inserted the events into @@ -561,9 +575,9 @@ class PersistEventsStore: # fetch their auth event info. while missing_auth_chains: sql = """ - SELECT event_id, events.type, state_key, chain_id, sequence_number + SELECT event_id, events.type, se.state_key, chain_id, sequence_number FROM events - INNER JOIN state_events USING (event_id) + INNER JOIN state_events AS se USING (event_id) LEFT JOIN event_auth_chains USING (event_id) WHERE """ @@ -1200,7 +1214,6 @@ class PersistEventsStore: self, txn, events_and_contexts: List[Tuple[EventBase, EventContext]], - backfilled: bool, ): """Update min_depth for each room @@ -1208,13 +1221,18 @@ class PersistEventsStore: txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting - backfilled (bool): True if the events were backfilled """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids txn.call_after(self.store._invalidate_get_event_cache, event.event_id) - if not backfilled: + # Then update the `stream_ordering` position to mark the latest + # event as the front of the room. This should not be done for + # backfilled events because backfilled events have negative + # stream_ordering and happened in the past so we know that we don't + # need to update the stream_ordering tip/front for the room. + assert event.internal_metadata.stream_ordering is not None + if event.internal_metadata.stream_ordering >= 0: txn.call_after( self.store._events_stream_cache.entity_has_changed, event.room_id, @@ -1427,7 +1445,12 @@ class PersistEventsStore: return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _update_metadata_tables_txn( - self, txn, events_and_contexts, all_events_and_contexts, backfilled + self, + txn, + *, + events_and_contexts, + all_events_and_contexts, + inhibit_local_membership_updates: bool = False, ): """Update all the miscellaneous tables for new events @@ -1439,7 +1462,10 @@ class PersistEventsStore: events that we were going to persist. This includes events we've already persisted, etc, that wouldn't appear in events_and_context. - backfilled (bool): True if the events were backfilled + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. """ # Insert all the push actions into the event_push_actions table. @@ -1513,7 +1539,7 @@ class PersistEventsStore: for event, _ in events_and_contexts if event.type == EventTypes.Member ], - backfilled=backfilled, + inhibit_local_membership_updates=inhibit_local_membership_updates, ) # Insert event_reference_hashes table. @@ -1553,11 +1579,13 @@ class PersistEventsStore: for row in rows: event = ev_map[row["event_id"]] if not row["rejects"] and not row["redacts"]: - to_prefill.append(_EventCacheEntry(event=event, redacted_event=None)) + to_prefill.append(EventCacheEntry(event=event, redacted_event=None)) def prefill(): for cache_entry in to_prefill: - self.store._get_event_cache.set((cache_entry[0].event_id,), cache_entry) + self.store._get_event_cache.set( + (cache_entry.event.event_id,), cache_entry + ) txn.call_after(prefill) @@ -1638,8 +1666,19 @@ class PersistEventsStore: txn, table="event_reference_hashes", values=vals ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database.""" + def _store_room_members_txn( + self, txn, events, *, inhibit_local_membership_updates: bool = False + ): + """ + Store a room member in the database. + Args: + txn: The transaction to use. + events: List of events to store. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. This should be set to True + for backfilled events because backfilled events in the past do + not affect the current local state. + """ def non_null_str_or_none(val: Any) -> Optional[str]: return val if isinstance(val, str) and "\u0000" not in val else None @@ -1682,7 +1721,7 @@ class PersistEventsStore: # band membership", like a remote invite or a rejection of a remote invite. if ( self.is_mine_id(event.state_key) - and not backfilled + and not inhibit_local_membership_updates and event.internal_metadata.is_outlier() and event.internal_metadata.is_out_of_band_membership() ): |