diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 29 | ||||
-rw-r--r-- | synapse/storage/_base.py | 30 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 13 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v4.sql | 6 | ||||
-rw-r--r-- | synapse/storage/stream.py | 30 |
5 files changed, 91 insertions, 17 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 66658f6721..672ed6971e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -24,6 +24,7 @@ from synapse.api.events.room import ( RoomAddStateLevelEvent, RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, + RoomDeletionEvent, ) from synapse.util.logutils import log_function @@ -61,7 +62,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 3 +SCHEMA_VERSION = 4 class _RollbackButIsFineException(Exception): @@ -182,6 +183,8 @@ class DataStore(RoomMemberStore, RoomStore, self._store_send_event_level(txn, event) elif event.type == RoomOpsPowerLevelsEvent.TYPE: self._store_ops_level(txn, event) + elif event.type == RoomDeletionEvent.TYPE: + self._store_deletion(txn, event) vals = { "topological_ordering": event.depth, @@ -203,7 +206,7 @@ class DataStore(RoomMemberStore, RoomStore, unrec = { k: v for k, v in event.get_full_dict().items() - if k not in vals.keys() + if k not in vals.keys() and k is not "deleted" } vals["unrecognized_keys"] = json.dumps(unrec) @@ -241,14 +244,32 @@ class DataStore(RoomMemberStore, RoomStore, } ) + def _store_deletion(self, txn, event): + event_id = event.event_id + deletes = event.deletes + + # We check if this new delete deletes an old delete or has been + # deleted by a previous delete that we received out of order. + sql = "SELECT * FROM deletions WHERE event_id = ? OR deletes = ?" + txn.execute(sql, (deletes, event_id)) + + if txn.fetchall(): + sql = "DELETE FROM deletions WHERE event_id = ? OR deletes = ?" + txn.execute(sql, (deletes, event_id, )) + else: + sql = "INSERT INTO deletions (event_id, deletes) VALUES (?,?)" + txn.execute(sql, (event_id, deletes)) + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): sql = ( - "SELECT e.* FROM events as e " + "SELECT e.*, (%(deleted)s) AS deleted FROM events as e " "INNER JOIN current_state_events as c ON e.event_id = c.event_id " "INNER JOIN state_events as s ON e.event_id = s.event_id " "WHERE c.room_id = ? " - ) + ) % { + "deleted": "e.event_id IN (SELECT deletes FROM deletions)", + } if event_type: sql += " AND s.type = ? AND s.state_key = ? " diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 76ed7d06fb..3aa610c85c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,6 +17,7 @@ import logging from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function import collections @@ -345,7 +346,7 @@ class SQLBaseStore(object): return self.runInteraction(func) def _parse_event_from_row(self, row_dict): - d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) + d = copy.deepcopy({k: v for k, v in row_dict.items()}) d.pop("stream_ordering", None) d.pop("topological_ordering", None) @@ -373,8 +374,8 @@ class SQLBaseStore(object): sql = "SELECT * FROM events WHERE event_id = ?" for ev in events: - if hasattr(ev, "prev_state"): - # Load previous state_content. + if hasattr(ev, "prev_state"): + # Load previous state_content. # TODO: Should we be pulling this out above? cursor = txn.execute(sql, (ev.prev_state,)) prevs = self.cursor_to_dict(cursor) @@ -382,8 +383,31 @@ class SQLBaseStore(object): prev = self._parse_event_from_row(prevs[0]) ev.prev_content = prev.content + if not hasattr(ev, "deleted"): + logger.debug("Doesn't have deleted key: %s", ev) + ev.deleted = self._has_been_deleted_txn(txn, ev) + + if ev.deleted: + # Get the deletion event. + sql = "SELECT * FROM events WHERE event_id = ?" + txn.execute(sql, (ev.deleted,)) + + del_evs = self._parse_events_txn( + txn, self.cursor_to_dict(txn) + ) + + if del_evs: + prune_event(ev) + ev.pruned = del_evs[0] + return events + def _has_been_deleted_txn(self, txn, event): + sql = "SELECT * FROM deletions WHERE deletes = ?" + txn.execute(sql, (event.event_id,)) + return len(txn.fetchall()) > 0 + + class Table(object): """ A base class used to store information about a particular table. """ diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 04b4067d03..97222da571 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -182,14 +182,21 @@ class RoomMemberStore(SQLBaseStore): ) def _get_members_query_txn(self, txn, where_clause, where_values): + del_sql = ( + "SELECT event_id FROM deletions WHERE deletes = e.event_id" + ) + sql = ( - "SELECT e.* FROM events as e " + "SELECT e.*, (%(deleted)s) AS deleted FROM events as e " "INNER JOIN room_memberships as m " "ON e.event_id = m.event_id " "INNER JOIN current_state_events as c " "ON m.event_id = c.event_id " - "WHERE %s " - ) % (where_clause,) + "WHERE %(where)s " + ) % { + "deleted": del_sql, + "where": where_clause, + } txn.execute(sql, where_values) rows = self.cursor_to_dict(txn) diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql index 1652ef2921..2e2635317a 100644 --- a/synapse/storage/schema/delta/v4.sql +++ b/synapse/storage/schema/delta/v4.sql @@ -1,5 +1,7 @@ CREATE TABLE IF NOT EXISTS deletions ( event_id TEXT NOT NULL, - deletes TEXT NOT NULL, - CONSTRAINT ev_uniq UNIQUE (event_id) + deletes TEXT NOT NULL ); + +CREATE INDEX IF NOT EXISTS deletions_event_id ON deletions (event_id); +CREATE INDEX IF NOT EXISTS deletions_deletes ON deletions (deletes); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a76fecf24f..aaac0aae30 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -157,6 +157,10 @@ class StreamStore(SQLBaseStore): "WHERE m.user_id = ? " ) + del_sql = ( + "SELECT event_id FROM deletions WHERE deletes = e.event_id" + ) + if limit: limit = max(limit, MAX_STREAM_SIZE) else: @@ -171,13 +175,14 @@ class StreamStore(SQLBaseStore): return sql = ( - "SELECT * FROM events as e WHERE " + "SELECT *, (%(deleted)s) AS deleted FROM events AS e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "AND e.outlier = 0 " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { + "deleted": del_sql, "current": current_room_membership_sql, "invites": membership_sql, "limit": limit @@ -224,11 +229,20 @@ class StreamStore(SQLBaseStore): else: limit_str = "" + del_sql = ( + "SELECT event_id FROM deletions WHERE deletes = events.event_id" + ) + sql = ( - "SELECT * FROM events " + "SELECT *, (%(deleted)s) AS deleted FROM events " "WHERE outlier = 0 AND room_id = ? AND %(bounds)s " "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s " - ) % {"bounds": bounds, "order": order, "limit": limit_str} + ) % { + "deleted": del_sql, + "bounds": bounds, + "order": order, + "limit": limit_str + } rows = yield self._execute_and_decode( sql, @@ -257,11 +271,17 @@ class StreamStore(SQLBaseStore): with_feedback=False): # TODO (erikj): Handle compressed feedback + del_sql = ( + "SELECT event_id FROM deletions WHERE deletes = events.event_id" + ) + sql = ( - "SELECT * FROM events " + "SELECT *, (%(deleted)s) AS deleted FROM events " "WHERE room_id = ? AND stream_ordering <= ? " "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) + ) % { + "deleted": del_sql, + } rows = yield self._execute_and_decode( sql, |