summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-04-28 14:24:08 +0100
committerErik Johnston <erik@matrix.org>2015-04-28 14:24:08 +0100
commit9fbcf191889b655d5c93d01a738e5137aa94a354 (patch)
tree8a403e4b27825447695fe09b54bb2440df734ed5 /synapse/storage/events.py
parentMerge pull request #126 from matrix-org/csauth (diff)
parentRemove unused imports (diff)
downloadsynapse-9fbcf191889b655d5c93d01a738e5137aa94a354.tar.xz
Merge pull request #123 from matrix-org/postgres
Postgres
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py160
1 files changed, 81 insertions, 79 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..a3c260ddc4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
                 is_new_state=is_new_state,
                 current_state=current_state,
             )
-            self.get_room_events_max_id.invalidate()
         except _RollbackButIsFineException:
             pass
 
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
         # Remove the any existing cache entries for the event_id
         self._invalidate_get_event_cache(event.event_id)
 
+        if stream_ordering is None:
+            with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+                return self._persist_event_txn(
+                    txn, event, context, backfilled,
+                    stream_ordering=stream_ordering,
+                    is_new_state=is_new_state,
+                    current_state=current_state,
+                )
+
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
+            self._simple_delete_txn(
+                txn,
+                table="current_state_events",
+                keyvalues={"room_id": event.room_id},
             )
 
             for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
                         "type": s.type,
                         "state_key": s.state_key,
                     },
-                    or_replace=True,
                 )
 
         if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
                         "type": event.type,
                         "state_key": event.state_key,
                     },
-                    or_replace=True,
                 )
 
                 for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
                 event.depth
             )
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
         have_persisted = self._simple_select_one_onecol_txn(
             txn,
             table="event_json",
@@ -169,7 +168,7 @@ class EventsStore(SQLBaseStore):
 
         metadata_json = encode_canonical_json(
             event.internal_metadata.get_dict()
-        )
+        ).decode("UTF-8")
 
         # If we have already persisted this event, we don't need to do any
         # more processing.
@@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore):
                 )
                 txn.execute(
                     sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
+                    (metadata_json, event.event_id,)
                 )
 
                 sql = (
-                    "UPDATE events SET outlier = 0"
+                    "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
                 )
                 txn.execute(
                     sql,
-                    (event.event_id,)
+                    (False, event.event_id,)
                 )
             return
 
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
         elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
         elif event.type == EventTypes.Topic:
@@ -224,10 +229,9 @@ class EventsStore(SQLBaseStore):
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
+                "internal_metadata": metadata_json,
                 "json": encode_canonical_json(event_dict).decode("UTF-8"),
             },
-            or_replace=True,
         )
 
         content = encode_canonical_json(
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
         unrec = {
             k: v
             for k, v in event.get_dict().items()
@@ -264,25 +265,53 @@ class EventsStore(SQLBaseStore):
             unrec
         ).decode("UTF-8")
 
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (?,?,?,?,?,?,?,?,?)"
+        )
+
+        txn.execute(
+            sql,
+            (
+                stream_ordering, event.depth, event.event_id, event.type,
+                event.room_id, content, True, outlier, event.depth
             )
-            raise _RollbackButIsFineException("_persist_event")
+        )
 
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
 
+        for hash_alg, hash_base64 in event.hashes.items():
+            hash_bytes = decode_base64(hash_base64)
+            self._store_event_content_hash_txn(
+                txn, event.event_id, hash_alg, hash_bytes,
+            )
+
+        for prev_event_id, prev_hashes in event.prev_events:
+            for alg, hash_base64 in prev_hashes.items():
+                hash_bytes = decode_base64(hash_base64)
+                self._store_prev_event_hash_txn(
+                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                )
+
+        for auth_id, _ in event.auth_events:
+            self._simple_insert_txn(
+                txn,
+                table="event_auth",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                },
+            )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
         if event.is_state():
             vals = {
                 "event_id": event.event_id,
@@ -301,18 +330,6 @@ class EventsStore(SQLBaseStore):
                 vals,
             )
 
-            if is_new_state and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                )
-
             for e_id, h in event.prev_state:
                 self._simple_insert_txn(
                     txn,
@@ -321,39 +338,24 @@ class EventsStore(SQLBaseStore):
                         "event_id": event.event_id,
                         "prev_event_id": e_id,
                         "room_id": event.room_id,
-                        "is_state": 1,
+                        "is_state": True,
                     },
                 )
 
-        for hash_alg, hash_base64 in event.hashes.items():
-            hash_bytes = decode_base64(hash_base64)
-            self._store_event_content_hash_txn(
-                txn, event.event_id, hash_alg, hash_bytes,
-            )
-
-        for prev_event_id, prev_hashes in event.prev_events:
-            for alg, hash_base64 in prev_hashes.items():
-                hash_bytes = decode_base64(hash_base64)
-                self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg, hash_bytes
+            if is_new_state and not context.rejected:
+                self._simple_upsert_txn(
+                    txn,
+                    "current_state_events",
+                    keyvalues={
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    values={
+                        "event_id": event.event_id,
+                    }
                 )
 
-        for auth_id, _ in event.auth_events:
-            self._simple_insert_txn(
-                txn,
-                table="event_auth",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "auth_id": auth_id,
-                },
-            )
-
-        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
-
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         self._invalidate_get_event_cache(event.redacts)