diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 20a8d81794..5b64918024 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,9 +23,7 @@ from synapse.events.utils import prune_event
from synapse.util.logcontext import preserve_context_over_deferred
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
-from synapse.crypto.event_signing import compute_event_reference_hash
-from syutil.base64util import decode_base64
from syutil.jsonutil import encode_json
from contextlib import contextmanager
@@ -47,6 +45,48 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
+ def persist_events(self, events_and_contexts, backfilled=False,
+ is_new_state=True):
+ if not events_and_contexts:
+ return
+
+ if backfilled:
+ if not self.min_token_deferred.called:
+ yield self.min_token_deferred
+ start = self.min_token - 1
+ self.min_token -= len(events_and_contexts) + 1
+ stream_orderings = range(start, self.min_token, -1)
+
+ @contextmanager
+ def stream_ordering_manager():
+ yield stream_orderings
+ stream_ordering_manager = stream_ordering_manager()
+ else:
+ stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
+ self, len(events_and_contexts)
+ )
+
+ with stream_ordering_manager as stream_orderings:
+ for (event, _), stream in zip(events_and_contexts, stream_orderings):
+ event.internal_metadata.stream_ordering = stream
+
+ chunks = [
+ events_and_contexts[x:x+100]
+ for x in xrange(0, len(events_and_contexts), 100)
+ ]
+
+ for chunk in chunks:
+ # We can't easily parallelize these since different chunks
+ # might contain the same event. :(
+ yield self.runInteraction(
+ "persist_events",
+ self._persist_events_txn,
+ events_and_contexts=chunk,
+ backfilled=backfilled,
+ is_new_state=is_new_state,
+ )
+
+ @defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False,
is_new_state=True, current_state=None):
@@ -67,13 +107,13 @@ class EventsStore(SQLBaseStore):
try:
with stream_ordering_manager as stream_ordering:
+ event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
backfilled=backfilled,
- stream_ordering=stream_ordering,
is_new_state=is_new_state,
current_state=current_state,
)
@@ -116,19 +156,14 @@ class EventsStore(SQLBaseStore):
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
- stream_ordering=None, is_new_state=True,
- current_state=None):
-
- # Remove the any existing cache entries for the event_id
- txn.call_after(self._invalidate_get_event_cache, event.event_id)
-
+ is_new_state=True, current_state=None):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.call_after(self.get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
- txn.call_after(self.get_users_in_room.invalidate, event.room_id)
- txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
+ txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
+ txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
txn.call_after(self.get_room_name_and_aliases, event.room_id)
self._simple_delete_txn(
@@ -149,37 +184,78 @@ class EventsStore(SQLBaseStore):
}
)
- outlier = event.internal_metadata.is_outlier()
+ return self._persist_events_txn(
+ txn,
+ [(event, context)],
+ backfilled=backfilled,
+ is_new_state=is_new_state,
+ )
- if not outlier:
- self._update_min_depth_for_room_txn(
- txn,
- event.room_id,
- event.depth
+ @log_function
+ def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+ is_new_state=True):
+
+ # Remove the any existing cache entries for the event_ids
+ for event, _ in events_and_contexts:
+ txn.call_after(self._invalidate_get_event_cache, event.event_id)
+
+ depth_updates = {}
+ for event, _ in events_and_contexts:
+ if event.internal_metadata.is_outlier():
+ continue
+ depth_updates[event.room_id] = max(
+ event.depth, depth_updates.get(event.room_id, event.depth)
)
- have_persisted = self._simple_select_one_txn(
- txn,
- table="events",
- keyvalues={"event_id": event.event_id},
- retcols=["event_id", "outlier"],
- allow_none=True,
+ for room_id, depth in depth_updates.items():
+ self._update_min_depth_for_room_txn(txn, room_id, depth)
+
+ txn.execute(
+ "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
+ ",".join(["?"] * len(events_and_contexts)),
+ ),
+ [event.event_id for event, _ in events_and_contexts]
)
+ have_persisted = {
+ event_id: outlier
+ for event_id, outlier in txn.fetchall()
+ }
+
+ event_map = {}
+ to_remove = set()
+ for event, context in events_and_contexts:
+ # Handle the case of the list including the same event multiple
+ # times. The tricky thing here is when they differ by whether
+ # they are an outlier.
+ if event.event_id in event_map:
+ other = event_map[event.event_id]
+
+ if not other.internal_metadata.is_outlier():
+ to_remove.add(event)
+ continue
+ elif not event.internal_metadata.is_outlier():
+ to_remove.add(event)
+ continue
+ else:
+ to_remove.add(other)
- metadata_json = encode_json(
- event.internal_metadata.get_dict(),
- using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8")
-
- # If we have already persisted this event, we don't need to do any
- # more processing.
- # The processing above must be done on every call to persist event,
- # since they might not have happened on previous calls. For example,
- # if we are persisting an event that we had persisted as an outlier,
- # but is no longer one.
- if have_persisted:
- if not outlier and have_persisted["outlier"]:
- self._store_state_groups_txn(txn, event, context)
+ event_map[event.event_id] = event
+
+ if event.event_id not in have_persisted:
+ continue
+
+ to_remove.add(event)
+
+ outlier_persisted = have_persisted[event.event_id]
+ if not event.internal_metadata.is_outlier() and outlier_persisted:
+ self._store_state_groups_txn(
+ txn, event, context,
+ )
+
+ metadata_json = encode_json(
+ event.internal_metadata.get_dict(),
+ using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8")
sql = (
"UPDATE event_json SET internal_metadata = ?"
@@ -198,94 +274,91 @@ class EventsStore(SQLBaseStore):
sql,
(False, event.event_id,)
)
- return
-
- if not outlier:
- self._store_state_groups_txn(txn, event, context)
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
+ events_and_contexts = filter(
+ lambda ec: ec[0] not in to_remove,
+ events_and_contexts
)
- if event.type == EventTypes.Member:
- self._store_room_member_txn(txn, event)
- elif event.type == EventTypes.Name:
- self._store_room_name_txn(txn, event)
- elif event.type == EventTypes.Topic:
- self._store_room_topic_txn(txn, event)
- elif event.type == EventTypes.Redaction:
- self._store_redaction(txn, event)
-
- event_dict = {
- k: v
- for k, v in event.get_dict().items()
- if k not in [
- "redacted",
- "redacted_because",
- ]
- }
+ if not events_and_contexts:
+ return
- self._simple_insert_txn(
+ self._store_mult_state_groups_txn(txn, [
+ (event, context)
+ for event, context in events_and_contexts
+ if not event.internal_metadata.is_outlier()
+ ])
+
+ self._handle_mult_prev_events(
txn,
- table="event_json",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "internal_metadata": metadata_json,
- "json": encode_json(
- event_dict, using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8"),
- },
+ events=[event for event, _ in events_and_contexts],
)
- content = encode_json(
- event.content, using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8")
-
- vals = {
- "topological_ordering": event.depth,
- "event_id": event.event_id,
- "type": event.type,
- "room_id": event.room_id,
- "content": content,
- "processed": True,
- "outlier": outlier,
- "depth": event.depth,
- }
+ for event, _ in events_and_contexts:
+ if event.type == EventTypes.Name:
+ self._store_room_name_txn(txn, event)
+ elif event.type == EventTypes.Topic:
+ self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Redaction:
+ self._store_redaction(txn, event)
- unrec = {
- k: v
- for k, v in event.get_dict().items()
- if k not in vals.keys() and k not in [
- "redacted",
- "redacted_because",
- "signatures",
- "hashes",
- "prev_events",
+ self._store_room_members_txn(
+ txn,
+ [
+ event
+ for event, _ in events_and_contexts
+ if event.type == EventTypes.Member
]
- }
+ )
- vals["unrecognized_keys"] = encode_json(
- unrec, using_frozen_dicts=USE_FROZEN_DICTS
- ).decode("UTF-8")
+ def event_dict(event):
+ return {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in [
+ "redacted",
+ "redacted_because",
+ ]
+ }
- sql = (
- "INSERT INTO events"
- " (stream_ordering, topological_ordering, event_id, type,"
- " room_id, content, processed, outlier, depth)"
- " VALUES (?,?,?,?,?,?,?,?,?)"
+ self._simple_insert_many_txn(
+ txn,
+ table="event_json",
+ values=[
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "internal_metadata": encode_json(
+ event.internal_metadata.get_dict(),
+ using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8"),
+ "json": encode_json(
+ event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8"),
+ }
+ for event, _ in events_and_contexts
+ ],
)
- txn.execute(
- sql,
- (
- stream_ordering, event.depth, event.event_id, event.type,
- event.room_id, content, True, outlier, event.depth
- )
+ self._simple_insert_many_txn(
+ txn,
+ table="events",
+ values=[
+ {
+ "stream_ordering": event.internal_metadata.stream_ordering,
+ "topological_ordering": event.depth,
+ "depth": event.depth,
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "processed": True,
+ "outlier": event.internal_metadata.is_outlier(),
+ "content": encode_json(
+ event.content, using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8"),
+ }
+ for event, _ in events_and_contexts
+ ],
)
if context.rejected:
@@ -293,20 +366,6 @@ class EventsStore(SQLBaseStore):
txn, event.event_id, context.rejected
)
- for hash_alg, hash_base64 in event.hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_event_content_hash_txn(
- txn, event.event_id, hash_alg, hash_bytes,
- )
-
- for prev_event_id, prev_hashes in event.prev_events:
- for alg, hash_base64 in prev_hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_prev_event_hash_txn(
- txn, event.event_id, prev_event_id, alg,
- hash_bytes
- )
-
self._simple_insert_many_txn(
txn,
table="event_auth",
@@ -316,16 +375,22 @@ class EventsStore(SQLBaseStore):
"room_id": event.room_id,
"auth_id": auth_id,
}
+ for event, _ in events_and_contexts
for auth_id, _ in event.auth_events
],
)
- (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
- self._store_event_reference_hash_txn(
- txn, event.event_id, ref_alg, ref_hash_bytes
+ self._store_event_reference_hashes_txn(
+ txn, [event for event, _ in events_and_contexts]
)
- if event.is_state():
+ state_events_and_contexts = filter(
+ lambda i: i[0].is_state(),
+ events_and_contexts,
+ )
+
+ state_values = []
+ for event, context in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -337,51 +402,55 @@ class EventsStore(SQLBaseStore):
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
- self._simple_insert_txn(
- txn,
- "state_events",
- vals,
- )
+ state_values.append(vals)
- self._simple_insert_many_txn(
- txn,
- table="event_edges",
- values=[
- {
- "event_id": event.event_id,
- "prev_event_id": e_id,
- "room_id": event.room_id,
- "is_state": True,
- }
- for e_id, h in event.prev_state
- ],
- )
+ self._simple_insert_many_txn(
+ txn,
+ table="state_events",
+ values=state_values,
+ )
- if is_new_state and not context.rejected:
- txn.call_after(
- self.get_current_state_for_key.invalidate,
- event.room_id, event.type, event.state_key
- )
+ self._simple_insert_many_txn(
+ txn,
+ table="event_edges",
+ values=[
+ {
+ "event_id": event.event_id,
+ "prev_event_id": prev_id,
+ "room_id": event.room_id,
+ "is_state": True,
+ }
+ for event, _ in state_events_and_contexts
+ for prev_id, _ in event.prev_state
+ ],
+ )
- if (event.type == EventTypes.Name
- or event.type == EventTypes.Aliases):
+ if is_new_state:
+ for event, _ in state_events_and_contexts:
+ if not context.rejected:
txn.call_after(
- self.get_room_name_and_aliases.invalidate,
- event.room_id
+ self.get_current_state_for_key.invalidate,
+ (event.room_id, event.type, event.state_key,)
)
- self._simple_upsert_txn(
- txn,
- "current_state_events",
- keyvalues={
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- values={
- "event_id": event.event_id,
- }
- )
+ if event.type in [EventTypes.Name, EventTypes.Aliases]:
+ txn.call_after(
+ self.get_room_name_and_aliases.invalidate,
+ (event.room_id,)
+ )
+
+ self._simple_upsert_txn(
+ txn,
+ "current_state_events",
+ keyvalues={
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ }
+ )
return
@@ -498,8 +567,9 @@ class EventsStore(SQLBaseStore):
def _invalidate_get_event_cache(self, event_id):
for check_redacted in (False, True):
for get_prev_content in (False, True):
- self._get_event_cache.invalidate(event_id, check_redacted,
- get_prev_content)
+ self._get_event_cache.invalidate(
+ (event_id, check_redacted, get_prev_content)
+ )
def _get_event_txn(self, txn, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False):
@@ -520,7 +590,7 @@ class EventsStore(SQLBaseStore):
for event_id in events:
try:
ret = self._get_event_cache.get(
- event_id, check_redacted, get_prev_content
+ (event_id, check_redacted, get_prev_content,)
)
if allow_rejected or not ret.rejected_reason:
@@ -736,7 +806,8 @@ class EventsStore(SQLBaseStore):
because = yield self.get_event(
redaction_id,
- check_redacted=False
+ check_redacted=False,
+ allow_none=True,
)
if because:
@@ -746,12 +817,13 @@ class EventsStore(SQLBaseStore):
prev = yield self.get_event(
ev.unsigned["replaces_state"],
get_prev_content=False,
+ allow_none=True,
)
if prev:
ev.unsigned["prev_content"] = prev.get_dict()["content"]
self._get_event_cache.prefill(
- ev.event_id, check_redacted, get_prev_content, ev
+ (ev.event_id, check_redacted, get_prev_content), ev
)
defer.returnValue(ev)
@@ -808,7 +880,7 @@ class EventsStore(SQLBaseStore):
ev.unsigned["prev_content"] = prev.get_dict()["content"]
self._get_event_cache.prefill(
- ev.event_id, check_redacted, get_prev_content, ev
+ (ev.event_id, check_redacted, get_prev_content), ev
)
return ev
|