diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 73658a9927..85ce6bea1a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,23 +13,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
-from twisted.internet import defer, reactor
+from synapse.storage.events_worker import EventsWorkerStore
-from synapse.events import FrozenEvent, USE_FROZEN_DICTS
-from synapse.events.utils import prune_event
+from twisted.internet import defer
+
+from synapse.events import USE_FROZEN_DICTS
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import (
- preserve_fn, PreserveLoggingContext, make_deferred_yieldable
+ PreserveLoggingContext, make_deferred_yieldable
)
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
-from synapse.state import resolve_events
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.types import get_domain_from_id
from canonicaljson import encode_canonical_json
@@ -61,16 +61,6 @@ def encode_json(json_object):
return json.dumps(json_object, ensure_ascii=False)
-# These values are used in the `enqueus_event` and `_do_fetch` methods to
-# control how we batch/bulk fetch events from the database.
-# The values are plucked out of thing air to make initial sync run faster
-# on jki.re
-# TODO: Make these configurable.
-EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events
-EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
-EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
-
-
class _EventPeristenceQueue(object):
"""Queues up events so that they can be persisted in bulk with only one
concurrent transaction per room.
@@ -109,7 +99,7 @@ class _EventPeristenceQueue(object):
end_item.events_and_contexts.extend(events_and_contexts)
return end_item.deferred.observe()
- deferred = ObservableDeferred(defer.Deferred())
+ deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
queue.append(self._EventPersistQueueItem(
events_and_contexts=events_and_contexts,
@@ -145,18 +135,25 @@ class _EventPeristenceQueue(object):
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
+ # handle_queue_loop runs in the sentinel logcontext, so
+ # there is no need to preserve_fn when running the
+ # callbacks on the deferred.
try:
ret = yield per_item_callback(item)
item.deferred.callback(ret)
- except Exception as e:
- item.deferred.errback(e)
+ except Exception:
+ item.deferred.errback()
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- preserve_fn(handle_queue_loop)()
+ # set handle_queue_loop off on the background. We don't want to
+ # attribute work done in it to the current request, so we drop the
+ # logcontext altogether.
+ with PreserveLoggingContext():
+ handle_queue_loop()
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -192,13 +189,12 @@ def _retry_on_integrity_error(func):
return f
-class EventsStore(SQLBaseStore):
+class EventsStore(EventsWorkerStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
- self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
@@ -229,6 +225,8 @@ class EventsStore(SQLBaseStore):
self._event_persist_queue = _EventPeristenceQueue()
+ self._state_resolution_handler = hs.get_state_resolution_handler()
+
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
@@ -284,10 +282,11 @@ class EventsStore(SQLBaseStore):
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
def persisting_queue(item):
- yield self._persist_events(
- item.events_and_contexts,
- backfilled=item.backfilled,
- )
+ with Measure(self._clock, "persist_events"):
+ yield self._persist_events(
+ item.events_and_contexts,
+ backfilled=item.backfilled,
+ )
self._event_persist_queue.handle_queue(room_id, persisting_queue)
@@ -334,8 +333,20 @@ class EventsStore(SQLBaseStore):
# NB: Assumes that we are only persisting events for one room
# at a time.
+
+ # map room_id->list[event_ids] giving the new forward
+ # extremities in each room
new_forward_extremeties = {}
+
+ # map room_id->(type,state_key)->event_id tracking the full
+ # state in each room after adding these events
current_state_for_room = {}
+
+ # map room_id->(to_delete, to_insert) where each entry is
+ # a map (type,key)->event_id giving the state delta in each
+ # room
+ state_delta_for_room = {}
+
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
@@ -378,11 +389,20 @@ class EventsStore(SQLBaseStore):
if all_single_prev_not_state:
continue
- state = yield self._calculate_state_delta(
- room_id, ev_ctx_rm, new_latest_event_ids
+ logger.info(
+ "Calculating state delta for room %s", room_id,
)
- if state:
- current_state_for_room[room_id] = state
+ current_state = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm, new_latest_event_ids,
+ )
+ if current_state is not None:
+ current_state_for_room[room_id] = current_state
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ if delta is not None:
+ state_delta_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -390,7 +410,7 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk,
backfilled=backfilled,
delete_existing=delete_existing,
- current_state_for_room=current_state_for_room,
+ state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
@@ -407,7 +427,7 @@ class EventsStore(SQLBaseStore):
event_counter.inc(event.type, origin_type, origin_entity)
- for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+ for room_id, new_state in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(
(room_id, ), new_state
)
@@ -459,22 +479,31 @@ class EventsStore(SQLBaseStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
- def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
- """Calculate the new state deltas for a room.
+ def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
+ """Calculate the current state dict after adding some new events to
+ a room
- Assumes that we are only persisting events for one room at a time.
+ Args:
+ room_id (str):
+ room to which the events are being added. Used for logging etc
+
+ events_context (list[(EventBase, EventContext)]):
+ events and contexts which are being added to the room
+
+ new_latest_event_ids (iterable[str]):
+ the new forward extremities for the room.
Returns:
- 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert. `new_state` is the full set of state.
- May return None if there are no changes to be applied.
+ Deferred[dict[(str,str), str]|None]:
+ None if there are no changes to the room state, or
+ a dict of (type, state_key) -> event_id].
"""
- # Now we need to work out the different state sets for
- # each state extremities
- state_sets = []
- state_groups = set()
+
+ if not new_latest_event_ids:
+ defer.returnValue({})
+
+ # map from state_group to ((type, key) -> event_id) state map
+ state_groups = {}
missing_event_ids = []
was_updated = False
for event_id in new_latest_event_ids:
@@ -485,16 +514,19 @@ class EventsStore(SQLBaseStore):
if ctx.current_state_ids is None:
raise Exception("Unknown current state")
+ if ctx.state_group is None:
+ # I don't think this can happen, but let's double-check
+ raise Exception(
+ "Context for new extremity event %s has no state "
+ "group" % (event_id, ),
+ )
+
# If we've already seen the state group don't bother adding
# it to the state sets again
if ctx.state_group not in state_groups:
- state_sets.append(ctx.current_state_ids)
+ state_groups[ctx.state_group] = ctx.current_state_ids
if ctx.delta_ids or hasattr(ev, "state_key"):
was_updated = True
- if ctx.state_group:
- # Add this as a seen state group (if it has a state
- # group)
- state_groups.add(ctx.state_group)
break
else:
# If we couldn't find it, then we'll need to pull
@@ -502,60 +534,50 @@ class EventsStore(SQLBaseStore):
was_updated = True
missing_event_ids.append(event_id)
+ if not was_updated:
+ return
+
if missing_event_ids:
# Now pull out the state for any missing events from DB
event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)
- groups = set(event_to_groups.itervalues()) - state_groups
+ groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
if groups:
group_to_state = yield self._get_state_for_groups(groups)
- state_sets.extend(group_to_state.itervalues())
+ state_groups.update(group_to_state)
- if not new_latest_event_ids:
- current_state = {}
- elif was_updated:
- if len(state_sets) == 1:
- # If there is only one state set, then we know what the current
- # state is.
- current_state = state_sets[0]
- else:
- # We work out the current state by passing the state sets to the
- # state resolution algorithm. It may ask for some events, including
- # the events we have yet to persist, so we need a slightly more
- # complicated event lookup function than simply looking the events
- # up in the db.
- events_map = {ev.event_id: ev for ev, _ in events_context}
-
- @defer.inlineCallbacks
- def get_events(ev_ids):
- # We get the events by first looking at the list of events we
- # are trying to persist, and then fetching the rest from the DB.
- db = []
- to_return = {}
- for ev_id in ev_ids:
- ev = events_map.get(ev_id, None)
- if ev:
- to_return[ev_id] = ev
- else:
- db.append(ev_id)
-
- if db:
- evs = yield self.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
- to_return.update(evs)
- defer.returnValue(to_return)
+ if len(state_groups) == 1:
+ # If there is only one state group, then we know what the current
+ # state is.
+ defer.returnValue(state_groups.values()[0])
- current_state = yield resolve_events(
- state_sets,
- state_map_factory=get_events,
- )
- else:
- return
+ def get_events(ev_ids):
+ return self.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
+ )
+ events_map = {ev.event_id: ev for ev, _ in events_context}
+ logger.debug("calling resolve_state_groups from preserve_events")
+ res = yield self._state_resolution_handler.resolve_state_groups(
+ room_id, state_groups, events_map, get_events
+ )
+
+ defer.returnValue(res.state)
+
+ @defer.inlineCallbacks
+ def _calculate_state_delta(self, room_id, current_state):
+ """Calculate the new state deltas for a room.
+ Assumes that we are only persisting events for one room at a time.
+
+ Returns:
+ 2-tuple (to_delete, to_insert) where both are state dicts,
+ i.e. (type, state_key) -> event_id. `to_delete` are the entries to
+ first be deleted from current_state_events, `to_insert` are entries
+ to insert.
+ """
existing_state = yield self.get_current_state_ids(room_id)
existing_events = set(existing_state.itervalues())
@@ -575,67 +597,11 @@ class EventsStore(SQLBaseStore):
if ev_id in events_to_insert
}
- defer.returnValue((to_delete, to_insert, current_state))
-
- @defer.inlineCallbacks
- def get_event(self, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False,
- allow_none=False):
- """Get an event from the database by event_id.
-
- Args:
- event_id (str): The event_id of the event to fetch
- check_redacted (bool): If True, check if event has been redacted
- and redact it.
- get_prev_content (bool): If True and event is a state event,
- include the previous states content in the unsigned field.
- allow_rejected (bool): If True return rejected events.
- allow_none (bool): If True, return None if no event found, if
- False throw an exception.
-
- Returns:
- Deferred : A FrozenEvent.
- """
- events = yield self._get_events(
- [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- if not events and not allow_none:
- raise SynapseError(404, "Could not find event %s" % (event_id,))
-
- defer.returnValue(events[0] if events else None)
-
- @defer.inlineCallbacks
- def get_events(self, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- """Get events from the database
-
- Args:
- event_ids (list): The event_ids of the events to fetch
- check_redacted (bool): If True, check if event has been redacted
- and redact it.
- get_prev_content (bool): If True and event is a state event,
- include the previous states content in the unsigned field.
- allow_rejected (bool): If True return rejected events.
-
- Returns:
- Deferred : Dict from event_id to event.
- """
- events = yield self._get_events(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- defer.returnValue({e.event_id: e for e in events})
+ defer.returnValue((to_delete, to_insert))
@log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- delete_existing=False, current_state_for_room={},
+ delete_existing=False, state_delta_for_room={},
new_forward_extremeties={}):
"""Insert some number of room events into the necessary database tables.
@@ -651,7 +617,7 @@ class EventsStore(SQLBaseStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
- current_state_for_room (dict[str, (list[str], list[str])]):
+ state_delta_for_room (dict[str, (list[str], list[str])]):
The current-state delta for each room. For each room, a tuple
(to_delete, to_insert), being a list of event ids to be removed
from the current state, and a list of event ids to be added to
@@ -661,9 +627,11 @@ class EventsStore(SQLBaseStore):
list of the event ids which are the forward extremities.
"""
+ all_events_and_contexts = events_and_contexts
+
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
- self._update_current_state_txn(txn, current_state_for_room, max_stream_order)
+ self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
self._update_forward_extremities_txn(
txn,
@@ -707,9 +675,8 @@ class EventsStore(SQLBaseStore):
events_and_contexts=events_and_contexts,
)
- # Insert into the state_groups, state_groups_state, and
- # event_to_state_groups tables.
- self._store_mult_state_groups_txn(txn, events_and_contexts)
+ # Insert into event_to_state_groups.
+ self._store_event_state_mappings_txn(txn, events_and_contexts)
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
@@ -724,12 +691,13 @@ class EventsStore(SQLBaseStore):
self._update_metadata_tables_txn(
txn,
events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled,
)
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
- to_delete, to_insert, _ = current_state_tuple
+ to_delete, to_insert = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
@@ -786,7 +754,7 @@ class EventsStore(SQLBaseStore):
for member in members_changed:
self._invalidate_cache_and_stream(
- txn, self.get_rooms_for_user, (member,)
+ txn, self.get_rooms_for_user_with_stream_ordering, (member,)
)
for host in set(get_domain_from_id(u) for u in members_changed):
@@ -944,10 +912,9 @@ class EventsStore(SQLBaseStore):
# an outlier in the database. We now have some state at that
# so we need to update the state_groups table with that state.
- # insert into the state_group, state_groups_state and
- # event_to_state_groups tables.
+ # insert into event_to_state_groups.
try:
- self._store_mult_state_groups_txn(txn, ((event, context),))
+ self._store_event_state_mappings_txn(txn, ((event, context),))
except Exception:
logger.exception("")
raise
@@ -1122,27 +1089,33 @@ class EventsStore(SQLBaseStore):
ec for ec in events_and_contexts if ec[0] not in to_remove
]
- def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+ def _update_metadata_tables_txn(self, txn, events_and_contexts,
+ all_events_and_contexts, backfilled):
"""Update all the miscellaneous tables for new events
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
+ all_events_and_contexts (list[(EventBase, EventContext)]): all
+ events that we were going to persist. This includes events
+ we've already persisted, etc, that wouldn't appear in
+ events_and_context.
backfilled (bool): True if the events were backfilled
"""
+ # Insert all the push actions into the event_push_actions table.
+ self._set_push_actions_for_event_and_users_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ all_events_and_contexts=all_events_and_contexts,
+ )
+
if not events_and_contexts:
# nothing to do here
return
for event, context in events_and_contexts:
- # Insert all the push actions into the event_push_actions table.
- if context.push_actions:
- self._set_push_actions_for_event_and_users_txn(
- txn, event, context.push_actions
- )
-
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
# redacted event.
@@ -1347,292 +1320,6 @@ class EventsStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def _get_events(self, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- defer.returnValue([])
-
- event_id_list = event_ids
- event_ids = set(event_ids)
-
- event_entry_map = self._get_events_from_cache(
- event_ids,
- allow_rejected=allow_rejected,
- )
-
- missing_events_ids = [e for e in event_ids if e not in event_entry_map]
-
- if missing_events_ids:
- missing_events = yield self._enqueue_events(
- missing_events_ids,
- check_redacted=check_redacted,
- allow_rejected=allow_rejected,
- )
-
- event_entry_map.update(missing_events)
-
- events = []
- for event_id in event_id_list:
- entry = event_entry_map.get(event_id, None)
- if not entry:
- continue
-
- if allow_rejected or not entry.event.rejected_reason:
- if check_redacted and entry.redacted_event:
- event = entry.redacted_event
- else:
- event = entry.event
-
- events.append(event)
-
- if get_prev_content:
- if "replaces_state" in event.unsigned:
- prev = yield self.get_event(
- event.unsigned["replaces_state"],
- get_prev_content=False,
- allow_none=True,
- )
- if prev:
- event.unsigned = dict(event.unsigned)
- event.unsigned["prev_content"] = prev.content
- event.unsigned["prev_sender"] = prev.sender
-
- defer.returnValue(events)
-
- def _invalidate_get_event_cache(self, event_id):
- self._get_event_cache.invalidate((event_id,))
-
- def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
- """Fetch events from the caches
-
- Args:
- events (list(str)): list of event_ids to fetch
- allow_rejected (bool): Whether to teturn events that were rejected
- update_metrics (bool): Whether to update the cache hit ratio metrics
-
- Returns:
- dict of event_id -> _EventCacheEntry for each event_id in cache. If
- allow_rejected is `False` then there will still be an entry but it
- will be `None`
- """
- event_map = {}
-
- for event_id in events:
- ret = self._get_event_cache.get(
- (event_id,), None,
- update_metrics=update_metrics,
- )
- if not ret:
- continue
-
- if allow_rejected or not ret.event.rejected_reason:
- event_map[event_id] = ret
- else:
- event_map[event_id] = None
-
- return event_map
-
- def _do_fetch(self, conn):
- """Takes a database connection and waits for requests for events from
- the _event_fetch_list queue.
- """
- event_list = []
- i = 0
- while True:
- try:
- with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
- single_threaded = self.database_engine.single_threaded
- if single_threaded or i > EVENT_QUEUE_ITERATIONS:
- self._event_fetch_ongoing -= 1
- return
- else:
- self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
- i += 1
- continue
- i = 0
-
- event_id_lists = zip(*event_list)[0]
- event_ids = [
- item for sublist in event_id_lists for item in sublist
- ]
-
- rows = self._new_transaction(
- conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
- )
-
- row_dict = {
- r["event_id"]: r
- for r in rows
- }
-
- # We only want to resolve deferreds from the main thread
- def fire(lst, res):
- for ids, d in lst:
- if not d.called:
- try:
- with PreserveLoggingContext():
- d.callback([
- res[i]
- for i in ids
- if i in res
- ])
- except Exception:
- logger.exception("Failed to callback")
- with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list, row_dict)
- except Exception as e:
- logger.exception("do_fetch")
-
- # We only want to resolve deferreds from the main thread
- def fire(evs):
- for _, d in evs:
- if not d.called:
- with PreserveLoggingContext():
- d.errback(e)
-
- if event_list:
- with PreserveLoggingContext():
- reactor.callFromThread(fire, event_list)
-
- @defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
- """Fetches events from the database using the _event_fetch_list. This
- allows batch and bulk fetching of events - it allows us to fetch events
- without having to create a new transaction for each request for events.
- """
- if not events:
- defer.returnValue({})
-
- events_d = defer.Deferred()
- with self._event_fetch_lock:
- self._event_fetch_list.append(
- (events, events_d)
- )
-
- self._event_fetch_lock.notify()
-
- if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
- self._event_fetch_ongoing += 1
- should_start = True
- else:
- should_start = False
-
- if should_start:
- with PreserveLoggingContext():
- self.runWithConnection(
- self._do_fetch
- )
-
- logger.debug("Loading %d events", len(events))
- with PreserveLoggingContext():
- rows = yield events_d
- logger.debug("Loaded %d events (%d rows)", len(events), len(rows))
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = yield make_deferred_yieldable(defer.gatherResults(
- [
- preserve_fn(self._get_event_from_row)(
- row["internal_metadata"], row["json"], row["redacts"],
- rejected_reason=row["rejects"],
- )
- for row in rows
- ],
- consumeErrors=True
- ))
-
- defer.returnValue({
- e.event.event_id: e
- for e in res if e
- })
-
- def _fetch_event_rows(self, txn, events):
- rows = []
- N = 200
- for i in range(1 + len(events) / N):
- evs = events[i * N:(i + 1) * N]
- if not evs:
- break
-
- sql = (
- "SELECT "
- " e.event_id as event_id, "
- " e.internal_metadata,"
- " e.json,"
- " r.redacts as redacts,"
- " rej.event_id as rejects "
- " FROM event_json as e"
- " LEFT JOIN rejections as rej USING (event_id)"
- " LEFT JOIN redactions as r ON e.event_id = r.redacts"
- " WHERE e.event_id IN (%s)"
- ) % (",".join(["?"] * len(evs)),)
-
- txn.execute(sql, evs)
- rows.extend(self.cursor_to_dict(txn))
-
- return rows
-
- @defer.inlineCallbacks
- def _get_event_from_row(self, internal_metadata, js, redacted,
- rejected_reason=None):
- with Measure(self._clock, "_get_event_from_row"):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = yield self._simple_select_one_onecol(
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- desc="_get_event_from_row_rejected_reason",
- )
-
- original_ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- redacted_event = None
- if redacted:
- redacted_event = prune_event(original_ev)
-
- redaction_id = yield self._simple_select_one_onecol(
- table="redactions",
- keyvalues={"redacts": redacted_event.event_id},
- retcol="event_id",
- desc="_get_event_from_row_redactions",
- )
-
- redacted_event.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = yield self.get_event(
- redaction_id,
- check_redacted=False,
- allow_none=True,
- )
-
- if because:
- # It's fine to do add the event directly, since get_pdu_json
- # will serialise this field correctly
- redacted_event.unsigned["redacted_because"] = because
-
- cache_entry = _EventCacheEntry(
- event=original_ev,
- redacted_event=redacted_event,
- )
-
- self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
-
- defer.returnValue(cache_entry)
-
- @defer.inlineCallbacks
def count_daily_messages(self):
"""
Returns an estimate of the number of messages sent in the last day.
@@ -2017,16 +1704,32 @@ class EventsStore(SQLBaseStore):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
- def delete_old_state(self, room_id, topological_ordering):
- return self.runInteraction(
- "delete_old_state",
- self._delete_old_state_txn, room_id, topological_ordering
- )
+ def purge_history(
+ self, room_id, topological_ordering, delete_local_events,
+ ):
+ """Deletes room history before a certain point
+
+ Args:
+ room_id (str):
+
+ topological_ordering (int):
+ minimum topo ordering to preserve
- def _delete_old_state_txn(self, txn, room_id, topological_ordering):
- """Deletes old room state
+ delete_local_events (bool):
+ if True, we will delete local events as well as remote ones
+ (instead of just marking them as outliers and deleting their
+ state groups).
"""
+ return self.runInteraction(
+ "purge_history",
+ self._purge_history_txn, room_id, topological_ordering,
+ delete_local_events,
+ )
+
+ def _purge_history_txn(
+ self, txn, room_id, topological_ordering, delete_local_events,
+ ):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@@ -2047,6 +1750,30 @@ class EventsStore(SQLBaseStore):
# state_groups
# state_groups_state
+ # we will build a temporary table listing the events so that we don't
+ # have to keep shovelling the list back and forth across the
+ # connection. Annoyingly the python sqlite driver commits the
+ # transaction on CREATE, so let's do this first.
+ #
+ # furthermore, we might already have the table from a previous (failed)
+ # purge attempt, so let's drop the table first.
+
+ txn.execute("DROP TABLE IF EXISTS events_to_purge")
+
+ txn.execute(
+ "CREATE TEMPORARY TABLE events_to_purge ("
+ " event_id TEXT NOT NULL,"
+ " should_delete BOOLEAN NOT NULL"
+ ")"
+ )
+
+ # create an index on should_delete because later we'll be looking for
+ # the should_delete / shouldn't_delete subsets
+ txn.execute(
+ "CREATE INDEX events_to_purge_should_delete"
+ " ON events_to_purge(should_delete)",
+ )
+
# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
@@ -2067,42 +1794,48 @@ class EventsStore(SQLBaseStore):
400, "topological_ordering is greater than forward extremeties"
)
- logger.debug("[purge] looking for events to delete")
+ logger.info("[purge] looking for events to delete")
+
+ should_delete_expr = "state_key IS NULL"
+ should_delete_params = ()
+ if not delete_local_events:
+ should_delete_expr += " AND event_id NOT LIKE ?"
+ should_delete_params += ("%:" + self.hs.hostname, )
+
+ should_delete_params += (room_id, topological_ordering)
txn.execute(
- "SELECT event_id, state_key FROM events"
- " LEFT JOIN state_events USING (room_id, event_id)"
- " WHERE room_id = ? AND topological_ordering < ?",
- (room_id, topological_ordering,)
+ "INSERT INTO events_to_purge"
+ " SELECT event_id, %s"
+ " FROM events AS e LEFT JOIN state_events USING (event_id)"
+ " WHERE e.room_id = ? AND topological_ordering < ?" % (
+ should_delete_expr,
+ ),
+ should_delete_params,
+ )
+ txn.execute(
+ "SELECT event_id, should_delete FROM events_to_purge"
)
event_rows = txn.fetchall()
-
- to_delete = [
- (event_id,) for event_id, state_key in event_rows
- if state_key is None and not self.hs.is_mine_id(event_id)
- ]
logger.info(
- "[purge] found %i events before cutoff, of which %i are remote"
- " non-state events to delete", len(event_rows), len(to_delete))
-
- for event_id, state_key in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+ "[purge] found %i events before cutoff, of which %i can be deleted",
+ len(event_rows), sum(1 for e in event_rows if e[1]),
+ )
- logger.debug("[purge] Finding new backward extremities")
+ logger.info("[purge] Finding new backward extremities")
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
- "SELECT DISTINCT e.event_id FROM events as e"
- " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
- " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
- " WHERE e.room_id = ? AND e.topological_ordering < ?"
- " AND e2.topological_ordering >= ?",
- (room_id, topological_ordering, topological_ordering)
+ "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
+ " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
+ " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
+ " WHERE e2.topological_ordering >= ?",
+ (topological_ordering, )
)
new_backwards_extrems = txn.fetchall()
- logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+ logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
@@ -2118,7 +1851,7 @@ class EventsStore(SQLBaseStore):
]
)
- logger.debug("[purge] finding redundant state groups")
+ logger.info("[purge] finding redundant state groups")
# Get all state groups that are only referenced by events that are
# to be deleted.
@@ -2126,24 +1859,23 @@ class EventsStore(SQLBaseStore):
"SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)"
" WHERE state_group IN ("
- " SELECT DISTINCT state_group FROM events"
+ " SELECT DISTINCT state_group FROM events_to_purge"
" INNER JOIN event_to_state_groups USING (event_id)"
- " WHERE room_id = ? AND topological_ordering < ?"
" )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
- (room_id, topological_ordering, topological_ordering)
+ (topological_ordering, )
)
state_rows = txn.fetchall()
- logger.debug("[purge] found %i redundant state groups", len(state_rows))
+ logger.info("[purge] found %i redundant state groups", len(state_rows))
# make a set of the redundant state groups, so that we can look them up
# efficiently
state_groups_to_delete = set([sg for sg, in state_rows])
# Now we get all the state groups that rely on these state groups
- logger.debug("[purge] finding state groups which depend on redundant"
- " state groups")
+ logger.info("[purge] finding state groups which depend on redundant"
+ " state groups")
remaining_state_groups = []
for i in xrange(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
@@ -2168,7 +1900,7 @@ class EventsStore(SQLBaseStore):
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
- logger.debug("[purge] de-delta-ing remaining state group %s", sg)
+ logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(
txn, [sg], types=None
)
@@ -2205,7 +1937,7 @@ class EventsStore(SQLBaseStore):
],
)
- logger.debug("[purge] removing redundant state groups")
+ logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
@@ -2215,18 +1947,15 @@ class EventsStore(SQLBaseStore):
state_rows
)
- # Delete all non-state
- logger.debug("[purge] removing events from event_to_state_groups")
- txn.executemany(
- "DELETE FROM event_to_state_groups WHERE event_id = ?",
- [(event_id,) for event_id, _ in event_rows]
- )
-
- logger.debug("[purge] updating room_depth")
+ logger.info("[purge] removing events from event_to_state_groups")
txn.execute(
- "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
- (topological_ordering, room_id,)
+ "DELETE FROM event_to_state_groups "
+ "WHERE event_id IN (SELECT event_id from events_to_purge)"
)
+ for event_id, _ in event_rows:
+ txn.call_after(self._get_state_group_for_event.invalidate, (
+ event_id,
+ ))
# Delete all remote non-state events
for table in (
@@ -2238,28 +1967,60 @@ class EventsStore(SQLBaseStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
- "event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
"rejections",
):
- logger.debug("[purge] removing remote non-state events from %s", table)
+ logger.info("[purge] removing events from %s", table)
- txn.executemany(
- "DELETE FROM %s WHERE event_id = ?" % (table,),
- to_delete
+ txn.execute(
+ "DELETE FROM %s WHERE event_id IN ("
+ " SELECT event_id FROM events_to_purge WHERE should_delete"
+ ")" % (table,),
+ )
+
+ # event_push_actions lacks an index on event_id, and has one on
+ # (room_id, event_id) instead.
+ for table in (
+ "event_push_actions",
+ ):
+ logger.info("[purge] removing events from %s", table)
+
+ txn.execute(
+ "DELETE FROM %s WHERE room_id = ? AND event_id IN ("
+ " SELECT event_id FROM events_to_purge WHERE should_delete"
+ ")" % (table,),
+ (room_id, )
)
# Mark all state and own events as outliers
- logger.debug("[purge] marking remaining events as outliers")
- txn.executemany(
+ logger.info("[purge] marking remaining events as outliers")
+ txn.execute(
"UPDATE events SET outlier = ?"
- " WHERE event_id = ?",
- [
- (True, event_id,) for event_id, state_key in event_rows
- if state_key is not None or self.hs.is_mine_id(event_id)
- ]
+ " WHERE event_id IN ("
+ " SELECT event_id FROM events_to_purge "
+ " WHERE NOT should_delete"
+ ")",
+ (True,),
+ )
+
+ # synapse tries to take out an exclusive lock on room_depth whenever it
+ # persists events (because upsert), and once we run this update, we
+ # will block that for the rest of our transaction.
+ #
+ # So, let's stick it at the end so that we don't block event
+ # persistence.
+ logger.info("[purge] updating room_depth")
+ txn.execute(
+ "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+ (topological_ordering, room_id,)
+ )
+
+ # finally, drop the temp table. this will commit the txn in sqlite,
+ # so make sure to keep this actually last.
+ txn.execute(
+ "DROP TABLE events_to_purge"
)
logger.info("[purge] done")
@@ -2272,7 +2033,7 @@ class EventsStore(SQLBaseStore):
to_2, so_2 = yield self._get_event_ordering(event_id2)
defer.returnValue((to_1, so_1) > (to_2, so_2))
- @defer.inlineCallbacks
+ @cachedInlineCallbacks(max_entries=5000)
def _get_event_ordering(self, event_id):
res = yield self._simple_select_one(
table="events",
|