diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 1ba073884b..e5ad63cf9c 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -351,6 +351,93 @@ class EventFederationStore(SQLBaseStore):
self.get_latest_event_ids_in_room.invalidate, room_id
)
+ def _handle_mult_prev_events(self, txn, events):
+ """
+ For the given event, update the event edges table and forward and
+ backward extremities tables.
+ """
+ self._simple_insert_many_txn(
+ txn,
+ table="event_edges",
+ values=[
+ {
+ "event_id": ev.event_id,
+ "prev_event_id": e_id,
+ "room_id": ev.room_id,
+ "is_state": False,
+ }
+ for e_id in [e_id for ev in events for e_id, _ in ev.prev_events]
+ ],
+ )
+
+ events_by_room = {}
+ for ev in events:
+ events_by_room.setdefault(ev.room_id, []).append(ev)
+
+ for room_id, room_events in events_by_room.items():
+ prevs = [
+ e_id for ev in room_events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ]
+ if prevs:
+ txn.execute(
+ "DELETE FROM event_forward_extremities"
+ " WHERE room_id = ?"
+ " AND event_id in (%s)" % (
+ ",".join(["?"] * len(prevs)),
+ ),
+ [room_id] + prevs,
+ )
+
+ query = (
+ "INSERT INTO event_forward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+ " )"
+ )
+
+ txn.executemany(
+ query,
+ [(ev.event_id, ev.room_id, ev.event_id) for ev in events]
+ )
+
+ query = (
+ "INSERT INTO event_backward_extremities (event_id, room_id)"
+ " SELECT ?, ? WHERE NOT EXISTS ("
+ " SELECT 1 FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ " )"
+ " AND NOT EXISTS ("
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
+ " )"
+ )
+
+ prev_events = [
+ e_id for ev in events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ]
+
+ txn.executemany(query, [
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ for ev in events for e_id, _ in ev.prev_events
+ if not ev.internal_metadata.is_outlier()
+ ])
+
+ query = (
+ "DELETE FROM event_backward_extremities"
+ " WHERE event_id = ? AND room_id = ?"
+ )
+ txn.executemany(
+ query,
+ [(ev.event_id, ev.room_id) for ev in events]
+ )
+
+ for room_id in events_by_room:
+ txn.call_after(
+ self.get_latest_event_ids_in_room.invalidate, room_id
+ )
+
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e02a8066d6..b9a2319115 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -213,6 +213,259 @@ class EventsStore(SQLBaseStore):
if not outlier:
self._store_state_groups_txn(txn, event, context)
+ self._handle_mult_prev_events(
+ txn,
+ events=[event]
+ )
+
+ if event.type == EventTypes.Member:
+ self._store_room_member_txn(txn, event)
+ elif event.type == EventTypes.Name:
+ self._store_room_name_txn(txn, event)
+ elif event.type == EventTypes.Topic:
+ self._store_room_topic_txn(txn, event)
+ elif event.type == EventTypes.Redaction:
+ self._store_redaction(txn, event)
+
+ event_dict = {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in [
+ "redacted",
+ "redacted_because",
+ ]
+ }
+
+ self._simple_insert_txn(
+ txn,
+ table="event_json",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "internal_metadata": metadata_json,
+ "json": encode_json(
+ event_dict, using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8"),
+ },
+ )
+
+ content = encode_json(
+ event.content, using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8")
+
+ vals = {
+ "topological_ordering": event.depth,
+ "event_id": event.event_id,
+ "type": event.type,
+ "room_id": event.room_id,
+ "content": content,
+ "processed": True,
+ "outlier": outlier,
+ "depth": event.depth,
+ }
+
+ unrec = {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in vals.keys() and k not in [
+ "redacted",
+ "redacted_because",
+ "signatures",
+ "hashes",
+ "prev_events",
+ ]
+ }
+
+ vals["unrecognized_keys"] = encode_json(
+ unrec, using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8")
+
+ sql = (
+ "INSERT INTO events"
+ " (stream_ordering, topological_ordering, event_id, type,"
+ " room_id, content, processed, outlier, depth)"
+ " VALUES (?,?,?,?,?,?,?,?,?)"
+ )
+
+ txn.execute(
+ sql,
+ (
+ stream_ordering, event.depth, event.event_id, event.type,
+ event.room_id, content, True, outlier, event.depth
+ )
+ )
+
+ if context.rejected:
+ self._store_rejections_txn(
+ txn, event.event_id, context.rejected
+ )
+
+ # for hash_alg, hash_base64 in event.hashes.items():
+ # hash_bytes = decode_base64(hash_base64)
+ # self._store_event_content_hash_txn(
+ # txn, event.event_id, hash_alg, hash_bytes,
+ # )
+
+ # for prev_event_id, prev_hashes in event.prev_events:
+ # for alg, hash_base64 in prev_hashes.items():
+ # hash_bytes = decode_base64(hash_base64)
+ # self._store_prev_event_hash_txn(
+ # txn, event.event_id, prev_event_id, alg,
+ # hash_bytes
+ # )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="event_auth",
+ values=[
+ {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ }
+ for auth_id, _ in event.auth_events
+ ],
+ )
+
+ (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+ self._store_event_reference_hash_txn(
+ txn, event.event_id, ref_alg, ref_hash_bytes
+ )
+
+ if event.is_state():
+ vals = {
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ }
+
+ # TODO: How does this work with backfilling?
+ if hasattr(event, "replaces_state"):
+ vals["prev_state"] = event.replaces_state
+
+ self._simple_insert_txn(
+ txn,
+ "state_events",
+ vals,
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="event_edges",
+ values=[
+ {
+ "event_id": event.event_id,
+ "prev_event_id": e_id,
+ "room_id": event.room_id,
+ "is_state": True,
+ }
+ for e_id, h in event.prev_state
+ ],
+ )
+
+ if is_new_state and not context.rejected:
+ txn.call_after(
+ self.get_current_state_for_key.invalidate,
+ event.room_id, event.type, event.state_key
+ )
+
+ if (event.type == EventTypes.Name
+ or event.type == EventTypes.Aliases):
+ txn.call_after(
+ self.get_room_name_and_aliases.invalidate,
+ event.room_id
+ )
+
+ self._simple_upsert_txn(
+ txn,
+ "current_state_events",
+ keyvalues={
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ }
+ )
+
+ return
+
+ @log_function
+ def _persist_events_txn(self, txn, event_and_contexts, backfilled,
+ stream_ordering, is_new_state=True):
+
+ # Remove the any existing cache entries for the event_ids
+ for event, _ in event_and_contexts:
+ txn.call_after(self._invalidate_get_event_cache, event.event_id)
+
+ depth_updates = {}
+ for event, _ in event_and_contexts:
+ if event.internal_metadata.is_outlier():
+ continue
+ depth_updates[event.room_id] = max(
+ event.depth, depth_updates.get(event.room_id, event.depth)
+ )
+
+ for room_id, depth in depth_updates:
+ self._update_min_depth_for_room_txn(txn, room_id, depth)
+
+ txn.execute(
+ "SELECT event_id, outlier FROM events WHERE event_id in %s" % (
+ ",".join(["?"] * len(event_and_contexts)),
+ ),
+ [event.event_id for event, _ in event_and_contexts]
+ )
+ have_persisted = {
+ event_id: outlier
+ for event_id, outlier in txn.fetchall()
+ }
+
+
+ # metadata_json = encode_json(
+ # event.internal_metadata.get_dict(),
+ # using_frozen_dicts=USE_FROZEN_DICTS
+ # ).decode("UTF-8")
+
+ for event, context in event_and_contexts:
+ if event.event_id not in have_persisted:
+ continue
+ outlier_persisted = have_persisted[event.event_id]
+ if not event.internal_metadata.is_outlier() and outlier_persisted:
+ self._store_state_groups_txn(
+ txn, event, context,
+ )
+
+ metadata_json = encode_json(
+ event.internal_metadata.get_dict(),
+ using_frozen_dicts=USE_FROZEN_DICTS
+ ).decode("UTF-8")
+
+ sql = (
+ "UPDATE event_json SET internal_metadata = ?"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (metadata_json, event.event_id,)
+ )
+
+ sql = (
+ "UPDATE events SET outlier = ?"
+ " WHERE event_id = ?"
+ )
+ txn.execute(
+ sql,
+ (False, event.event_id,)
+ )
+
+ self._store_mult_state_groups_txn(txn, [
+ (event, context)
+ for event, context in event_and_contexts
+ if not event.internal_metadata.is_outlier
+ ])
+
self._handle_prev_events(
txn,
outlier=outlier,
|