diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 90 |
1 files changed, 45 insertions, 45 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2425f57f5f..a2e87c27ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self.get_room_events_max_id.invalidate() except _RollbackButIsFineException: pass @@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore): # Remove the any existing cache entries for the event_id self._invalidate_get_event_cache(event.event_id) + if stream_ordering is None: + with self._stream_id_gen.get_next_txn(txn) as stream_ordering: + return self._persist_event_txn( + txn, event, context, backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) + self._simple_delete_txn( + txn, + table="current_state_events", + keyvalues={"room_id": event.room_id}, ) for s in current_state: @@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore): "type": s.type, "state_key": s.state_key, }, - or_replace=True, ) if event.is_state() and is_new_state: @@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore): "type": event.type, "state_key": event.state_key, }, - or_replace=True, ) for prev_state_id, _ in event.prev_state: @@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore): event.depth ) - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -185,7 +184,7 @@ class EventsStore(SQLBaseStore): ) txn.execute( sql, - (metadata_json.decode("UTF-8"), event.event_id,) + (buffer(metadata_json), event.event_id,) ) sql = ( @@ -198,10 +197,16 @@ class EventsStore(SQLBaseStore): ) return + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) elif event.type == EventTypes.Name: self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: @@ -224,15 +229,14 @@ class EventsStore(SQLBaseStore): values={ "event_id": event.event_id, "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), + "internal_metadata": buffer(metadata_json), + "json": buffer(encode_canonical_json(event_dict)), }, - or_replace=True, ) - content = encode_canonical_json( + content = buffer(encode_canonical_json( event.content - ).decode("UTF-8") + )) vals = { "topological_ordering": event.depth, @@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore): "depth": event.depth, } - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - unrec = { k: v for k, v in event.get_dict().items() @@ -260,25 +261,22 @@ class EventsStore(SQLBaseStore): ] } - vals["unrecognized_keys"] = encode_canonical_json( + vals["unrecognized_keys"] = buffer(encode_canonical_json( unrec - ).decode("UTF-8") + )) - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, - ) - raise _RollbackButIsFineException("_persist_event") + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (%s,?,?,?,?,?,?,?,?)" + ) % (stream_ordering,) + + txn.execute( + sql, + (event.depth, event.event_id, event.type, event.room_id, + content, True, outlier, event.depth) + ) if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) @@ -302,15 +300,17 @@ class EventsStore(SQLBaseStore): ) if is_new_state and not context.rejected: - self._simple_insert_txn( + self._simple_upsert_txn( txn, "current_state_events", - { - "event_id": event.event_id, + keyvalues={ "room_id": event.room_id, "type": event.type, "state_key": event.state_key, }, + values={ + "event_id": event.event_id, + } ) for e_id, h in event.prev_state: |