summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <github@rvanderhoff.org.uk>2018-03-12 16:32:18 +0000
committerGitHub <noreply@github.com>2018-03-12 16:32:18 +0000
commitd65ceb4b484dc317f2def3df6f9e92564c997cd4 (patch)
tree1363290a8c789ce70b7ac0e25dd31e6a5ecc2a53
parentMerge pull request #2961 from matrix-org/rav/run_in_background (diff)
parentAdd transactional API to history purge (diff)
downloadsynapse-d65ceb4b484dc317f2def3df6f9e92564c997cd4.tar.xz
Merge pull request #2962 from matrix-org/rav/purge_history_txns
Add transactional API to history purge
-rw-r--r--docs/admin_api/purge_history_api.rst27
-rw-r--r--synapse/handlers/message.py121
-rw-r--r--synapse/rest/client/v1/admin.py38
3 files changed, 175 insertions, 11 deletions
diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst
index acf1bc5749..ea2922da5c 100644
--- a/docs/admin_api/purge_history_api.rst
+++ b/docs/admin_api/purge_history_api.rst
@@ -32,3 +32,30 @@ specified by including an event_id in the URI, or by setting a
 id is given, that event (and others at the same graph depth) will be retained.
 If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
 in milliseconds.
+
+The API starts the purge running, and returns immediately with a JSON body with
+a purge id:
+
+.. code:: json
+
+    {
+        "purge_id": "<opaque id>"
+    }
+
+Purge status query
+------------------
+
+It is possible to poll for updates on recent purges with a second API;
+
+``GET /_matrix/client/r0/admin/purge_history_status/<purge_id>``
+
+(again, with a suitable ``access_token``). This API returns a JSON body like
+the following:
+
+.. code:: json
+
+    {
+        "status": "active"
+    }
+
+The status will be one of ``active``, ``complete``, or ``failed``.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dd00d8a86c..42aab91c50 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,7 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,9 +25,10 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 from synapse.replication.http.send_event import send_event_to_master
 
@@ -41,6 +43,36 @@ import ujson
 logger = logging.getLogger(__name__)
 
 
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -50,15 +82,88 @@ class MessageHandler(BaseHandler):
         self.clock = hs.get_clock()
 
         self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
 
-    @defer.inlineCallbacks
-    def purge_history(self, room_id, topological_ordering,
-                      delete_local_events=False):
-        with (yield self.pagination_lock.write(room_id)):
-            yield self.store.purge_history(
-                room_id, topological_ordering, delete_local_events,
+    def start_purge_history(self, room_id, topological_ordering,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
             )
 
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, topological_ordering, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, topological_ordering,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, topological_ordering, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            reactor.callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
                      as_client_event=True, event_filter=None):
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index dcf6215dad..303419d281 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -17,7 +17,7 @@
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.api.errors import AuthError, SynapseError, Codes
+from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError
 from synapse.types import UserID, create_requester
 from synapse.http.servlet import parse_json_object_from_request
 
@@ -185,12 +185,43 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                 errcode=Codes.BAD_JSON,
             )
 
-        yield self.handlers.message_handler.purge_history(
+        purge_id = yield self.handlers.message_handler.start_purge_history(
             room_id, depth,
             delete_local_events=delete_local_events,
         )
 
-        defer.returnValue((200, {}))
+        defer.returnValue((200, {
+            "purge_id": purge_id,
+        }))
+
+
+class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns(
+        "/admin/purge_history_status/(?P<purge_id>[^/]+)"
+    )
+
+    def __init__(self, hs):
+        """
+
+        Args:
+            hs (synapse.server.HomeServer)
+        """
+        super(PurgeHistoryStatusRestServlet, self).__init__(hs)
+        self.handlers = hs.get_handlers()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, purge_id):
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        purge_status = self.handlers.message_handler.get_purge_status(purge_id)
+        if purge_status is None:
+            raise NotFoundError("purge id '%s' not found" % purge_id)
+
+        defer.returnValue((200, purge_status.asdict()))
 
 
 class DeactivateAccountRestServlet(ClientV1RestServlet):
@@ -561,6 +592,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
 def register_servlets(hs, http_server):
     WhoisRestServlet(hs).register(http_server)
     PurgeMediaCacheRestServlet(hs).register(http_server)
+    PurgeHistoryStatusRestServlet(hs).register(http_server)
     DeactivateAccountRestServlet(hs).register(http_server)
     PurgeHistoryRestServlet(hs).register(http_server)
     UsersRestServlet(hs).register(http_server)