diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2aaab0d02c..2f482af3a1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
from collections import OrderedDict, deque, namedtuple
from functools import wraps
-from six import iteritems, itervalues
+from six import iteritems
from six.moves import range
from canonicaljson import json
@@ -33,6 +33,7 @@ from synapse.api.errors import SynapseError
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
@@ -141,25 +142,22 @@ class _EventPeristenceQueue(object):
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
- # handle_queue_loop runs in the sentinel logcontext, so
- # there is no need to preserve_fn when running the
- # callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
except Exception:
- item.deferred.errback()
+ with PreserveLoggingContext():
+ item.deferred.errback()
+ else:
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- # set handle_queue_loop off on the background. We don't want to
- # attribute work done in it to the current request, so we drop the
- # logcontext altogether.
- with PreserveLoggingContext():
- handle_queue_loop()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -345,11 +343,14 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties = {}
# map room_id->(type,state_key)->event_id tracking the full
- # state in each room after adding these events
+ # state in each room after adding these events.
+ # This is simply used to prefill the get_current_state_ids
+ # cache
current_state_for_room = {}
- # map room_id->(to_delete, to_insert) where each entry is
- # a map (type,key)->event_id giving the state delta in each
+ # map room_id->(to_delete, to_insert) where to_delete is a list
+ # of type/state keys to remove from current state, and to_insert
+ # is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
@@ -419,19 +420,40 @@ class EventsStore(EventsWorkerStore):
logger.info(
"Calculating state delta for room %s", room_id,
)
- current_state = yield self._get_new_state_after_events(
- room_id,
- ev_ctx_rm,
- latest_event_ids,
- new_latest_event_ids,
- )
+ with Measure(
+ self._clock,
+ "persist_events.get_new_state_after_events",
+ ):
+ res = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
+ )
+ current_state, delta_ids = res
+
+ # If either are not None then there has been a change,
+ # and we need to work out the delta (or use that
+ # given)
+ if delta_ids is not None:
+ # If there is a delta we know that we've
+ # only added or replaced state, never
+ # removed keys entirely.
+ state_delta_for_room[room_id] = ([], delta_ids)
+ elif current_state is not None:
+ with Measure(
+ self._clock,
+ "persist_events.calculate_state_delta",
+ ):
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ state_delta_for_room[room_id] = delta
+
+ # If we have the current_state then lets prefill
+ # the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
- delta = yield self._calculate_state_delta(
- room_id, current_state,
- )
- if delta is not None:
- state_delta_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -498,7 +520,6 @@ class EventsStore(EventsWorkerStore):
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
- "room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
@@ -530,9 +551,15 @@ class EventsStore(EventsWorkerStore):
the new forward extremities for the room.
Returns:
- Deferred[dict[(str,str), str]|None]:
- None if there are no changes to the room state, or
- a dict of (type, state_key) -> event_id].
+ Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+ Returns a tuple of two state maps, the first being the full new current
+ state and the second being the delta to the existing current state.
+ If both are None then there has been no change.
+
+ If there has been a change then we only return the delta if its
+ already been calculated. Conversely if we do know the delta then
+ the new current state is only returned if we've already calculated
+ it.
"""
if not new_latest_event_ids:
@@ -540,18 +567,32 @@ class EventsStore(EventsWorkerStore):
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
+
+ # Map from (prev state group, new state group) -> delta state dict
+ state_group_deltas = {}
+
for ev, ctx in events_context:
if ctx.state_group is None:
- # I don't think this can happen, but let's double-check
- raise Exception(
- "Context for new extremity event %s has no state "
- "group" % (ev.event_id, ),
- )
+ # This should only happen for outlier events.
+ if not ev.internal_metadata.is_outlier():
+ raise Exception(
+ "Context for new event %s has no state "
+ "group" % (ev.event_id, ),
+ )
+ continue
if ctx.state_group in state_groups_map:
continue
- state_groups_map[ctx.state_group] = ctx.current_state_ids
+ # We're only interested in pulling out state that has already
+ # been cached in the context. We'll pull stuff out of the DB later
+ # if necessary.
+ current_state_ids = ctx.get_cached_current_state_ids()
+ if current_state_ids is not None:
+ state_groups_map[ctx.state_group] = current_state_ids
+
+ if ctx.prev_group:
+ state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
@@ -566,7 +607,7 @@ class EventsStore(EventsWorkerStore):
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding.
for ev, ctx in events_context:
- if event_id == ev.event_id:
+ if event_id == ev.event_id and ctx.state_group is not None:
event_id_to_state_group[event_id] = ctx.state_group
break
else:
@@ -594,7 +635,26 @@ class EventsStore(EventsWorkerStore):
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
- return
+ defer.returnValue((None, None))
+
+ if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+ # If we're going from one state group to another, lets check if
+ # we have a delta for that transition. If we do then we can just
+ # return that.
+
+ new_state_group = next(iter(new_state_groups))
+ old_state_group = next(iter(old_state_groups))
+
+ delta_ids = state_group_deltas.get(
+ (old_state_group, new_state_group,), None
+ )
+ if delta_ids is not None:
+ # We have a delta from the existing to new current state,
+ # so lets just return that. If we happen to already have
+ # the current state in memory then lets also return that,
+ # but it doesn't matter if we don't.
+ new_state = state_groups_map.get(new_state_group)
+ defer.returnValue((new_state, delta_ids))
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@@ -606,7 +666,7 @@ class EventsStore(EventsWorkerStore):
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- defer.returnValue(state_groups_map[new_state_groups.pop()])
+ defer.returnValue((state_groups_map[new_state_groups.pop()], None))
# Ok, we need to defer to the state handler to resolve our state sets.
@@ -625,7 +685,7 @@ class EventsStore(EventsWorkerStore):
room_id, state_groups, events_map, get_events
)
- defer.returnValue(res.state)
+ defer.returnValue((res.state, None))
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -634,28 +694,20 @@ class EventsStore(EventsWorkerStore):
Assumes that we are only persisting events for one room at a time.
Returns:
- 2-tuple (to_delete, to_insert) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert.
+ tuple[list, dict] (to_delete, to_insert): where to_delete are the
+ type/state_keys to remove from current_state_events and `to_insert`
+ are the updates to current_state_events.
"""
existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(itervalues(existing_state))
- new_events = set(ev_id for ev_id in itervalues(current_state))
- changed_events = existing_events ^ new_events
-
- if not changed_events:
- return
+ to_delete = [
+ key for key in existing_state
+ if key not in current_state
+ ]
- to_delete = {
- key: ev_id for key, ev_id in iteritems(existing_state)
- if ev_id in changed_events
- }
- events_to_insert = (new_events - existing_events)
to_insert = {
key: ev_id for key, ev_id in iteritems(current_state)
- if ev_id in events_to_insert
+ if ev_id != existing_state.get(key)
}
defer.returnValue((to_delete, to_insert))
@@ -678,10 +730,10 @@ class EventsStore(EventsWorkerStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room (dict[str, (list[str], list[str])]):
+ state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
- (to_delete, to_insert), being a list of event ids to be removed
- from the current state, and a list of event ids to be added to
+ (to_delete, to_insert), being a list of type/state keys to be
+ removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
@@ -759,9 +811,46 @@ class EventsStore(EventsWorkerStore):
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
+
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
+ )
+ """
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, None,
+ room_id, etype, state_key,
+ )
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ))
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, ev_id,
+ room_id, etype, state_key,
+ )
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ))
+
+ # Now we actually update the current_state_events table
+
txn.executemany(
- "DELETE FROM current_state_events WHERE event_id = ?",
- [(ev_id,) for ev_id in itervalues(to_delete)],
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
)
self._simple_insert_many_txn(
@@ -778,25 +867,6 @@ class EventsStore(EventsWorkerStore):
],
)
- state_deltas = {key: None for key in to_delete}
- state_deltas.update(to_insert)
-
- self._simple_insert_many_txn(
- txn,
- table="current_state_delta_stream",
- values=[
- {
- "stream_id": max_stream_order,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": ev_id,
- "prev_event_id": to_delete.get(key, None),
- }
- for key, ev_id in iteritems(state_deltas)
- ]
- )
-
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
@@ -810,7 +880,8 @@ class EventsStore(EventsWorkerStore):
# and which we have added, then we invlidate the caches for all
# those users.
members_changed = set(
- state_key for ev_type, state_key in state_deltas
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
)
@@ -1066,7 +1137,7 @@ class EventsStore(EventsWorkerStore):
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
- [(ev.event_id,) for ev, _ in events_and_contexts]
+ [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
@@ -1117,7 +1188,6 @@ class EventsStore(EventsWorkerStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
|