diff options
author | Erik Johnston <erik@matrix.org> | 2015-06-22 14:09:39 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-06-22 14:09:39 +0100 |
commit | 03a9e4436b9e619d6e94b53fbce99f21197269a8 (patch) | |
tree | 74d348f6f991b1a85553174e6e65328ef0e532de | |
parent | Add a many version of store_state_groups_txn (diff) | |
download | synapse-03a9e4436b9e619d6e94b53fbce99f21197269a8.tar.xz |
Add a many version of _handle_prev_events
-rw-r--r-- | synapse/storage/event_federation.py | 87 | ||||
-rw-r--r-- | synapse/storage/events.py | 253 |
2 files changed, 340 insertions, 0 deletions
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 1ba073884b..e5ad63cf9c 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -351,6 +351,93 @@ class EventFederationStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate, room_id ) + def _handle_mult_prev_events(self, txn, events): + """ + For the given event, update the event edges table and forward and + backward extremities tables. + """ + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { + "event_id": ev.event_id, + "prev_event_id": e_id, + "room_id": ev.room_id, + "is_state": False, + } + for e_id in [e_id for ev in events for e_id, _ in ev.prev_events] + ], + ) + + events_by_room = {} + for ev in events: + events_by_room.setdefault(ev.room_id, []).append(ev) + + for room_id, room_events in events_by_room.items(): + prevs = [ + e_id for ev in room_events for e_id, _ in ev.prev_events + if not ev.internal_metadata.is_outlier() + ] + if prevs: + txn.execute( + "DELETE FROM event_forward_extremities" + " WHERE room_id = ?" + " AND event_id in (%s)" % ( + ",".join(["?"] * len(prevs)), + ), + [room_id] + prevs, + ) + + query = ( + "INSERT INTO event_forward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_edges WHERE prev_event_id = ?" + " )" + ) + + txn.executemany( + query, + [(ev.event_id, ev.room_id, ev.event_id) for ev in events] + ) + + query = ( + "INSERT INTO event_backward_extremities (event_id, room_id)" + " SELECT ?, ? WHERE NOT EXISTS (" + " SELECT 1 FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + " )" + " AND NOT EXISTS (" + " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? " + " AND outlier = ?" + " )" + ) + + prev_events = [ + e_id for ev in events for e_id, _ in ev.prev_events + if not ev.internal_metadata.is_outlier() + ] + + txn.executemany(query, [ + (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) + for ev in events for e_id, _ in ev.prev_events + if not ev.internal_metadata.is_outlier() + ]) + + query = ( + "DELETE FROM event_backward_extremities" + " WHERE event_id = ? AND room_id = ?" + ) + txn.executemany( + query, + [(ev.event_id, ev.room_id) for ev in events] + ) + + for room_id in events_by_room: + txn.call_after( + self.get_latest_event_ids_in_room.invalidate, room_id + ) + def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e02a8066d6..b9a2319115 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -213,6 +213,259 @@ class EventsStore(SQLBaseStore): if not outlier: self._store_state_groups_txn(txn, event, context) + self._handle_mult_prev_events( + txn, + events=[event] + ) + + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json, + "json": encode_json( + event_dict, using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8"), + }, + ) + + content = encode_json( + event.content, using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8") + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": content, + "processed": True, + "outlier": outlier, + "depth": event.depth, + } + + unrec = { + k: v + for k, v in event.get_dict().items() + if k not in vals.keys() and k not in [ + "redacted", + "redacted_because", + "signatures", + "hashes", + "prev_events", + ] + } + + vals["unrecognized_keys"] = encode_json( + unrec, using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8") + + sql = ( + "INSERT INTO events" + " (stream_ordering, topological_ordering, event_id, type," + " room_id, content, processed, outlier, depth)" + " VALUES (?,?,?,?,?,?,?,?,?)" + ) + + txn.execute( + sql, + ( + stream_ordering, event.depth, event.event_id, event.type, + event.room_id, content, True, outlier, event.depth + ) + ) + + if context.rejected: + self._store_rejections_txn( + txn, event.event_id, context.rejected + ) + + # for hash_alg, hash_base64 in event.hashes.items(): + # hash_bytes = decode_base64(hash_base64) + # self._store_event_content_hash_txn( + # txn, event.event_id, hash_alg, hash_bytes, + # ) + + # for prev_event_id, prev_hashes in event.prev_events: + # for alg, hash_base64 in prev_hashes.items(): + # hash_bytes = decode_base64(hash_base64) + # self._store_prev_event_hash_txn( + # txn, event.event_id, prev_event_id, alg, + # hash_bytes + # ) + + self._simple_insert_many_txn( + txn, + table="event_auth", + values=[ + { + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + } + for auth_id, _ in event.auth_events + ], + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + + if event.is_state(): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + self._simple_insert_txn( + txn, + "state_events", + vals, + ) + + self._simple_insert_many_txn( + txn, + table="event_edges", + values=[ + { + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": True, + } + for e_id, h in event.prev_state + ], + ) + + if is_new_state and not context.rejected: + txn.call_after( + self.get_current_state_for_key.invalidate, + event.room_id, event.type, event.state_key + ) + + if (event.type == EventTypes.Name + or event.type == EventTypes.Aliases): + txn.call_after( + self.get_room_name_and_aliases.invalidate, + event.room_id + ) + + self._simple_upsert_txn( + txn, + "current_state_events", + keyvalues={ + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + values={ + "event_id": event.event_id, + } + ) + + return + + @log_function + def _persist_events_txn(self, txn, event_and_contexts, backfilled, + stream_ordering, is_new_state=True): + + # Remove the any existing cache entries for the event_ids + for event, _ in event_and_contexts: + txn.call_after(self._invalidate_get_event_cache, event.event_id) + + depth_updates = {} + for event, _ in event_and_contexts: + if event.internal_metadata.is_outlier(): + continue + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) + + for room_id, depth in depth_updates: + self._update_min_depth_for_room_txn(txn, room_id, depth) + + txn.execute( + "SELECT event_id, outlier FROM events WHERE event_id in %s" % ( + ",".join(["?"] * len(event_and_contexts)), + ), + [event.event_id for event, _ in event_and_contexts] + ) + have_persisted = { + event_id: outlier + for event_id, outlier in txn.fetchall() + } + + + # metadata_json = encode_json( + # event.internal_metadata.get_dict(), + # using_frozen_dicts=USE_FROZEN_DICTS + # ).decode("UTF-8") + + for event, context in event_and_contexts: + if event.event_id not in have_persisted: + continue + outlier_persisted = have_persisted[event.event_id] + if not event.internal_metadata.is_outlier() and outlier_persisted: + self._store_state_groups_txn( + txn, event, context, + ) + + metadata_json = encode_json( + event.internal_metadata.get_dict(), + using_frozen_dicts=USE_FROZEN_DICTS + ).decode("UTF-8") + + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json, event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (False, event.event_id,) + ) + + self._store_mult_state_groups_txn(txn, [ + (event, context) + for event, context in event_and_contexts + if not event.internal_metadata.is_outlier + ]) + self._handle_prev_events( txn, outlier=outlier, |