diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index e71c23541d..a97f8b3934 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -17,39 +17,44 @@
import itertools
import logging
-from collections import Counter as c_counter, OrderedDict, namedtuple
+from collections import OrderedDict, namedtuple
from functools import wraps
-from typing import Dict, List, Tuple
+from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
-from six import iteritems, text_type
+from six import integer_types, iteritems, text_type
from six.moves import range
+import attr
from canonicaljson import json
from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics
-from synapse.api.constants import EventContentFields, EventTypes
-from synapse.api.errors import SynapseError
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RelationTypes,
+)
from synapse.api.room_versions import RoomVersions
+from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
-from synapse.events.utils import prune_event_dict
from synapse.logging.utils import log_function
-from synapse.metrics import BucketCollector
-from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import make_in_list_sql_clause
-from synapse.storage.data_stores.main.event_federation import EventFederationStore
-from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
-from synapse.storage.data_stores.main.state import StateGroupWorkerStore
+from synapse.storage.data_stores.main.search import SearchEntry
from synapse.storage.database import Database, LoggingTransaction
-from synapse.storage.persist_events import DeltaState
-from synapse.types import RoomStreamToken, StateMap, get_domain_from_id
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.types import StateMap, get_domain_from_id
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.iterutils import batch_iter
+if TYPE_CHECKING:
+ from synapse.storage.data_stores.main import DataStore
+ from synapse.server import HomeServer
+
+
logger = logging.getLogger(__name__)
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
@@ -94,58 +99,49 @@ def _retry_on_integrity_error(func):
return f
-# inherits from EventFederationStore so that we can call _update_backward_extremities
-# and _handle_mult_prev_events (though arguably those could both be moved in here)
-class EventsStore(
- StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
-):
- def __init__(self, database: Database, db_conn, hs):
- super(EventsStore, self).__init__(database, db_conn, hs)
+@attr.s(slots=True)
+class DeltaState:
+ """Deltas to use to update the `current_state_events` table.
- # Collect metrics on the number of forward extremities that exist.
- # Counter of number of extremities to count
- self._current_forward_extremities_amount = c_counter()
+ Attributes:
+ to_delete: List of type/state_keys to delete from current state
+ to_insert: Map of state to upsert into current state
+ no_longer_in_room: The server is not longer in the room, so the room
+ should e.g. be removed from `current_state_events` table.
+ """
- BucketCollector(
- "synapse_forward_extremities",
- lambda: self._current_forward_extremities_amount,
- buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
- )
+ to_delete = attr.ib(type=List[Tuple[str, str]])
+ to_insert = attr.ib(type=StateMap[str])
+ no_longer_in_room = attr.ib(type=bool, default=False)
- # Read the extrems every 60 minutes
- def read_forward_extremities():
- # run as a background process to make sure that the database transactions
- # have a logcontext to report to
- return run_as_background_process(
- "read_forward_extremities", self._read_forward_extremities
- )
- hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
+class PersistEventsStore:
+ """Contains all the functions for writing events to the database.
- def _censor_redactions():
- return run_as_background_process(
- "_censor_redactions", self._censor_redactions
- )
+ Should only be instantiated on one process (when using a worker mode setup).
+
+ Note: This is not part of the `DataStore` mixin.
+ """
- if self.hs.config.redaction_retention_period is not None:
- hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
+ def __init__(self, hs: "HomeServer", db: Database, main_data_store: "DataStore"):
+ self.hs = hs
+ self.db = db
+ self.store = main_data_store
+ self.database_engine = db.engine
+ self._clock = hs.get_clock()
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
self.is_mine_id = hs.is_mine_id
- @defer.inlineCallbacks
- def _read_forward_extremities(self):
- def fetch(txn):
- txn.execute(
- """
- select count(*) c from event_forward_extremities
- group by room_id
- """
- )
- return txn.fetchall()
+ # Ideally we'd move these ID gens here, unfortunately some other ID
+ # generators are chained off them so doing so is a bit of a PITA.
+ self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
+ self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
- res = yield self.db.runInteraction("read_forward_extremities", fetch)
- self._current_forward_extremities_amount = c_counter([x[0] for x in res])
+ # This should only exist on master for now
+ assert (
+ hs.config.worker.worker_app is None
+ ), "Can only instantiate PersistEventsStore on master"
@_retry_on_integrity_error
@defer.inlineCallbacks
@@ -237,10 +233,10 @@ class EventsStore(
event_counter.labels(event.type, origin_type, origin_entity).inc()
for room_id, new_state in iteritems(current_state_for_room):
- self.get_current_state_ids.prefill((room_id,), new_state)
+ self.store.get_current_state_ids.prefill((room_id,), new_state)
for room_id, latest_event_ids in iteritems(new_forward_extremeties):
- self.get_latest_event_ids_in_room.prefill(
+ self.store.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
)
@@ -586,7 +582,7 @@ class EventsStore(
)
txn.call_after(
- self._curr_state_delta_stream_cache.entity_has_changed,
+ self.store._curr_state_delta_stream_cache.entity_has_changed,
room_id,
stream_id,
)
@@ -606,10 +602,13 @@ class EventsStore(
for member in members_changed:
txn.call_after(
- self.get_rooms_for_user_with_stream_ordering.invalidate, (member,)
+ self.store.get_rooms_for_user_with_stream_ordering.invalidate,
+ (member,),
)
- self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
+ self.store._invalidate_state_caches_and_stream(
+ txn, room_id, members_changed
+ )
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
"""Update the room version in the database based off current state
@@ -647,7 +646,9 @@ class EventsStore(
self.db.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
- txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,))
+ txn.call_after(
+ self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
+ )
self.db.simple_insert_many_txn(
txn,
@@ -713,10 +714,10 @@ class EventsStore(
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
- txn.call_after(self._invalidate_get_event_cache, event.event_id)
+ txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
if not backfilled:
txn.call_after(
- self._events_stream_cache.entity_has_changed,
+ self.store._events_stream_cache.entity_has_changed,
event.room_id,
event.internal_metadata.stream_ordering,
)
@@ -1088,13 +1089,15 @@ class EventsStore(
def prefill():
for cache_entry in to_prefill:
- self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+ self.store._get_event_cache.prefill(
+ (cache_entry[0].event_id,), cache_entry
+ )
txn.call_after(prefill)
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
- txn.call_after(self._invalidate_get_event_cache, event.redacts)
+ txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
self.db.simple_insert_txn(
txn,
@@ -1106,783 +1109,484 @@ class EventsStore(
},
)
- async def _censor_redactions(self):
- """Censors all redactions older than the configured period that haven't
- been censored yet.
+ def insert_labels_for_event_txn(
+ self, txn, event_id, labels, room_id, topological_ordering
+ ):
+ """Store the mapping between an event's ID and its labels, with one row per
+ (event_id, label) tuple.
- By censor we mean update the event_json table with the redacted event.
+ Args:
+ txn (LoggingTransaction): The transaction to execute.
+ event_id (str): The event's ID.
+ labels (list[str]): A list of text labels.
+ room_id (str): The ID of the room the event was sent to.
+ topological_ordering (int): The position of the event in the room's topology.
"""
+ return self.db.simple_insert_many_txn(
+ txn=txn,
+ table="event_labels",
+ values=[
+ {
+ "event_id": event_id,
+ "label": label,
+ "room_id": room_id,
+ "topological_ordering": topological_ordering,
+ }
+ for label in labels
+ ],
+ )
- if self.hs.config.redaction_retention_period is None:
- return
-
- if not (
- await self.db.updates.has_completed_background_update(
- "redactions_have_censored_ts_idx"
- )
- ):
- # We don't want to run this until the appropriate index has been
- # created.
- return
-
- before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period
+ def _insert_event_expiry_txn(self, txn, event_id, expiry_ts):
+ """Save the expiry timestamp associated with a given event ID.
- # We fetch all redactions that:
- # 1. point to an event we have,
- # 2. has a received_ts from before the cut off, and
- # 3. we haven't yet censored.
- #
- # This is limited to 100 events to ensure that we don't try and do too
- # much at once. We'll get called again so this should eventually catch
- # up.
- sql = """
- SELECT redactions.event_id, redacts FROM redactions
- LEFT JOIN events AS original_event ON (
- redacts = original_event.event_id
- )
- WHERE NOT have_censored
- AND redactions.received_ts <= ?
- ORDER BY redactions.received_ts ASC
- LIMIT ?
+ Args:
+ txn (LoggingTransaction): The database transaction to use.
+ event_id (str): The event ID the expiry timestamp is associated with.
+ expiry_ts (int): The timestamp at which to expire (delete) the event.
"""
-
- rows = await self.db.execute(
- "_censor_redactions_fetch", None, sql, before_ts, 100
+ return self.db.simple_insert_txn(
+ txn=txn,
+ table="event_expiry",
+ values={"event_id": event_id, "expiry_ts": expiry_ts},
)
- updates = []
+ def _store_event_reference_hashes_txn(self, txn, events):
+ """Store a hash for a PDU
+ Args:
+ txn (cursor):
+ events (list): list of Events.
+ """
- for redaction_id, event_id in rows:
- redaction_event = await self.get_event(redaction_id, allow_none=True)
- original_event = await self.get_event(
- event_id, allow_rejected=True, allow_none=True
+ vals = []
+ for event in events:
+ ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
+ vals.append(
+ {
+ "event_id": event.event_id,
+ "algorithm": ref_alg,
+ "hash": memoryview(ref_hash_bytes),
+ }
)
- # The SQL above ensures that we have both the redaction and
- # original event, so if the `get_event` calls return None it
- # means that the redaction wasn't allowed. Either way we know that
- # the result won't change so we mark the fact that we've checked.
- if (
- redaction_event
- and original_event
- and original_event.internal_metadata.is_redacted()
- ):
- # Redaction was allowed
- pruned_json = encode_json(
- prune_event_dict(
- original_event.room_version, original_event.get_dict()
- )
- )
- else:
- # Redaction wasn't allowed
- pruned_json = None
-
- updates.append((redaction_id, event_id, pruned_json))
-
- def _update_censor_txn(txn):
- for redaction_id, event_id, pruned_json in updates:
- if pruned_json:
- self._censor_event_txn(txn, event_id, pruned_json)
-
- self.db.simple_update_one_txn(
- txn,
- table="redactions",
- keyvalues={"event_id": redaction_id},
- updatevalues={"have_censored": True},
- )
-
- await self.db.runInteraction("_update_censor_txn", _update_censor_txn)
+ self.db.simple_insert_many_txn(txn, table="event_reference_hashes", values=vals)
- def _censor_event_txn(self, txn, event_id, pruned_json):
- """Censor an event by replacing its JSON in the event_json table with the
- provided pruned JSON.
-
- Args:
- txn (LoggingTransaction): The database transaction.
- event_id (str): The ID of the event to censor.
- pruned_json (str): The pruned JSON
+ def _store_room_members_txn(self, txn, events, backfilled):
+ """Store a room member in the database.
"""
- self.db.simple_update_one_txn(
+ self.db.simple_insert_many_txn(
txn,
- table="event_json",
- keyvalues={"event_id": event_id},
- updatevalues={"json": pruned_json},
+ table="room_memberships",
+ values=[
+ {
+ "event_id": event.event_id,
+ "user_id": event.state_key,
+ "sender": event.user_id,
+ "room_id": event.room_id,
+ "membership": event.membership,
+ "display_name": event.content.get("displayname", None),
+ "avatar_url": event.content.get("avatar_url", None),
+ }
+ for event in events
+ ],
)
- @defer.inlineCallbacks
- def count_daily_messages(self):
- """
- Returns an estimate of the number of messages sent in the last day.
-
- If it has been significantly less or more than one day since the last
- call to this function, it will return None.
- """
-
- def _count_messages(txn):
- sql = """
- SELECT COALESCE(COUNT(*), 0) FROM events
- WHERE type = 'm.room.message'
- AND stream_ordering > ?
- """
- txn.execute(sql, (self.stream_ordering_day_ago,))
- (count,) = txn.fetchone()
- return count
-
- ret = yield self.db.runInteraction("count_messages", _count_messages)
- return ret
-
- @defer.inlineCallbacks
- def count_daily_sent_messages(self):
- def _count_messages(txn):
- # This is good enough as if you have silly characters in your own
- # hostname then thats your own fault.
- like_clause = "%:" + self.hs.hostname
-
- sql = """
- SELECT COALESCE(COUNT(*), 0) FROM events
- WHERE type = 'm.room.message'
- AND sender LIKE ?
- AND stream_ordering > ?
- """
-
- txn.execute(sql, (like_clause, self.stream_ordering_day_ago))
- (count,) = txn.fetchone()
- return count
-
- ret = yield self.db.runInteraction("count_daily_sent_messages", _count_messages)
- return ret
-
- @defer.inlineCallbacks
- def count_daily_active_rooms(self):
- def _count(txn):
- sql = """
- SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events
- WHERE type = 'm.room.message'
- AND stream_ordering > ?
- """
- txn.execute(sql, (self.stream_ordering_day_ago,))
- (count,) = txn.fetchone()
- return count
-
- ret = yield self.db.runInteraction("count_daily_active_rooms", _count)
- return ret
-
- @cached(num_args=5, max_entries=10)
- def get_all_new_events(
- self,
- last_backfill_id,
- last_forward_id,
- current_backfill_id,
- current_forward_id,
- limit,
- ):
- """Get all the new events that have arrived at the server either as
- new events or as backfilled events"""
- have_backfill_events = last_backfill_id != current_backfill_id
- have_forward_events = last_forward_id != current_forward_id
-
- if not have_backfill_events and not have_forward_events:
- return defer.succeed(AllNewEventsResult([], [], [], [], []))
-
- def get_all_new_events_txn(txn):
- sql = (
- "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts"
- " FROM events AS e"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events USING (event_id)"
- " WHERE ? < stream_ordering AND stream_ordering <= ?"
- " ORDER BY stream_ordering ASC"
- " LIMIT ?"
+ for event in events:
+ txn.call_after(
+ self.store._membership_stream_cache.entity_has_changed,
+ event.state_key,
+ event.internal_metadata.stream_ordering,
)
- if have_forward_events:
- txn.execute(sql, (last_forward_id, current_forward_id, limit))
- new_forward_events = txn.fetchall()
-
- if len(new_forward_events) == limit:
- upper_bound = new_forward_events[-1][0]
- else:
- upper_bound = current_forward_id
-
- sql = (
- "SELECT event_stream_ordering, event_id, state_group"
- " FROM ex_outlier_stream"
- " WHERE ? > event_stream_ordering"
- " AND event_stream_ordering >= ?"
- " ORDER BY event_stream_ordering DESC"
- )
- txn.execute(sql, (last_forward_id, upper_bound))
- forward_ex_outliers = txn.fetchall()
- else:
- new_forward_events = []
- forward_ex_outliers = []
-
- sql = (
- "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts"
- " FROM events AS e"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events USING (event_id)"
- " WHERE ? > stream_ordering AND stream_ordering >= ?"
- " ORDER BY stream_ordering DESC"
- " LIMIT ?"
+ txn.call_after(
+ self.store.get_invited_rooms_for_local_user.invalidate,
+ (event.state_key,),
)
- if have_backfill_events:
- txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
- new_backfill_events = txn.fetchall()
- if len(new_backfill_events) == limit:
- upper_bound = new_backfill_events[-1][0]
- else:
- upper_bound = current_backfill_id
-
- sql = (
- "SELECT -event_stream_ordering, event_id, state_group"
- " FROM ex_outlier_stream"
- " WHERE ? > event_stream_ordering"
- " AND event_stream_ordering >= ?"
- " ORDER BY event_stream_ordering DESC"
- )
- txn.execute(sql, (-last_backfill_id, -upper_bound))
- backward_ex_outliers = txn.fetchall()
- else:
- new_backfill_events = []
- backward_ex_outliers = []
-
- return AllNewEventsResult(
- new_forward_events,
- new_backfill_events,
- forward_ex_outliers,
- backward_ex_outliers,
+ # We update the local_invites table only if the event is "current",
+ # i.e., its something that has just happened. If the event is an
+ # outlier it is only current if its an "out of band membership",
+ # like a remote invite or a rejection of a remote invite.
+ is_new_state = not backfilled and (
+ not event.internal_metadata.is_outlier()
+ or event.internal_metadata.is_out_of_band_membership()
)
+ is_mine = self.is_mine_id(event.state_key)
+ if is_new_state and is_mine:
+ if event.membership == Membership.INVITE:
+ self.db.simple_insert_txn(
+ txn,
+ table="local_invites",
+ values={
+ "event_id": event.event_id,
+ "invitee": event.state_key,
+ "inviter": event.sender,
+ "room_id": event.room_id,
+ "stream_id": event.internal_metadata.stream_ordering,
+ },
+ )
+ else:
+ sql = (
+ "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
- return self.db.runInteraction("get_all_new_events", get_all_new_events_txn)
-
- def purge_history(self, room_id, token, delete_local_events):
- """Deletes room history before a certain point
-
- Args:
- room_id (str):
+ txn.execute(
+ sql,
+ (
+ event.internal_metadata.stream_ordering,
+ event.event_id,
+ event.room_id,
+ event.state_key,
+ ),
+ )
- token (str): A topological token to delete events before
+ # We also update the `local_current_membership` table with
+ # latest invite info. This will usually get updated by the
+ # `current_state_events` handling, unless its an outlier.
+ if event.internal_metadata.is_outlier():
+ # This should only happen for out of band memberships, so
+ # we add a paranoia check.
+ assert event.internal_metadata.is_out_of_band_membership()
+
+ self.db.simple_upsert_txn(
+ txn,
+ table="local_current_membership",
+ keyvalues={
+ "room_id": event.room_id,
+ "user_id": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ "membership": event.membership,
+ },
+ )
- delete_local_events (bool):
- if True, we will delete local events as well as remote ones
- (instead of just marking them as outliers and deleting their
- state groups).
+ def _handle_event_relations(self, txn, event):
+ """Handles inserting relation data during peristence of events
- Returns:
- Deferred[set[int]]: The set of state groups that are referenced by
- deleted events.
+ Args:
+ txn
+ event (EventBase)
"""
+ relation = event.content.get("m.relates_to")
+ if not relation:
+ # No relations
+ return
- return self.db.runInteraction(
- "purge_history",
- self._purge_history_txn,
- room_id,
- token,
- delete_local_events,
- )
+ rel_type = relation.get("rel_type")
+ if rel_type not in (
+ RelationTypes.ANNOTATION,
+ RelationTypes.REFERENCE,
+ RelationTypes.REPLACE,
+ ):
+ # Unknown relation type
+ return
- def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
- token = RoomStreamToken.parse(token_str)
-
- # Tables that should be pruned:
- # event_auth
- # event_backward_extremities
- # event_edges
- # event_forward_extremities
- # event_json
- # event_push_actions
- # event_reference_hashes
- # event_search
- # event_to_state_groups
- # events
- # rejections
- # room_depth
- # state_groups
- # state_groups_state
-
- # we will build a temporary table listing the events so that we don't
- # have to keep shovelling the list back and forth across the
- # connection. Annoyingly the python sqlite driver commits the
- # transaction on CREATE, so let's do this first.
- #
- # furthermore, we might already have the table from a previous (failed)
- # purge attempt, so let's drop the table first.
+ parent_id = relation.get("event_id")
+ if not parent_id:
+ # Invalid relation
+ return
- txn.execute("DROP TABLE IF EXISTS events_to_purge")
+ aggregation_key = relation.get("key")
- txn.execute(
- "CREATE TEMPORARY TABLE events_to_purge ("
- " event_id TEXT NOT NULL,"
- " should_delete BOOLEAN NOT NULL"
- ")"
+ self.db.simple_insert_txn(
+ txn,
+ table="event_relations",
+ values={
+ "event_id": event.event_id,
+ "relates_to_id": parent_id,
+ "relation_type": rel_type,
+ "aggregation_key": aggregation_key,
+ },
)
- # First ensure that we're not about to delete all the forward extremeties
- txn.execute(
- "SELECT e.event_id, e.depth FROM events as e "
- "INNER JOIN event_forward_extremities as f "
- "ON e.event_id = f.event_id "
- "AND e.room_id = f.room_id "
- "WHERE f.room_id = ?",
- (room_id,),
+ txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
+ txn.call_after(
+ self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
)
- rows = txn.fetchall()
- max_depth = max(row[1] for row in rows)
-
- if max_depth < token.topological:
- # We need to ensure we don't delete all the events from the database
- # otherwise we wouldn't be able to send any events (due to not
- # having any backwards extremeties)
- raise SynapseError(
- 400, "topological_ordering is greater than forward extremeties"
- )
-
- logger.info("[purge] looking for events to delete")
-
- should_delete_expr = "state_key IS NULL"
- should_delete_params = ()
- if not delete_local_events:
- should_delete_expr += " AND event_id NOT LIKE ?"
-
- # We include the parameter twice since we use the expression twice
- should_delete_params += ("%:" + self.hs.hostname, "%:" + self.hs.hostname)
- should_delete_params += (room_id, token.topological)
+ if rel_type == RelationTypes.REPLACE:
+ txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
- # Note that we insert events that are outliers and aren't going to be
- # deleted, as nothing will happen to them.
- txn.execute(
- "INSERT INTO events_to_purge"
- " SELECT event_id, %s"
- " FROM events AS e LEFT JOIN state_events USING (event_id)"
- " WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
- % (should_delete_expr, should_delete_expr),
- should_delete_params,
- )
+ def _handle_redaction(self, txn, redacted_event_id):
+ """Handles receiving a redaction and checking whether we need to remove
+ any redacted relations from the database.
- # We create the indices *after* insertion as that's a lot faster.
+ Args:
+ txn
+ redacted_event_id (str): The event that was redacted.
+ """
- # create an index on should_delete because later we'll be looking for
- # the should_delete / shouldn't_delete subsets
- txn.execute(
- "CREATE INDEX events_to_purge_should_delete"
- " ON events_to_purge(should_delete)"
+ self.db.simple_delete_txn(
+ txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
)
- # We do joins against events_to_purge for e.g. calculating state
- # groups to purge, etc., so lets make an index.
- txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
-
- txn.execute("SELECT event_id, should_delete FROM events_to_purge")
- event_rows = txn.fetchall()
- logger.info(
- "[purge] found %i events before cutoff, of which %i can be deleted",
- len(event_rows),
- sum(1 for e in event_rows if e[1]),
- )
+ def _store_room_topic_txn(self, txn, event):
+ if hasattr(event, "content") and "topic" in event.content:
+ self.store_event_search_txn(
+ txn, event, "content.topic", event.content["topic"]
+ )
- logger.info("[purge] Finding new backward extremities")
+ def _store_room_name_txn(self, txn, event):
+ if hasattr(event, "content") and "name" in event.content:
+ self.store_event_search_txn(
+ txn, event, "content.name", event.content["name"]
+ )
- # We calculate the new entries for the backward extremeties by finding
- # events to be purged that are pointed to by events we're not going to
- # purge.
- txn.execute(
- "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
- " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
- " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
- " WHERE ep2.event_id IS NULL"
- )
- new_backwards_extrems = txn.fetchall()
+ def _store_room_message_txn(self, txn, event):
+ if hasattr(event, "content") and "body" in event.content:
+ self.store_event_search_txn(
+ txn, event, "content.body", event.content["body"]
+ )
- logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
+ def _store_retention_policy_for_room_txn(self, txn, event):
+ if hasattr(event, "content") and (
+ "min_lifetime" in event.content or "max_lifetime" in event.content
+ ):
+ if (
+ "min_lifetime" in event.content
+ and not isinstance(event.content.get("min_lifetime"), integer_types)
+ ) or (
+ "max_lifetime" in event.content
+ and not isinstance(event.content.get("max_lifetime"), integer_types)
+ ):
+ # Ignore the event if one of the value isn't an integer.
+ return
- txn.execute(
- "DELETE FROM event_backward_extremities WHERE room_id = ?", (room_id,)
- )
+ self.db.simple_insert_txn(
+ txn=txn,
+ table="room_retention",
+ values={
+ "room_id": event.room_id,
+ "event_id": event.event_id,
+ "min_lifetime": event.content.get("min_lifetime"),
+ "max_lifetime": event.content.get("max_lifetime"),
+ },
+ )
- # Update backward extremeties
- txn.executemany(
- "INSERT INTO event_backward_extremities (room_id, event_id)"
- " VALUES (?, ?)",
- [(room_id, event_id) for event_id, in new_backwards_extrems],
- )
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_retention_policy_for_room, (event.room_id,)
+ )
- logger.info("[purge] finding state groups referenced by deleted events")
+ def store_event_search_txn(self, txn, event, key, value):
+ """Add event to the search table
- # Get all state groups that are referenced by events that are to be
- # deleted.
- txn.execute(
- """
- SELECT DISTINCT state_group FROM events_to_purge
- INNER JOIN event_to_state_groups USING (event_id)
+ Args:
+ txn (cursor):
+ event (EventBase):
+ key (str):
+ value (str):
"""
+ self.store.store_search_entries_txn(
+ txn,
+ (
+ SearchEntry(
+ key=key,
+ value=value,
+ event_id=event.event_id,
+ room_id=event.room_id,
+ stream_ordering=event.internal_metadata.stream_ordering,
+ origin_server_ts=event.origin_server_ts,
+ ),
+ ),
)
- referenced_state_groups = {sg for sg, in txn}
- logger.info(
- "[purge] found %i referenced state groups", len(referenced_state_groups)
- )
+ def _set_push_actions_for_event_and_users_txn(
+ self, txn, events_and_contexts, all_events_and_contexts
+ ):
+ """Handles moving push actions from staging table to main
+ event_push_actions table for all events in `events_and_contexts`.
- logger.info("[purge] removing events from event_to_state_groups")
- txn.execute(
- "DELETE FROM event_to_state_groups "
- "WHERE event_id IN (SELECT event_id from events_to_purge)"
- )
- for event_id, _ in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+ Also ensures that all events in `all_events_and_contexts` are removed
+ from the push action staging area.
- # Delete all remote non-state events
- for table in (
- "events",
- "event_json",
- "event_auth",
- "event_edges",
- "event_forward_extremities",
- "event_reference_hashes",
- "event_search",
- "rejections",
- ):
- logger.info("[purge] removing events from %s", table)
+ Args:
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+ all_events_and_contexts (list[(EventBase, EventContext)]): all
+ events that we were going to persist. This includes events
+ we've already persisted, etc, that wouldn't appear in
+ events_and_context.
+ """
- txn.execute(
- "DELETE FROM %s WHERE event_id IN ("
- " SELECT event_id FROM events_to_purge WHERE should_delete"
- ")" % (table,)
+ sql = """
+ INSERT INTO event_push_actions (
+ room_id, event_id, user_id, actions, stream_ordering,
+ topological_ordering, notif, highlight
)
+ SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
+ FROM event_push_actions_staging
+ WHERE event_id = ?
+ """
- # event_push_actions lacks an index on event_id, and has one on
- # (room_id, event_id) instead.
- for table in ("event_push_actions",):
- logger.info("[purge] removing events from %s", table)
+ if events_and_contexts:
+ txn.executemany(
+ sql,
+ (
+ (
+ event.room_id,
+ event.internal_metadata.stream_ordering,
+ event.depth,
+ event.event_id,
+ )
+ for event, _ in events_and_contexts
+ ),
+ )
- txn.execute(
- "DELETE FROM %s WHERE room_id = ? AND event_id IN ("
- " SELECT event_id FROM events_to_purge WHERE should_delete"
- ")" % (table,),
- (room_id,),
+ for event, _ in events_and_contexts:
+ user_ids = self.db.simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={"event_id": event.event_id},
+ retcol="user_id",
)
- # Mark all state and own events as outliers
- logger.info("[purge] marking remaining events as outliers")
- txn.execute(
- "UPDATE events SET outlier = ?"
- " WHERE event_id IN ("
- " SELECT event_id FROM events_to_purge "
- " WHERE NOT should_delete"
- ")",
- (True,),
+ for uid in user_ids:
+ txn.call_after(
+ self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (event.room_id, uid),
+ )
+
+ # Now we delete the staging area for *all* events that were being
+ # persisted.
+ txn.executemany(
+ "DELETE FROM event_push_actions_staging WHERE event_id = ?",
+ ((event.event_id,) for event, _ in all_events_and_contexts),
)
- # synapse tries to take out an exclusive lock on room_depth whenever it
- # persists events (because upsert), and once we run this update, we
- # will block that for the rest of our transaction.
- #
- # So, let's stick it at the end so that we don't block event
- # persistence.
- #
- # We do this by calculating the minimum depth of the backwards
- # extremities. However, the events in event_backward_extremities
- # are ones we don't have yet so we need to look at the events that
- # point to it via event_edges table.
- txn.execute(
- """
- SELECT COALESCE(MIN(depth), 0)
- FROM event_backward_extremities AS eb
- INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
- INNER JOIN events AS e ON e.event_id = eg.event_id
- WHERE eb.room_id = ?
- """,
+ def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
+ # Sad that we have to blow away the cache for the whole room here
+ txn.call_after(
+ self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id,),
)
- (min_depth,) = txn.fetchone()
-
- logger.info("[purge] updating room_depth to %d", min_depth)
-
txn.execute(
- "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
- (min_depth, room_id),
+ "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
+ (room_id, event_id),
)
- # finally, drop the temp table. this will commit the txn in sqlite,
- # so make sure to keep this actually last.
- txn.execute("DROP TABLE events_to_purge")
-
- logger.info("[purge] done")
-
- return referenced_state_groups
-
- def purge_room(self, room_id):
- """Deletes all record of a room
+ def _store_rejections_txn(self, txn, event_id, reason):
+ self.db.simple_insert_txn(
+ txn,
+ table="rejections",
+ values={
+ "event_id": event_id,
+ "reason": reason,
+ "last_check": self._clock.time_msec(),
+ },
+ )
- Args:
- room_id (str)
+ def _store_event_state_mappings_txn(
+ self, txn, events_and_contexts: Iterable[Tuple[EventBase, EventContext]]
+ ):
+ state_groups = {}
+ for event, context in events_and_contexts:
+ if event.internal_metadata.is_outlier():
+ continue
- Returns:
- Deferred[List[int]]: The list of state groups to delete.
- """
+ # if the event was rejected, just give it the same state as its
+ # predecessor.
+ if context.rejected:
+ state_groups[event.event_id] = context.state_group_before_event
+ continue
- return self.db.runInteraction("purge_room", self._purge_room_txn, room_id)
+ state_groups[event.event_id] = context.state_group
- def _purge_room_txn(self, txn, room_id):
- # First we fetch all the state groups that should be deleted, before
- # we delete that information.
- txn.execute(
- """
- SELECT DISTINCT state_group FROM events
- INNER JOIN event_to_state_groups USING(event_id)
- WHERE events.room_id = ?
- """,
- (room_id,),
+ self.db.simple_insert_many_txn(
+ txn,
+ table="event_to_state_groups",
+ values=[
+ {"state_group": state_group_id, "event_id": event_id}
+ for event_id, state_group_id in iteritems(state_groups)
+ ],
)
- state_groups = [row[0] for row in txn]
-
- # Now we delete tables which lack an index on room_id but have one on event_id
- for table in (
- "event_auth",
- "event_edges",
- "event_push_actions_staging",
- "event_reference_hashes",
- "event_relations",
- "event_to_state_groups",
- "redactions",
- "rejections",
- "state_events",
- ):
- logger.info("[purge] removing %s from %s", room_id, table)
-
- txn.execute(
- """
- DELETE FROM %s WHERE event_id IN (
- SELECT event_id FROM events WHERE room_id=?
- )
- """
- % (table,),
- (room_id,),
+ for event_id, state_group_id in iteritems(state_groups):
+ txn.call_after(
+ self.store._get_state_group_for_event.prefill,
+ (event_id,),
+ state_group_id,
)
- # and finally, the tables with an index on room_id (or no useful index)
- for table in (
- "current_state_events",
- "event_backward_extremities",
- "event_forward_extremities",
- "event_json",
- "event_push_actions",
- "event_search",
- "events",
- "group_rooms",
- "public_room_list_stream",
- "receipts_graph",
- "receipts_linearized",
- "room_aliases",
- "room_depth",
- "room_memberships",
- "room_stats_state",
- "room_stats_current",
- "room_stats_historical",
- "room_stats_earliest_token",
- "rooms",
- "stream_ordering_to_exterm",
- "users_in_public_rooms",
- "users_who_share_private_rooms",
- # no useful index, but let's clear them anyway
- "appservice_room_list",
- "e2e_room_keys",
- "event_push_summary",
- "pusher_throttle",
- "group_summary_rooms",
- "local_invites",
- "room_account_data",
- "room_tags",
- "local_current_membership",
- ):
- logger.info("[purge] removing %s from %s", room_id, table)
- txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
-
- # Other tables we do NOT need to clear out:
- #
- # - blocked_rooms
- # This is important, to make sure that we don't accidentally rejoin a blocked
- # room after it was purged
- #
- # - user_directory
- # This has a room_id column, but it is unused
- #
-
- # Other tables that we might want to consider clearing out include:
- #
- # - event_reports
- # Given that these are intended for abuse management my initial
- # inclination is to leave them in place.
- #
- # - current_state_delta_stream
- # - ex_outlier_stream
- # - room_tags_revisions
- # The problem with these is that they are largeish and there is no room_id
- # index on them. In any case we should be clearing out 'stream' tables
- # periodically anyway (#5888)
-
- # TODO: we could probably usefully do a bunch of cache invalidation here
+ def _update_min_depth_for_room_txn(self, txn, room_id, depth):
+ min_depth = self.store._get_min_depth_interaction(txn, room_id)
- logger.info("[purge] done")
-
- return state_groups
-
- async def is_event_after(self, event_id1, event_id2):
- """Returns True if event_id1 is after event_id2 in the stream
- """
- to_1, so_1 = await self._get_event_ordering(event_id1)
- to_2, so_2 = await self._get_event_ordering(event_id2)
- return (to_1, so_1) > (to_2, so_2)
+ if min_depth is not None and depth >= min_depth:
+ return
- @cachedInlineCallbacks(max_entries=5000)
- def _get_event_ordering(self, event_id):
- res = yield self.db.simple_select_one(
- table="events",
- retcols=["topological_ordering", "stream_ordering"],
- keyvalues={"event_id": event_id},
- allow_none=True,
+ self.db.simple_upsert_txn(
+ txn,
+ table="room_depth",
+ keyvalues={"room_id": room_id},
+ values={"min_depth": depth},
)
- if not res:
- raise SynapseError(404, "Could not find event %s" % (event_id,))
-
- return (int(res["topological_ordering"]), int(res["stream_ordering"]))
-
- def insert_labels_for_event_txn(
- self, txn, event_id, labels, room_id, topological_ordering
- ):
- """Store the mapping between an event's ID and its labels, with one row per
- (event_id, label) tuple.
-
- Args:
- txn (LoggingTransaction): The transaction to execute.
- event_id (str): The event's ID.
- labels (list[str]): A list of text labels.
- room_id (str): The ID of the room the event was sent to.
- topological_ordering (int): The position of the event in the room's topology.
+ def _handle_mult_prev_events(self, txn, events):
"""
- return self.db.simple_insert_many_txn(
- txn=txn,
- table="event_labels",
+ For the given event, update the event edges table and forward and
+ backward extremities tables.
+ """
+ self.db.simple_insert_many_txn(
+ txn,
+ table="event_edges",
values=[
{
- "event_id": event_id,
- "label": label,
- "room_id": room_id,
- "topological_ordering": topological_ordering,
+ "event_id": ev.event_id,
+ "prev_event_id": e_id,
+ "room_id": ev.room_id,
+ "is_state": False,
}
- for label in labels
+ for ev in events
+ for e_id in ev.prev_event_ids()
],
)
- def _insert_event_expiry_txn(self, txn, event_id, expiry_ts):
- """Save the expiry timestamp associated with a given event ID.
-
- Args:
- txn (LoggingTransaction): The database transaction to use.
- event_id (str): The event ID the expiry timestamp is associated with.
- expiry_ts (int): The timestamp at which to expire (delete) the event.
- """
- return self.db.simple_insert_txn(
- txn=txn,
- table="event_expiry",
- values={"event_id": event_id, "expiry_ts": expiry_ts},
- )
-
- @defer.inlineCallbacks
- def expire_event(self, event_id):
- """Retrieve and expire an event that has expired, and delete its associated
- expiry timestamp. If the event can't be retrieved, delete its associated
- timestamp so we don't try to expire it again in the future.
-
- Args:
- event_id (str): The ID of the event to delete.
- """
- # Try to retrieve the event's content from the database or the event cache.
- event = yield self.get_event(event_id)
-
- def delete_expired_event_txn(txn):
- # Delete the expiry timestamp associated with this event from the database.
- self._delete_event_expiry_txn(txn, event_id)
-
- if not event:
- # If we can't find the event, log a warning and delete the expiry date
- # from the database so that we don't try to expire it again in the
- # future.
- logger.warning(
- "Can't expire event %s because we don't have it.", event_id
- )
- return
-
- # Prune the event's dict then convert it to JSON.
- pruned_json = encode_json(
- prune_event_dict(event.room_version, event.get_dict())
- )
-
- # Update the event_json table to replace the event's JSON with the pruned
- # JSON.
- self._censor_event_txn(txn, event.event_id, pruned_json)
-
- # We need to invalidate the event cache entry for this event because we
- # changed its content in the database. We can't call
- # self._invalidate_cache_and_stream because self.get_event_cache isn't of the
- # right type.
- txn.call_after(self._get_event_cache.invalidate, (event.event_id,))
- # Send that invalidation to replication so that other workers also invalidate
- # the event cache.
- self._send_invalidation_to_replication(
- txn, "_get_event_cache", (event.event_id,)
- )
+ self._update_backward_extremeties(txn, events)
- yield self.db.runInteraction("delete_expired_event", delete_expired_event_txn)
+ def _update_backward_extremeties(self, txn, events):
+ """Updates the event_backward_extremities tables based on the new/updated
+ events being persisted.
- def _delete_event_expiry_txn(self, txn, event_id):
- """Delete the expiry timestamp associated with an event ID without deleting the
- actual event.
+ This is called for new events *and* for events that were outliers, but
+ are now being persisted as non-outliers.
- Args:
- txn (LoggingTransaction): The transaction to use to perform the deletion.
- event_id (str): The event ID to delete the associated expiry timestamp of.
+ Forward extremities are handled when we first start persisting the events.
"""
- return self.db.simple_delete_txn(
- txn=txn, table="event_expiry", keyvalues={"event_id": event_id}
+ events_by_room = {}
+ for ev in events:
+ events_by_room.setdefault(ev.room_id, []).append(ev)
+
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
)
- def get_next_event_to_expire(self):
- """Retrieve the entry with the lowest expiry timestamp in the event_expiry
- table, or None if there's no more event to expire.
-
- Returns: Deferred[Optional[Tuple[str, int]]]
- A tuple containing the event ID as its first element and an expiry timestamp
- as its second one, if there's at least one row in the event_expiry table.
- None otherwise.
- """
-
- def get_next_event_to_expire_txn(txn):
- txn.execute(
- """
- SELECT event_id, expiry_ts FROM event_expiry
- ORDER BY expiry_ts ASC LIMIT 1
- """
- )
-
- return txn.fetchone()
-
- return self.db.runInteraction(
- desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
+ txn.executemany(
+ query,
+ [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events
+ for e_id in ev.prev_event_ids()
+ if not ev.internal_metadata.is_outlier()
+ ],
)
-
-AllNewEventsResult = namedtuple(
- "AllNewEventsResult",
- [
- "new_forward_events",
- "new_backfill_events",
- "forward_ex_outliers",
- "backward_ex_outliers",
- ],
-)
+ query = (
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ )
+ txn.executemany(
+ query,
+ [
+ (ev.event_id, ev.room_id)
+ for ev in events
+ if not ev.internal_metadata.is_outlier()
+ ],
+ )
|