summary refs log tree commit diff
path: root/synapse/storage/databases/main/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r--synapse/storage/databases/main/events.py151
1 files changed, 117 insertions, 34 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py

index 17e35cf63e..a4010ee28d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -16,6 +16,7 @@ import itertools import logging from collections import OrderedDict +from http import HTTPStatus from typing import ( TYPE_CHECKING, Any, @@ -35,9 +36,11 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.opentracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -46,7 +49,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventCacheEntry from synapse.storage.databases.main.search import SearchEntry -from synapse.storage.engines.postgres import PostgresEngine +from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import AbstractStreamIdGenerator from synapse.storage.util.sequence import SequenceGenerator from synapse.types import JsonDict, StateMap, get_domain_from_id @@ -69,6 +72,24 @@ event_counter = Counter( ) +class PartialStateConflictError(SynapseError): + """An internal error raised when attempting to persist an event with partial state + after the room containing the event has been un-partial stated. + + This error should be handled by recomputing the event context and trying again. + + This error has an HTTP status code so that it can be transported over replication. + It should not be exposed to clients. + """ + + def __init__(self) -> None: + super().__init__( + HTTPStatus.CONFLICT, + msg="Cannot persist partial state event in un-partial stated room", + errcode=Codes.UNKNOWN, + ) + + @attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -125,6 +146,7 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], @@ -154,6 +176,10 @@ class PersistEventsStore: Returns: Resolves when the events have been persisted + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # We want to calculate the stream orderings as late as possible, as @@ -354,6 +380,9 @@ class PersistEventsStore: For each room, a list of the event ids which are the forward extremities. + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ state_delta_for_room = state_delta_for_room or {} new_forward_extremities = new_forward_extremities or {} @@ -980,16 +1009,16 @@ class PersistEventsStore: self, room_id: str, state_delta: DeltaState, - stream_id: int, ) -> None: """Update the current state stored in the datatabase for the given room""" - await self.db_pool.runInteraction( - "update_current_state", - self._update_current_state_txn, - state_delta_by_room={room_id: state_delta}, - stream_id=stream_id, - ) + async with self._stream_id_gen.get_next() as stream_ordering: + await self.db_pool.runInteraction( + "update_current_state", + self._update_current_state_txn, + state_delta_by_room={room_id: state_delta}, + stream_id=stream_ordering, + ) def _update_current_state_txn( self, @@ -1266,7 +1295,7 @@ class PersistEventsStore: depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids - txn.call_after(self.store._invalidate_get_event_cache, event.event_id) + self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) # Then update the `stream_ordering` position to mark the latest # event as the front of the room. This should not be done for # backfilled events because backfilled events have negative @@ -1304,6 +1333,10 @@ class PersistEventsStore: Returns: new list, without events which are already in the events table. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" @@ -1315,9 +1348,24 @@ class PersistEventsStore: event_id: outlier for event_id, outlier in txn } + logger.debug( + "_update_outliers_txn: events=%s have_persisted=%s", + [ev.event_id for ev, _ in events_and_contexts], + have_persisted, + ) + to_remove = set() for event, context in events_and_contexts: - if event.event_id not in have_persisted: + outlier_persisted = have_persisted.get(event.event_id) + logger.debug( + "_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s", + event.event_id, + event.internal_metadata.is_outlier(), + outlier_persisted, + ) + + # Ignore events which we haven't persisted at all + if outlier_persisted is None: continue to_remove.add(event) @@ -1327,7 +1375,6 @@ class PersistEventsStore: # was an outlier or not - what we have is at least as good. continue - outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: # We received a copy of an event that we had already stored as # an outlier in the database. We now have some state at that event @@ -1338,7 +1385,10 @@ class PersistEventsStore: # events down /sync. In general they will be historical events, so that # doesn't matter too much, but that is not always the case. - logger.info("Updating state for ex-outlier event %s", event.event_id) + logger.info( + "_update_outliers_txn: Updating state for ex-outlier event %s", + event.event_id, + ) # insert into event_to_state_groups. try: @@ -1442,7 +1492,7 @@ class PersistEventsStore: event.sender, "url" in event.content and isinstance(event.content["url"], str), event.get_state_key(), - context.rejected or None, + context.rejected, ) for event, context in events_and_contexts ), @@ -1638,13 +1688,13 @@ class PersistEventsStore: if not row["rejects"] and not row["redacts"]: to_prefill.append(EventCacheEntry(event=event, redacted_event=None)) - def prefill() -> None: + async def prefill() -> None: for cache_entry in to_prefill: - self.store._get_event_cache.set( + await self.store._get_event_cache.set( (cache_entry.event.event_id,), cache_entry ) - txn.call_after(prefill) + txn.async_call_after(prefill) def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: """Invalidate the caches for the redacted event. @@ -1653,7 +1703,7 @@ class PersistEventsStore: _invalidate_caches_for_event. """ assert event.redacts is not None - txn.call_after(self.store._invalidate_get_event_cache, event.redacts) + self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,)) txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,)) @@ -1766,6 +1816,18 @@ class PersistEventsStore: self.store.get_invited_rooms_for_local_user.invalidate, (event.state_key,), ) + txn.call_after( + self.store.get_local_users_in_room.invalidate, + (event.room_id,), + ) + txn.call_after( + self.store.get_number_joined_users_in_room.invalidate, + (event.room_id,), + ) + txn.call_after( + self.store.get_user_in_room_with_profile.invalidate, + (event.room_id, event.state_key), + ) # The `_get_membership_from_event_id` is immutable, except for the # case where we look up an event *before* persisting it. @@ -2215,6 +2277,11 @@ class PersistEventsStore: txn: LoggingTransaction, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: + """ + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. + """ state_groups = {} for event, context in events_and_contexts: if event.internal_metadata.is_outlier(): @@ -2239,19 +2306,37 @@ class PersistEventsStore: # if we have partial state for these events, record the fact. (This happens # here rather than in _store_event_txn because it also needs to happen when # we de-outlier an event.) - self.db_pool.simple_insert_many_txn( - txn, - table="partial_state_events", - keys=("room_id", "event_id"), - values=[ - ( - event.room_id, - event.event_id, - ) - for event, ctx in events_and_contexts - if ctx.partial_state - ], - ) + try: + self.db_pool.simple_insert_many_txn( + txn, + table="partial_state_events", + keys=("room_id", "event_id"), + values=[ + ( + event.room_id, + event.event_id, + ) + for event, ctx in events_and_contexts + if ctx.partial_state + ], + ) + except self.db_pool.engine.module.IntegrityError: + logger.info( + "Cannot persist events %s in rooms %s: room has been un-partial stated", + [ + event.event_id + for event, ctx in events_and_contexts + if ctx.partial_state + ], + list( + { + event.room_id + for event, ctx in events_and_contexts + if ctx.partial_state + } + ), + ) + raise PartialStateConflictError() self.db_pool.simple_upsert_many_txn( txn, @@ -2296,11 +2381,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_edges", - keys=("event_id", "prev_event_id", "room_id", "is_state"), + keys=("event_id", "prev_event_id"), values=[ - (ev.event_id, e_id, ev.room_id, False) - for ev in events - for e_id in ev.prev_event_ids() + (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids() ], )