diff options
author | Erik Johnston <erik@matrix.org> | 2015-06-25 11:45:05 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-06-25 11:45:05 +0100 |
commit | e2a241af08e6c6d8217d03a1a74d42da2354abfa (patch) | |
tree | 5d335d965802f3a8bc9f0657d839843f32feb36b | |
parent | Typo (diff) | |
download | synapse-e2a241af08e6c6d8217d03a1a74d42da2354abfa.tar.xz |
Make _persist_event_txn call _persist_events_txn
-rw-r--r-- | synapse/storage/events.py | 252 |
1 files changed, 11 insertions, 241 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2e7ef9a93b..24770882d2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -112,7 +112,6 @@ class EventsStore(SQLBaseStore): event=event, context=context, backfilled=backfilled, - stream_ordering=stream_ordering, is_new_state=is_new_state, current_state=current_state, ) @@ -155,12 +154,7 @@ class EventsStore(SQLBaseStore): @log_function def _persist_event_txn(self, txn, event, context, backfilled, - stream_ordering=None, is_new_state=True, - current_state=None): - - # Remove the any existing cache entries for the event_id - txn.call_after(self._invalidate_get_event_cache, event.event_id) - + is_new_state=True, current_state=None): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: @@ -188,236 +182,13 @@ class EventsStore(SQLBaseStore): } ) - outlier = event.internal_metadata.is_outlier() - - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - - have_persisted = self._simple_select_one_txn( - txn, - table="events", - keyvalues={"event_id": event.event_id}, - retcols=["event_id", "outlier"], - allow_none=True, - ) - - metadata_json = encode_json( - event.internal_metadata.get_dict(), - using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8") - - # If we have already persisted this event, we don't need to do any - # more processing. - # The processing above must be done on every call to persist event, - # since they might not have happened on previous calls. For example, - # if we are persisting an event that we had persisted as an outlier, - # but is no longer one. - if have_persisted: - if not outlier and have_persisted["outlier"]: - self._store_state_groups_txn(txn, event, context) - - sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json, event.event_id,) - ) - - sql = ( - "UPDATE events SET outlier = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (False, event.event_id,) - ) - return - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - self._handle_mult_prev_events( - txn, - events=[event] - ) - - if event.type == EventTypes.Member: - self._store_room_members_txn(txn, [event]) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - - event_dict = { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - self._simple_insert_txn( + return self._persist_events_txn( txn, - table="event_json", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": metadata_json, - "json": encode_json( - event_dict, using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8"), - }, + [(event, context)], + backfilled=backfilled, + is_new_state=is_new_state, ) - content = encode_json( - event.content, using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8") - - vals = { - "topological_ordering": event.depth, - "event_id": event.event_id, - "type": event.type, - "room_id": event.room_id, - "content": content, - "processed": True, - "outlier": outlier, - "depth": event.depth, - } - - unrec = { - k: v - for k, v in event.get_dict().items() - if k not in vals.keys() and k not in [ - "redacted", - "redacted_because", - "signatures", - "hashes", - "prev_events", - ] - } - - vals["unrecognized_keys"] = encode_json( - unrec, using_frozen_dicts=USE_FROZEN_DICTS - ).decode("UTF-8") - - sql = ( - "INSERT INTO events" - " (stream_ordering, topological_ordering, event_id, type," - " room_id, content, processed, outlier, depth)" - " VALUES (?,?,?,?,?,?,?,?,?)" - ) - - txn.execute( - sql, - ( - stream_ordering, event.depth, event.event_id, event.type, - event.room_id, content, True, outlier, event.depth - ) - ) - - if context.rejected: - self._store_rejections_txn( - txn, event.event_id, context.rejected - ) - - # for hash_alg, hash_base64 in event.hashes.items(): - # hash_bytes = decode_base64(hash_base64) - # self._store_event_content_hash_txn( - # txn, event.event_id, hash_alg, hash_bytes, - # ) - - # for prev_event_id, prev_hashes in event.prev_events: - # for alg, hash_base64 in prev_hashes.items(): - # hash_bytes = decode_base64(hash_base64) - # self._store_prev_event_hash_txn( - # txn, event.event_id, prev_event_id, alg, - # hash_bytes - # ) - - self._simple_insert_many_txn( - txn, - table="event_auth", - values=[ - { - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - } - for auth_id, _ in event.auth_events - ], - ) - - self._store_event_reference_hashes_txn(txn, [event]) - - if event.is_state(): - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - self._simple_insert_txn( - txn, - "state_events", - vals, - ) - - self._simple_insert_many_txn( - txn, - table="event_edges", - values=[ - { - "event_id": event.event_id, - "prev_event_id": e_id, - "room_id": event.room_id, - "is_state": True, - } - for e_id, h in event.prev_state - ], - ) - - if is_new_state and not context.rejected: - txn.call_after( - self.get_current_state_for_key.invalidate, - event.room_id, event.type, event.state_key - ) - - if (event.type == EventTypes.Name - or event.type == EventTypes.Aliases): - txn.call_after( - self.get_room_name_and_aliases.invalidate, - event.room_id - ) - - self._simple_upsert_txn( - txn, - "current_state_events", - keyvalues={ - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - values={ - "event_id": event.event_id, - } - ) - - return - @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, is_new_state=True): @@ -507,10 +278,13 @@ class EventsStore(SQLBaseStore): events_and_contexts ) + if not events_and_contexts: + return + self._store_mult_state_groups_txn(txn, [ (event, context) for event, context in events_and_contexts - if not event.internal_metadata.is_outlier + if not event.internal_metadata.is_outlier() ]) self._handle_mult_prev_events( @@ -627,9 +401,6 @@ class EventsStore(SQLBaseStore): events_and_contexts, ) - for event, context in state_events_and_contexts: - pass - state_values = [] for event, context in state_events_and_contexts: vals = { @@ -668,14 +439,13 @@ class EventsStore(SQLBaseStore): if is_new_state: for event, _ in state_events_and_contexts: - if not context.rejected and not event.internal_metadata.is_outlier(): + if not context.rejected: txn.call_after( self.get_current_state_for_key.invalidate, event.room_id, event.type, event.state_key ) - if (event.type == EventTypes.Name - or event.type == EventTypes.Aliases): + if event.type in [EventTypes.Name, EventTypes.Aliases]: txn.call_after( self.get_room_name_and_aliases.invalidate, event.room_id |