summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py90
1 files changed, 45 insertions, 45 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..a2e87c27ce 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
                 is_new_state=is_new_state,
                 current_state=current_state,
             )
-            self.get_room_events_max_id.invalidate()
         except _RollbackButIsFineException:
             pass
 
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
         # Remove the any existing cache entries for the event_id
         self._invalidate_get_event_cache(event.event_id)
 
+        if stream_ordering is None:
+            with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+                return self._persist_event_txn(
+                    txn, event, context, backfilled,
+                    stream_ordering=stream_ordering,
+                    is_new_state=is_new_state,
+                    current_state=current_state,
+                )
+
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
+            self._simple_delete_txn(
+                txn,
+                table="current_state_events",
+                keyvalues={"room_id": event.room_id},
             )
 
             for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
                         "type": s.type,
                         "state_key": s.state_key,
                     },
-                    or_replace=True,
                 )
 
         if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
                         "type": event.type,
                         "state_key": event.state_key,
                     },
-                    or_replace=True,
                 )
 
                 for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
                 event.depth
             )
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
         have_persisted = self._simple_select_one_onecol_txn(
             txn,
             table="event_json",
@@ -185,7 +184,7 @@ class EventsStore(SQLBaseStore):
                 )
                 txn.execute(
                     sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
+                    (buffer(metadata_json), event.event_id,)
                 )
 
                 sql = (
@@ -198,10 +197,16 @@ class EventsStore(SQLBaseStore):
                 )
             return
 
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
         elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
         elif event.type == EventTypes.Topic:
@@ -224,15 +229,14 @@ class EventsStore(SQLBaseStore):
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
-                "json": encode_canonical_json(event_dict).decode("UTF-8"),
+                "internal_metadata": buffer(metadata_json),
+                "json": buffer(encode_canonical_json(event_dict)),
             },
-            or_replace=True,
         )
 
-        content = encode_canonical_json(
+        content = buffer(encode_canonical_json(
             event.content
-        ).decode("UTF-8")
+        ))
 
         vals = {
             "topological_ordering": event.depth,
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
         unrec = {
             k: v
             for k, v in event.get_dict().items()
@@ -260,25 +261,22 @@ class EventsStore(SQLBaseStore):
             ]
         }
 
-        vals["unrecognized_keys"] = encode_canonical_json(
+        vals["unrecognized_keys"] = buffer(encode_canonical_json(
             unrec
-        ).decode("UTF-8")
+        ))
 
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
-            )
-            raise _RollbackButIsFineException("_persist_event")
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (%s,?,?,?,?,?,?,?,?)"
+        ) % (stream_ordering,)
+
+        txn.execute(
+            sql,
+            (event.depth, event.event_id, event.type, event.room_id,
+             content, True, outlier, event.depth)
+        )
 
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
@@ -302,15 +300,17 @@ class EventsStore(SQLBaseStore):
             )
 
             if is_new_state and not context.rejected:
-                self._simple_insert_txn(
+                self._simple_upsert_txn(
                     txn,
                     "current_state_events",
-                    {
-                        "event_id": event.event_id,
+                    keyvalues={
                         "room_id": event.room_id,
                         "type": event.type,
                         "state_key": event.state_key,
                     },
+                    values={
+                        "event_id": event.event_id,
+                    }
                 )
 
             for e_id, h in event.prev_state: