diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 211 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 14 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v12.sql | 2 | ||||
-rw-r--r-- | synapse/storage/schema/pusher.sql | 2 |
4 files changed, 145 insertions, 84 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6ff0093136..2225be96be 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -131,21 +131,144 @@ class DataStore(RoomMemberStore, RoomStore, pass @defer.inlineCallbacks - def get_event(self, event_id, allow_none=False): - events = yield self._get_events([event_id]) + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. - if not events: - if allow_none: - defer.returnValue(None) - else: - raise RuntimeError("Could not find event %s" % (event_id,)) + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - defer.returnValue(events[0]) + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) @log_function def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + if event.type == EventTypes.Member: self._store_room_member_txn(txn, event) elif event.type == EventTypes.Feedback: @@ -157,8 +280,6 @@ class DataStore(RoomMemberStore, RoomStore, elif event.type == EventTypes.Redaction: self._store_redaction(txn, event) - outlier = event.internal_metadata.is_outlier() - event_dict = { k: v for k, v in event.get_dict().items() @@ -168,10 +289,6 @@ class DataStore(RoomMemberStore, RoomStore, ] } - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - self._simple_insert_txn( txn, table="event_json", @@ -227,41 +344,10 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - if not outlier: - self._store_state_groups_txn(txn, event, context) - if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - is_state = hasattr(event, "state_key") and event.state_key is not None - if is_state: + if event.is_state(): vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -269,6 +355,7 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } + # TODO: How does this work with backfilling? if hasattr(event, "replaces_state"): vals["prev_state"] = event.replaces_state @@ -305,28 +392,6 @@ class DataStore(RoomMemberStore, RoomStore, or_ignore=True, ) - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -357,13 +422,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, ref_alg, ref_hash_bytes ) - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - def _store_redaction(self, txn, event): txn.execute( "INSERT OR IGNORE INTO redactions " @@ -480,6 +538,9 @@ class DataStore(RoomMemberStore, RoomStore, the rejected reason string if we rejected the event, else maps to None. """ + if not event_ids: + return defer.succeed({}) + def f(txn): sql = ( "SELECT e.event_id, reason FROM events as e " diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index f253c9e2c3..e2a662a6c7 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -29,7 +29,7 @@ class PusherStore(SQLBaseStore): @defer.inlineCallbacks def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey): sql = ( - "SELECT id, user_name, kind, instance_handle, app_id," + "SELECT id, user_name, kind, profile_tag, app_id," "app_display_name, device_display_name, pushkey, ts, data, " "last_token, last_success, failing_since " "FROM pushers " @@ -45,7 +45,7 @@ class PusherStore(SQLBaseStore): "id": r[0], "user_name": r[1], "kind": r[2], - "instance_handle": r[3], + "profile_tag": r[3], "app_id": r[4], "app_display_name": r[5], "device_display_name": r[6], @@ -64,7 +64,7 @@ class PusherStore(SQLBaseStore): @defer.inlineCallbacks def get_all_pushers(self): sql = ( - "SELECT id, user_name, kind, instance_handle, app_id," + "SELECT id, user_name, kind, profile_tag, app_id," "app_display_name, device_display_name, pushkey, ts, data, " "last_token, last_success, failing_since " "FROM pushers" @@ -77,7 +77,7 @@ class PusherStore(SQLBaseStore): "id": r[0], "user_name": r[1], "kind": r[2], - "instance_handle": r[3], + "profile_tag": r[3], "app_id": r[4], "app_display_name": r[5], "device_display_name": r[6], @@ -94,7 +94,7 @@ class PusherStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def add_pusher(self, user_name, instance_handle, kind, app_id, + def add_pusher(self, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, lang, data): try: @@ -107,7 +107,7 @@ class PusherStore(SQLBaseStore): dict( user_name=user_name, kind=kind, - instance_handle=instance_handle, + profile_tag=profile_tag, app_display_name=app_display_name, device_display_name=device_display_name, ts=pushkey_ts, @@ -158,7 +158,7 @@ class PushersTable(Table): "id", "user_name", "kind", - "instance_handle", + "profile_tag", "app_id", "app_display_name", "device_display_name", diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql index a6867cba62..16c2258ca4 100644 --- a/synapse/storage/schema/delta/v12.sql +++ b/synapse/storage/schema/delta/v12.sql @@ -24,7 +24,7 @@ CREATE TABLE IF NOT EXISTS rejections( CREATE TABLE IF NOT EXISTS pushers ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_name TEXT NOT NULL, - instance_handle varchar(32) NOT NULL, + profile_tag varchar(32) NOT NULL, kind varchar(8) NOT NULL, app_id varchar(64) NOT NULL, app_display_name varchar(64) NOT NULL, diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql index 8c4dfd5c1b..3735b11547 100644 --- a/synapse/storage/schema/pusher.sql +++ b/synapse/storage/schema/pusher.sql @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS pushers ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_name TEXT NOT NULL, - instance_handle varchar(32) NOT NULL, + profile_tag varchar(32) NOT NULL, kind varchar(8) NOT NULL, app_id varchar(64) NOT NULL, app_display_name varchar(64) NOT NULL, |