diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 552e7ca35b..ecb79c07ef 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,12 +19,22 @@ from twisted.internet import defer, reactor
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
from synapse.events.utils import prune_event
-from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
+from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import (
+ preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
+)
from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
from canonicaljson import encode_canonical_json
-from contextlib import contextmanager
+from collections import deque, namedtuple, OrderedDict
+from functools import wraps
+
+import synapse
+import synapse.metrics
+
import logging
import math
@@ -33,6 +43,10 @@ import ujson as json
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
def encode_json(json_object):
if USE_FROZEN_DICTS:
# ujson doesn't like frozen_dicts
@@ -40,6 +54,7 @@ def encode_json(json_object):
else:
return json.dumps(json_object, ensure_ascii=False)
+
# These values are used in the `enqueus_event` and `_do_fetch` methods to
# control how we batch/bulk fetch events from the database.
# The values are plucked out of thing air to make initial sync run faster
@@ -50,37 +65,225 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
+class _EventPeristenceQueue(object):
+ """Queues up events so that they can be persisted in bulk with only one
+ concurrent transaction per room.
+ """
+
+ _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
+ "events_and_contexts", "current_state", "backfilled", "deferred",
+ ))
+
+ def __init__(self):
+ self._event_persist_queues = {}
+ self._currently_persisting_rooms = set()
+
+ def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
+ """Add events to the queue, with the given persist_event options.
+ """
+ queue = self._event_persist_queues.setdefault(room_id, deque())
+ if queue:
+ end_item = queue[-1]
+ if end_item.current_state or current_state:
+ # We perist events with current_state set to True one at a time
+ pass
+ if end_item.backfilled == backfilled:
+ end_item.events_and_contexts.extend(events_and_contexts)
+ return end_item.deferred.observe()
+
+ deferred = ObservableDeferred(defer.Deferred())
+
+ queue.append(self._EventPersistQueueItem(
+ events_and_contexts=events_and_contexts,
+ backfilled=backfilled,
+ current_state=current_state,
+ deferred=deferred,
+ ))
+
+ return deferred.observe()
+
+ def handle_queue(self, room_id, per_item_callback):
+ """Attempts to handle the queue for a room if not already being handled.
+
+ The given callback will be invoked with for each item in the queue,1
+ of type _EventPersistQueueItem. The per_item_callback will continuously
+ be called with new items, unless the queue becomnes empty. The return
+ value of the function will be given to the deferreds waiting on the item,
+ exceptions will be passed to the deferres as well.
+
+ This function should therefore be called whenever anything is added
+ to the queue.
+
+ If another callback is currently handling the queue then it will not be
+ invoked.
+ """
+
+ if room_id in self._currently_persisting_rooms:
+ return
+
+ self._currently_persisting_rooms.add(room_id)
+
+ @defer.inlineCallbacks
+ def handle_queue_loop():
+ try:
+ queue = self._get_drainining_queue(room_id)
+ for item in queue:
+ try:
+ ret = yield per_item_callback(item)
+ item.deferred.callback(ret)
+ except Exception as e:
+ item.deferred.errback(e)
+ finally:
+ queue = self._event_persist_queues.pop(room_id, None)
+ if queue:
+ self._event_persist_queues[room_id] = queue
+ self._currently_persisting_rooms.discard(room_id)
+
+ preserve_fn(handle_queue_loop)()
+
+ def _get_drainining_queue(self, room_id):
+ queue = self._event_persist_queues.setdefault(room_id, deque())
+
+ try:
+ while True:
+ yield queue.popleft()
+ except IndexError:
+ # Queue has been drained.
+ pass
+
+
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
+def _retry_on_integrity_error(func):
+ """Wraps a database function so that it gets retried on IntegrityError,
+ with `delete_existing=True` passed in.
+
+ Args:
+ func: function that returns a Deferred and accepts a `delete_existing` arg
+ """
+ @wraps(func)
+ @defer.inlineCallbacks
+ def f(self, *args, **kwargs):
+ try:
+ res = yield func(self, *args, **kwargs)
+ except self.database_engine.module.IntegrityError:
+ logger.exception("IntegrityError, retrying.")
+ res = yield func(self, *args, delete_existing=True, **kwargs)
+ defer.returnValue(res)
+
+ return f
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+ EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
def __init__(self, hs):
super(EventsStore, self).__init__(hs)
+ self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
+ self.register_background_update_handler(
+ self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+ self._background_reindex_fields_sender,
+ )
+
+ self.register_background_index_update(
+ "event_contains_url_index",
+ index_name="event_contains_url_index",
+ table="events",
+ columns=["room_id", "topological_ordering", "stream_ordering"],
+ where_clause="contains_url = true AND outlier = false",
+ )
+
+ self._event_persist_queue = _EventPeristenceQueue()
+
+ def persist_events(self, events_and_contexts, backfilled=False):
+ """
+ Write events to the database
+ Args:
+ events_and_contexts: list of tuples of (event, context)
+ backfilled: ?
+ """
+ partitioned = {}
+ for event, ctx in events_and_contexts:
+ partitioned.setdefault(event.room_id, []).append((event, ctx))
+
+ deferreds = []
+ for room_id, evs_ctxs in partitioned.items():
+ d = preserve_fn(self._event_persist_queue.add_to_queue)(
+ room_id, evs_ctxs,
+ backfilled=backfilled,
+ current_state=None,
+ )
+ deferreds.append(d)
+
+ for room_id in partitioned.keys():
+ self._maybe_start_persisting(room_id)
+
+ return preserve_context_over_deferred(
+ defer.gatherResults(deferreds, consumeErrors=True)
+ )
+
+ @defer.inlineCallbacks
+ @log_function
+ def persist_event(self, event, context, current_state=None, backfilled=False):
+ deferred = self._event_persist_queue.add_to_queue(
+ event.room_id, [(event, context)],
+ backfilled=backfilled,
+ current_state=current_state,
+ )
+
+ self._maybe_start_persisting(event.room_id)
+
+ yield preserve_context_over_deferred(deferred)
+
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
+ defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+
+ def _maybe_start_persisting(self, room_id):
+ @defer.inlineCallbacks
+ def persisting_queue(item):
+ if item.current_state:
+ for event, context in item.events_and_contexts:
+ # There should only ever be one item in
+ # events_and_contexts when current_state is
+ # not None
+ yield self._persist_event(
+ event, context,
+ current_state=item.current_state,
+ backfilled=item.backfilled,
+ )
+ else:
+ yield self._persist_events(
+ item.events_and_contexts,
+ backfilled=item.backfilled,
+ )
+ self._event_persist_queue.handle_queue(room_id, persisting_queue)
+
+ @_retry_on_integrity_error
@defer.inlineCallbacks
- def persist_events(self, events_and_contexts, backfilled=False,
- is_new_state=True):
+ def _persist_events(self, events_and_contexts, backfilled=False,
+ delete_existing=False):
if not events_and_contexts:
return
if backfilled:
- start = self.min_stream_token - 1
- self.min_stream_token -= len(events_and_contexts) + 1
- stream_orderings = range(start, self.min_stream_token, -1)
-
- @contextmanager
- def stream_ordering_manager():
- yield stream_orderings
- stream_ordering_manager = stream_ordering_manager()
+ stream_ordering_manager = self._backfill_id_gen.get_next_mult(
+ len(events_and_contexts)
+ )
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(
len(events_and_contexts)
)
with stream_ordering_manager as stream_orderings:
- for (event, _), stream in zip(events_and_contexts, stream_orderings):
+ for (event, context), stream, in zip(
+ events_and_contexts, stream_orderings
+ ):
event.internal_metadata.stream_ordering = stream
chunks = [
@@ -96,44 +299,31 @@ class EventsStore(SQLBaseStore):
self._persist_events_txn,
events_and_contexts=chunk,
backfilled=backfilled,
- is_new_state=is_new_state,
+ delete_existing=delete_existing,
)
+ persist_event_counter.inc_by(len(chunk))
+ @_retry_on_integrity_error
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, backfilled=False,
- is_new_state=True, current_state=None):
- stream_ordering = None
- if backfilled:
- self.min_stream_token -= 1
- stream_ordering = self.min_stream_token
-
- if stream_ordering is None:
- stream_ordering_manager = self._stream_id_gen.get_next()
- else:
- @contextmanager
- def stream_ordering_manager():
- yield stream_ordering
- stream_ordering_manager = stream_ordering_manager()
-
+ def _persist_event(self, event, context, current_state=None, backfilled=False,
+ delete_existing=False):
try:
- with stream_ordering_manager as stream_ordering:
+ with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
- backfilled=backfilled,
- is_new_state=is_new_state,
current_state=current_state,
+ backfilled=backfilled,
+ delete_existing=delete_existing,
)
+ persist_event_counter.inc()
except _RollbackButIsFineException:
pass
- max_persisted_id = yield self._stream_id_gen.get_max_token()
- defer.returnValue((stream_ordering, max_persisted_id))
-
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
@@ -161,21 +351,53 @@ class EventsStore(SQLBaseStore):
)
if not events and not allow_none:
- raise RuntimeError("Could not find event %s" % (event_id,))
+ raise SynapseError(404, "Could not find event %s" % (event_id,))
defer.returnValue(events[0] if events else None)
+ @defer.inlineCallbacks
+ def get_events(self, event_ids, check_redacted=True,
+ get_prev_content=False, allow_rejected=False):
+ """Get events from the database
+
+ Args:
+ event_ids (list): The event_ids of the events to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+
+ Returns:
+ Deferred : Dict from event_id to event.
+ """
+ events = yield self._get_events(
+ event_ids,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
+
+ defer.returnValue({e.event_id: e for e in events})
+
@log_function
- def _persist_event_txn(self, txn, event, context, backfilled,
- is_new_state=True, current_state=None):
+ def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
+ delete_existing=False):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
- txn.call_after(self.get_current_state_for_key.invalidate_all)
+ txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
- txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
- txn.call_after(self.get_room_name_and_aliases, event.room_id)
+
+ # Add an entry to the current_state_resets table to record the point
+ # where we clobbered the current state
+ stream_order = event.internal_metadata.stream_ordering
+ self._simple_insert_txn(
+ txn,
+ table="current_state_resets",
+ values={"event_stream_ordering": stream_order}
+ )
self._simple_delete_txn(
txn,
@@ -199,12 +421,38 @@ class EventsStore(SQLBaseStore):
txn,
[(event, context)],
backfilled=backfilled,
- is_new_state=is_new_state,
+ delete_existing=delete_existing,
)
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- is_new_state=True):
+ delete_existing=False):
+ """Insert some number of room events into the necessary database tables.
+
+ Rejected events are only inserted into the events table, the events_json table,
+ and the rejections table. Things reading from those table will need to check
+ whether the event was rejected.
+
+ If delete_existing is True then existing events will be purged from the
+ database before insertion. This is useful when retrying due to IntegrityError.
+ """
+ # Ensure that we don't have the same event twice.
+ # Pick the earliest non-outlier if there is one, else the earliest one.
+ new_events_and_contexts = OrderedDict()
+ for event, context in events_and_contexts:
+ prev_event_context = new_events_and_contexts.get(event.event_id)
+ if prev_event_context:
+ if not event.internal_metadata.is_outlier():
+ if prev_event_context[0].internal_metadata.is_outlier():
+ # To ensure correct ordering we pop, as OrderedDict is
+ # ordered by first insertion.
+ new_events_and_contexts.pop(event.event_id, None)
+ new_events_and_contexts[event.event_id] = (event, context)
+ else:
+ new_events_and_contexts[event.event_id] = (event, context)
+
+ events_and_contexts = new_events_and_contexts.values()
+
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -215,21 +463,11 @@ class EventsStore(SQLBaseStore):
event.room_id, event.internal_metadata.stream_ordering,
)
- if not event.internal_metadata.is_outlier():
+ if not event.internal_metadata.is_outlier() and not context.rejected:
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth)
)
- if context.push_actions:
- self._set_push_actions_for_event_and_users_txn(
- txn, event, context.push_actions
- )
-
- if event.type == EventTypes.Redaction and event.redacts is not None:
- self._remove_push_actions_for_event_id_txn(
- txn, event.room_id, event.redacts
- )
-
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
@@ -239,30 +477,21 @@ class EventsStore(SQLBaseStore):
),
[event.event_id for event, _ in events_and_contexts]
)
+
have_persisted = {
event_id: outlier
for event_id, outlier in txn.fetchall()
}
- event_map = {}
to_remove = set()
for event, context in events_and_contexts:
- # Handle the case of the list including the same event multiple
- # times. The tricky thing here is when they differ by whether
- # they are an outlier.
- if event.event_id in event_map:
- other = event_map[event.event_id]
-
- if not other.internal_metadata.is_outlier():
+ if context.rejected:
+ # If the event is rejected then we don't care if the event
+ # was an outlier or not.
+ if event.event_id in have_persisted:
+ # If we have already seen the event then ignore it.
to_remove.add(event)
- continue
- elif not event.internal_metadata.is_outlier():
- to_remove.add(event)
- continue
- else:
- to_remove.add(other)
-
- event_map[event.event_id] = event
+ continue
if event.event_id not in have_persisted:
continue
@@ -271,9 +500,17 @@ class EventsStore(SQLBaseStore):
outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
- self._store_state_groups_txn(
- txn, event, context,
- )
+ # We received a copy of an event that we had already stored as
+ # an outlier in the database. We now have some state at that
+ # so we need to update the state_groups table with that state.
+
+ # insert into the state_group, state_groups_state and
+ # event_to_state_groups tables.
+ try:
+ self._store_mult_state_groups_txn(txn, ((event, context),))
+ except Exception:
+ logger.exception("")
+ raise
metadata_json = encode_json(
event.internal_metadata.get_dict()
@@ -288,6 +525,20 @@ class EventsStore(SQLBaseStore):
(metadata_json, event.event_id,)
)
+ # Add an entry to the ex_outlier_stream table to replicate the
+ # change in outlier status to our workers.
+ stream_order = event.internal_metadata.stream_ordering
+ state_group_id = context.state_group
+ self._simple_insert_txn(
+ txn,
+ table="ex_outlier_stream",
+ values={
+ "event_stream_ordering": stream_order,
+ "event_id": event.event_id,
+ "state_group": state_group_id,
+ }
+ )
+
sql = (
"UPDATE events SET outlier = ?"
" WHERE event_id = ?"
@@ -297,49 +548,21 @@ class EventsStore(SQLBaseStore):
(False, event.event_id,)
)
+ # Update the event_backward_extremities table now that this
+ # event isn't an outlier any more.
self._update_extremeties(txn, [event])
- events_and_contexts = filter(
- lambda ec: ec[0] not in to_remove,
- events_and_contexts
- )
+ events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0] not in to_remove
+ ]
if not events_and_contexts:
+ # Make sure we don't pass an empty list to functions that expect to
+ # be storing at least one element.
return
- self._store_mult_state_groups_txn(txn, [
- (event, context)
- for event, context in events_and_contexts
- if not event.internal_metadata.is_outlier()
- ])
-
- self._handle_mult_prev_events(
- txn,
- events=[event for event, _ in events_and_contexts],
- )
-
- for event, _ in events_and_contexts:
- if event.type == EventTypes.Name:
- self._store_room_name_txn(txn, event)
- elif event.type == EventTypes.Topic:
- self._store_room_topic_txn(txn, event)
- elif event.type == EventTypes.Message:
- self._store_room_message_txn(txn, event)
- elif event.type == EventTypes.Redaction:
- self._store_redaction(txn, event)
- elif event.type == EventTypes.RoomHistoryVisibility:
- self._store_history_visibility_txn(txn, event)
- elif event.type == EventTypes.GuestAccess:
- self._store_guest_access_txn(txn, event)
-
- self._store_room_members_txn(
- txn,
- [
- event
- for event, _ in events_and_contexts
- if event.type == EventTypes.Member
- ]
- )
+ # From this point onwards the events are only events that we haven't
+ # seen before.
def event_dict(event):
return {
@@ -351,6 +574,43 @@ class EventsStore(SQLBaseStore):
]
}
+ if delete_existing:
+ # For paranoia reasons, we go and delete all the existing entries
+ # for these events so we can reinsert them.
+ # This gets around any problems with some tables already having
+ # entries.
+
+ logger.info("Deleting existing")
+
+ for table in (
+ "events",
+ "event_auth",
+ "event_json",
+ "event_content_hashes",
+ "event_destinations",
+ "event_edge_hashes",
+ "event_edges",
+ "event_forward_extremities",
+ "event_push_actions",
+ "event_reference_hashes",
+ "event_search",
+ "event_signatures",
+ "event_to_state_groups",
+ "guest_access",
+ "history_visibility",
+ "local_invites",
+ "room_names",
+ "state_events",
+ "rejections",
+ "redactions",
+ "room_memberships",
+ "topics"
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
+ [(ev.event_id,) for ev, _ in events_and_contexts]
+ )
+
self._simple_insert_many_txn(
txn,
table="event_json",
@@ -382,15 +642,52 @@ class EventsStore(SQLBaseStore):
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
+ "received_ts": self._clock.time_msec(),
+ "sender": event.sender,
+ "contains_url": (
+ "url" in event.content
+ and isinstance(event.content["url"], basestring)
+ ),
}
for event, _ in events_and_contexts
],
)
- if context.rejected:
- self._store_rejections_txn(
- txn, event.event_id, context.rejected
- )
+ # Remove the rejected events from the list now that we've added them
+ # to the events table and the events_json table.
+ to_remove = set()
+ for event, context in events_and_contexts:
+ if context.rejected:
+ # Insert the event_id into the rejections table
+ self._store_rejections_txn(
+ txn, event.event_id, context.rejected
+ )
+ to_remove.add(event)
+
+ events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0] not in to_remove
+ ]
+
+ if not events_and_contexts:
+ # Make sure we don't pass an empty list to functions that expect to
+ # be storing at least one element.
+ return
+
+ # From this point onwards the events are only ones that weren't rejected.
+
+ for event, context in events_and_contexts:
+ # Insert all the push actions into the event_push_actions table.
+ if context.push_actions:
+ self._set_push_actions_for_event_and_users_txn(
+ txn, event, context.push_actions
+ )
+
+ if event.type == EventTypes.Redaction and event.redacts is not None:
+ # Remove the entries in the event_push_actions table for the
+ # redacted event.
+ self._remove_push_actions_for_event_id_txn(
+ txn, event.room_id, event.redacts
+ )
self._simple_insert_many_txn(
txn,
@@ -406,14 +703,56 @@ class EventsStore(SQLBaseStore):
],
)
+ # Insert into the state_groups, state_groups_state, and
+ # event_to_state_groups tables.
+ self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+ # Update the event_forward_extremities, event_backward_extremities and
+ # event_edges tables.
+ self._handle_mult_prev_events(
+ txn,
+ events=[event for event, _ in events_and_contexts],
+ )
+
+ for event, _ in events_and_contexts:
+ if event.type == EventTypes.Name:
+ # Insert into the room_names and event_search tables.
+ self._store_room_name_txn(txn, event)
+ elif event.type == EventTypes.Topic:
+ # Insert into the topics table and event_search table.
+ self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Message:
+ # Insert into the event_search table.
+ self._store_room_message_txn(txn, event)
+ elif event.type == EventTypes.Redaction:
+ # Insert into the redactions table.
+ self._store_redaction(txn, event)
+ elif event.type == EventTypes.RoomHistoryVisibility:
+ # Insert into the event_search table.
+ self._store_history_visibility_txn(txn, event)
+ elif event.type == EventTypes.GuestAccess:
+ # Insert into the event_search table.
+ self._store_guest_access_txn(txn, event)
+
+ # Insert into the room_memberships table.
+ self._store_room_members_txn(
+ txn,
+ [
+ event
+ for event, _ in events_and_contexts
+ if event.type == EventTypes.Member
+ ],
+ backfilled=backfilled,
+ )
+
+ # Insert event_reference_hashes table.
self._store_event_reference_hashes_txn(
txn, [event for event, _ in events_and_contexts]
)
- state_events_and_contexts = filter(
- lambda i: i[0].is_state(),
- events_and_contexts,
- )
+ state_events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0].is_state()
+ ]
state_values = []
for event, context in state_events_and_contexts:
@@ -451,35 +790,78 @@ class EventsStore(SQLBaseStore):
],
)
- if is_new_state:
- for event, _ in state_events_and_contexts:
- if not context.rejected:
- txn.call_after(
- self.get_current_state_for_key.invalidate,
- (event.room_id, event.type, event.state_key,)
- )
+ # Prefill the event cache
+ self._add_to_cache(txn, events_and_contexts)
- if event.type in [EventTypes.Name, EventTypes.Aliases]:
- txn.call_after(
- self.get_room_name_and_aliases.invalidate,
- (event.room_id,)
- )
+ if backfilled:
+ # Backfilled events come before the current state so we don't need
+ # to update the current state table
+ return
- self._simple_upsert_txn(
- txn,
- "current_state_events",
- keyvalues={
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- values={
- "event_id": event.event_id,
- }
- )
+ for event, _ in state_events_and_contexts:
+ if event.internal_metadata.is_outlier():
+ # Outlier events shouldn't clobber the current state.
+ continue
+
+ txn.call_after(
+ self._get_current_state_for_key.invalidate,
+ (event.room_id, event.type, event.state_key,)
+ )
+
+ self._simple_upsert_txn(
+ txn,
+ "current_state_events",
+ keyvalues={
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ }
+ )
return
+ def _add_to_cache(self, txn, events_and_contexts):
+ to_prefill = []
+
+ rows = []
+ N = 200
+ for i in range(0, len(events_and_contexts), N):
+ ev_map = {
+ e[0].event_id: e[0]
+ for e in events_and_contexts[i:i + N]
+ }
+ if not ev_map:
+ break
+
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE e.event_id IN (%s)"
+ ) % (",".join(["?"] * len(ev_map)),)
+
+ txn.execute(sql, ev_map.keys())
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(
+ event=event,
+ redacted_event=None,
+ ))
+
+ def prefill():
+ for cache_entry in to_prefill:
+ self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+ txn.call_after(prefill)
+
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
@@ -488,6 +870,22 @@ class EventsStore(SQLBaseStore):
(event.event_id, event.redacts)
)
+ @defer.inlineCallbacks
+ def have_events_in_timeline(self, event_ids):
+ """Given a list of event ids, check if we have already processed and
+ stored them as non outliers.
+ """
+ rows = yield self._simple_select_many_batch(
+ table="events",
+ retcols=("event_id",),
+ column="event_id",
+ iterable=list(event_ids),
+ keyvalues={"outlier": False},
+ desc="have_events_in_timeline",
+ )
+
+ defer.returnValue(set(r["event_id"] for r in rows))
+
def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
@@ -526,105 +924,68 @@ class EventsStore(SQLBaseStore):
if not event_ids:
defer.returnValue([])
- event_map = self._get_events_from_cache(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- missing_events_ids = [e for e in event_ids if e not in event_map]
-
- if not missing_events_ids:
- defer.returnValue([
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ])
-
- missing_events = yield self._enqueue_events(
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- event_map.update(missing_events)
-
- defer.returnValue([
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ])
+ event_id_list = event_ids
+ event_ids = set(event_ids)
- def _get_events_txn(self, txn, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- return []
-
- event_map = self._get_events_from_cache(
+ event_entry_map = self._get_events_from_cache(
event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
- missing_events_ids = [e for e in event_ids if e not in event_map]
+ missing_events_ids = [e for e in event_ids if e not in event_entry_map]
- if not missing_events_ids:
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ if missing_events_ids:
+ missing_events = yield self._enqueue_events(
+ missing_events_ids,
+ check_redacted=check_redacted,
+ allow_rejected=allow_rejected,
+ )
- missing_events = self._fetch_events_txn(
- txn,
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ event_entry_map.update(missing_events)
- event_map.update(missing_events)
+ events = []
+ for event_id in event_id_list:
+ entry = event_entry_map.get(event_id, None)
+ if not entry:
+ continue
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
+ if allow_rejected or not entry.event.rejected_reason:
+ if check_redacted and entry.redacted_event:
+ event = entry.redacted_event
+ else:
+ event = entry.event
- def _invalidate_get_event_cache(self, event_id):
- for check_redacted in (False, True):
- for get_prev_content in (False, True):
- self._get_event_cache.invalidate(
- (event_id, check_redacted, get_prev_content)
- )
+ events.append(event)
- def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ if get_prev_content:
+ if "replaces_state" in event.unsigned:
+ prev = yield self.get_event(
+ event.unsigned["replaces_state"],
+ get_prev_content=False,
+ allow_none=True,
+ )
+ if prev:
+ event.unsigned = dict(event.unsigned)
+ event.unsigned["prev_content"] = prev.content
+ event.unsigned["prev_sender"] = prev.sender
- events = self._get_events_txn(
- txn, [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ defer.returnValue(events)
- return events[0] if events else None
+ def _invalidate_get_event_cache(self, event_id):
+ self._get_event_cache.invalidate((event_id,))
- def _get_events_from_cache(self, events, check_redacted, get_prev_content,
- allow_rejected):
+ def _get_events_from_cache(self, events, allow_rejected):
event_map = {}
for event_id in events:
- try:
- ret = self._get_event_cache.get(
- (event_id, check_redacted, get_prev_content,)
- )
+ ret = self._get_event_cache.get((event_id,), None)
+ if not ret:
+ continue
- if allow_rejected or not ret.rejected_reason:
- event_map[event_id] = ret
- else:
- event_map[event_id] = None
- except KeyError:
- pass
+ if allow_rejected or not ret.event.rejected_reason:
+ event_map[event_id] = ret
+ else:
+ event_map[event_id] = None
return event_map
@@ -695,8 +1056,7 @@ class EventsStore(SQLBaseStore):
reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
+ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
@@ -730,21 +1090,19 @@ class EventsStore(SQLBaseStore):
if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]
- res = yield defer.gatherResults(
+ res = yield preserve_context_over_deferred(defer.gatherResults(
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
rejected_reason=row["rejects"],
)
for row in rows
],
consumeErrors=True
- )
+ ))
defer.returnValue({
- e.event_id: e
+ e.event.event_id: e
for e in res if e
})
@@ -774,157 +1132,60 @@ class EventsStore(SQLBaseStore):
return rows
- def _fetch_events_txn(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not events:
- return {}
-
- rows = self._fetch_event_rows(
- txn, events,
- )
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = [
- self._get_event_from_row_txn(
- txn,
- row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- rejected_reason=row["rejects"],
- )
- for row in rows
- ]
-
- return {
- r.event_id: r
- for r in res
- }
-
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = yield self._simple_select_one_onecol(
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- desc="_get_event_from_row",
- )
-
- ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- if check_redacted and redacted:
- ev = prune_event(ev)
-
- redaction_id = yield self._simple_select_one_onecol(
- table="redactions",
- keyvalues={"redacts": ev.event_id},
- retcol="event_id",
- desc="_get_event_from_row",
- )
-
- ev.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = yield self.get_event(
- redaction_id,
- check_redacted=False,
- allow_none=True,
- )
-
- if because:
- # It's fine to do add the event directly, since get_pdu_json
- # will serialise this field correctly
- ev.unsigned["redacted_because"] = because
-
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = yield self.get_event(
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- defer.returnValue(ev)
-
- def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
- rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
+ with Measure(self._clock, "_get_event_from_row"):
+ d = json.loads(js)
+ internal_metadata = json.loads(internal_metadata)
+
+ if rejected_reason:
+ rejected_reason = yield self._simple_select_one_onecol(
+ table="rejections",
+ keyvalues={"event_id": rejected_reason},
+ retcol="reason",
+ desc="_get_event_from_row_rejected_reason",
+ )
- if rejected_reason:
- rejected_reason = self._simple_select_one_onecol_txn(
- txn,
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
+ original_ev = FrozenEvent(
+ d,
+ internal_metadata_dict=internal_metadata,
+ rejected_reason=rejected_reason,
)
- ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- if check_redacted and redacted:
- ev = prune_event(ev)
+ redacted_event = None
+ if redacted:
+ redacted_event = prune_event(original_ev)
- redaction_id = self._simple_select_one_onecol_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": ev.event_id},
- retcol="event_id",
- )
+ redaction_id = yield self._simple_select_one_onecol(
+ table="redactions",
+ keyvalues={"redacts": redacted_event.event_id},
+ retcol="event_id",
+ desc="_get_event_from_row_redactions",
+ )
- ev.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
+ redacted_event.unsigned["redacted_by"] = redaction_id
+ # Get the redaction event.
- because = self._get_event_txn(
- txn,
- redaction_id,
- check_redacted=False
- )
+ because = yield self.get_event(
+ redaction_id,
+ check_redacted=False,
+ allow_none=True,
+ )
- if because:
- ev.unsigned["redacted_because"] = because
+ if because:
+ # It's fine to do add the event directly, since get_pdu_json
+ # will serialise this field correctly
+ redacted_event.unsigned["redacted_because"] = because
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = self._get_event_txn(
- txn,
- ev.unsigned["replaces_state"],
- get_prev_content=False,
+ cache_entry = _EventCacheEntry(
+ event=original_ev,
+ redacted_event=redacted_event,
)
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- return ev
+ self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
- def _parse_events_txn(self, txn, rows):
- event_ids = [r["event_id"] for r in rows]
-
- return self._get_events_txn(txn, event_ids)
+ defer.returnValue(cache_entry)
@defer.inlineCallbacks
def count_daily_messages(self):
@@ -998,16 +1259,17 @@ class EventsStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
- def _background_reindex_origin_server_ts(self, progress, batch_size):
+ def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
- def reindex_search_txn(txn):
+ def reindex_txn(txn):
sql = (
- "SELECT stream_ordering, event_id FROM events"
+ "SELECT stream_ordering, event_id, json FROM events"
+ " INNER JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" ORDER BY stream_ordering DESC"
" LIMIT ?"
@@ -1020,28 +1282,31 @@ class EventsStore(SQLBaseStore):
return 0
min_stream_id = rows[-1][0]
- event_ids = [row[1] for row in rows]
-
- events = self._get_events_txn(txn, event_ids)
- rows = []
- for event in events:
+ update_rows = []
+ for row in rows:
try:
- event_id = event.event_id
- origin_server_ts = event.origin_server_ts
+ event_id = row[1]
+ event_json = json.loads(row[2])
+ sender = event_json["sender"]
+ content = event_json["content"]
+
+ contains_url = "url" in content
+ if contains_url:
+ contains_url &= isinstance(content["url"], basestring)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
continue
- rows.append((origin_server_ts, event_id))
+ update_rows.append((sender, contains_url, event_id))
sql = (
- "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+ "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
)
- for index in range(0, len(rows), INSERT_CLUMP_SIZE):
- clump = rows[index:index + INSERT_CLUMP_SIZE]
+ for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
+ clump = update_rows[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = {
@@ -1051,12 +1316,94 @@ class EventsStore(SQLBaseStore):
}
self._background_update_progress_txn(
- txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+ txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
)
return len(rows)
result = yield self.runInteraction(
+ self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _background_reindex_origin_server_ts(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def reindex_search_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+ event_ids = [row[1] for row in rows]
+
+ rows_to_update = []
+
+ chunks = [
+ event_ids[i:i + 100]
+ for i in xrange(0, len(event_ids), 100)
+ ]
+ for chunk in chunks:
+ ev_rows = self._simple_select_many_txn(
+ txn,
+ table="event_json",
+ column="event_id",
+ iterable=chunk,
+ retcols=["event_id", "json"],
+ keyvalues={},
+ )
+
+ for row in ev_rows:
+ event_id = row["event_id"]
+ event_json = json.loads(row["json"])
+ try:
+ origin_server_ts = event_json["origin_server_ts"]
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ rows_to_update.append((origin_server_ts, event_id))
+
+ sql = (
+ "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+ )
+
+ for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
+ clump = rows_to_update[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(rows_to_update)
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+ )
+
+ return len(rows_to_update)
+
+ result = yield self.runInteraction(
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
)
@@ -1067,45 +1414,324 @@ class EventsStore(SQLBaseStore):
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
-
- # TODO: Fix race with the persit_event txn by using one of the
- # stream id managers
- return -self.min_stream_token
+ return -self._backfill_id_gen.get_current_token()
def get_all_new_events(self, last_backfill_id, last_forward_id,
current_backfill_id, current_forward_id, limit):
"""Get all the new events that have arrived at the server either as
new events or as backfilled events"""
+ have_backfill_events = last_backfill_id != current_backfill_id
+ have_forward_events = last_forward_id != current_forward_id
+
+ if not have_backfill_events and not have_forward_events:
+ return defer.succeed(AllNewEventsResult([], [], [], [], []))
+
def get_all_new_events_txn(txn):
sql = (
- "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+ "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
" FROM events as e"
" JOIN event_json as ej"
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " LEFT JOIN event_to_state_groups as eg"
+ " ON e.event_id = eg.event_id"
" WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)
- if last_forward_id != current_forward_id:
+ if have_forward_events:
txn.execute(sql, (last_forward_id, current_forward_id, limit))
new_forward_events = txn.fetchall()
+
+ if len(new_forward_events) == limit:
+ upper_bound = new_forward_events[-1][0]
+ else:
+ upper_bound = current_forward_id
+
+ sql = (
+ "SELECT event_stream_ordering FROM current_state_resets"
+ " WHERE ? < event_stream_ordering"
+ " AND event_stream_ordering <= ?"
+ " ORDER BY event_stream_ordering ASC"
+ )
+ txn.execute(sql, (last_forward_id, upper_bound))
+ state_resets = txn.fetchall()
+
+ sql = (
+ "SELECT event_stream_ordering, event_id, state_group"
+ " FROM ex_outlier_stream"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (last_forward_id, upper_bound))
+ forward_ex_outliers = txn.fetchall()
else:
new_forward_events = []
+ state_resets = []
+ forward_ex_outliers = []
sql = (
- "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+ "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
+ " eg.state_group"
" FROM events as e"
" JOIN event_json as ej"
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " LEFT JOIN event_to_state_groups as eg"
+ " ON e.event_id = eg.event_id"
" WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
" ORDER BY e.stream_ordering DESC"
" LIMIT ?"
)
- if last_backfill_id != current_backfill_id:
+ if have_backfill_events:
txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
new_backfill_events = txn.fetchall()
+
+ if len(new_backfill_events) == limit:
+ upper_bound = new_backfill_events[-1][0]
+ else:
+ upper_bound = current_backfill_id
+
+ sql = (
+ "SELECT -event_stream_ordering, event_id, state_group"
+ " FROM ex_outlier_stream"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (-last_backfill_id, -upper_bound))
+ backward_ex_outliers = txn.fetchall()
else:
new_backfill_events = []
+ backward_ex_outliers = []
- return (new_forward_events, new_backfill_events)
+ return AllNewEventsResult(
+ new_forward_events, new_backfill_events,
+ forward_ex_outliers, backward_ex_outliers,
+ state_resets,
+ )
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+
+ def delete_old_state(self, room_id, topological_ordering):
+ return self.runInteraction(
+ "delete_old_state",
+ self._delete_old_state_txn, room_id, topological_ordering
+ )
+
+ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
+ """Deletes old room state
+ """
+
+ # Tables that should be pruned:
+ # event_auth
+ # event_backward_extremities
+ # event_content_hashes
+ # event_destinations
+ # event_edge_hashes
+ # event_edges
+ # event_forward_extremities
+ # event_json
+ # event_push_actions
+ # event_reference_hashes
+ # event_search
+ # event_signatures
+ # event_to_state_groups
+ # events
+ # rejections
+ # room_depth
+ # state_groups
+ # state_groups_state
+
+ # First ensure that we're not about to delete all the forward extremeties
+ txn.execute(
+ "SELECT e.event_id, e.depth FROM events as e "
+ "INNER JOIN event_forward_extremities as f "
+ "ON e.event_id = f.event_id "
+ "AND e.room_id = f.room_id "
+ "WHERE f.room_id = ?",
+ (room_id,)
+ )
+ rows = txn.fetchall()
+ max_depth = max(row[0] for row in rows)
+
+ if max_depth <= topological_ordering:
+ # We need to ensure we don't delete all the events from the datanase
+ # otherwise we wouldn't be able to send any events (due to not
+ # having any backwards extremeties)
+ raise SynapseError(
+ 400, "topological_ordering is greater than forward extremeties"
+ )
+
+ txn.execute(
+ "SELECT event_id, state_key FROM events"
+ " LEFT JOIN state_events USING (room_id, event_id)"
+ " WHERE room_id = ? AND topological_ordering < ?",
+ (room_id, topological_ordering,)
+ )
+ event_rows = txn.fetchall()
+
+ for event_id, state_key in event_rows:
+ txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+
+ # We calculate the new entries for the backward extremeties by finding
+ # all events that point to events that are to be purged
+ txn.execute(
+ "SELECT DISTINCT e.event_id FROM events as e"
+ " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
+ " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
+ " WHERE e.room_id = ? AND e.topological_ordering < ?"
+ " AND e2.topological_ordering >= ?",
+ (room_id, topological_ordering, topological_ordering)
+ )
+ new_backwards_extrems = txn.fetchall()
+
+ txn.execute(
+ "DELETE FROM event_backward_extremities WHERE room_id = ?",
+ (room_id,)
+ )
+
+ # Update backward extremeties
+ txn.executemany(
+ "INSERT INTO event_backward_extremities (room_id, event_id)"
+ " VALUES (?, ?)",
+ [
+ (room_id, event_id) for event_id, in new_backwards_extrems
+ ]
+ )
+
+ # Get all state groups that are only referenced by events that are
+ # to be deleted.
+ txn.execute(
+ "SELECT state_group FROM event_to_state_groups"
+ " INNER JOIN events USING (event_id)"
+ " WHERE state_group IN ("
+ " SELECT DISTINCT state_group FROM events"
+ " INNER JOIN event_to_state_groups USING (event_id)"
+ " WHERE room_id = ? AND topological_ordering < ?"
+ " )"
+ " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
+ (room_id, topological_ordering, topological_ordering)
+ )
+
+ state_rows = txn.fetchall()
+ state_groups_to_delete = [sg for sg, in state_rows]
+
+ # Now we get all the state groups that rely on these state groups
+ new_state_edges = []
+ chunks = [
+ state_groups_to_delete[i:i + 100]
+ for i in xrange(0, len(state_groups_to_delete), 100)
+ ]
+ for chunk in chunks:
+ rows = self._simple_select_many_txn(
+ txn,
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=chunk,
+ retcols=["state_group"],
+ keyvalues={},
+ )
+ new_state_edges.extend(row["state_group"] for row in rows)
+
+ # Now we turn the state groups that reference to-be-deleted state groups
+ # to non delta versions.
+ for new_state_edge in new_state_edges:
+ curr_state = self._get_state_groups_from_groups_txn(
+ txn, [new_state_edge], types=None
+ )
+ curr_state = curr_state[new_state_edge]
+
+ self._simple_delete_txn(
+ txn,
+ table="state_groups_state",
+ keyvalues={
+ "state_group": new_state_edge,
+ }
+ )
+
+ self._simple_delete_txn(
+ txn,
+ table="state_group_edges",
+ keyvalues={
+ "state_group": new_state_edge,
+ }
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": new_state_edge,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in curr_state.items()
+ ],
+ )
+
+ txn.executemany(
+ "DELETE FROM state_groups_state WHERE state_group = ?",
+ state_rows
+ )
+ txn.executemany(
+ "DELETE FROM state_groups WHERE id = ?",
+ state_rows
+ )
+ # Delete all non-state
+ txn.executemany(
+ "DELETE FROM event_to_state_groups WHERE event_id = ?",
+ [(event_id,) for event_id, _ in event_rows]
+ )
+
+ txn.execute(
+ "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+ (topological_ordering, room_id,)
+ )
+
+ # Delete all remote non-state events
+ to_delete = [
+ (event_id,) for event_id, state_key in event_rows
+ if state_key is None and not self.hs.is_mine_id(event_id)
+ ]
+ for table in (
+ "events",
+ "event_json",
+ "event_auth",
+ "event_content_hashes",
+ "event_destinations",
+ "event_edge_hashes",
+ "event_edges",
+ "event_forward_extremities",
+ "event_push_actions",
+ "event_reference_hashes",
+ "event_search",
+ "event_signatures",
+ "rejections",
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
+ to_delete
+ )
+
+ txn.executemany(
+ "DELETE FROM events WHERE event_id = ?",
+ to_delete
+ )
+ # Mark all state and own events as outliers
+ txn.executemany(
+ "UPDATE events SET outlier = ?"
+ " WHERE event_id = ?",
+ [
+ (True, event_id,) for event_id, state_key in event_rows
+ if state_key is not None or self.hs.is_mine_id(event_id)
+ ]
+ )
+
+
+AllNewEventsResult = namedtuple("AllNewEventsResult", [
+ "new_forward_events", "new_backfill_events",
+ "forward_ex_outliers", "backward_ex_outliers",
+ "state_resets"
+])
|