diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7002b3752e..906a405031 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,64 +13,59 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from twisted.internet import defer, reactor
+import itertools
+import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
-from synapse.events import FrozenEvent, USE_FROZEN_DICTS
-from synapse.events.utils import prune_event
+from six import iteritems
+from six.moves import range
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import (
- preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
-)
-from synapse.util.logutils import log_function
-from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.state import resolve_events
-from synapse.util.caches.descriptors import cached
-from synapse.types import get_domain_from_id
+from canonicaljson import json
+from prometheus_client import Counter
-from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple, OrderedDict
-from functools import wraps
+from twisted.internet import defer
import synapse.metrics
-
-import logging
-import ujson as json
-
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.events import EventBase # noqa: F401
+from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.events_worker import EventsWorkerStore
+from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.util.async import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.frozenutils import frozendict_json_encoder
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
+from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
+persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
+event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
+ ["type", "origin_type", "origin_entity"])
-metrics = synapse.metrics.get_metrics_for(__name__)
-persist_event_counter = metrics.register_counter("persisted_events")
-event_counter = metrics.register_counter(
- "persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
-)
+# The number of times we are recalculating the current state
+state_delta_counter = Counter("synapse_storage_events_state_delta", "")
+# The number of times we are recalculating state when there is only a
+# single forward extremity
+state_delta_single_event_counter = Counter(
+ "synapse_storage_events_state_delta_single_event", "")
-def encode_json(json_object):
- if USE_FROZEN_DICTS:
- # ujson doesn't like frozen_dicts
- return encode_canonical_json(json_object)
- else:
- return json.dumps(json_object, ensure_ascii=False)
+# The number of times we are reculating state when we could have resonably
+# calculated the delta when we calculated the state for an event we were
+# persisting.
+state_delta_reuse_delta_counter = Counter(
+ "synapse_storage_events_state_delta_reuse_delta", "")
-# These values are used in the `enqueus_event` and `_do_fetch` methods to
-# control how we batch/bulk fetch events from the database.
-# The values are plucked out of thing air to make initial sync run faster
-# on jki.re
-# TODO: Make these configurable.
-EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events
-EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
-EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
+def encode_json(json_object):
+ return frozendict_json_encoder.encode(json_object)
class _EventPeristenceQueue(object):
@@ -88,19 +84,29 @@ class _EventPeristenceQueue(object):
def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options.
+ NB: due to the normal usage pattern of this method, it does *not*
+ follow the synapse logcontext rules, and leaves the logcontext in
+ place whether or not the returned deferred is ready.
+
Args:
room_id (str):
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
+
+ Returns:
+ defer.Deferred: a deferred which will resolve once the events are
+ persisted. Runs its callbacks *without* a logcontext.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
+ # if the last item in the queue has the same `backfilled` setting,
+ # we can just add these new events to that item.
end_item = queue[-1]
if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts)
return end_item.deferred.observe()
- deferred = ObservableDeferred(defer.Deferred())
+ deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
queue.append(self._EventPersistQueueItem(
events_and_contexts=events_and_contexts,
@@ -113,11 +119,11 @@ class _EventPeristenceQueue(object):
def handle_queue(self, room_id, per_item_callback):
"""Attempts to handle the queue for a room if not already being handled.
- The given callback will be invoked with for each item in the queue,1
+ The given callback will be invoked with for each item in the queue,
of type _EventPersistQueueItem. The per_item_callback will continuously
be called with new items, unless the queue becomnes empty. The return
value of the function will be given to the deferreds waiting on the item,
- exceptions will be passed to the deferres as well.
+ exceptions will be passed to the deferreds as well.
This function should therefore be called whenever anything is added
to the queue.
@@ -136,18 +142,23 @@ class _EventPeristenceQueue(object):
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
+ # handle_queue_loop runs in the sentinel logcontext, so
+ # there is no need to preserve_fn when running the
+ # callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
- except Exception as e:
- item.deferred.errback(e)
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
+ except Exception:
+ item.deferred.errback()
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- preserve_fn(handle_queue_loop)()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -183,13 +194,12 @@ def _retry_on_integrity_error(func):
return f
-class EventsStore(SQLBaseStore):
+class EventsStore(EventsWorkerStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
- def __init__(self, hs):
- super(EventsStore, self).__init__(hs)
- self._clock = hs.get_clock()
+ def __init__(self, db_conn, hs):
+ super(EventsStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
@@ -220,6 +230,8 @@ class EventsStore(SQLBaseStore):
self._event_persist_queue = _EventPeristenceQueue()
+ self._state_resolution_handler = hs.get_state_resolution_handler()
+
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
@@ -232,8 +244,8 @@ class EventsStore(SQLBaseStore):
partitioned.setdefault(event.room_id, []).append((event, ctx))
deferreds = []
- for room_id, evs_ctxs in partitioned.iteritems():
- d = preserve_fn(self._event_persist_queue.add_to_queue)(
+ for room_id, evs_ctxs in iteritems(partitioned):
+ d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs,
backfilled=backfilled,
)
@@ -242,7 +254,7 @@ class EventsStore(SQLBaseStore):
for room_id in partitioned:
self._maybe_start_persisting(room_id)
- return preserve_context_over_deferred(
+ return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
@@ -267,7 +279,7 @@ class EventsStore(SQLBaseStore):
self._maybe_start_persisting(event.room_id)
- yield preserve_context_over_deferred(deferred)
+ yield make_deferred_yieldable(deferred)
max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -275,10 +287,11 @@ class EventsStore(SQLBaseStore):
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
def persisting_queue(item):
- yield self._persist_events(
- item.events_and_contexts,
- backfilled=item.backfilled,
- )
+ with Measure(self._clock, "persist_events"):
+ yield self._persist_events(
+ item.events_and_contexts,
+ backfilled=item.backfilled,
+ )
self._event_persist_queue.handle_queue(room_id, persisting_queue)
@@ -316,7 +329,7 @@ class EventsStore(SQLBaseStore):
chunks = [
events_and_contexts[x:x + 100]
- for x in xrange(0, len(events_and_contexts), 100)
+ for x in range(0, len(events_and_contexts), 100)
]
for chunk in chunks:
@@ -325,8 +338,23 @@ class EventsStore(SQLBaseStore):
# NB: Assumes that we are only persisting events for one room
# at a time.
+
+ # map room_id->list[event_ids] giving the new forward
+ # extremities in each room
new_forward_extremeties = {}
+
+ # map room_id->(type,state_key)->event_id tracking the full
+ # state in each room after adding these events.
+ # This is simply used to prefill the get_current_state_ids
+ # cache
current_state_for_room = {}
+
+ # map room_id->(to_delete, to_insert) where to_delete is a list
+ # of type/state keys to remove from current state, and to_insert
+ # is a map (type,key)->event_id giving the state delta in each
+ # room
+ state_delta_for_room = {}
+
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
@@ -338,7 +366,7 @@ class EventsStore(SQLBaseStore):
(event, context)
)
- for room_id, ev_ctx_rm in events_by_room.iteritems():
+ for room_id, ev_ctx_rm in iteritems(events_by_room):
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -348,7 +376,8 @@ class EventsStore(SQLBaseStore):
room_id, ev_ctx_rm, latest_event_ids
)
- if new_latest_event_ids == set(latest_event_ids):
+ latest_event_ids = set(latest_event_ids)
+ if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
continue
@@ -369,11 +398,63 @@ class EventsStore(SQLBaseStore):
if all_single_prev_not_state:
continue
- state = yield self._calculate_state_delta(
- room_id, ev_ctx_rm, new_latest_event_ids
+ state_delta_counter.inc()
+ if len(new_latest_event_ids) == 1:
+ state_delta_single_event_counter.inc()
+
+ # This is a fairly handwavey check to see if we could
+ # have guessed what the delta would have been when
+ # processing one of these events.
+ # What we're interested in is if the latest extremities
+ # were the same when we created the event as they are
+ # now. When this server creates a new event (as opposed
+ # to receiving it over federation) it will use the
+ # forward extremities as the prev_events, so we can
+ # guess this by looking at the prev_events and checking
+ # if they match the current forward extremities.
+ for ev, _ in ev_ctx_rm:
+ prev_event_ids = set(e for e, _ in ev.prev_events)
+ if latest_event_ids == prev_event_ids:
+ state_delta_reuse_delta_counter.inc()
+ break
+
+ logger.info(
+ "Calculating state delta for room %s", room_id,
)
- if state:
- current_state_for_room[room_id] = state
+ with Measure(
+ self._clock,
+ "persist_events.get_new_state_after_events",
+ ):
+ res = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
+ )
+ current_state, delta_ids = res
+
+ # If either are not None then there has been a change,
+ # and we need to work out the delta (or use that
+ # given)
+ if delta_ids is not None:
+ # If there is a delta we know that we've
+ # only added or replaced state, never
+ # removed keys entirely.
+ state_delta_for_room[room_id] = ([], delta_ids)
+ elif current_state is not None:
+ with Measure(
+ self._clock,
+ "persist_events.calculate_state_delta",
+ ):
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ state_delta_for_room[room_id] = delta
+
+ # If we have the current_state then lets prefill
+ # the cache with it.
+ if current_state is not None:
+ current_state_for_room[room_id] = current_state
yield self.runInteraction(
"persist_events",
@@ -381,10 +462,13 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk,
backfilled=backfilled,
delete_existing=delete_existing,
- current_state_for_room=current_state_for_room,
+ state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
- persist_event_counter.inc_by(len(chunk))
+ persist_event_counter.inc(len(chunk))
+ synapse.metrics.event_persisted_position.set(
+ chunk[-1][0].internal_metadata.stream_ordering,
+ )
for event, context in chunk:
if context.app_service:
origin_type = "local"
@@ -396,14 +480,14 @@ class EventsStore(SQLBaseStore):
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)
- event_counter.inc(event.type, origin_type, origin_entity)
+ event_counter.labels(event.type, origin_type, origin_entity).inc()
- for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+ for room_id, new_state in iteritems(current_state_for_room):
self.get_current_state_ids.prefill(
(room_id, ), new_state
)
- for room_id, latest_event_ids in new_forward_extremeties.iteritems():
+ for room_id, latest_event_ids in iteritems(new_forward_extremeties):
self.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
)
@@ -450,183 +534,187 @@ class EventsStore(SQLBaseStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
- def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
- """Calculate the new state deltas for a room.
+ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
+ new_latest_event_ids):
+ """Calculate the current state dict after adding some new events to
+ a room
- Assumes that we are only persisting events for one room at a time.
+ Args:
+ room_id (str):
+ room to which the events are being added. Used for logging etc
+
+ events_context (list[(EventBase, EventContext)]):
+ events and contexts which are being added to the room
+
+ old_latest_event_ids (iterable[str]):
+ the old forward extremities for the room.
+
+ new_latest_event_ids (iterable[str]):
+ the new forward extremities for the room.
Returns:
- 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert. `new_state` is the full set of state.
- May return None if there are no changes to be applied.
+ Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+ Returns a tuple of two state maps, the first being the full new current
+ state and the second being the delta to the existing current state.
+ If both are None then there has been no change.
+
+ If there has been a change then we only return the delta if its
+ already been calculated. Conversely if we do know the delta then
+ the new current state is only returned if we've already calculated
+ it.
"""
- # Now we need to work out the different state sets for
- # each state extremities
- state_sets = []
- state_groups = set()
- missing_event_ids = []
- was_updated = False
+
+ if not new_latest_event_ids:
+ return
+
+ # map from state_group to ((type, key) -> event_id) state map
+ state_groups_map = {}
+
+ # Map from (prev state group, new state group) -> delta state dict
+ state_group_deltas = {}
+
+ for ev, ctx in events_context:
+ if ctx.state_group is None:
+ # I don't think this can happen, but let's double-check
+ raise Exception(
+ "Context for new extremity event %s has no state "
+ "group" % (ev.event_id, ),
+ )
+
+ if ctx.state_group in state_groups_map:
+ continue
+
+ # We're only interested in pulling out state that has already
+ # been cached in the context. We'll pull stuff out of the DB later
+ # if necessary.
+ current_state_ids = ctx.get_cached_current_state_ids()
+ if current_state_ids is not None:
+ state_groups_map[ctx.state_group] = current_state_ids
+
+ if ctx.prev_group:
+ state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
+
+ # We need to map the event_ids to their state groups. First, let's
+ # check if the event is one we're persisting, in which case we can
+ # pull the state group from its context.
+ # Otherwise we need to pull the state group from the database.
+
+ # Set of events we need to fetch groups for. (We know none of the old
+ # extremities are going to be in events_context).
+ missing_event_ids = set(old_latest_event_ids)
+
+ event_id_to_state_group = {}
for event_id in new_latest_event_ids:
- # First search in the list of new events we're adding,
- # and then use the current state from that
+ # First search in the list of new events we're adding.
for ev, ctx in events_context:
if event_id == ev.event_id:
- if ctx.current_state_ids is None:
- raise Exception("Unknown current state")
-
- # If we've already seen the state group don't bother adding
- # it to the state sets again
- if ctx.state_group not in state_groups:
- state_sets.append(ctx.current_state_ids)
- if ctx.delta_ids or hasattr(ev, "state_key"):
- was_updated = True
- if ctx.state_group:
- # Add this as a seen state group (if it has a state
- # group)
- state_groups.add(ctx.state_group)
+ event_id_to_state_group[event_id] = ctx.state_group
break
else:
# If we couldn't find it, then we'll need to pull
# the state from the database
- was_updated = True
- missing_event_ids.append(event_id)
+ missing_event_ids.add(event_id)
if missing_event_ids:
- # Now pull out the state for any missing events from DB
+ # Now pull out the state groups for any missing events from DB
event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)
+ event_id_to_state_group.update(event_to_groups)
- groups = set(event_to_groups.itervalues()) - state_groups
+ # State groups of old_latest_event_ids
+ old_state_groups = set(
+ event_id_to_state_group[evid] for evid in old_latest_event_ids
+ )
- if groups:
- group_to_state = yield self._get_state_for_groups(groups)
- state_sets.extend(group_to_state.itervalues())
+ # State groups of new_latest_event_ids
+ new_state_groups = set(
+ event_id_to_state_group[evid] for evid in new_latest_event_ids
+ )
- if not new_latest_event_ids:
- current_state = {}
- elif was_updated:
- if len(state_sets) == 1:
- # If there is only one state set, then we know what the current
- # state is.
- current_state = state_sets[0]
- else:
- # We work out the current state by passing the state sets to the
- # state resolution algorithm. It may ask for some events, including
- # the events we have yet to persist, so we need a slightly more
- # complicated event lookup function than simply looking the events
- # up in the db.
- events_map = {ev.event_id: ev for ev, _ in events_context}
-
- @defer.inlineCallbacks
- def get_events(ev_ids):
- # We get the events by first looking at the list of events we
- # are trying to persist, and then fetching the rest from the DB.
- db = []
- to_return = {}
- for ev_id in ev_ids:
- ev = events_map.get(ev_id, None)
- if ev:
- to_return[ev_id] = ev
- else:
- db.append(ev_id)
-
- if db:
- evs = yield self.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
- to_return.update(evs)
- defer.returnValue(to_return)
-
- current_state = yield resolve_events(
- state_sets,
- state_map_factory=get_events,
- )
- else:
- return
+ # If they old and new groups are the same then we don't need to do
+ # anything.
+ if old_state_groups == new_state_groups:
+ defer.returnValue((None, None))
- existing_state = yield self.get_current_state_ids(room_id)
+ if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+ # If we're going from one state group to another, lets check if
+ # we have a delta for that transition. If we do then we can just
+ # return that.
- existing_events = set(existing_state.itervalues())
- new_events = set(ev_id for ev_id in current_state.itervalues())
- changed_events = existing_events ^ new_events
+ new_state_group = next(iter(new_state_groups))
+ old_state_group = next(iter(old_state_groups))
- if not changed_events:
- return
+ delta_ids = state_group_deltas.get(
+ (old_state_group, new_state_group,), None
+ )
+ if delta_ids is not None:
+ # We have a delta from the existing to new current state,
+ # so lets just return that. If we happen to already have
+ # the current state in memory then lets also return that,
+ # but it doesn't matter if we don't.
+ new_state = state_groups_map.get(new_state_group)
+ defer.returnValue((new_state, delta_ids))
+
+ # Now that we have calculated new_state_groups we need to get
+ # their state IDs so we can resolve to a single state set.
+ missing_state = new_state_groups - set(state_groups_map)
+ if missing_state:
+ group_to_state = yield self._get_state_for_groups(missing_state)
+ state_groups_map.update(group_to_state)
+
+ if len(new_state_groups) == 1:
+ # If there is only one state group, then we know what the current
+ # state is.
+ defer.returnValue((state_groups_map[new_state_groups.pop()], None))
+
+ # Ok, we need to defer to the state handler to resolve our state sets.
+
+ def get_events(ev_ids):
+ return self.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
+ )
- to_delete = {
- key: ev_id for key, ev_id in existing_state.iteritems()
- if ev_id in changed_events
+ state_groups = {
+ sg: state_groups_map[sg] for sg in new_state_groups
}
- events_to_insert = (new_events - existing_events)
- to_insert = {
- key: ev_id for key, ev_id in current_state.iteritems()
- if ev_id in events_to_insert
- }
-
- defer.returnValue((to_delete, to_insert, current_state))
-
- @defer.inlineCallbacks
- def get_event(self, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False,
- allow_none=False):
- """Get an event from the database by event_id.
-
- Args:
- event_id (str): The event_id of the event to fetch
- check_redacted (bool): If True, check if event has been redacted
- and redact it.
- get_prev_content (bool): If True and event is a state event,
- include the previous states content in the unsigned field.
- allow_rejected (bool): If True return rejected events.
- allow_none (bool): If True, return None if no event found, if
- False throw an exception.
- Returns:
- Deferred : A FrozenEvent.
- """
- events = yield self._get_events(
- [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
+ events_map = {ev.event_id: ev for ev, _ in events_context}
+ logger.debug("calling resolve_state_groups from preserve_events")
+ res = yield self._state_resolution_handler.resolve_state_groups(
+ room_id, state_groups, events_map, get_events
)
- if not events and not allow_none:
- raise SynapseError(404, "Could not find event %s" % (event_id,))
-
- defer.returnValue(events[0] if events else None)
+ defer.returnValue((res.state, None))
@defer.inlineCallbacks
- def get_events(self, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- """Get events from the database
+ def _calculate_state_delta(self, room_id, current_state):
+ """Calculate the new state deltas for a room.
- Args:
- event_ids (list): The event_ids of the events to fetch
- check_redacted (bool): If True, check if event has been redacted
- and redact it.
- get_prev_content (bool): If True and event is a state event,
- include the previous states content in the unsigned field.
- allow_rejected (bool): If True return rejected events.
+ Assumes that we are only persisting events for one room at a time.
Returns:
- Deferred : Dict from event_id to event.
+ tuple[list, dict] (to_delete, to_insert): where to_delete are the
+ type/state_keys to remove from current_state_events and `to_insert`
+ are the updates to current_state_events.
"""
- events = yield self._get_events(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
+ existing_state = yield self.get_current_state_ids(room_id)
+
+ to_delete = [
+ key for key in existing_state
+ if key not in current_state
+ ]
+
+ to_insert = {
+ key: ev_id for key, ev_id in iteritems(current_state)
+ if ev_id != existing_state.get(key)
+ }
- defer.returnValue({e.event_id: e for e in events})
+ defer.returnValue((to_delete, to_insert))
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- delete_existing=False, current_state_for_room={},
+ delete_existing=False, state_delta_for_room={},
new_forward_extremeties={}):
"""Insert some number of room events into the necessary database tables.
@@ -642,19 +730,21 @@ class EventsStore(SQLBaseStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
- current_state_for_room (dict[str, (list[str], list[str])]):
+ state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
- (to_delete, to_insert), being a list of event ids to be removed
- from the current state, and a list of event ids to be added to
+ (to_delete, to_insert), being a list of type/state keys to be
+ removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
list of the event ids which are the forward extremities.
"""
+ all_events_and_contexts = events_and_contexts
+
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
- self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
+ self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
self._update_forward_extremities_txn(
txn,
@@ -698,9 +788,8 @@ class EventsStore(SQLBaseStore):
events_and_contexts=events_and_contexts,
)
- # Insert into the state_groups, state_groups_state, and
- # event_to_state_groups tables.
- self._store_mult_state_groups_txn(txn, events_and_contexts)
+ # Insert into event_to_state_groups.
+ self._store_event_state_mappings_txn(txn, events_and_contexts)
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
@@ -715,15 +804,53 @@ class EventsStore(SQLBaseStore):
self._update_metadata_tables_txn(
txn,
events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled,
)
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
- for room_id, current_state_tuple in state_delta_by_room.iteritems():
- to_delete, to_insert, _ = current_state_tuple
+ for room_id, current_state_tuple in iteritems(state_delta_by_room):
+ to_delete, to_insert = current_state_tuple
+
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
+ )
+ """
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, None,
+ room_id, etype, state_key,
+ )
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ))
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, ev_id,
+ room_id, etype, state_key,
+ )
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ))
+
+ # Now we actually update the current_state_events table
+
txn.executemany(
- "DELETE FROM current_state_events WHERE event_id = ?",
- [(ev_id,) for ev_id in to_delete.itervalues()],
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
)
self._simple_insert_many_txn(
@@ -736,30 +863,12 @@ class EventsStore(SQLBaseStore):
"type": key[0],
"state_key": key[1],
}
- for key, ev_id in to_insert.iteritems()
+ for key, ev_id in iteritems(to_insert)
],
)
- state_deltas = {key: None for key in to_delete}
- state_deltas.update(to_insert)
-
- self._simple_insert_many_txn(
- txn,
- table="current_state_delta_stream",
- values=[
- {
- "stream_id": max_stream_order,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": ev_id,
- "prev_event_id": to_delete.get(key, None),
- }
- for key, ev_id in state_deltas.iteritems()
- ]
- )
-
- self._curr_state_delta_stream_cache.entity_has_changed(
+ txn.call_after(
+ self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
)
@@ -771,19 +880,23 @@ class EventsStore(SQLBaseStore):
# and which we have added, then we invlidate the caches for all
# those users.
members_changed = set(
- state_key for ev_type, state_key in state_deltas
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
)
for member in members_changed:
self._invalidate_cache_and_stream(
- txn, self.get_rooms_for_user, (member,)
+ txn, self.get_rooms_for_user_with_stream_ordering, (member,)
)
for host in set(get_domain_from_id(u) for u in members_changed):
self._invalidate_cache_and_stream(
txn, self.is_host_joined, (room_id, host)
)
+ self._invalidate_cache_and_stream(
+ txn, self.was_host_joined, (room_id, host)
+ )
self._invalidate_cache_and_stream(
txn, self.get_users_in_room, (room_id,)
@@ -795,7 +908,7 @@ class EventsStore(SQLBaseStore):
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
- for room_id, new_extrem in new_forward_extremities.iteritems():
+ for room_id, new_extrem in iteritems(new_forward_extremities):
self._simple_delete_txn(
txn,
table="event_forward_extremities",
@@ -813,7 +926,7 @@ class EventsStore(SQLBaseStore):
"event_id": ev_id,
"room_id": room_id,
}
- for room_id, new_extrem in new_forward_extremities.iteritems()
+ for room_id, new_extrem in iteritems(new_forward_extremities)
for ev_id in new_extrem
],
)
@@ -830,7 +943,7 @@ class EventsStore(SQLBaseStore):
"event_id": event_id,
"stream_ordering": max_stream_order,
}
- for room_id, new_extrem in new_forward_extremities.iteritems()
+ for room_id, new_extrem in iteritems(new_forward_extremities)
for event_id in new_extrem
]
)
@@ -858,7 +971,7 @@ class EventsStore(SQLBaseStore):
new_events_and_contexts[event.event_id] = (event, context)
else:
new_events_and_contexts[event.event_id] = (event, context)
- return new_events_and_contexts.values()
+ return list(new_events_and_contexts.values())
def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
"""Update min_depth for each room
@@ -884,7 +997,7 @@ class EventsStore(SQLBaseStore):
event.depth, depth_updates.get(event.room_id, event.depth)
)
- for room_id, depth in depth_updates.iteritems():
+ for room_id, depth in iteritems(depth_updates):
self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts):
@@ -932,10 +1045,9 @@ class EventsStore(SQLBaseStore):
# an outlier in the database. We now have some state at that
# so we need to update the state_groups table with that state.
- # insert into the state_group, state_groups_state and
- # event_to_state_groups tables.
+ # insert into event_to_state_groups.
try:
- self._store_mult_state_groups_txn(txn, ((event, context),))
+ self._store_event_state_mappings_txn(txn, ((event, context),))
except Exception:
logger.exception("")
raise
@@ -1001,7 +1113,6 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
- "event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -1021,6 +1132,14 @@ class EventsStore(SQLBaseStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
+ for table in (
+ "event_push_actions",
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+ [(ev.event_id,) for ev, _ in events_and_contexts]
+ )
+
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
@@ -1110,27 +1229,33 @@ class EventsStore(SQLBaseStore):
ec for ec in events_and_contexts if ec[0] not in to_remove
]
- def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+ def _update_metadata_tables_txn(self, txn, events_and_contexts,
+ all_events_and_contexts, backfilled):
"""Update all the miscellaneous tables for new events
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
+ all_events_and_contexts (list[(EventBase, EventContext)]): all
+ events that we were going to persist. This includes events
+ we've already persisted, etc, that wouldn't appear in
+ events_and_context.
backfilled (bool): True if the events were backfilled
"""
+ # Insert all the push actions into the event_push_actions table.
+ self._set_push_actions_for_event_and_users_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
+ )
+
if not events_and_contexts:
# nothing to do here
return
for event, context in events_and_contexts:
- # Insert all the push actions into the event_push_actions table.
- if context.push_actions:
- self._set_push_actions_for_event_and_users_txn(
- txn, event, context.push_actions
- )
-
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
# redacted event.
@@ -1263,7 +1388,7 @@ class EventsStore(SQLBaseStore):
" WHERE e.event_id IN (%s)"
) % (",".join(["?"] * len(ev_map)),)
- txn.execute(sql, ev_map.keys())
+ txn.execute(sql, list(ev_map))
rows = self.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
@@ -1302,13 +1427,49 @@ class EventsStore(SQLBaseStore):
defer.returnValue(set(r["event_id"] for r in rows))
- def have_events(self, event_ids):
+ @defer.inlineCallbacks
+ def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
+ Args:
+ event_ids (iterable[str]):
+
Returns:
- dict: Has an entry for each event id we already have seen. Maps to
- the rejected reason string if we rejected the event, else maps to
- None.
+ Deferred[set[str]]: The events we have already seen.
+ """
+ results = set()
+
+ def have_seen_events_txn(txn, chunk):
+ sql = (
+ "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+ % (",".join("?" * len(chunk)), )
+ )
+ txn.execute(sql, chunk)
+ for (event_id, ) in txn:
+ results.add(event_id)
+
+ # break the input up into chunks of 100
+ input_iterator = iter(event_ids)
+ for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+ []):
+ yield self.runInteraction(
+ "have_seen_events",
+ have_seen_events_txn,
+ chunk,
+ )
+ defer.returnValue(results)
+
+ def get_seen_events_with_rejections(self, event_ids):
+ """Given a list of event ids, check if we rejected them.
+
+ Args:
+ event_ids (list[str])
+
+ Returns:
+ Deferred[dict[str, str|None):
+ Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps
+ to None.
"""
if not event_ids:
return defer.succeed({})
@@ -1330,295 +1491,7 @@ class EventsStore(SQLBaseStore):
return res
- return self.runInteraction(
- "have_events", f,
- )
-
- @defer.inlineCallbacks
- def _get_events(self, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- defer.returnValue([])
-
- event_id_list = event_ids
- event_ids = set(event_ids)
-
- event_entry_map = self._get_events_from_cache(
- event_ids,
- allow_rejected=allow_rejected,
- )
-
- missing_events_ids = [e for e in event_ids if e not in event_entry_map]
-
- if missing_events_ids:
- missing_events = yield self._enqueue_events(
- missing_events_ids,
- check_redacted=check_redacted,
- allow_rejected=allow_rejected,
- )
-
- event_entry_map.update(missing_events)
-
- events = []
- for event_id in event_id_list:
- entry = event_entry_map.get(event_id, None)
- if not entry:
- continue
-
- if allow_rejected or not entry.event.rejected_reason:
- if check_redacted and entry.redacted_event:
- event = entry.redacted_event
- else:
- event = entry.event
-
- events.append(event)
-
- if get_prev_content:
- if "replaces_state" in event.unsigned:
- prev = yield self.get_event(
- event.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- event.unsigned = dict(event.unsigned)
- event.unsigned["prev_content"] = prev.content
- event.unsigned["prev_sender"] = prev.sender
-
- defer.returnValue(events)
-
- def _invalidate_get_event_cache(self, event_id):
- self._get_event_cache.invalidate((event_id,))
-
- def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
- """Fetch events from the caches
-
- Args:
- events (list(str)): list of event_ids to fetch
- allow_rejected (bool): Whether to teturn events that were rejected
- update_metrics (bool): Whether to update the cache hit ratio metrics
-
- Returns:
- dict of event_id -> _EventCacheEntry for each event_id in cache. If
- allow_rejected is `False` then there will still be an entry but it
- will be `None`
- """
- event_map = {}
-
- for event_id in events:
- ret = self._get_event_cache.get(
- (event_id,), None,
- update_metrics=update_metrics,
- )
- if not ret:
- continue
-
- if allow_rejected or not ret.event.rejected_reason:
- event_map[event_id] = ret
- else:
- event_map[event_id] = None
-
- return event_map
-
- def _do_fetch(self, conn):
- """Takes a database connection and waits for requests for events from
- the _event_fetch_list queue.
- """
- event_list = []
- i = 0
- while True:
- try:
- with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
- single_threaded = self.database_engine.single_threaded
- if single_threaded or i > EVENT_QUEUE_ITERATIONS:
- self._event_fetch_ongoing -= 1
- return
- else:
- self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
- i += 1
- continue
- i = 0
-
- event_id_lists = zip(*event_list)[0]
- event_ids = [
- item for sublist in event_id_lists for item in sublist
- ]
-
- rows = self._new_transaction(
- conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
- )
-
- row_dict = {
- r["event_id"]: r
- for r in rows
- }
-
- # We only want to resolve deferreds from the main thread
- def fire(lst, res):
- for ids, d in lst:
- if not d.called:
- try:
- with PreserveLoggingContext():
- d.callback([
- res[i]
- for i in ids
- if i in res
- ])
- except:
- logger.exception("Failed to callback")
- with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list, row_dict)
- except Exception as e:
- logger.exception("do_fetch")
-
- # We only want to resolve deferreds from the main thread
- def fire(evs):
- for _, d in evs:
- if not d.called:
- with PreserveLoggingContext():
- d.errback(e)
-
- if event_list:
- with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list)
-
- @defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
- """Fetches events from the database using the _event_fetch_list. This
- allows batch and bulk fetching of events - it allows us to fetch events
- without having to create a new transaction for each request for events.
- """
- if not events:
- defer.returnValue({})
-
- events_d = defer.Deferred()
- with self._event_fetch_lock:
- self._event_fetch_list.append(
- (events, events_d)
- )
-
- self._event_fetch_lock.notify()
-
- if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
- self._event_fetch_ongoing += 1
- should_start = True
- else:
- should_start = False
-
- if should_start:
- with PreserveLoggingContext():
- self.runWithConnection(
- self._do_fetch
- )
-
- logger.debug("Loading %d events", len(events))
- with PreserveLoggingContext():
- rows = yield events_d
- logger.debug("Loaded %d events (%d rows)", len(events), len(rows))
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = yield preserve_context_over_deferred(defer.gatherResults(
- [
- preserve_fn(self._get_event_from_row)(
- row["internal_metadata"], row["json"], row["redacts"],
- rejected_reason=row["rejects"],
- )
- for row in rows
- ],
- consumeErrors=True
- ))
-
- defer.returnValue({
- e.event.event_id: e
- for e in res if e
- })
-
- def _fetch_event_rows(self, txn, events):
- rows = []
- N = 200
- for i in range(1 + len(events) / N):
- evs = events[i * N:(i + 1) * N]
- if not evs:
- break
-
- sql = (
- "SELECT "
- " e.event_id as event_id, "
- " e.internal_metadata,"
- " e.json,"
- " r.redacts as redacts,"
- " rej.event_id as rejects "
- " FROM event_json as e"
- " LEFT JOIN rejections as rej USING (event_id)"
- " LEFT JOIN redactions as r ON e.event_id = r.redacts"
- " WHERE e.event_id IN (%s)"
- ) % (",".join(["?"] * len(evs)),)
-
- txn.execute(sql, evs)
- rows.extend(self.cursor_to_dict(txn))
-
- return rows
-
- @defer.inlineCallbacks
- def _get_event_from_row(self, internal_metadata, js, redacted,
- rejected_reason=None):
- with Measure(self._clock, "_get_event_from_row"):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = yield self._simple_select_one_onecol(
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- desc="_get_event_from_row_rejected_reason",
- )
-
- original_ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- redacted_event = None
- if redacted:
- redacted_event = prune_event(original_ev)
-
- redaction_id = yield self._simple_select_one_onecol(
- table="redactions",
- keyvalues={"redacts": redacted_event.event_id},
- retcol="event_id",
- desc="_get_event_from_row_redactions",
- )
-
- redacted_event.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = yield self.get_event(
- redaction_id,
- check_redacted=False,
- allow_none=True,
- )
-
- if because:
- # It's fine to do add the event directly, since get_pdu_json
- # will serialise this field correctly
- redacted_event.unsigned["redacted_because"] = because
-
- cache_entry = _EventCacheEntry(
- event=original_ev,
- redacted_event=redacted_event,
- )
-
- self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
-
- defer.returnValue(cache_entry)
+ return self.runInteraction("get_rejection_reasons", f)
@defer.inlineCallbacks
def count_daily_messages(self):
@@ -1778,7 +1651,7 @@ class EventsStore(SQLBaseStore):
chunks = [
event_ids[i:i + 100]
- for i in xrange(0, len(event_ids), 100)
+ for i in range(0, len(event_ids), 100)
]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
@@ -2005,15 +1878,32 @@ class EventsStore(SQLBaseStore):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
- def delete_old_state(self, room_id, topological_ordering):
+ def purge_history(
+ self, room_id, token, delete_local_events,
+ ):
+ """Deletes room history before a certain point
+
+ Args:
+ room_id (str):
+
+ token (str): A topological token to delete events before
+
+ delete_local_events (bool):
+ if True, we will delete local events as well as remote ones
+ (instead of just marking them as outliers and deleting their
+ state groups).
+ """
+
return self.runInteraction(
- "delete_old_state",
- self._delete_old_state_txn, room_id, topological_ordering
+ "purge_history",
+ self._purge_history_txn, room_id, token,
+ delete_local_events,
)
- def _delete_old_state_txn(self, txn, room_id, topological_ordering):
- """Deletes old room state
- """
+ def _purge_history_txn(
+ self, txn, room_id, token_str, delete_local_events,
+ ):
+ token = RoomStreamToken.parse(token_str)
# Tables that should be pruned:
# event_auth
@@ -2035,6 +1925,37 @@ class EventsStore(SQLBaseStore):
# state_groups
# state_groups_state
+ # we will build a temporary table listing the events so that we don't
+ # have to keep shovelling the list back and forth across the
+ # connection. Annoyingly the python sqlite driver commits the
+ # transaction on CREATE, so let's do this first.
+ #
+ # furthermore, we might already have the table from a previous (failed)
+ # purge attempt, so let's drop the table first.
+
+ txn.execute("DROP TABLE IF EXISTS events_to_purge")
+
+ txn.execute(
+ "CREATE TEMPORARY TABLE events_to_purge ("
+ " event_id TEXT NOT NULL,"
+ " should_delete BOOLEAN NOT NULL"
+ ")"
+ )
+
+ # create an index on should_delete because later we'll be looking for
+ # the should_delete / shouldn't_delete subsets
+ txn.execute(
+ "CREATE INDEX events_to_purge_should_delete"
+ " ON events_to_purge(should_delete)",
+ )
+
+ # We do joins against events_to_purge for e.g. calculating state
+ # groups to purge, etc., so lets make an index.
+ txn.execute(
+ "CREATE INDEX events_to_purge_id"
+ " ON events_to_purge(event_id)",
+ )
+
# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
@@ -2047,7 +1968,7 @@ class EventsStore(SQLBaseStore):
rows = txn.fetchall()
max_depth = max(row[0] for row in rows)
- if max_depth <= topological_ordering:
+ if max_depth <= token.topological:
# We need to ensure we don't delete all the events from the datanase
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
@@ -2055,42 +1976,48 @@ class EventsStore(SQLBaseStore):
400, "topological_ordering is greater than forward extremeties"
)
- logger.debug("[purge] looking for events to delete")
+ logger.info("[purge] looking for events to delete")
+
+ should_delete_expr = "state_key IS NULL"
+ should_delete_params = ()
+ if not delete_local_events:
+ should_delete_expr += " AND event_id NOT LIKE ?"
+ should_delete_params += ("%:" + self.hs.hostname, )
+
+ should_delete_params += (room_id, token.topological)
txn.execute(
- "SELECT event_id, state_key FROM events"
- " LEFT JOIN state_events USING (room_id, event_id)"
- " WHERE room_id = ? AND topological_ordering < ?",
- (room_id, topological_ordering,)
+ "INSERT INTO events_to_purge"
+ " SELECT event_id, %s"
+ " FROM events AS e LEFT JOIN state_events USING (event_id)"
+ " WHERE e.room_id = ? AND topological_ordering < ?" % (
+ should_delete_expr,
+ ),
+ should_delete_params,
+ )
+ txn.execute(
+ "SELECT event_id, should_delete FROM events_to_purge"
)
event_rows = txn.fetchall()
-
- to_delete = [
- (event_id,) for event_id, state_key in event_rows
- if state_key is None and not self.hs.is_mine_id(event_id)
- ]
logger.info(
- "[purge] found %i events before cutoff, of which %i are remote"
- " non-state events to delete", len(event_rows), len(to_delete))
-
- for event_id, state_key in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+ "[purge] found %i events before cutoff, of which %i can be deleted",
+ len(event_rows), sum(1 for e in event_rows if e[1]),
+ )
- logger.debug("[purge] Finding new backward extremities")
+ logger.info("[purge] Finding new backward extremities")
# We calculate the new entries for the backward extremeties by finding
- # all events that point to events that are to be purged
+ # events to be purged that are pointed to by events we're not going to
+ # purge.
txn.execute(
- "SELECT DISTINCT e.event_id FROM events as e"
- " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
- " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
- " WHERE e.room_id = ? AND e.topological_ordering < ?"
- " AND e2.topological_ordering >= ?",
- (room_id, topological_ordering, topological_ordering)
+ "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
+ " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
+ " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
+ " WHERE ep2.event_id IS NULL",
)
new_backwards_extrems = txn.fetchall()
- logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+ logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
@@ -2106,34 +2033,39 @@ class EventsStore(SQLBaseStore):
]
)
- logger.debug("[purge] finding redundant state groups")
+ logger.info("[purge] finding redundant state groups")
# Get all state groups that are only referenced by events that are
# to be deleted.
- txn.execute(
- "SELECT state_group FROM event_to_state_groups"
- " INNER JOIN events USING (event_id)"
- " WHERE state_group IN ("
- " SELECT DISTINCT state_group FROM events"
- " INNER JOIN event_to_state_groups USING (event_id)"
- " WHERE room_id = ? AND topological_ordering < ?"
- " )"
- " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
- (room_id, topological_ordering, topological_ordering)
- )
+ # This works by first getting state groups that we may want to delete,
+ # joining against event_to_state_groups to get events that use that
+ # state group, then left joining against events_to_purge again. Any
+ # state group where the left join produce *no nulls* are referenced
+ # only by events that are going to be purged.
+ txn.execute("""
+ SELECT state_group FROM
+ (
+ SELECT DISTINCT state_group FROM events_to_purge
+ INNER JOIN event_to_state_groups USING (event_id)
+ ) AS sp
+ INNER JOIN event_to_state_groups USING (state_group)
+ LEFT JOIN events_to_purge AS ep USING (event_id)
+ GROUP BY state_group
+ HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+ """)
state_rows = txn.fetchall()
- logger.debug("[purge] found %i redundant state groups", len(state_rows))
+ logger.info("[purge] found %i redundant state groups", len(state_rows))
# make a set of the redundant state groups, so that we can look them up
# efficiently
state_groups_to_delete = set([sg for sg, in state_rows])
# Now we get all the state groups that rely on these state groups
- logger.debug("[purge] finding state groups which depend on redundant"
- " state groups")
+ logger.info("[purge] finding state groups which depend on redundant"
+ " state groups")
remaining_state_groups = []
- for i in xrange(0, len(state_rows), 100):
+ for i in range(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
# look for state groups whose prev_state_group is one we are about
# to delete
@@ -2156,7 +2088,7 @@ class EventsStore(SQLBaseStore):
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
- logger.debug("[purge] de-delta-ing remaining state group %s", sg)
+ logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(
txn, [sg], types=None
)
@@ -2189,11 +2121,11 @@ class EventsStore(SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in curr_state.iteritems()
+ for key, state_id in iteritems(curr_state)
],
)
- logger.debug("[purge] removing redundant state groups")
+ logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
@@ -2203,18 +2135,15 @@ class EventsStore(SQLBaseStore):
state_rows
)
- # Delete all non-state
- logger.debug("[purge] removing events from event_to_state_groups")
- txn.executemany(
- "DELETE FROM event_to_state_groups WHERE event_id = ?",
- [(event_id,) for event_id, _ in event_rows]
- )
-
- logger.debug("[purge] updating room_depth")
+ logger.info("[purge] removing events from event_to_state_groups")
txn.execute(
- "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
- (topological_ordering, room_id,)
+ "DELETE FROM event_to_state_groups "
+ "WHERE event_id IN (SELECT event_id from events_to_purge)"
)
+ for event_id, _ in event_rows:
+ txn.call_after(self._get_state_group_for_event.invalidate, (
+ event_id,
+ ))
# Delete all remote non-state events
for table in (
@@ -2226,28 +2155,75 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
- "event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
"rejections",
):
- logger.debug("[purge] removing remote non-state events from %s", table)
+ logger.info("[purge] removing events from %s", table)
- txn.executemany(
- "DELETE FROM %s WHERE event_id = ?" % (table,),
- to_delete
+ txn.execute(
+ "DELETE FROM %s WHERE event_id IN ("
+ " SELECT event_id FROM events_to_purge WHERE should_delete"
+ ")" % (table,),
+ )
+
+ # event_push_actions lacks an index on event_id, and has one on
+ # (room_id, event_id) instead.
+ for table in (
+ "event_push_actions",
+ ):
+ logger.info("[purge] removing events from %s", table)
+
+ txn.execute(
+ "DELETE FROM %s WHERE room_id = ? AND event_id IN ("
+ " SELECT event_id FROM events_to_purge WHERE should_delete"
+ ")" % (table,),
+ (room_id, )
)
# Mark all state and own events as outliers
- logger.debug("[purge] marking remaining events as outliers")
- txn.executemany(
+ logger.info("[purge] marking remaining events as outliers")
+ txn.execute(
"UPDATE events SET outlier = ?"
- " WHERE event_id = ?",
- [
- (True, event_id,) for event_id, state_key in event_rows
- if state_key is not None or self.hs.is_mine_id(event_id)
- ]
+ " WHERE event_id IN ("
+ " SELECT event_id FROM events_to_purge "
+ " WHERE NOT should_delete"
+ ")",
+ (True,),
+ )
+
+ # synapse tries to take out an exclusive lock on room_depth whenever it
+ # persists events (because upsert), and once we run this update, we
+ # will block that for the rest of our transaction.
+ #
+ # So, let's stick it at the end so that we don't block event
+ # persistence.
+ #
+ # We do this by calculating the minimum depth of the backwards
+ # extremities. However, the events in event_backward_extremities
+ # are ones we don't have yet so we need to look at the events that
+ # point to it via event_edges table.
+ txn.execute("""
+ SELECT COALESCE(MIN(depth), 0)
+ FROM event_backward_extremities AS eb
+ INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
+ INNER JOIN events AS e ON e.event_id = eg.event_id
+ WHERE eb.room_id = ?
+ """, (room_id,))
+ min_depth, = txn.fetchone()
+
+ logger.info("[purge] updating room_depth to %d", min_depth)
+
+ txn.execute(
+ "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+ (min_depth, room_id,)
+ )
+
+ # finally, drop the temp table. this will commit the txn in sqlite,
+ # so make sure to keep this actually last.
+ txn.execute(
+ "DROP TABLE events_to_purge"
)
logger.info("[purge] done")
@@ -2260,7 +2236,7 @@ class EventsStore(SQLBaseStore):
to_2, so_2 = yield self._get_event_ordering(event_id2)
defer.returnValue((to_1, so_1) > (to_2, so_2))
- @defer.inlineCallbacks
+ @cachedInlineCallbacks(max_entries=5000)
def _get_event_ordering(self, event_id):
res = yield self._simple_select_one(
table="events",
|