diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index cb1082e864..8bf87f38f7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,36 +14,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import OrderedDict, deque, namedtuple
-from functools import wraps
import itertools
import logging
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+
+from six import iteritems, text_type
+from six.moves import range
+
+from canonicaljson import json
+from prometheus_client import Counter
-import simplejson as json
from twisted.internet import defer
+import synapse.metrics
+from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
+from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
-from synapse.util.async import ObservableDeferred
+from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.util.async_helpers import ObservableDeferred
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import (
- PreserveLoggingContext, make_deferred_yieldable,
-)
+from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
-from synapse.api.constants import EventTypes
-from synapse.api.errors import SynapseError
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id, RoomStreamToken
-import synapse.metrics
-
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
-from synapse.events.snapshot import EventContext # noqa: F401
-
-from six.moves import range
-from six import itervalues, iteritems
-
-from prometheus_client import Counter
logger = logging.getLogger(__name__)
@@ -67,7 +67,13 @@ state_delta_reuse_delta_counter = Counter(
def encode_json(json_object):
- return frozendict_json_encoder.encode(json_object)
+ """
+ Encode a Python object as JSON and return it in a Unicode string.
+ """
+ out = frozendict_json_encoder.encode(json_object)
+ if isinstance(out, bytes):
+ out = out.decode('utf8')
+ return out
class _EventPeristenceQueue(object):
@@ -144,25 +150,22 @@ class _EventPeristenceQueue(object):
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
- # handle_queue_loop runs in the sentinel logcontext, so
- # there is no need to preserve_fn when running the
- # callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
except Exception:
- item.deferred.errback()
+ with PreserveLoggingContext():
+ item.deferred.errback()
+ else:
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- # set handle_queue_loop off on the background. We don't want to
- # attribute work done in it to the current request, so we drop the
- # logcontext altogether.
- with PreserveLoggingContext():
- handle_queue_loop()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -198,7 +201,9 @@ def _retry_on_integrity_error(func):
return f
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -236,12 +241,18 @@ class EventsStore(EventsWorkerStore):
self._state_resolution_handler = hs.get_state_resolution_handler()
+ @defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
- backfilled: ?
+ backfilled (bool): Whether the results are retrieved from federation
+ via backfill or not. Used to determine if they're "new" events
+ which might update the current state etc.
+
+ Returns:
+ Deferred[int]: the stream ordering of the latest persisted event
"""
partitioned = {}
for event, ctx in events_and_contexts:
@@ -258,10 +269,14 @@ class EventsStore(EventsWorkerStore):
for room_id in partitioned:
self._maybe_start_persisting(room_id)
- return make_deferred_yieldable(
+ yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
+
+ defer.returnValue(max_persisted_id)
+
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False):
@@ -348,11 +363,14 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties = {}
# map room_id->(type,state_key)->event_id tracking the full
- # state in each room after adding these events
+ # state in each room after adding these events.
+ # This is simply used to prefill the get_current_state_ids
+ # cache
current_state_for_room = {}
- # map room_id->(to_delete, to_insert) where each entry is
- # a map (type,key)->event_id giving the state delta in each
+ # map room_id->(to_delete, to_insert) where to_delete is a list
+ # of type/state keys to remove from current state, and to_insert
+ # is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
@@ -422,19 +440,40 @@ class EventsStore(EventsWorkerStore):
logger.info(
"Calculating state delta for room %s", room_id,
)
- current_state = yield self._get_new_state_after_events(
- room_id,
- ev_ctx_rm,
- latest_event_ids,
- new_latest_event_ids,
- )
+ with Measure(
+ self._clock,
+ "persist_events.get_new_state_after_events",
+ ):
+ res = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
+ )
+ current_state, delta_ids = res
+
+ # If either are not None then there has been a change,
+ # and we need to work out the delta (or use that
+ # given)
+ if delta_ids is not None:
+ # If there is a delta we know that we've
+ # only added or replaced state, never
+ # removed keys entirely.
+ state_delta_for_room[room_id] = ([], delta_ids)
+ elif current_state is not None:
+ with Measure(
+ self._clock,
+ "persist_events.calculate_state_delta",
+ ):
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ state_delta_for_room[room_id] = delta
+
+ # If we have the current_state then lets prefill
+ # the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
- delta = yield self._calculate_state_delta(
- room_id, current_state,
- )
- if delta is not None:
- state_delta_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -446,9 +485,14 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc(len(chunk))
- synapse.metrics.event_persisted_position.set(
- chunk[-1][0].internal_metadata.stream_ordering,
- )
+
+ if not backfilled:
+ # backfilled events have negative stream orderings, so we don't
+ # want to set the event_persisted_position to that.
+ synapse.metrics.event_persisted_position.set(
+ chunk[-1][0].internal_metadata.stream_ordering,
+ )
+
for event, context in chunk:
if context.app_service:
origin_type = "local"
@@ -501,7 +545,6 @@ class EventsStore(EventsWorkerStore):
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
- "room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
@@ -533,9 +576,15 @@ class EventsStore(EventsWorkerStore):
the new forward extremities for the room.
Returns:
- Deferred[dict[(str,str), str]|None]:
- None if there are no changes to the room state, or
- a dict of (type, state_key) -> event_id].
+ Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+ Returns a tuple of two state maps, the first being the full new current
+ state and the second being the delta to the existing current state.
+ If both are None then there has been no change.
+
+ If there has been a change then we only return the delta if its
+ already been calculated. Conversely if we do know the delta then
+ the new current state is only returned if we've already calculated
+ it.
"""
if not new_latest_event_ids:
@@ -543,18 +592,32 @@ class EventsStore(EventsWorkerStore):
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
+
+ # Map from (prev state group, new state group) -> delta state dict
+ state_group_deltas = {}
+
for ev, ctx in events_context:
if ctx.state_group is None:
- # I don't think this can happen, but let's double-check
- raise Exception(
- "Context for new extremity event %s has no state "
- "group" % (ev.event_id, ),
- )
+ # This should only happen for outlier events.
+ if not ev.internal_metadata.is_outlier():
+ raise Exception(
+ "Context for new event %s has no state "
+ "group" % (ev.event_id, ),
+ )
+ continue
if ctx.state_group in state_groups_map:
continue
- state_groups_map[ctx.state_group] = ctx.current_state_ids
+ # We're only interested in pulling out state that has already
+ # been cached in the context. We'll pull stuff out of the DB later
+ # if necessary.
+ current_state_ids = ctx.get_cached_current_state_ids()
+ if current_state_ids is not None:
+ state_groups_map[ctx.state_group] = current_state_ids
+
+ if ctx.prev_group:
+ state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
@@ -569,7 +632,7 @@ class EventsStore(EventsWorkerStore):
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding.
for ev, ctx in events_context:
- if event_id == ev.event_id:
+ if event_id == ev.event_id and ctx.state_group is not None:
event_id_to_state_group[event_id] = ctx.state_group
break
else:
@@ -597,7 +660,26 @@ class EventsStore(EventsWorkerStore):
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
- return
+ defer.returnValue((None, None))
+
+ if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+ # If we're going from one state group to another, lets check if
+ # we have a delta for that transition. If we do then we can just
+ # return that.
+
+ new_state_group = next(iter(new_state_groups))
+ old_state_group = next(iter(old_state_groups))
+
+ delta_ids = state_group_deltas.get(
+ (old_state_group, new_state_group,), None
+ )
+ if delta_ids is not None:
+ # We have a delta from the existing to new current state,
+ # so lets just return that. If we happen to already have
+ # the current state in memory then lets also return that,
+ # but it doesn't matter if we don't.
+ new_state = state_groups_map.get(new_state_group)
+ defer.returnValue((new_state, delta_ids))
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@@ -609,7 +691,7 @@ class EventsStore(EventsWorkerStore):
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- defer.returnValue(state_groups_map[new_state_groups.pop()])
+ defer.returnValue((state_groups_map[new_state_groups.pop()], None))
# Ok, we need to defer to the state handler to resolve our state sets.
@@ -623,12 +705,14 @@ class EventsStore(EventsWorkerStore):
}
events_map = {ev.event_id: ev for ev, _ in events_context}
+ room_version = yield self.get_room_version(room_id)
+
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
- room_id, state_groups, events_map, get_events
+ room_id, room_version, state_groups, events_map, get_events
)
- defer.returnValue(res.state)
+ defer.returnValue((res.state, None))
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -637,28 +721,20 @@ class EventsStore(EventsWorkerStore):
Assumes that we are only persisting events for one room at a time.
Returns:
- 2-tuple (to_delete, to_insert) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert.
+ tuple[list, dict] (to_delete, to_insert): where to_delete are the
+ type/state_keys to remove from current_state_events and `to_insert`
+ are the updates to current_state_events.
"""
existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(itervalues(existing_state))
- new_events = set(ev_id for ev_id in itervalues(current_state))
- changed_events = existing_events ^ new_events
-
- if not changed_events:
- return
+ to_delete = [
+ key for key in existing_state
+ if key not in current_state
+ ]
- to_delete = {
- key: ev_id for key, ev_id in iteritems(existing_state)
- if ev_id in changed_events
- }
- events_to_insert = (new_events - existing_events)
to_insert = {
key: ev_id for key, ev_id in iteritems(current_state)
- if ev_id in events_to_insert
+ if ev_id != existing_state.get(key)
}
defer.returnValue((to_delete, to_insert))
@@ -681,10 +757,10 @@ class EventsStore(EventsWorkerStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room (dict[str, (list[str], list[str])]):
+ state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
- (to_delete, to_insert), being a list of event ids to be removed
- from the current state, and a list of event ids to be added to
+ (to_delete, to_insert), being a list of type/state keys to be
+ removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
@@ -762,9 +838,46 @@ class EventsStore(EventsWorkerStore):
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
+
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
+ )
+ """
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, None,
+ room_id, etype, state_key,
+ )
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ))
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, ev_id,
+ room_id, etype, state_key,
+ )
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ))
+
+ # Now we actually update the current_state_events table
+
txn.executemany(
- "DELETE FROM current_state_events WHERE event_id = ?",
- [(ev_id,) for ev_id in itervalues(to_delete)],
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
)
self._simple_insert_many_txn(
@@ -781,26 +894,8 @@ class EventsStore(EventsWorkerStore):
],
)
- state_deltas = {key: None for key in to_delete}
- state_deltas.update(to_insert)
-
- self._simple_insert_many_txn(
- txn,
- table="current_state_delta_stream",
- values=[
- {
- "stream_id": max_stream_order,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": ev_id,
- "prev_event_id": to_delete.get(key, None),
- }
- for key, ev_id in iteritems(state_deltas)
- ]
- )
-
- self._curr_state_delta_stream_cache.entity_has_changed(
+ txn.call_after(
+ self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
)
@@ -812,7 +907,8 @@ class EventsStore(EventsWorkerStore):
# and which we have added, then we invlidate the caches for all
# those users.
members_changed = set(
- state_key for ev_type, state_key in state_deltas
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
)
@@ -985,7 +1081,7 @@ class EventsStore(EventsWorkerStore):
metadata_json = encode_json(
event.internal_metadata.get_dict()
- ).decode("UTF-8")
+ )
sql = (
"UPDATE event_json SET internal_metadata = ?"
@@ -1044,7 +1140,6 @@ class EventsStore(EventsWorkerStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
- "event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -1064,6 +1159,14 @@ class EventsStore(EventsWorkerStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
+ for table in (
+ "event_push_actions",
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
+ [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
+ )
+
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
@@ -1092,8 +1195,8 @@ class EventsStore(EventsWorkerStore):
"room_id": event.room_id,
"internal_metadata": encode_json(
event.internal_metadata.get_dict()
- ).decode("UTF-8"),
- "json": encode_json(event_dict(event)).decode("UTF-8"),
+ ),
+ "json": encode_json(event_dict(event)),
}
for event, _ in events_and_contexts
],
@@ -1112,13 +1215,12 @@ class EventsStore(EventsWorkerStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
"contains_url": (
"url" in event.content
- and isinstance(event.content["url"], basestring)
+ and isinstance(event.content["url"], text_type)
),
}
for event, _ in events_and_contexts
@@ -1336,88 +1438,6 @@ class EventsStore(EventsWorkerStore):
)
@defer.inlineCallbacks
- def have_events_in_timeline(self, event_ids):
- """Given a list of event ids, check if we have already processed and
- stored them as non outliers.
- """
- rows = yield self._simple_select_many_batch(
- table="events",
- retcols=("event_id",),
- column="event_id",
- iterable=list(event_ids),
- keyvalues={"outlier": False},
- desc="have_events_in_timeline",
- )
-
- defer.returnValue(set(r["event_id"] for r in rows))
-
- @defer.inlineCallbacks
- def have_seen_events(self, event_ids):
- """Given a list of event ids, check if we have already processed them.
-
- Args:
- event_ids (iterable[str]):
-
- Returns:
- Deferred[set[str]]: The events we have already seen.
- """
- results = set()
-
- def have_seen_events_txn(txn, chunk):
- sql = (
- "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
- % (",".join("?" * len(chunk)), )
- )
- txn.execute(sql, chunk)
- for (event_id, ) in txn:
- results.add(event_id)
-
- # break the input up into chunks of 100
- input_iterator = iter(event_ids)
- for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
- []):
- yield self.runInteraction(
- "have_seen_events",
- have_seen_events_txn,
- chunk,
- )
- defer.returnValue(results)
-
- def get_seen_events_with_rejections(self, event_ids):
- """Given a list of event ids, check if we rejected them.
-
- Args:
- event_ids (list[str])
-
- Returns:
- Deferred[dict[str, str|None):
- Has an entry for each event id we already have seen. Maps to
- the rejected reason string if we rejected the event, else maps
- to None.
- """
- if not event_ids:
- return defer.succeed({})
-
- def f(txn):
- sql = (
- "SELECT e.event_id, reason FROM events as e "
- "LEFT JOIN rejections as r ON e.event_id = r.event_id "
- "WHERE e.event_id = ?"
- )
-
- res = {}
- for event_id in event_ids:
- txn.execute(sql, (event_id,))
- row = txn.fetchone()
- if row:
- _, rejected = row
- res[event_id] = rejected
-
- return res
-
- return self.runInteraction("get_rejection_reasons", f)
-
- @defer.inlineCallbacks
def count_daily_messages(self):
"""
Returns an estimate of the number of messages sent in the last day.
@@ -1509,7 +1529,7 @@ class EventsStore(EventsWorkerStore):
contains_url = "url" in content
if contains_url:
- contains_url &= isinstance(content["url"], basestring)
+ contains_url &= isinstance(content["url"], text_type)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
@@ -1890,10 +1910,10 @@ class EventsStore(EventsWorkerStore):
(room_id,)
)
rows = txn.fetchall()
- max_depth = max(row[0] for row in rows)
+ max_depth = max(row[1] for row in rows)
- if max_depth <= token.topological:
- # We need to ensure we don't delete all the events from the datanase
+ if max_depth < token.topological:
+ # We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
raise SynapseError(
|