diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 29c99c6357..01e935edef 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -54,11 +54,11 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
+from synapse.logging.opentracing import start_active_span, tag_args, trace
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -80,6 +80,8 @@ from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import AsyncLruCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -210,26 +212,35 @@ class EventsWorkerStore(SQLBaseStore):
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
- if hs.get_instance_name() in hs.config.worker.writers.events:
- self._stream_id_gen = StreamIdGenerator(
- db_conn,
- "events",
- "stream_ordering",
- )
- self._backfill_id_gen = StreamIdGenerator(
- db_conn,
- "events",
- "stream_ordering",
- step=-1,
- extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
- )
- else:
- self._stream_id_gen = SlavedIdTracker(
- db_conn, "events", "stream_ordering"
- )
- self._backfill_id_gen = SlavedIdTracker(
- db_conn, "events", "stream_ordering", step=-1
- )
+ self._stream_id_gen = StreamIdGenerator(
+ db_conn,
+ "events",
+ "stream_ordering",
+ is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
+ )
+ self._backfill_id_gen = StreamIdGenerator(
+ db_conn,
+ "events",
+ "stream_ordering",
+ step=-1,
+ extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+ is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
+ )
+
+ events_max = self._stream_id_gen.get_current_token()
+ curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
+ db_conn,
+ "current_state_delta_stream",
+ entity_column="room_id",
+ stream_column="stream_id",
+ max_value=events_max, # As we share the stream id with events token
+ limit=1000,
+ )
+ self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache(
+ "_curr_state_delta_stream_cache",
+ min_curr_state_delta_id,
+ prefilled_cache=curr_state_delta_prefill,
+ )
if hs.config.worker.run_background_tasks:
# We periodically clean out old transaction ID mappings
@@ -338,6 +349,7 @@ class EventsWorkerStore(SQLBaseStore):
) -> Optional[EventBase]:
...
+ @cancellable
async def get_event(
self,
event_id: str,
@@ -371,7 +383,7 @@ class EventsWorkerStore(SQLBaseStore):
If there is a mismatch, behave as per allow_none.
Returns:
- The event, or None if the event was not found.
+ The event, or None if the event was not found and allow_none is `True`.
"""
if not isinstance(event_id, str):
raise TypeError("Invalid event event_id %r" % (event_id,))
@@ -430,6 +442,9 @@ class EventsWorkerStore(SQLBaseStore):
return {e.event_id: e for e in events}
+ @trace
+ @tag_args
+ @cancellable
async def get_events_as_list(
self,
event_ids: Collection[str],
@@ -468,7 +483,7 @@ class EventsWorkerStore(SQLBaseStore):
return []
# there may be duplicates so we cast the list to a set
- event_entry_map = await self._get_events_from_cache_or_db(
+ event_entry_map = await self.get_unredacted_events_from_cache_or_db(
set(event_ids), allow_rejected=allow_rejected
)
@@ -503,7 +518,9 @@ class EventsWorkerStore(SQLBaseStore):
continue
redacted_event_id = entry.event.redacts
- event_map = await self._get_events_from_cache_or_db([redacted_event_id])
+ event_map = await self.get_unredacted_events_from_cache_or_db(
+ [redacted_event_id]
+ )
original_event_entry = event_map.get(redacted_event_id)
if not original_event_entry:
# we don't have the redacted event (or it was rejected).
@@ -581,11 +598,17 @@ class EventsWorkerStore(SQLBaseStore):
return events
- async def _get_events_from_cache_or_db(
- self, event_ids: Iterable[str], allow_rejected: bool = False
+ @cancellable
+ async def get_unredacted_events_from_cache_or_db(
+ self,
+ event_ids: Iterable[str],
+ allow_rejected: bool = False,
) -> Dict[str, EventCacheEntry]:
"""Fetch a bunch of events from the cache or the database.
+ Note that the events pulled by this function will not have any redactions
+ applied, and no guarantee is made about the ordering of the events returned.
+
If events are pulled from the database, they will be cached for future lookups.
Unknown events are omitted from the response.
@@ -600,7 +623,11 @@ class EventsWorkerStore(SQLBaseStore):
Returns:
map from event id to result
"""
- event_entry_map = await self._get_events_from_cache(
+ # Shortcut: check if we have any events in the *in memory* cache - this function
+ # may be called repeatedly for the same event so at this point we cannot reach
+ # out to any external cache for performance reasons. The external cache is
+ # checked later on in the `get_missing_events_from_cache_or_db` function below.
+ event_entry_map = self._get_events_from_local_cache(
event_ids,
)
@@ -632,7 +659,9 @@ class EventsWorkerStore(SQLBaseStore):
if missing_events_ids:
- async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
+ async def get_missing_events_from_cache_or_db() -> Dict[
+ str, EventCacheEntry
+ ]:
"""Fetches the events in `missing_event_ids` from the database.
Also creates entries in `self._current_event_fetches` to allow
@@ -657,10 +686,18 @@ class EventsWorkerStore(SQLBaseStore):
# the events have been redacted, and if so pulling the redaction event
# out of the database to check it.
#
+ missing_events = {}
try:
- missing_events = await self._get_events_from_db(
+ # Try to fetch from any external cache. We already checked the
+ # in-memory cache above.
+ missing_events = await self._get_events_from_external_cache(
missing_events_ids,
)
+ # Now actually fetch any remaining events from the DB
+ db_missing_events = await self._get_events_from_db(
+ missing_events_ids - missing_events.keys(),
+ )
+ missing_events.update(db_missing_events)
except Exception as e:
with PreserveLoggingContext():
fetching_deferred.errback(e)
@@ -679,7 +716,7 @@ class EventsWorkerStore(SQLBaseStore):
# cancellations, since multiple `_get_events_from_cache_or_db` calls can
# reuse the same fetch.
missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
- get_missing_events_from_db()
+ get_missing_events_from_cache_or_db()
)
event_entry_map.update(missing_events)
@@ -754,7 +791,54 @@ class EventsWorkerStore(SQLBaseStore):
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
- """Fetch events from the caches.
+ """Fetch events from the caches, both in memory and any external.
+
+ May return rejected events.
+
+ Args:
+ events: list of event_ids to fetch
+ update_metrics: Whether to update the cache hit ratio metrics
+ """
+ event_map = self._get_events_from_local_cache(
+ events, update_metrics=update_metrics
+ )
+
+ missing_event_ids = (e for e in events if e not in event_map)
+ event_map.update(
+ await self._get_events_from_external_cache(
+ events=missing_event_ids,
+ update_metrics=update_metrics,
+ )
+ )
+
+ return event_map
+
+ async def _get_events_from_external_cache(
+ self, events: Iterable[str], update_metrics: bool = True
+ ) -> Dict[str, EventCacheEntry]:
+ """Fetch events from any configured external cache.
+
+ May return rejected events.
+
+ Args:
+ events: list of event_ids to fetch
+ update_metrics: Whether to update the cache hit ratio metrics
+ """
+ event_map = {}
+
+ for event_id in events:
+ ret = await self._get_event_cache.get_external(
+ (event_id,), None, update_metrics=update_metrics
+ )
+ if ret:
+ event_map[event_id] = ret
+
+ return event_map
+
+ def _get_events_from_local_cache(
+ self, events: Iterable[str], update_metrics: bool = True
+ ) -> Dict[str, EventCacheEntry]:
+ """Fetch events from the local, in memory, caches.
May return rejected events.
@@ -766,7 +850,7 @@ class EventsWorkerStore(SQLBaseStore):
for event_id in events:
# First check if it's in the event cache
- ret = await self._get_event_cache.get(
+ ret = self._get_event_cache.get_local(
(event_id,), None, update_metrics=update_metrics
)
if ret:
@@ -788,7 +872,7 @@ class EventsWorkerStore(SQLBaseStore):
# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
- await self._get_event_cache.set((event_id,), cache_entry)
+ self._get_event_cache.set_local((event_id,), cache_entry)
return event_map
@@ -1029,23 +1113,42 @@ class EventsWorkerStore(SQLBaseStore):
"""
fetched_event_ids: Set[str] = set()
fetched_events: Dict[str, _EventRow] = {}
- events_to_fetch = event_ids
- while events_to_fetch:
- row_map = await self._enqueue_events(events_to_fetch)
+ async def _fetch_event_ids_and_get_outstanding_redactions(
+ event_ids_to_fetch: Collection[str],
+ ) -> Collection[str]:
+ """
+ Fetch all of the given event_ids and return any associated redaction event_ids
+ that we still need to fetch in the next iteration.
+ """
+ row_map = await self._enqueue_events(event_ids_to_fetch)
# we need to recursively fetch any redactions of those events
redaction_ids: Set[str] = set()
- for event_id in events_to_fetch:
+ for event_id in event_ids_to_fetch:
row = row_map.get(event_id)
fetched_event_ids.add(event_id)
if row:
fetched_events[event_id] = row
redaction_ids.update(row.redactions)
- events_to_fetch = redaction_ids.difference(fetched_event_ids)
- if events_to_fetch:
- logger.debug("Also fetching redaction events %s", events_to_fetch)
+ event_ids_to_fetch = redaction_ids.difference(fetched_event_ids)
+ return event_ids_to_fetch
+
+ # Grab the initial list of events requested
+ event_ids_to_fetch = await _fetch_event_ids_and_get_outstanding_redactions(
+ event_ids
+ )
+ # Then go and recursively find all of the associated redactions
+ with start_active_span("recursively fetching redactions"):
+ while event_ids_to_fetch:
+ logger.debug("Also fetching redaction events %s", event_ids_to_fetch)
+
+ event_ids_to_fetch = (
+ await _fetch_event_ids_and_get_outstanding_redactions(
+ event_ids_to_fetch
+ )
+ )
# build a map from event_id to EventBase
event_map: Dict[str, EventBase] = {}
@@ -1073,7 +1176,7 @@ class EventsWorkerStore(SQLBaseStore):
if format_version is None:
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
- format_version = EventFormatVersions.V1
+ format_version = EventFormatVersions.ROOM_V1_V2
room_version_id = row.room_version_id
@@ -1103,10 +1206,10 @@ class EventsWorkerStore(SQLBaseStore):
#
# So, the following approximations should be adequate.
- if format_version == EventFormatVersions.V1:
+ if format_version == EventFormatVersions.ROOM_V1_V2:
# if it's event format v1 then it must be room v1 or v2
room_version = RoomVersions.V1
- elif format_version == EventFormatVersions.V2:
+ elif format_version == EventFormatVersions.ROOM_V3:
# if it's event format v2 then it must be room v3
room_version = RoomVersions.V3
else:
@@ -1363,6 +1466,8 @@ class EventsWorkerStore(SQLBaseStore):
return {r["event_id"] for r in rows}
+ @trace
+ @tag_args
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
) -> Set[str]:
@@ -1385,36 +1490,36 @@ class EventsWorkerStore(SQLBaseStore):
# the batches as big as possible.
results: Set[str] = set()
- for chunk in batch_iter(event_ids, 500):
- r = await self._have_seen_events_dict(
- [(room_id, event_id) for event_id in chunk]
+ for event_ids_chunk in batch_iter(event_ids, 500):
+ events_seen_dict = await self._have_seen_events_dict(
+ room_id, event_ids_chunk
+ )
+ results.update(
+ eid for (eid, have_event) in events_seen_dict.items() if have_event
)
- results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)
return results
- @cachedList(cached_method_name="have_seen_event", list_name="keys")
+ @cachedList(cached_method_name="have_seen_event", list_name="event_ids")
async def _have_seen_events_dict(
- self, keys: Collection[Tuple[str, str]]
- ) -> Dict[Tuple[str, str], bool]:
+ self,
+ room_id: str,
+ event_ids: Collection[str],
+ ) -> Dict[str, bool]:
"""Helper for have_seen_events
Returns:
- a dict {(room_id, event_id)-> bool}
+ a dict {event_id -> bool}
"""
- # if the event cache contains the event, obviously we've seen it.
-
- cache_results = {
- (rid, eid)
- for (rid, eid) in keys
- if await self._get_event_cache.contains((eid,))
- }
- results = dict.fromkeys(cache_results, True)
- remaining = [k for k in keys if k not in cache_results]
- if not remaining:
- return results
-
- def have_seen_events_txn(txn: LoggingTransaction) -> None:
+ # TODO: We used to query the _get_event_cache here as a fast-path before
+ # hitting the database. For if an event were in the cache, we've presumably
+ # seen it before.
+ #
+ # But this is currently an invalid assumption due to the _get_event_cache
+ # not being invalidated when purging events from a room. The optimisation can
+ # be re-added after https://github.com/matrix-org/synapse/issues/13476
+
+ def have_seen_events_txn(txn: LoggingTransaction) -> Dict[str, bool]:
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
@@ -1422,23 +1527,22 @@ class EventsWorkerStore(SQLBaseStore):
sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
- txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
+ txn.database_engine, "e.event_id", event_ids
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}
# ... and then we can update the results for each key
- results.update(
- {(rid, eid): (eid in found_events) for (rid, eid) in remaining}
- )
+ return {eid: (eid in found_events) for eid in event_ids}
- await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
- return results
+ return await self.db_pool.runInteraction(
+ "have_seen_events", have_seen_events_txn
+ )
@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
- res = await self._have_seen_events_dict(((room_id, event_id),))
- return res[(room_id, event_id)]
+ res = await self._have_seen_events_dict(room_id, [event_id])
+ return res[event_id]
def _get_current_state_event_counts_txn(
self, txn: LoggingTransaction, room_id: str
@@ -1478,7 +1582,7 @@ class EventsWorkerStore(SQLBaseStore):
room_id: The room ID to query.
Returns:
- dict[str:float] of complexity version to complexity.
+ Map of complexity version to complexity.
"""
state_events = await self.get_current_state_event_counts(room_id)
@@ -1876,12 +1980,17 @@ class EventsWorkerStore(SQLBaseStore):
Args:
room_id: room where the event lives
- event_id: event to check
+ event: event to check (can't be an `outlier`)
Returns:
Boolean indicating whether it's an extremity
"""
+ assert not event.internal_metadata.is_outlier(), (
+ "is_event_next_to_backward_gap(...) can't be used with `outlier` events. "
+ "This function relies on `event_backward_extremities` which won't be filled in for `outliers`."
+ )
+
def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool:
# If the event in question has any of its prev_events listed as a
# backward extremity, it's next to a gap.
@@ -1931,12 +2040,17 @@ class EventsWorkerStore(SQLBaseStore):
Args:
room_id: room where the event lives
- event_id: event to check
+ event: event to check (can't be an `outlier`)
Returns:
Boolean indicating whether it's an extremity
"""
+ assert not event.internal_metadata.is_outlier(), (
+ "is_event_next_to_forward_gap(...) can't be used with `outlier` events. "
+ "This function relies on `event_edges` and `event_forward_extremities` which won't be filled in for `outliers`."
+ )
+
def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool:
# If the event in question is a forward extremity, we will just
# consider any potential forward gap as not a gap since it's one of
@@ -2017,35 +2131,50 @@ class EventsWorkerStore(SQLBaseStore):
The closest event_id otherwise None if we can't find any event in
the given direction.
"""
+ if direction == "b":
+ # Find closest event *before* a given timestamp. We use descending
+ # (which gives values largest to smallest) because we want the
+ # largest possible timestamp *before* the given timestamp.
+ comparison_operator = "<="
+ order = "DESC"
+ else:
+ # Find closest event *after* a given timestamp. We use ascending
+ # (which gives values smallest to largest) because we want the
+ # closest possible timestamp *after* the given timestamp.
+ comparison_operator = ">="
+ order = "ASC"
- sql_template = """
+ sql_template = f"""
SELECT event_id FROM events
LEFT JOIN rejections USING (event_id)
WHERE
- origin_server_ts %s ?
- AND room_id = ?
+ room_id = ?
+ AND origin_server_ts {comparison_operator} ?
+ /**
+ * Make sure the event isn't an `outlier` because we have no way
+ * to later check whether it's next to a gap. `outliers` do not
+ * have entries in the `event_edges`, `event_forward_extremeties`,
+ * and `event_backward_extremities` tables to check against
+ * (used by `is_event_next_to_backward_gap` and `is_event_next_to_forward_gap`).
+ */
+ AND NOT outlier
/* Make sure event is not rejected */
AND rejections.event_id IS NULL
- ORDER BY origin_server_ts %s
+ /**
+ * First sort by the message timestamp. If the message timestamps are the
+ * same, we want the message that logically comes "next" (before/after
+ * the given timestamp) based on the DAG and its topological order (`depth`).
+ * Finally, we can tie-break based on when it was received on the server
+ * (`stream_ordering`).
+ */
+ ORDER BY origin_server_ts {order}, depth {order}, stream_ordering {order}
LIMIT 1;
"""
def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]:
- if direction == "b":
- # Find closest event *before* a given timestamp. We use descending
- # (which gives values largest to smallest) because we want the
- # largest possible timestamp *before* the given timestamp.
- comparison_operator = "<="
- order = "DESC"
- else:
- # Find closest event *after* a given timestamp. We use ascending
- # (which gives values smallest to largest) because we want the
- # closest possible timestamp *after* the given timestamp.
- comparison_operator = ">="
- order = "ASC"
-
txn.execute(
- sql_template % (comparison_operator, order), (timestamp, room_id)
+ sql_template,
+ (room_id, timestamp),
)
row = txn.fetchone()
if row:
@@ -2099,7 +2228,15 @@ class EventsWorkerStore(SQLBaseStore):
return result is not None
async def get_partial_state_events_batch(self, room_id: str) -> List[str]:
- """Get a list of events in the given room that have partial state"""
+ """
+ Get a list of events in the given room that:
+ - have partial state; and
+ - are ready to be resynced (because they have no prev_events that are
+ partial-stated)
+
+ See the docstring on `_get_partial_state_events_batch_txn` for more
+ information.
+ """
return await self.db_pool.runInteraction(
"get_partial_state_events_batch",
self._get_partial_state_events_batch_txn,
@@ -2139,3 +2276,63 @@ class EventsWorkerStore(SQLBaseStore):
(room_id,),
)
return [row[0] for row in txn]
+
+ def mark_event_rejected_txn(
+ self,
+ txn: LoggingTransaction,
+ event_id: str,
+ rejection_reason: Optional[str],
+ ) -> None:
+ """Mark an event that was previously accepted as rejected, or vice versa
+
+ This can happen, for example, when resyncing state during a faster join.
+
+ Args:
+ txn:
+ event_id: ID of event to update
+ rejection_reason: reason it has been rejected, or None if it is now accepted
+ """
+ if rejection_reason is None:
+ logger.info(
+ "Marking previously-processed event %s as accepted",
+ event_id,
+ )
+ self.db_pool.simple_delete_txn(
+ txn,
+ "rejections",
+ keyvalues={"event_id": event_id},
+ )
+ else:
+ logger.info(
+ "Marking previously-processed event %s as rejected(%s)",
+ event_id,
+ rejection_reason,
+ )
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="rejections",
+ keyvalues={"event_id": event_id},
+ values={
+ "reason": rejection_reason,
+ "last_check": self._clock.time_msec(),
+ },
+ )
+ self.db_pool.simple_update_txn(
+ txn,
+ table="events",
+ keyvalues={"event_id": event_id},
+ updatevalues={"rejection_reason": rejection_reason},
+ )
+
+ self.invalidate_get_event_cache_after_txn(txn, event_id)
+
+ # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
+ # call '_send_invalidation_to_replication', but we actually need the other
+ # end to call _invalidate_local_get_event_cache() rather than (just)
+ # _get_event_cache.invalidate().
+ #
+ # One solution might be to (somehow) get the workers to call
+ # _invalidate_caches_for_event() (though that will invalidate more than
+ # strictly necessary).
+ #
+ # https://github.com/matrix-org/synapse/issues/12994
|