diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 17e35cf63e..a4010ee28d 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -16,6 +16,7 @@
import itertools
import logging
from collections import OrderedDict
+from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Any,
@@ -35,9 +36,11 @@ from prometheus_client import Counter
import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
+from synapse.logging.opentracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -46,7 +49,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventCacheEntry
from synapse.storage.databases.main.search import SearchEntry
-from synapse.storage.engines.postgres import PostgresEngine
+from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.storage.util.sequence import SequenceGenerator
from synapse.types import JsonDict, StateMap, get_domain_from_id
@@ -69,6 +72,24 @@ event_counter = Counter(
)
+class PartialStateConflictError(SynapseError):
+ """An internal error raised when attempting to persist an event with partial state
+ after the room containing the event has been un-partial stated.
+
+ This error should be handled by recomputing the event context and trying again.
+
+ This error has an HTTP status code so that it can be transported over replication.
+ It should not be exposed to clients.
+ """
+
+ def __init__(self) -> None:
+ super().__init__(
+ HTTPStatus.CONFLICT,
+ msg="Cannot persist partial state event in un-partial stated room",
+ errcode=Codes.UNKNOWN,
+ )
+
+
@attr.s(slots=True, auto_attribs=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.
@@ -125,6 +146,7 @@ class PersistEventsStore:
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
+ @trace
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
@@ -154,6 +176,10 @@ class PersistEventsStore:
Returns:
Resolves when the events have been persisted
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
# We want to calculate the stream orderings as late as possible, as
@@ -354,6 +380,9 @@ class PersistEventsStore:
For each room, a list of the event ids which are the forward
extremities.
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
state_delta_for_room = state_delta_for_room or {}
new_forward_extremities = new_forward_extremities or {}
@@ -980,16 +1009,16 @@ class PersistEventsStore:
self,
room_id: str,
state_delta: DeltaState,
- stream_id: int,
) -> None:
"""Update the current state stored in the datatabase for the given room"""
- await self.db_pool.runInteraction(
- "update_current_state",
- self._update_current_state_txn,
- state_delta_by_room={room_id: state_delta},
- stream_id=stream_id,
- )
+ async with self._stream_id_gen.get_next() as stream_ordering:
+ await self.db_pool.runInteraction(
+ "update_current_state",
+ self._update_current_state_txn,
+ state_delta_by_room={room_id: state_delta},
+ stream_id=stream_ordering,
+ )
def _update_current_state_txn(
self,
@@ -1266,7 +1295,7 @@ class PersistEventsStore:
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
- txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
+ self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
@@ -1304,6 +1333,10 @@ class PersistEventsStore:
Returns:
new list, without events which are already in the events table.
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)"
@@ -1315,9 +1348,24 @@ class PersistEventsStore:
event_id: outlier for event_id, outlier in txn
}
+ logger.debug(
+ "_update_outliers_txn: events=%s have_persisted=%s",
+ [ev.event_id for ev, _ in events_and_contexts],
+ have_persisted,
+ )
+
to_remove = set()
for event, context in events_and_contexts:
- if event.event_id not in have_persisted:
+ outlier_persisted = have_persisted.get(event.event_id)
+ logger.debug(
+ "_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s",
+ event.event_id,
+ event.internal_metadata.is_outlier(),
+ outlier_persisted,
+ )
+
+ # Ignore events which we haven't persisted at all
+ if outlier_persisted is None:
continue
to_remove.add(event)
@@ -1327,7 +1375,6 @@ class PersistEventsStore:
# was an outlier or not - what we have is at least as good.
continue
- outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that event
@@ -1338,7 +1385,10 @@ class PersistEventsStore:
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.
- logger.info("Updating state for ex-outlier event %s", event.event_id)
+ logger.info(
+ "_update_outliers_txn: Updating state for ex-outlier event %s",
+ event.event_id,
+ )
# insert into event_to_state_groups.
try:
@@ -1442,7 +1492,7 @@ class PersistEventsStore:
event.sender,
"url" in event.content and isinstance(event.content["url"], str),
event.get_state_key(),
- context.rejected or None,
+ context.rejected,
)
for event, context in events_and_contexts
),
@@ -1638,13 +1688,13 @@ class PersistEventsStore:
if not row["rejects"] and not row["redacts"]:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
- def prefill() -> None:
+ async def prefill() -> None:
for cache_entry in to_prefill:
- self.store._get_event_cache.set(
+ await self.store._get_event_cache.set(
(cache_entry.event.event_id,), cache_entry
)
- txn.call_after(prefill)
+ txn.async_call_after(prefill)
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
"""Invalidate the caches for the redacted event.
@@ -1653,7 +1703,7 @@ class PersistEventsStore:
_invalidate_caches_for_event.
"""
assert event.redacts is not None
- txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
+ self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
@@ -1766,6 +1816,18 @@ class PersistEventsStore:
self.store.get_invited_rooms_for_local_user.invalidate,
(event.state_key,),
)
+ txn.call_after(
+ self.store.get_local_users_in_room.invalidate,
+ (event.room_id,),
+ )
+ txn.call_after(
+ self.store.get_number_joined_users_in_room.invalidate,
+ (event.room_id,),
+ )
+ txn.call_after(
+ self.store.get_user_in_room_with_profile.invalidate,
+ (event.room_id, event.state_key),
+ )
# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
@@ -2215,6 +2277,11 @@ class PersistEventsStore:
txn: LoggingTransaction,
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
) -> None:
+ """
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
+ """
state_groups = {}
for event, context in events_and_contexts:
if event.internal_metadata.is_outlier():
@@ -2239,19 +2306,37 @@ class PersistEventsStore:
# if we have partial state for these events, record the fact. (This happens
# here rather than in _store_event_txn because it also needs to happen when
# we de-outlier an event.)
- self.db_pool.simple_insert_many_txn(
- txn,
- table="partial_state_events",
- keys=("room_id", "event_id"),
- values=[
- (
- event.room_id,
- event.event_id,
- )
- for event, ctx in events_and_contexts
- if ctx.partial_state
- ],
- )
+ try:
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="partial_state_events",
+ keys=("room_id", "event_id"),
+ values=[
+ (
+ event.room_id,
+ event.event_id,
+ )
+ for event, ctx in events_and_contexts
+ if ctx.partial_state
+ ],
+ )
+ except self.db_pool.engine.module.IntegrityError:
+ logger.info(
+ "Cannot persist events %s in rooms %s: room has been un-partial stated",
+ [
+ event.event_id
+ for event, ctx in events_and_contexts
+ if ctx.partial_state
+ ],
+ list(
+ {
+ event.room_id
+ for event, ctx in events_and_contexts
+ if ctx.partial_state
+ }
+ ),
+ )
+ raise PartialStateConflictError()
self.db_pool.simple_upsert_many_txn(
txn,
@@ -2296,11 +2381,9 @@ class PersistEventsStore:
self.db_pool.simple_insert_many_txn(
txn,
table="event_edges",
- keys=("event_id", "prev_event_id", "room_id", "is_state"),
+ keys=("event_id", "prev_event_id"),
values=[
- (ev.event_id, e_id, ev.room_id, False)
- for ev in events
- for e_id in ev.prev_event_ids()
+ (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
],
)
|