diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 1aaef3f493..15919eb580 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -24,6 +24,7 @@ from synapse.api.events.room import (
RoomAddStateLevelEvent,
RoomSendEventLevelEvent,
RoomOpsPowerLevelsEvent,
+ RoomRedactionEvent,
)
from synapse.util.logutils import log_function
@@ -56,12 +57,13 @@ SCHEMAS = [
"presence",
"im",
"room_aliases",
+ "redactions",
]
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 3
+SCHEMA_VERSION = 4
class _RollbackButIsFineException(Exception):
@@ -182,6 +184,8 @@ class DataStore(RoomMemberStore, RoomStore,
self._store_send_event_level(txn, event)
elif event.type == RoomOpsPowerLevelsEvent.TYPE:
self._store_ops_level(txn, event)
+ elif event.type == RoomRedactionEvent.TYPE:
+ self._store_redaction(txn, event)
vals = {
"topological_ordering": event.depth,
@@ -203,7 +207,7 @@ class DataStore(RoomMemberStore, RoomStore,
unrec = {
k: v
for k, v in event.get_full_dict().items()
- if k not in vals.keys()
+ if k not in vals.keys() and k not in ["redacted", "redacted_because"]
}
vals["unrecognized_keys"] = json.dumps(unrec)
@@ -242,14 +246,28 @@ class DataStore(RoomMemberStore, RoomStore,
}
)
+ def _store_redaction(self, txn, event):
+ txn.execute(
+ "INSERT OR IGNORE INTO redactions "
+ "(event_id, redacts) VALUES (?,?)",
+ (event.event_id, event.redacts)
+ )
+
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
sql = (
- "SELECT e.* FROM events as e "
+ "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
- )
+ ) % {
+ "redacted": del_sql,
+ }
if event_type:
sql += " AND s.type = ? AND s.state_key = ? "
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 76ed7d06fb..889de2bedc 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.api.events.utils import prune_event
from synapse.util.logutils import log_function
import collections
@@ -345,7 +346,7 @@ class SQLBaseStore(object):
return self.runInteraction(func)
def _parse_event_from_row(self, row_dict):
- d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+ d = copy.deepcopy({k: v for k, v in row_dict.items()})
d.pop("stream_ordering", None)
d.pop("topological_ordering", None)
@@ -373,8 +374,8 @@ class SQLBaseStore(object):
sql = "SELECT * FROM events WHERE event_id = ?"
for ev in events:
- if hasattr(ev, "prev_state"):
- # Load previous state_content.
+ if hasattr(ev, "prev_state"):
+ # Load previous state_content.
# TODO: Should we be pulling this out above?
cursor = txn.execute(sql, (ev.prev_state,))
prevs = self.cursor_to_dict(cursor)
@@ -382,8 +383,32 @@ class SQLBaseStore(object):
prev = self._parse_event_from_row(prevs[0])
ev.prev_content = prev.content
+ if not hasattr(ev, "redacted"):
+ logger.debug("Doesn't have redacted key: %s", ev)
+ ev.redacted = self._has_been_redacted_txn(txn, ev)
+
+ if ev.redacted:
+ # Get the redaction event.
+ sql = "SELECT * FROM events WHERE event_id = ?"
+ txn.execute(sql, (ev.redacted,))
+
+ del_evs = self._parse_events_txn(
+ txn, self.cursor_to_dict(txn)
+ )
+
+ if del_evs:
+ prune_event(ev)
+ ev.redacted_because = del_evs[0]
+
return events
+ def _has_been_redacted_txn(self, txn, event):
+ sql = "SELECT event_id FROM redactions WHERE redacts = ?"
+ txn.execute(sql, (event.event_id,))
+ result = txn.fetchone()
+ return result[0] if result else None
+
+
class Table(object):
""" A base class used to store information about a particular table.
"""
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5adf8cdf1b..8cd46334cf 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -27,7 +27,7 @@ import logging
logger = logging.getLogger(__name__)
-OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level"))
+OpsLevel = collections.namedtuple("OpsLevel", ("ban_level", "kick_level", "redact_level"))
class RoomStore(SQLBaseStore):
@@ -189,7 +189,8 @@ class RoomStore(SQLBaseStore):
def _get_ops_levels(self, txn, room_id):
sql = (
- "SELECT ban_level, kick_level FROM room_ops_levels as r "
+ "SELECT ban_level, kick_level, redact_level "
+ "FROM room_ops_levels as r "
"INNER JOIN current_state_events as c "
"ON r.event_id = c.event_id "
"WHERE c.room_id = ? "
@@ -198,7 +199,7 @@ class RoomStore(SQLBaseStore):
rows = txn.execute(sql, (room_id,)).fetchall()
if len(rows) == 1:
- return OpsLevel(rows[0][0], rows[0][1])
+ return OpsLevel(rows[0][0], rows[0][1], rows[0][2])
else:
return OpsLevel(None, None)
@@ -326,6 +327,9 @@ class RoomStore(SQLBaseStore):
if "ban_level" in event.content:
content["ban_level"] = event.content["ban_level"]
+ if "redact_level" in event.content:
+ content["redact_level"] = event.content["redact_level"]
+
self._simple_insert_txn(
txn,
"room_ops_levels",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 04b4067d03..958e730591 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -182,14 +182,22 @@ class RoomMemberStore(SQLBaseStore):
)
def _get_members_query_txn(self, txn, where_clause, where_values):
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
sql = (
- "SELECT e.* FROM events as e "
+ "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN room_memberships as m "
"ON e.event_id = m.event_id "
"INNER JOIN current_state_events as c "
"ON m.event_id = c.event_id "
- "WHERE %s "
- ) % (where_clause,)
+ "WHERE %(where)s "
+ ) % {
+ "redacted": del_sql,
+ "where": where_clause,
+ }
txn.execute(sql, where_values)
rows = self.cursor_to_dict(txn)
diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql
new file mode 100644
index 0000000000..25d2ead450
--- /dev/null
+++ b/synapse/storage/schema/delta/v4.sql
@@ -0,0 +1,12 @@
+CREATE TABLE IF NOT EXISTS redactions (
+ event_id TEXT NOT NULL,
+ redacts TEXT NOT NULL,
+ CONSTRAINT ev_uniq UNIQUE (event_id)
+);
+
+CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
+CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
+
+ALTER TABLE room_ops_levels ADD COLUMN redact_level INTEGER;
+
+PRAGMA user_version = 4;
diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql
index 6ffea51310..3aa83f5c8c 100644
--- a/synapse/storage/schema/im.sql
+++ b/synapse/storage/schema/im.sql
@@ -150,7 +150,8 @@ CREATE TABLE IF NOT EXISTS room_ops_levels(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
ban_level INTEGER,
- kick_level INTEGER
+ kick_level INTEGER,
+ redact_level INTEGER
);
CREATE INDEX IF NOT EXISTS room_ops_levels_event_id ON room_ops_levels(event_id);
diff --git a/synapse/storage/schema/redactions.sql b/synapse/storage/schema/redactions.sql
new file mode 100644
index 0000000000..4c2829d05d
--- /dev/null
+++ b/synapse/storage/schema/redactions.sql
@@ -0,0 +1,8 @@
+CREATE TABLE IF NOT EXISTS redactions (
+ event_id TEXT NOT NULL,
+ redacts TEXT NOT NULL,
+ CONSTRAINT ev_uniq UNIQUE (event_id)
+);
+
+CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
+CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a76fecf24f..d61f909939 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -157,6 +157,11 @@ class StreamStore(SQLBaseStore):
"WHERE m.user_id = ? "
)
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+ "LIMIT 1"
+ )
+
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
@@ -171,13 +176,14 @@ class StreamStore(SQLBaseStore):
return
sql = (
- "SELECT * FROM events as e WHERE "
+ "SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE "
"((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) "
"AND e.stream_ordering > ? AND e.stream_ordering <= ? "
"AND e.outlier = 0 "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % {
+ "redacted": del_sql,
"current": current_room_membership_sql,
"invites": membership_sql,
"limit": limit
@@ -224,11 +230,21 @@ class StreamStore(SQLBaseStore):
else:
limit_str = ""
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = events.event_id "
+ "LIMIT 1"
+ )
+
sql = (
- "SELECT * FROM events "
+ "SELECT *, (%(redacted)s) AS redacted FROM events "
"WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
"ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
- ) % {"bounds": bounds, "order": order, "limit": limit_str}
+ ) % {
+ "redacted": del_sql,
+ "bounds": bounds,
+ "order": order,
+ "limit": limit_str
+ }
rows = yield self._execute_and_decode(
sql,
@@ -257,11 +273,18 @@ class StreamStore(SQLBaseStore):
with_feedback=False):
# TODO (erikj): Handle compressed feedback
+ del_sql = (
+ "SELECT event_id FROM redactions WHERE redacts = events.event_id "
+ "LIMIT 1"
+ )
+
sql = (
- "SELECT * FROM events "
+ "SELECT *, (%(redacted)s) AS redacted FROM events "
"WHERE room_id = ? AND stream_ordering <= ? "
"ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
- )
+ ) % {
+ "redacted": del_sql,
+ }
rows = yield self._execute_and_decode(
sql,
|