summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-09-23 15:28:32 +0100
committerErik Johnston <erik@matrix.org>2014-09-23 15:28:32 +0100
commit78af6bbb981c41e5509c99454deb7205c31bf964 (patch)
tree50dcd73e748b2807e22752f59bb940dd420078cb /synapse
parentAdd a deletions table (diff)
downloadsynapse-78af6bbb981c41e5509c99454deb7205c31bf964.tar.xz
Add m.room.deletion. If an event is deleted it will be returned to clients 'pruned', i.e. all client specified keys will be removed.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/events/__init__.py6
-rw-r--r--synapse/api/events/factory.py4
-rw-r--r--synapse/api/events/room.py9
-rw-r--r--synapse/rest/room.py38
-rw-r--r--synapse/storage/__init__.py29
-rw-r--r--synapse/storage/_base.py30
-rw-r--r--synapse/storage/roommember.py13
-rw-r--r--synapse/storage/schema/delta/v4.sql6
-rw-r--r--synapse/storage/stream.py30
9 files changed, 144 insertions, 21 deletions
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index 0cee196851..910c990b33 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -22,7 +22,7 @@ def serialize_event(hs, e):
     if not isinstance(e, SynapseEvent):
         return e
 
-    d = e.get_dict()
+    d = {k: v for k, v in e.get_dict().items() if v is not None or v is not False}
     if "age_ts" in d:
         d["age"] = int(hs.get_clock().time_msec()) - d["age_ts"]
         del d["age_ts"]
@@ -58,17 +58,19 @@ class SynapseEvent(JsonEncodedObject):
         "required_power_level",
         "age_ts",
         "prev_content",
+        "prev_state",
+        "pruned",
     ]
 
     internal_keys = [
         "is_state",
         "prev_events",
-        "prev_state",
         "depth",
         "destinations",
         "origin",
         "outlier",
         "power_level",
+        "deleted",
     ]
 
     required_keys = [
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index d3d96d73eb..c65ea8372b 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -17,7 +17,8 @@ from synapse.api.events.room import (
     RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
     InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent,
     RoomPowerLevelsEvent, RoomJoinRulesEvent, RoomOpsPowerLevelsEvent,
-    RoomCreateEvent, RoomAddStateLevelEvent, RoomSendEventLevelEvent
+    RoomCreateEvent, RoomAddStateLevelEvent, RoomSendEventLevelEvent,
+    RoomDeletionEvent,
 )
 
 from synapse.util.stringutils import random_string
@@ -39,6 +40,7 @@ class EventFactory(object):
         RoomAddStateLevelEvent,
         RoomSendEventLevelEvent,
         RoomOpsPowerLevelsEvent,
+        RoomDeletionEvent,
     ]
 
     def __init__(self, hs):
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index 3a4dbc58ce..9861395556 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -180,3 +180,12 @@ class RoomAliasesEvent(SynapseStateEvent):
 
     def get_content_template(self):
         return {}
+
+
+class RoomDeletionEvent(SynapseEvent):
+    TYPE = "m.room.deletion"
+
+    valid_keys = SynapseEvent.valid_keys + ["deletes"]
+
+    def get_content_template(self):
+        return {}
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index ecb1e346d9..85a1d2eae3 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -19,7 +19,7 @@ from twisted.internet import defer
 from base import RestServlet, client_path_pattern
 from synapse.api.errors import SynapseError, Codes
 from synapse.streams.config import PaginationConfig
-from synapse.api.events.room import RoomMemberEvent
+from synapse.api.events.room import RoomMemberEvent, RoomDeletionEvent
 from synapse.api.constants import Membership
 
 import json
@@ -430,6 +430,41 @@ class RoomMembershipRestServlet(RestServlet):
         self.txns.store_client_transaction(request, txn_id, response)
         defer.returnValue(response)
 
+class RoomDeleteEventRestServlet(RestServlet):
+    def register(self, http_server):
+        PATTERN = ("/rooms/(?P<room_id>[^/]*)/delete/(?P<event_id>[^/]*)")
+        register_txn_path(self, PATTERN, http_server)
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, room_id, event_id):
+        user = yield self.auth.get_user_by_req(request)
+        content = _parse_json(request)
+
+        event = self.event_factory.create_event(
+            etype=RoomDeletionEvent.TYPE,
+            room_id=urllib.unquote(room_id),
+            user_id=user.to_string(),
+            content=content,
+            deletes=event_id,
+        )
+
+        msg_handler = self.handlers.message_handler
+        yield msg_handler.send_message(event)
+
+        defer.returnValue((200, {"event_id": event.event_id}))
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, room_id, event_id, txn_id):
+        try:
+            defer.returnValue(self.txns.get_client_transaction(request, txn_id))
+        except KeyError:
+            pass
+
+        response = yield self.on_POST(request, room_id, event_id)
+
+        self.txns.store_client_transaction(request, txn_id, response)
+        defer.returnValue(response)
+
 
 def _parse_json(request):
     try:
@@ -485,3 +520,4 @@ def register_servlets(hs, http_server):
     PublicRoomListRestServlet(hs).register(http_server)
     RoomStateRestServlet(hs).register(http_server)
     RoomInitialSyncRestServlet(hs).register(http_server)
+    RoomDeleteEventRestServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 66658f6721..672ed6971e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -24,6 +24,7 @@ from synapse.api.events.room import (
     RoomAddStateLevelEvent,
     RoomSendEventLevelEvent,
     RoomOpsPowerLevelsEvent,
+    RoomDeletionEvent,
 )
 
 from synapse.util.logutils import log_function
@@ -61,7 +62,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 3
+SCHEMA_VERSION = 4
 
 
 class _RollbackButIsFineException(Exception):
@@ -182,6 +183,8 @@ class DataStore(RoomMemberStore, RoomStore,
             self._store_send_event_level(txn, event)
         elif event.type == RoomOpsPowerLevelsEvent.TYPE:
             self._store_ops_level(txn, event)
+        elif event.type == RoomDeletionEvent.TYPE:
+            self._store_deletion(txn, event)
 
         vals = {
             "topological_ordering": event.depth,
@@ -203,7 +206,7 @@ class DataStore(RoomMemberStore, RoomStore,
         unrec = {
             k: v
             for k, v in event.get_full_dict().items()
-            if k not in vals.keys()
+            if k not in vals.keys() and k is not "deleted"
         }
         vals["unrecognized_keys"] = json.dumps(unrec)
 
@@ -241,14 +244,32 @@ class DataStore(RoomMemberStore, RoomStore,
                 }
             )
 
+    def _store_deletion(self, txn, event):
+        event_id = event.event_id
+        deletes = event.deletes
+
+        # We check if this new delete deletes an old delete or has been
+        # deleted by a previous delete that we received out of order.
+        sql = "SELECT * FROM deletions WHERE event_id = ? OR deletes = ?"
+        txn.execute(sql, (deletes, event_id))
+
+        if txn.fetchall():
+            sql = "DELETE FROM deletions WHERE event_id = ? OR deletes = ?"
+            txn.execute(sql, (deletes, event_id, ))
+        else:
+            sql = "INSERT INTO deletions (event_id, deletes) VALUES (?,?)"
+            txn.execute(sql, (event_id, deletes))
+
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
         sql = (
-            "SELECT e.* FROM events as e "
+            "SELECT e.*, (%(deleted)s) AS deleted FROM events as e "
             "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
             "INNER JOIN state_events as s ON e.event_id = s.event_id "
             "WHERE c.room_id = ? "
-        )
+        ) % {
+            "deleted": "e.event_id IN (SELECT deletes FROM deletions)",
+        }
 
         if event_type:
             sql += " AND s.type = ? AND s.state_key = ? "
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 76ed7d06fb..3aa610c85c 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -17,6 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.api.events.utils import prune_event
 from synapse.util.logutils import log_function
 
 import collections
@@ -345,7 +346,7 @@ class SQLBaseStore(object):
         return self.runInteraction(func)
 
     def _parse_event_from_row(self, row_dict):
-        d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
+        d = copy.deepcopy({k: v for k, v in row_dict.items()})
 
         d.pop("stream_ordering", None)
         d.pop("topological_ordering", None)
@@ -373,8 +374,8 @@ class SQLBaseStore(object):
         sql = "SELECT * FROM events WHERE event_id = ?"
 
         for ev in events:
-           if hasattr(ev, "prev_state"):
-                # Load previous state_content. 
+            if hasattr(ev, "prev_state"):
+                # Load previous state_content.
                 # TODO: Should we be pulling this out above?
                 cursor = txn.execute(sql, (ev.prev_state,))
                 prevs = self.cursor_to_dict(cursor)
@@ -382,8 +383,31 @@ class SQLBaseStore(object):
                     prev = self._parse_event_from_row(prevs[0])
                     ev.prev_content = prev.content
 
+            if not hasattr(ev, "deleted"):
+                logger.debug("Doesn't have deleted key: %s", ev)
+                ev.deleted = self._has_been_deleted_txn(txn, ev)
+
+            if ev.deleted:
+                # Get the deletion event.
+                sql = "SELECT * FROM events WHERE event_id = ?"
+                txn.execute(sql, (ev.deleted,))
+
+                del_evs = self._parse_events_txn(
+                    txn, self.cursor_to_dict(txn)
+                )
+
+                if del_evs:
+                    prune_event(ev)
+                    ev.pruned = del_evs[0]
+
         return events
 
+    def _has_been_deleted_txn(self, txn, event):
+        sql = "SELECT * FROM deletions WHERE deletes = ?"
+        txn.execute(sql, (event.event_id,))
+        return len(txn.fetchall()) > 0
+
+
 class Table(object):
     """ A base class used to store information about a particular table.
     """
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 04b4067d03..97222da571 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -182,14 +182,21 @@ class RoomMemberStore(SQLBaseStore):
         )
 
     def _get_members_query_txn(self, txn, where_clause, where_values):
+        del_sql = (
+            "SELECT event_id FROM deletions WHERE deletes = e.event_id"
+        )
+
         sql = (
-            "SELECT e.* FROM events as e "
+            "SELECT e.*, (%(deleted)s) AS deleted FROM events as e "
             "INNER JOIN room_memberships as m "
             "ON e.event_id = m.event_id "
             "INNER JOIN current_state_events as c "
             "ON m.event_id = c.event_id "
-            "WHERE %s "
-        ) % (where_clause,)
+            "WHERE %(where)s "
+        ) % {
+            "deleted": del_sql,
+            "where": where_clause,
+        }
 
         txn.execute(sql, where_values)
         rows = self.cursor_to_dict(txn)
diff --git a/synapse/storage/schema/delta/v4.sql b/synapse/storage/schema/delta/v4.sql
index 1652ef2921..2e2635317a 100644
--- a/synapse/storage/schema/delta/v4.sql
+++ b/synapse/storage/schema/delta/v4.sql
@@ -1,5 +1,7 @@
 CREATE TABLE IF NOT EXISTS deletions (
     event_id TEXT NOT NULL,
-    deletes TEXT NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
+    deletes TEXT NOT NULL
 );
+
+CREATE INDEX IF NOT EXISTS deletions_event_id ON deletions (event_id);
+CREATE INDEX IF NOT EXISTS deletions_deletes ON deletions (deletes);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a76fecf24f..aaac0aae30 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -157,6 +157,10 @@ class StreamStore(SQLBaseStore):
             "WHERE m.user_id = ? "
         )
 
+        del_sql = (
+            "SELECT event_id FROM deletions WHERE deletes = e.event_id"
+        )
+
         if limit:
             limit = max(limit, MAX_STREAM_SIZE)
         else:
@@ -171,13 +175,14 @@ class StreamStore(SQLBaseStore):
             return
 
         sql = (
-            "SELECT * FROM events as e WHERE "
+            "SELECT *, (%(deleted)s) AS deleted FROM events AS e WHERE "
             "((room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
             "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
             "AND e.outlier = 0 "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
         ) % {
+            "deleted": del_sql,
             "current": current_room_membership_sql,
             "invites": membership_sql,
             "limit": limit
@@ -224,11 +229,20 @@ class StreamStore(SQLBaseStore):
         else:
             limit_str = ""
 
+        del_sql = (
+            "SELECT event_id FROM deletions WHERE deletes = events.event_id"
+        )
+
         sql = (
-            "SELECT * FROM events "
+            "SELECT *, (%(deleted)s) AS deleted FROM events "
             "WHERE outlier = 0 AND room_id = ? AND %(bounds)s "
             "ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
-        ) % {"bounds": bounds, "order": order, "limit": limit_str}
+        ) % {
+            "deleted": del_sql,
+            "bounds": bounds,
+            "order": order,
+            "limit": limit_str
+        }
 
         rows = yield self._execute_and_decode(
             sql,
@@ -257,11 +271,17 @@ class StreamStore(SQLBaseStore):
                                    with_feedback=False):
         # TODO (erikj): Handle compressed feedback
 
+        del_sql = (
+            "SELECT event_id FROM deletions WHERE deletes = events.event_id"
+        )
+
         sql = (
-            "SELECT * FROM events "
+            "SELECT *, (%(deleted)s) AS deleted FROM events "
             "WHERE room_id = ? AND stream_ordering <= ? "
             "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        ) % {
+            "deleted": del_sql,
+        }
 
         rows = yield self._execute_and_decode(
             sql,