summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-06-25 17:18:19 +0100
committerErik Johnston <erik@matrix.org>2015-06-25 17:29:34 +0100
commit5130d80d79fe1f95ce03b8f1cfd4fbf0a32f5ac8 (patch)
treee16d302641072f6a700d3d65e919c7cb01447e47 /synapse/storage/events.py
parentBatch SELECTs in _get_auth_chain_ids_txn (diff)
downloadsynapse-5130d80d79fe1f95ce03b8f1cfd4fbf0a32f5ac8.tar.xz
Add bulk insert events API
Diffstat (limited to '')
-rw-r--r--synapse/storage/events.py415
1 files changed, 249 insertions, 166 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2caf0aae80..d760acb878 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,9 +23,7 @@ from synapse.events.utils import prune_event
 from synapse.util.logcontext import preserve_context_over_deferred
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
-from synapse.crypto.event_signing import compute_event_reference_hash
 
-from syutil.base64util import decode_base64
 from syutil.jsonutil import encode_json
 from contextlib import contextmanager
 
@@ -47,6 +45,48 @@ EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
+    def persist_events(self, events_and_contexts, backfilled=False,
+                       is_new_state=True):
+        if not events_and_contexts:
+            return
+
+        if backfilled:
+            if not self.min_token_deferred.called:
+                yield self.min_token_deferred
+            start = self.min_token - 1
+            self.min_token -= len(events_and_contexts) + 1
+            stream_orderings = range(start, self.min_token, -1)
+
+            @contextmanager
+            def stream_ordering_manager():
+                yield stream_orderings
+            stream_ordering_manager = stream_ordering_manager()
+        else:
+            stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
+                self, len(events_and_contexts)
+            )
+
+        with stream_ordering_manager as stream_orderings:
+            for (event, _), stream in zip(events_and_contexts, stream_orderings):
+                event.internal_metadata.stream_ordering = stream
+
+            chunks = [
+                events_and_contexts[x:x+100]
+                for x in xrange(0, len(events_and_contexts), 100)
+            ]
+
+            for chunk in chunks:
+                # We can't easily parallelize these since different chunks
+                # might contain the same event. :(
+                yield self.runInteraction(
+                    "persist_events",
+                    self._persist_events_txn,
+                    events_and_contexts=chunk,
+                    backfilled=backfilled,
+                    is_new_state=is_new_state,
+                )
+
+    @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False,
                       is_new_state=True, current_state=None):
@@ -67,13 +107,13 @@ class EventsStore(SQLBaseStore):
 
         try:
             with stream_ordering_manager as stream_ordering:
+                event.internal_metadata.stream_ordering = stream_ordering
                 yield self.runInteraction(
                     "persist_event",
                     self._persist_event_txn,
                     event=event,
                     context=context,
                     backfilled=backfilled,
-                    stream_ordering=stream_ordering,
                     is_new_state=is_new_state,
                     current_state=current_state,
                 )
@@ -116,12 +156,7 @@ class EventsStore(SQLBaseStore):
 
     @log_function
     def _persist_event_txn(self, txn, event, context, backfilled,
-                           stream_ordering=None, is_new_state=True,
-                           current_state=None):
-
-        # Remove the any existing cache entries for the event_id
-        txn.call_after(self._invalidate_get_event_cache, event.event_id)
-
+                           is_new_state=True, current_state=None):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -149,37 +184,78 @@ class EventsStore(SQLBaseStore):
                     }
                 )
 
-        outlier = event.internal_metadata.is_outlier()
+        return self._persist_events_txn(
+            txn,
+            [(event, context)],
+            backfilled=backfilled,
+            is_new_state=is_new_state,
+        )
 
-        if not outlier:
-            self._update_min_depth_for_room_txn(
-                txn,
-                event.room_id,
-                event.depth
+    @log_function
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
+                            is_new_state=True):
+
+        # Remove the any existing cache entries for the event_ids
+        for event, _ in events_and_contexts:
+            txn.call_after(self._invalidate_get_event_cache, event.event_id)
+
+        depth_updates = {}
+        for event, _ in events_and_contexts:
+            if event.internal_metadata.is_outlier():
+                continue
+            depth_updates[event.room_id] = max(
+                event.depth, depth_updates.get(event.room_id, event.depth)
             )
 
-        have_persisted = self._simple_select_one_txn(
-            txn,
-            table="events",
-            keyvalues={"event_id": event.event_id},
-            retcols=["event_id", "outlier"],
-            allow_none=True,
+        for room_id, depth in depth_updates.items():
+            self._update_min_depth_for_room_txn(txn, room_id, depth)
+
+        txn.execute(
+            "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
+                ",".join(["?"] * len(events_and_contexts)),
+            ),
+            [event.event_id for event, _ in events_and_contexts]
         )
+        have_persisted = {
+            event_id: outlier
+            for event_id, outlier in txn.fetchall()
+        }
+
+        event_map = {}
+        to_remove = set()
+        for event, context in events_and_contexts:
+            # Handle the case of the list including the same event multiple
+            # times. The tricky thing here is when they differ by whether
+            # they are an outlier.
+            if event.event_id in event_map:
+                other = event_map[event.event_id]
+
+                if not other.internal_metadata.is_outlier():
+                    to_remove.add(event)
+                    continue
+                elif not event.internal_metadata.is_outlier():
+                    to_remove.add(event)
+                    continue
+                else:
+                    to_remove.add(other)
+
+            event_map[event.event_id] = event
+
+            if event.event_id not in have_persisted:
+                continue
+
+            to_remove.add(event)
+
+            outlier_persisted = have_persisted[event.event_id]
+            if not event.internal_metadata.is_outlier() and outlier_persisted:
+                self._store_state_groups_txn(
+                    txn, event, context,
+                )
 
-        metadata_json = encode_json(
-            event.internal_metadata.get_dict(),
-            using_frozen_dicts=USE_FROZEN_DICTS
-        ).decode("UTF-8")
-
-        # If we have already persisted this event, we don't need to do any
-        # more processing.
-        # The processing above must be done on every call to persist event,
-        # since they might not have happened on previous calls. For example,
-        # if we are persisting an event that we had persisted as an outlier,
-        # but is no longer one.
-        if have_persisted:
-            if not outlier and have_persisted["outlier"]:
-                self._store_state_groups_txn(txn, event, context)
+                metadata_json = encode_json(
+                    event.internal_metadata.get_dict(),
+                    using_frozen_dicts=USE_FROZEN_DICTS
+                ).decode("UTF-8")
 
                 sql = (
                     "UPDATE event_json SET internal_metadata = ?"
@@ -198,94 +274,91 @@ class EventsStore(SQLBaseStore):
                     sql,
                     (False, event.event_id,)
                 )
-            return
-
-        if not outlier:
-            self._store_state_groups_txn(txn, event, context)
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
+        events_and_contexts = filter(
+            lambda ec: ec[0] not in to_remove,
+            events_and_contexts
         )
 
-        if event.type == EventTypes.Member:
-            self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Name:
-            self._store_room_name_txn(txn, event)
-        elif event.type == EventTypes.Topic:
-            self._store_room_topic_txn(txn, event)
-        elif event.type == EventTypes.Redaction:
-            self._store_redaction(txn, event)
-
-        event_dict = {
-            k: v
-            for k, v in event.get_dict().items()
-            if k not in [
-                "redacted",
-                "redacted_because",
-            ]
-        }
+        if not events_and_contexts:
+            return
 
-        self._simple_insert_txn(
+        self._store_mult_state_groups_txn(txn, [
+            (event, context)
+            for event, context in events_and_contexts
+            if not event.internal_metadata.is_outlier()
+        ])
+
+        self._handle_mult_prev_events(
             txn,
-            table="event_json",
-            values={
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "internal_metadata": metadata_json,
-                "json": encode_json(
-                    event_dict, using_frozen_dicts=USE_FROZEN_DICTS
-                ).decode("UTF-8"),
-            },
+            events=[event for event, _ in events_and_contexts],
         )
 
-        content = encode_json(
-            event.content, using_frozen_dicts=USE_FROZEN_DICTS
-        ).decode("UTF-8")
-
-        vals = {
-            "topological_ordering": event.depth,
-            "event_id": event.event_id,
-            "type": event.type,
-            "room_id": event.room_id,
-            "content": content,
-            "processed": True,
-            "outlier": outlier,
-            "depth": event.depth,
-        }
+        for event, _ in events_and_contexts:
+            if event.type == EventTypes.Name:
+                self._store_room_name_txn(txn, event)
+            elif event.type == EventTypes.Topic:
+                self._store_room_topic_txn(txn, event)
+            elif event.type == EventTypes.Redaction:
+                self._store_redaction(txn, event)
 
-        unrec = {
-            k: v
-            for k, v in event.get_dict().items()
-            if k not in vals.keys() and k not in [
-                "redacted",
-                "redacted_because",
-                "signatures",
-                "hashes",
-                "prev_events",
+        self._store_room_members_txn(
+            txn,
+            [
+                event
+                for event, _ in events_and_contexts
+                if event.type == EventTypes.Member
             ]
-        }
+        )
 
-        vals["unrecognized_keys"] = encode_json(
-            unrec, using_frozen_dicts=USE_FROZEN_DICTS
-        ).decode("UTF-8")
+        def event_dict(event):
+            return {
+                k: v
+                for k, v in event.get_dict().items()
+                if k not in [
+                    "redacted",
+                    "redacted_because",
+                ]
+            }
 
-        sql = (
-            "INSERT INTO events"
-            " (stream_ordering, topological_ordering, event_id, type,"
-            " room_id, content, processed, outlier, depth)"
-            " VALUES (?,?,?,?,?,?,?,?,?)"
+        self._simple_insert_many_txn(
+            txn,
+            table="event_json",
+            values=[
+                {
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "internal_metadata": encode_json(
+                        event.internal_metadata.get_dict(),
+                        using_frozen_dicts=USE_FROZEN_DICTS
+                    ).decode("UTF-8"),
+                    "json": encode_json(
+                        event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
+                    ).decode("UTF-8"),
+                }
+                for event, _ in events_and_contexts
+            ],
         )
 
-        txn.execute(
-            sql,
-            (
-                stream_ordering, event.depth, event.event_id, event.type,
-                event.room_id, content, True, outlier, event.depth
-            )
+        self._simple_insert_many_txn(
+            txn,
+            table="events",
+            values=[
+                {
+                    "stream_ordering": event.internal_metadata.stream_ordering,
+                    "topological_ordering": event.depth,
+                    "depth": event.depth,
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "type": event.type,
+                    "processed": True,
+                    "outlier": event.internal_metadata.is_outlier(),
+                    "content": encode_json(
+                        event.content, using_frozen_dicts=USE_FROZEN_DICTS
+                    ).decode("UTF-8"),
+                }
+                for event, _ in events_and_contexts
+            ],
         )
 
         if context.rejected:
@@ -293,19 +366,19 @@ class EventsStore(SQLBaseStore):
                 txn, event.event_id, context.rejected
             )
 
-        for hash_alg, hash_base64 in event.hashes.items():
-            hash_bytes = decode_base64(hash_base64)
-            self._store_event_content_hash_txn(
-                txn, event.event_id, hash_alg, hash_bytes,
-            )
+        # for hash_alg, hash_base64 in event.hashes.items():
+        #     hash_bytes = decode_base64(hash_base64)
+        #     self._store_event_content_hash_txn(
+        #         txn, event.event_id, hash_alg, hash_bytes,
+        #     )
 
-        for prev_event_id, prev_hashes in event.prev_events:
-            for alg, hash_base64 in prev_hashes.items():
-                hash_bytes = decode_base64(hash_base64)
-                self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg,
-                    hash_bytes
-                )
+        # for prev_event_id, prev_hashes in event.prev_events:
+        #     for alg, hash_base64 in prev_hashes.items():
+        #         hash_bytes = decode_base64(hash_base64)
+        #         self._store_prev_event_hash_txn(
+        #             txn, event.event_id, prev_event_id, alg,
+        #             hash_bytes
+        #         )
 
         self._simple_insert_many_txn(
             txn,
@@ -316,16 +389,22 @@ class EventsStore(SQLBaseStore):
                     "room_id": event.room_id,
                     "auth_id": auth_id,
                 }
+                for event, _ in events_and_contexts
                 for auth_id, _ in event.auth_events
             ],
         )
 
-        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
+        self._store_event_reference_hashes_txn(
+            txn, [event for event, _ in events_and_contexts]
         )
 
-        if event.is_state():
+        state_events_and_contexts = filter(
+            lambda i: i[0].is_state(),
+            events_and_contexts,
+        )
+
+        state_values = []
+        for event, context in state_events_and_contexts:
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -337,52 +416,56 @@ class EventsStore(SQLBaseStore):
             if hasattr(event, "replaces_state"):
                 vals["prev_state"] = event.replaces_state
 
-            self._simple_insert_txn(
-                txn,
-                "state_events",
-                vals,
-            )
+            state_values.append(vals)
 
-            self._simple_insert_many_txn(
-                txn,
-                table="event_edges",
-                values=[
-                    {
-                        "event_id": event.event_id,
-                        "prev_event_id": e_id,
-                        "room_id": event.room_id,
-                        "is_state": True,
-                    }
-                    for e_id, h in event.prev_state
-                ],
-            )
+        self._simple_insert_many_txn(
+            txn,
+            table="state_events",
+            values=state_values,
+        )
 
-            if is_new_state and not context.rejected:
-                txn.call_after(
-                    self.get_current_state_for_key.invalidate,
-                    event.room_id, event.type, event.state_key
-                )
+        self._simple_insert_many_txn(
+            txn,
+            table="event_edges",
+            values=[
+                {
+                    "event_id": event.event_id,
+                    "prev_event_id": prev_id,
+                    "room_id": event.room_id,
+                    "is_state": True,
+                }
+                for event, _ in state_events_and_contexts
+                for prev_id, _ in event.prev_state
+            ],
+        )
 
-                if (event.type == EventTypes.Name
-                        or event.type == EventTypes.Aliases):
+        if is_new_state:
+            for event, _ in state_events_and_contexts:
+                if not context.rejected:
                     txn.call_after(
-                        self.get_room_name_and_aliases.invalidate,
-                        event.room_id
+                        self.get_current_state_for_key.invalidate,
+                        event.room_id, event.type, event.state_key
+                        )
+
+                    if event.type in [EventTypes.Name, EventTypes.Aliases]:
+                        txn.call_after(
+                            self.get_room_name_and_aliases.invalidate,
+                            event.room_id
+                        )
+
+                    self._simple_upsert_txn(
+                        txn,
+                        "current_state_events",
+                        keyvalues={
+                            "room_id": event.room_id,
+                            "type": event.type,
+                            "state_key": event.state_key,
+                        },
+                        values={
+                            "event_id": event.event_id,
+                        }
                     )
 
-                self._simple_upsert_txn(
-                    txn,
-                    "current_state_events",
-                    keyvalues={
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    values={
-                        "event_id": event.event_id,
-                    }
-                )
-
         return
 
     def _store_redaction(self, txn, event):