summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-06-22 14:09:39 +0100
committerErik Johnston <erik@matrix.org>2015-06-22 14:09:39 +0100
commit03a9e4436b9e619d6e94b53fbce99f21197269a8 (patch)
tree74d348f6f991b1a85553174e6e65328ef0e532de
parentAdd a many version of store_state_groups_txn (diff)
downloadsynapse-03a9e4436b9e619d6e94b53fbce99f21197269a8.tar.xz
Add a many version of _handle_prev_events
-rw-r--r--synapse/storage/event_federation.py87
-rw-r--r--synapse/storage/events.py253
2 files changed, 340 insertions, 0 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 1ba073884b..e5ad63cf9c 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -351,6 +351,93 @@ class EventFederationStore(SQLBaseStore):
                 self.get_latest_event_ids_in_room.invalidate, room_id
             )
 
+    def _handle_mult_prev_events(self, txn, events):
+        """
+        For the given event, update the event edges table and forward and
+        backward extremities tables.
+        """
+        self._simple_insert_many_txn(
+            txn,
+            table="event_edges",
+            values=[
+                {
+                    "event_id": ev.event_id,
+                    "prev_event_id": e_id,
+                    "room_id": ev.room_id,
+                    "is_state": False,
+                }
+                for e_id in [e_id for ev in events for e_id, _ in ev.prev_events]
+            ],
+        )
+
+        events_by_room = {}
+        for ev in events:
+            events_by_room.setdefault(ev.room_id, []).append(ev)
+
+        for room_id, room_events in events_by_room.items():
+            prevs = [
+                e_id for ev in room_events for e_id, _ in ev.prev_events
+                if not ev.internal_metadata.is_outlier()
+            ]
+            if prevs:
+                txn.execute(
+                    "DELETE FROM event_forward_extremities"
+                    " WHERE room_id = ?"
+                    " AND event_id in (%s)" % (
+                        ",".join(["?"] * len(prevs)),
+                    ),
+                    [room_id] + prevs,
+                )
+
+        query = (
+            "INSERT INTO event_forward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            " )"
+        )
+
+        txn.executemany(
+            query,
+            [(ev.event_id, ev.room_id, ev.event_id) for ev in events]
+        )
+
+        query = (
+            "INSERT INTO event_backward_extremities (event_id, room_id)"
+            " SELECT ?, ? WHERE NOT EXISTS ("
+            " SELECT 1 FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+            " )"
+            " AND NOT EXISTS ("
+            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            " AND outlier = ?"
+            " )"
+        )
+
+        prev_events = [
+            e_id for ev in events for e_id, _ in ev.prev_events
+            if not ev.internal_metadata.is_outlier()
+        ]
+
+        txn.executemany(query, [
+            (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+            for ev in events for e_id, _ in ev.prev_events
+            if not ev.internal_metadata.is_outlier()
+        ])
+
+        query = (
+            "DELETE FROM event_backward_extremities"
+            " WHERE event_id = ? AND room_id = ?"
+        )
+        txn.executemany(
+            query,
+            [(ev.event_id, ev.room_id) for ev in events]
+        )
+
+        for room_id in events_by_room:
+            txn.call_after(
+                self.get_latest_event_ids_in_room.invalidate, room_id
+            )
+
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e02a8066d6..b9a2319115 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -213,6 +213,259 @@ class EventsStore(SQLBaseStore):
         if not outlier:
             self._store_state_groups_txn(txn, event, context)
 
+        self._handle_mult_prev_events(
+            txn,
+            events=[event]
+        )
+
+        if event.type == EventTypes.Member:
+            self._store_room_member_txn(txn, event)
+        elif event.type == EventTypes.Name:
+            self._store_room_name_txn(txn, event)
+        elif event.type == EventTypes.Topic:
+            self._store_room_topic_txn(txn, event)
+        elif event.type == EventTypes.Redaction:
+            self._store_redaction(txn, event)
+
+        event_dict = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in [
+                "redacted",
+                "redacted_because",
+            ]
+        }
+
+        self._simple_insert_txn(
+            txn,
+            table="event_json",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "internal_metadata": metadata_json,
+                "json": encode_json(
+                    event_dict, using_frozen_dicts=USE_FROZEN_DICTS
+                ).decode("UTF-8"),
+            },
+        )
+
+        content = encode_json(
+            event.content, using_frozen_dicts=USE_FROZEN_DICTS
+        ).decode("UTF-8")
+
+        vals = {
+            "topological_ordering": event.depth,
+            "event_id": event.event_id,
+            "type": event.type,
+            "room_id": event.room_id,
+            "content": content,
+            "processed": True,
+            "outlier": outlier,
+            "depth": event.depth,
+        }
+
+        unrec = {
+            k: v
+            for k, v in event.get_dict().items()
+            if k not in vals.keys() and k not in [
+                "redacted",
+                "redacted_because",
+                "signatures",
+                "hashes",
+                "prev_events",
+            ]
+        }
+
+        vals["unrecognized_keys"] = encode_json(
+            unrec, using_frozen_dicts=USE_FROZEN_DICTS
+        ).decode("UTF-8")
+
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (?,?,?,?,?,?,?,?,?)"
+        )
+
+        txn.execute(
+            sql,
+            (
+                stream_ordering, event.depth, event.event_id, event.type,
+                event.room_id, content, True, outlier, event.depth
+            )
+        )
+
+        if context.rejected:
+            self._store_rejections_txn(
+                txn, event.event_id, context.rejected
+            )
+
+        # for hash_alg, hash_base64 in event.hashes.items():
+        #     hash_bytes = decode_base64(hash_base64)
+        #     self._store_event_content_hash_txn(
+        #         txn, event.event_id, hash_alg, hash_bytes,
+        #     )
+
+        # for prev_event_id, prev_hashes in event.prev_events:
+        #     for alg, hash_base64 in prev_hashes.items():
+        #         hash_bytes = decode_base64(hash_base64)
+        #         self._store_prev_event_hash_txn(
+        #             txn, event.event_id, prev_event_id, alg,
+        #             hash_bytes
+        #         )
+
+        self._simple_insert_many_txn(
+            txn,
+            table="event_auth",
+            values=[
+                {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                }
+                for auth_id, _ in event.auth_events
+            ],
+        )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
+        if event.is_state():
+            vals = {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "state_key": event.state_key,
+            }
+
+            # TODO: How does this work with backfilling?
+            if hasattr(event, "replaces_state"):
+                vals["prev_state"] = event.replaces_state
+
+            self._simple_insert_txn(
+                txn,
+                "state_events",
+                vals,
+            )
+
+            self._simple_insert_many_txn(
+                txn,
+                table="event_edges",
+                values=[
+                    {
+                        "event_id": event.event_id,
+                        "prev_event_id": e_id,
+                        "room_id": event.room_id,
+                        "is_state": True,
+                    }
+                    for e_id, h in event.prev_state
+                ],
+            )
+
+            if is_new_state and not context.rejected:
+                txn.call_after(
+                    self.get_current_state_for_key.invalidate,
+                    event.room_id, event.type, event.state_key
+                )
+
+                if (event.type == EventTypes.Name
+                        or event.type == EventTypes.Aliases):
+                    txn.call_after(
+                        self.get_room_name_and_aliases.invalidate,
+                        event.room_id
+                    )
+
+                self._simple_upsert_txn(
+                    txn,
+                    "current_state_events",
+                    keyvalues={
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    values={
+                        "event_id": event.event_id,
+                    }
+                )
+
+        return
+
+    @log_function
+    def _persist_events_txn(self, txn, event_and_contexts, backfilled,
+                            stream_ordering, is_new_state=True):
+
+        # Remove the any existing cache entries for the event_ids
+        for event, _ in event_and_contexts:
+            txn.call_after(self._invalidate_get_event_cache, event.event_id)
+
+        depth_updates = {}
+        for event, _ in event_and_contexts:
+            if event.internal_metadata.is_outlier():
+                continue
+            depth_updates[event.room_id] = max(
+                event.depth, depth_updates.get(event.room_id, event.depth)
+            )
+
+        for room_id, depth in depth_updates:
+            self._update_min_depth_for_room_txn(txn, room_id, depth)
+
+        txn.execute(
+            "SELECT event_id, outlier FROM events WHERE event_id in %s" % (
+                ",".join(["?"] * len(event_and_contexts)),
+            ),
+            [event.event_id for event, _ in event_and_contexts]
+        )
+        have_persisted = {
+            event_id: outlier
+            for event_id, outlier in txn.fetchall()
+        }
+
+
+        # metadata_json = encode_json(
+        #     event.internal_metadata.get_dict(),
+        #     using_frozen_dicts=USE_FROZEN_DICTS
+        # ).decode("UTF-8")
+
+        for event, context in event_and_contexts:
+            if event.event_id not in have_persisted:
+                continue
+            outlier_persisted = have_persisted[event.event_id]
+            if not event.internal_metadata.is_outlier() and outlier_persisted:
+                self._store_state_groups_txn(
+                    txn, event, context,
+                )
+
+                metadata_json = encode_json(
+                    event.internal_metadata.get_dict(),
+                    using_frozen_dicts=USE_FROZEN_DICTS
+                ).decode("UTF-8")
+
+                sql = (
+                    "UPDATE event_json SET internal_metadata = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (metadata_json, event.event_id,)
+                )
+
+                sql = (
+                    "UPDATE events SET outlier = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (False, event.event_id,)
+                )
+
+        self._store_mult_state_groups_txn(txn, [
+            (event, context)
+            for event, context in event_and_contexts
+            if not event.internal_metadata.is_outlier
+        ])
+
         self._handle_prev_events(
             txn,
             outlier=outlier,