diff options
author | Erik Johnston <erik@matrix.org> | 2015-04-28 14:24:08 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-04-28 14:24:08 +0100 |
commit | 9fbcf191889b655d5c93d01a738e5137aa94a354 (patch) | |
tree | 8a403e4b27825447695fe09b54bb2440df734ed5 /synapse/storage/events.py | |
parent | Merge pull request #126 from matrix-org/csauth (diff) | |
parent | Remove unused imports (diff) | |
download | synapse-9fbcf191889b655d5c93d01a738e5137aa94a354.tar.xz |
Merge pull request #123 from matrix-org/postgres
Postgres
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 160 |
1 files changed, 81 insertions, 79 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2425f57f5f..a3c260ddc4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore): is_new_state=is_new_state, current_state=current_state, ) - self.get_room_events_max_id.invalidate() except _RollbackButIsFineException: pass @@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore): # Remove the any existing cache entries for the event_id self._invalidate_get_event_cache(event.event_id) + if stream_ordering is None: + with self._stream_id_gen.get_next_txn(txn) as stream_ordering: + return self._persist_event_txn( + txn, event, context, backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) + self._simple_delete_txn( + txn, + table="current_state_events", + keyvalues={"room_id": event.room_id}, ) for s in current_state: @@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore): "type": s.type, "state_key": s.state_key, }, - or_replace=True, ) if event.is_state() and is_new_state: @@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore): "type": event.type, "state_key": event.state_key, }, - or_replace=True, ) for prev_state_id, _ in event.prev_state: @@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore): event.depth ) - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -169,7 +168,7 @@ class EventsStore(SQLBaseStore): metadata_json = encode_canonical_json( event.internal_metadata.get_dict() - ) + ).decode("UTF-8") # If we have already persisted this event, we don't need to do any # more processing. @@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore): ) txn.execute( sql, - (metadata_json.decode("UTF-8"), event.event_id,) + (metadata_json, event.event_id,) ) sql = ( - "UPDATE events SET outlier = 0" + "UPDATE events SET outlier = ?" " WHERE event_id = ?" ) txn.execute( sql, - (event.event_id,) + (False, event.event_id,) ) return + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) elif event.type == EventTypes.Name: self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: @@ -224,10 +229,9 @@ class EventsStore(SQLBaseStore): values={ "event_id": event.event_id, "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), + "internal_metadata": metadata_json, "json": encode_canonical_json(event_dict).decode("UTF-8"), }, - or_replace=True, ) content = encode_canonical_json( @@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore): "depth": event.depth, } - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - unrec = { k: v for k, v in event.get_dict().items() @@ -264,25 +265,53 @@ class EventsStore(SQLBaseStore): unrec ).decode("UTF-8") - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (?,?,?,?,?,?,?,?,?)" + ) + + txn.execute( + sql, + ( + stream_ordering, event.depth, event.event_id, event.type, + event.room_id, content, True, outlier, event.depth ) - raise _RollbackButIsFineException("_persist_event") + ) if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + if event.is_state(): vals = { "event_id": event.event_id, @@ -301,18 +330,6 @@ class EventsStore(SQLBaseStore): vals, ) - if is_new_state and not context.rejected: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - ) - for e_id, h in event.prev_state: self._simple_insert_txn( txn, @@ -321,39 +338,24 @@ class EventsStore(SQLBaseStore): "event_id": event.event_id, "prev_event_id": e_id, "room_id": event.room_id, - "is_state": 1, + "is_state": True, }, ) - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes + if is_new_state and not context.rejected: + self._simple_upsert_txn( + txn, + "current_state_events", + keyvalues={ + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + values={ + "event_id": event.event_id, + } ) - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - }, - ) - - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) - def _store_redaction(self, txn, event): # invalidate the cache for the redacted event self._invalidate_get_event_cache(event.redacts) |