summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--docs/admin_api/purge_history_api.rst14
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/http/servlet.py18
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/rest/client/v1/admin.py9
-rw-r--r--synapse/rest/media/v1/media_storage.py6
-rw-r--r--synapse/state.py34
-rw-r--r--synapse/storage/events.py189
8 files changed, 174 insertions, 102 deletions
diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst
index 08b3306366..a3a17e9f9f 100644
--- a/docs/admin_api/purge_history_api.rst
+++ b/docs/admin_api/purge_history_api.rst
@@ -4,8 +4,6 @@ Purge History API
 The purge history API allows server admins to purge historic events from their
 database, reclaiming disk space.
 
-**NB!** This will not delete local events (locally sent messages content etc) from the database, but will remove lots of the metadata about them and does dramatically reduce the on disk space usage
-
 Depending on the amount of history being purged a call to the API may take
 several minutes or longer. During this period users will not be able to
 paginate further back in the room from the point being purged from.
@@ -15,3 +13,15 @@ The API is simply:
 ``POST /_matrix/client/r0/admin/purge_history/<room_id>/<event_id>``
 
 including an ``access_token`` of a server admin.
+
+By default, events sent by local users are not deleted, as they may represent
+the only copies of this content in existence. (Events sent by remote users are
+deleted, and room state data before the cutoff is always removed).
+
+To delete local events as well, set ``delete_local_events`` in the body:
+
+.. code:: json
+
+   {
+       "delete_local_events": true
+   }
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6be3f4d770..1c3ac03f20 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -52,7 +52,7 @@ class MessageHandler(BaseHandler):
         self.pagination_lock = ReadWriteLock()
 
     @defer.inlineCallbacks
-    def purge_history(self, room_id, event_id):
+    def purge_history(self, room_id, event_id, delete_local_events=False):
         event = yield self.store.get_event(event_id)
 
         if event.room_id != room_id:
@@ -61,7 +61,7 @@ class MessageHandler(BaseHandler):
         depth = event.depth
 
         with (yield self.pagination_lock.write(room_id)):
-            yield self.store.delete_old_state(room_id, depth)
+            yield self.store.purge_history(room_id, depth, delete_local_events)
 
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 71420e54db..ef8e62901b 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -148,11 +148,13 @@ def parse_string_from_args(args, name, default=None, required=False,
             return default
 
 
-def parse_json_value_from_request(request):
+def parse_json_value_from_request(request, allow_empty_body=False):
     """Parse a JSON value from the body of a twisted HTTP request.
 
     Args:
         request: the twisted HTTP request.
+        allow_empty_body (bool): if True, an empty body will be accepted and
+            turned into None
 
     Returns:
         The JSON value.
@@ -165,6 +167,9 @@ def parse_json_value_from_request(request):
     except Exception:
         raise SynapseError(400, "Error reading JSON content.")
 
+    if not content_bytes and allow_empty_body:
+        return None
+
     try:
         content = simplejson.loads(content_bytes)
     except Exception as e:
@@ -174,17 +179,24 @@ def parse_json_value_from_request(request):
     return content
 
 
-def parse_json_object_from_request(request):
+def parse_json_object_from_request(request, allow_empty_body=False):
     """Parse a JSON object from the body of a twisted HTTP request.
 
     Args:
         request: the twisted HTTP request.
+        allow_empty_body (bool): if True, an empty body will be accepted and
+            turned into an empty dict.
 
     Raises:
         SynapseError if the request body couldn't be decoded as JSON or
             if it wasn't a JSON object.
     """
-    content = parse_json_value_from_request(request)
+    content = parse_json_value_from_request(
+        request, allow_empty_body=allow_empty_body,
+    )
+
+    if allow_empty_body and content is None:
+        return {}
 
     if type(content) != dict:
         message = "Content must be a JSON object."
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 7052333c19..97b631e60d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -36,7 +36,7 @@ REQUIREMENTS = {
     "pydenticon": ["pydenticon"],
     "ujson": ["ujson"],
     "blist": ["blist"],
-    "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
+    "pysaml2>=3.0.0": ["saml2>=3.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 20c5c66632..6073cc6fa2 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -129,7 +129,14 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        yield self.handlers.message_handler.purge_history(room_id, event_id)
+        body = parse_json_object_from_request(request, allow_empty_body=True)
+
+        delete_local_events = bool(body.get("delete_local_events", False))
+
+        yield self.handlers.message_handler.purge_history(
+            room_id, event_id,
+            delete_local_events=delete_local_events,
+        )
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index e8e8b3986d..3f8d4b9c22 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -70,6 +70,12 @@ class MediaStorage(object):
             _write_file_synchronously, source, fname,
         ))
 
+        # Tell the storage providers about the new file. They'll decide
+        # if they should upload it and whether to do so synchronously
+        # or not.
+        for provider in self.storage_providers:
+            yield provider.store_file(path, file_info)
+
         defer.returnValue(fname)
 
     @contextlib.contextmanager
diff --git a/synapse/state.py b/synapse/state.py
index cc93bbcb6b..932f602508 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -350,7 +350,7 @@ class StateHandler(object):
             ))
 
         result = yield self._state_resolution_handler.resolve_state_groups(
-            room_id, state_groups_ids, self._state_map_factory,
+            room_id, state_groups_ids, None, self._state_map_factory,
         )
         defer.returnValue(result)
 
@@ -413,7 +413,9 @@ class StateResolutionHandler(object):
 
     @defer.inlineCallbacks
     @log_function
-    def resolve_state_groups(self, room_id, state_groups_ids, state_map_factory):
+    def resolve_state_groups(
+        self, room_id, state_groups_ids, event_map, state_map_factory,
+    ):
         """Resolves conflicts between a set of state groups
 
         Always generates a new state group (unless we hit the cache), so should
@@ -425,6 +427,14 @@ class StateResolutionHandler(object):
                  map from state group id to the state in that state group
                 (where 'state' is a map from state key to event id)
 
+            event_map(dict[str,FrozenEvent]|None):
+                a dict from event_id to event, for any events that we happen to
+                have in flight (eg, those currently being persisted). This will be
+                used as a starting point fof finding the state we need; any missing
+                events will be requested via state_map_factory.
+
+                If None, all events will be fetched via state_map_factory.
+
         Returns:
             Deferred[_StateCacheEntry]: resolved state
         """
@@ -465,6 +475,7 @@ class StateResolutionHandler(object):
                 with Measure(self.clock, "state._resolve_events"):
                     new_state = yield resolve_events_with_factory(
                         state_groups_ids.values(),
+                        event_map=event_map,
                         state_map_factory=state_map_factory,
                     )
             else:
@@ -597,11 +608,20 @@ def _seperate(state_sets):
 
 
 @defer.inlineCallbacks
-def resolve_events_with_factory(state_sets, state_map_factory):
+def resolve_events_with_factory(state_sets, event_map, state_map_factory):
     """
     Args:
         state_sets(list): List of dicts of (type, state_key) -> event_id,
             which are the different state groups to resolve.
+
+        event_map(dict[str,FrozenEvent]|None):
+            a dict from event_id to event, for any events that we happen to
+            have in flight (eg, those currently being persisted). This will be
+            used as a starting point fof finding the state we need; any missing
+            events will be requested via state_map_factory.
+
+            If None, all events will be fetched via state_map_factory.
+
         state_map_factory(func): will be called
             with a list of event_ids that are needed, and should return with
             a Deferred of dict of event_id to event.
@@ -622,12 +642,16 @@ def resolve_events_with_factory(state_sets, state_map_factory):
         for event_ids in conflicted_state.itervalues()
         for event_id in event_ids
     )
+    if event_map is not None:
+        needed_events -= set(event_map.iterkeys())
 
     logger.info("Asking for %d conflicted events", len(needed_events))
 
     # dict[str, FrozenEvent]: a map from state event id to event. Only includes
-    # the state events which are in conflict.
+    # the state events which are in conflict (and those in event_map)
     state_map = yield state_map_factory(needed_events)
+    if event_map is not None:
+        state_map.update(event_map)
 
     # get the ids of the auth events which allow us to authenticate the
     # conflicted state, picking only from the unconflicting state.
@@ -639,6 +663,8 @@ def resolve_events_with_factory(state_sets, state_map_factory):
 
     new_needed_events = set(auth_events.itervalues())
     new_needed_events -= needed_events
+    if event_map is not None:
+        new_needed_events -= set(event_map.iterkeys())
 
     logger.info("Asking for %d auth events", len(new_needed_events))
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index af56f1ee57..bbb6aa992c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,7 +27,6 @@ from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
-from synapse.state import resolve_events_with_factory
 from synapse.util.caches.descriptors import cached
 from synapse.types import get_domain_from_id
 
@@ -237,6 +236,8 @@ class EventsStore(SQLBaseStore):
 
         self._event_persist_queue = _EventPeristenceQueue()
 
+        self._state_resolution_handler = hs.get_state_resolution_handler()
+
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
@@ -402,6 +403,7 @@ class EventsStore(SQLBaseStore):
                                 "Calculating state delta for room %s", room_id,
                             )
                             current_state = yield self._get_new_state_after_events(
+                                room_id,
                                 ev_ctx_rm, new_latest_event_ids,
                             )
                             if current_state is not None:
@@ -487,11 +489,14 @@ class EventsStore(SQLBaseStore):
         defer.returnValue(new_latest_event_ids)
 
     @defer.inlineCallbacks
-    def _get_new_state_after_events(self, events_context, new_latest_event_ids):
+    def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
         """Calculate the current state dict after adding some new events to
         a room
 
         Args:
+            room_id (str):
+                room to which the events are being added. Used for logging etc
+
             events_context (list[(EventBase, EventContext)]):
                 events and contexts which are being added to the room
 
@@ -503,8 +508,12 @@ class EventsStore(SQLBaseStore):
                 None if there are no changes to the room state, or
                 a dict of (type, state_key) -> event_id].
         """
-        state_sets = []
-        state_groups = set()
+
+        if not new_latest_event_ids:
+            defer.returnValue({})
+
+        # map from state_group to ((type, key) -> event_id) state map
+        state_groups = {}
         missing_event_ids = []
         was_updated = False
         for event_id in new_latest_event_ids:
@@ -515,16 +524,19 @@ class EventsStore(SQLBaseStore):
                     if ctx.current_state_ids is None:
                         raise Exception("Unknown current state")
 
+                    if ctx.state_group is None:
+                        # I don't think this can happen, but let's double-check
+                        raise Exception(
+                            "Context for new extremity event %s has no state "
+                            "group" % (event_id, ),
+                        )
+
                     # If we've already seen the state group don't bother adding
                     # it to the state sets again
                     if ctx.state_group not in state_groups:
-                        state_sets.append(ctx.current_state_ids)
+                        state_groups[ctx.state_group] = ctx.current_state_ids
                         if ctx.delta_ids or hasattr(ev, "state_key"):
                             was_updated = True
-                        if ctx.state_group:
-                            # Add this as a seen state group (if it has a state
-                            # group)
-                            state_groups.add(ctx.state_group)
                     break
             else:
                 # If we couldn't find it, then we'll need to pull
@@ -532,65 +544,37 @@ class EventsStore(SQLBaseStore):
                 was_updated = True
                 missing_event_ids.append(event_id)
 
+        if not was_updated:
+            return
+
         if missing_event_ids:
             # Now pull out the state for any missing events from DB
             event_to_groups = yield self._get_state_group_for_events(
                 missing_event_ids,
             )
 
-            groups = set(event_to_groups.itervalues()) - state_groups
+            groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
 
             if groups:
                 group_to_state = yield self._get_state_for_groups(groups)
-                state_sets.extend(group_to_state.itervalues())
+                state_groups.update(group_to_state)
 
-        if not new_latest_event_ids:
-            defer.returnValue({})
-        elif was_updated:
-            if len(state_sets) == 1:
-                # If there is only one state set, then we know what the current
-                # state is.
-                defer.returnValue(state_sets[0])
-            else:
-                # We work out the current state by passing the state sets to the
-                # state resolution algorithm. It may ask for some events, including
-                # the events we have yet to persist, so we need a slightly more
-                # complicated event lookup function than simply looking the events
-                # up in the db.
-
-                logger.info(
-                    "Resolving state with %i state sets", len(state_sets),
-                )
+        if len(state_groups) == 1:
+            # If there is only one state group, then we know what the current
+            # state is.
+            defer.returnValue(state_groups.values()[0])
 
-                events_map = {ev.event_id: ev for ev, _ in events_context}
-
-                @defer.inlineCallbacks
-                def get_events(ev_ids):
-                    # We get the events by first looking at the list of events we
-                    # are trying to persist, and then fetching the rest from the DB.
-                    db = []
-                    to_return = {}
-                    for ev_id in ev_ids:
-                        ev = events_map.get(ev_id, None)
-                        if ev:
-                            to_return[ev_id] = ev
-                        else:
-                            db.append(ev_id)
-
-                    if db:
-                        evs = yield self.get_events(
-                            ev_ids, get_prev_content=False, check_redacted=False,
-                        )
-                        to_return.update(evs)
-                    defer.returnValue(to_return)
+        def get_events(ev_ids):
+            return self.get_events(
+                ev_ids, get_prev_content=False, check_redacted=False,
+            )
+        events_map = {ev.event_id: ev for ev, _ in events_context}
+        logger.debug("calling resolve_state_groups from preserve_events")
+        res = yield self._state_resolution_handler.resolve_state_groups(
+            room_id, state_groups, events_map, get_events
+        )
 
-                current_state = yield resolve_events_with_factory(
-                    state_sets,
-                    state_map_factory=get_events,
-                )
-                defer.returnValue(current_state)
-        else:
-            return
+        defer.returnValue(res.state)
 
     @defer.inlineCallbacks
     def _calculate_state_delta(self, room_id, current_state):
@@ -2063,16 +2047,32 @@ class EventsStore(SQLBaseStore):
             )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
-    def delete_old_state(self, room_id, topological_ordering):
-        return self.runInteraction(
-            "delete_old_state",
-            self._delete_old_state_txn, room_id, topological_ordering
-        )
+    def purge_history(
+        self, room_id, topological_ordering, delete_local_events,
+    ):
+        """Deletes room history before a certain point
+
+        Args:
+            room_id (str):
+
+            topological_ordering (int):
+                minimum topo ordering to preserve
 
-    def _delete_old_state_txn(self, txn, room_id, topological_ordering):
-        """Deletes old room state
+            delete_local_events (bool):
+                if True, we will delete local events as well as remote ones
+                (instead of just marking them as outliers and deleting their
+                state groups).
         """
 
+        return self.runInteraction(
+            "purge_history",
+            self._purge_history_txn, room_id, topological_ordering,
+            delete_local_events,
+        )
+
+    def _purge_history_txn(
+        self, txn, room_id, topological_ordering, delete_local_events,
+    ):
         # Tables that should be pruned:
         #     event_auth
         #     event_backward_extremities
@@ -2113,7 +2113,7 @@ class EventsStore(SQLBaseStore):
                 400, "topological_ordering is greater than forward extremeties"
             )
 
-        logger.debug("[purge] looking for events to delete")
+        logger.info("[purge] looking for events to delete")
 
         txn.execute(
             "SELECT event_id, state_key FROM events"
@@ -2125,16 +2125,16 @@ class EventsStore(SQLBaseStore):
 
         to_delete = [
             (event_id,) for event_id, state_key in event_rows
-            if state_key is None and not self.hs.is_mine_id(event_id)
+            if state_key is None and (
+                delete_local_events or not self.hs.is_mine_id(event_id)
+            )
         ]
         logger.info(
-            "[purge] found %i events before cutoff, of which %i are remote"
-            " non-state events to delete", len(event_rows), len(to_delete))
-
-        for event_id, state_key in event_rows:
-            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+            "[purge] found %i events before cutoff, of which %i can be deleted",
+            len(event_rows), len(to_delete),
+        )
 
-        logger.debug("[purge] Finding new backward extremities")
+        logger.info("[purge] Finding new backward extremities")
 
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
@@ -2148,7 +2148,7 @@ class EventsStore(SQLBaseStore):
         )
         new_backwards_extrems = txn.fetchall()
 
-        logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+        logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
 
         txn.execute(
             "DELETE FROM event_backward_extremities WHERE room_id = ?",
@@ -2164,7 +2164,7 @@ class EventsStore(SQLBaseStore):
             ]
         )
 
-        logger.debug("[purge] finding redundant state groups")
+        logger.info("[purge] finding redundant state groups")
 
         # Get all state groups that are only referenced by events that are
         # to be deleted.
@@ -2181,15 +2181,15 @@ class EventsStore(SQLBaseStore):
         )
 
         state_rows = txn.fetchall()
-        logger.debug("[purge] found %i redundant state groups", len(state_rows))
+        logger.info("[purge] found %i redundant state groups", len(state_rows))
 
         # make a set of the redundant state groups, so that we can look them up
         # efficiently
         state_groups_to_delete = set([sg for sg, in state_rows])
 
         # Now we get all the state groups that rely on these state groups
-        logger.debug("[purge] finding state groups which depend on redundant"
-                     " state groups")
+        logger.info("[purge] finding state groups which depend on redundant"
+                    " state groups")
         remaining_state_groups = []
         for i in xrange(0, len(state_rows), 100):
             chunk = [sg for sg, in state_rows[i:i + 100]]
@@ -2214,7 +2214,7 @@ class EventsStore(SQLBaseStore):
         # Now we turn the state groups that reference to-be-deleted state
         # groups to non delta versions.
         for sg in remaining_state_groups:
-            logger.debug("[purge] de-delta-ing remaining state group %s", sg)
+            logger.info("[purge] de-delta-ing remaining state group %s", sg)
             curr_state = self._get_state_groups_from_groups_txn(
                 txn, [sg], types=None
             )
@@ -2251,7 +2251,7 @@ class EventsStore(SQLBaseStore):
                 ],
             )
 
-        logger.debug("[purge] removing redundant state groups")
+        logger.info("[purge] removing redundant state groups")
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             state_rows
@@ -2261,18 +2261,15 @@ class EventsStore(SQLBaseStore):
             state_rows
         )
 
-        # Delete all non-state
-        logger.debug("[purge] removing events from event_to_state_groups")
+        logger.info("[purge] removing events from event_to_state_groups")
         txn.executemany(
             "DELETE FROM event_to_state_groups WHERE event_id = ?",
             [(event_id,) for event_id, _ in event_rows]
         )
-
-        logger.debug("[purge] updating room_depth")
-        txn.execute(
-            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
-            (topological_ordering, room_id,)
-        )
+        for event_id, _ in event_rows:
+            txn.call_after(self._get_state_group_for_event.invalidate, (
+                event_id,
+            ))
 
         # Delete all remote non-state events
         for table in (
@@ -2290,7 +2287,7 @@ class EventsStore(SQLBaseStore):
             "event_signatures",
             "rejections",
         ):
-            logger.debug("[purge] removing remote non-state events from %s", table)
+            logger.info("[purge] removing events from %s", table)
 
             txn.executemany(
                 "DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -2298,16 +2295,30 @@ class EventsStore(SQLBaseStore):
             )
 
         # Mark all state and own events as outliers
-        logger.debug("[purge] marking remaining events as outliers")
+        logger.info("[purge] marking remaining events as outliers")
         txn.executemany(
             "UPDATE events SET outlier = ?"
             " WHERE event_id = ?",
             [
                 (True, event_id,) for event_id, state_key in event_rows
-                if state_key is not None or self.hs.is_mine_id(event_id)
+                if state_key is not None or (
+                    not delete_local_events and self.hs.is_mine_id(event_id)
+                )
             ]
         )
 
+        # synapse tries to take out an exclusive lock on room_depth whenever it
+        # persists events (because upsert), and once we run this update, we
+        # will block that for the rest of our transaction.
+        #
+        # So, let's stick it at the end so that we don't block event
+        # persistence.
+        logger.info("[purge] updating room_depth")
+        txn.execute(
+            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+            (topological_ordering, room_id,)
+        )
+
         logger.info("[purge] done")
 
     @defer.inlineCallbacks