diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst
index acf1bc5749..ea2922da5c 100644
--- a/docs/admin_api/purge_history_api.rst
+++ b/docs/admin_api/purge_history_api.rst
@@ -32,3 +32,30 @@ specified by including an event_id in the URI, or by setting a
id is given, that event (and others at the same graph depth) will be retained.
If ``purge_up_to_ts`` is given, it should be a timestamp since the unix epoch,
in milliseconds.
+
+The API starts the purge running, and returns immediately with a JSON body with
+a purge id:
+
+.. code:: json
+
+ {
+ "purge_id": "<opaque id>"
+ }
+
+Purge status query
+------------------
+
+It is possible to poll for updates on recent purges with a second API;
+
+``GET /_matrix/client/r0/admin/purge_history_status/<purge_id>``
+
+(again, with a suitable ``access_token``). This API returns a JSON body like
+the following:
+
+.. code:: json
+
+ {
+ "status": "active"
+ }
+
+The status will be one of ``active``, ``complete``, or ``failed``.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dd00d8a86c..42aab91c50 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,7 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,9 +25,10 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
from synapse.replication.http.send_event import send_event_to_master
@@ -41,6 +43,36 @@ import ujson
logger = logging.getLogger(__name__)
+class PurgeStatus(object):
+ """Object tracking the status of a purge request
+
+ This class contains information on the progress of a purge request, for
+ return by get_purge_status.
+
+ Attributes:
+ status (int): Tracks whether this request has completed. One of
+ STATUS_{ACTIVE,COMPLETE,FAILED}
+ """
+
+ STATUS_ACTIVE = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+
+ STATUS_TEXT = {
+ STATUS_ACTIVE: "active",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ }
+
+ def __init__(self):
+ self.status = PurgeStatus.STATUS_ACTIVE
+
+ def asdict(self):
+ return {
+ "status": PurgeStatus.STATUS_TEXT[self.status]
+ }
+
+
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -50,15 +82,88 @@ class MessageHandler(BaseHandler):
self.clock = hs.get_clock()
self.pagination_lock = ReadWriteLock()
+ self._purges_in_progress_by_room = set()
+ # map from purge id to PurgeStatus
+ self._purges_by_id = {}
- @defer.inlineCallbacks
- def purge_history(self, room_id, topological_ordering,
- delete_local_events=False):
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, topological_ordering, delete_local_events,
+ def start_purge_history(self, room_id, topological_ordering,
+ delete_local_events=False):
+ """Start off a history purge on a room.
+
+ Args:
+ room_id (str): The room to purge from
+
+ topological_ordering (int): minimum topo ordering to preserve
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ str: unique ID for this purge transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400,
+ "History purge already in progress for %s" % (room_id, ),
)
+ purge_id = random_string(16)
+
+ # we log the purge_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info("[purge] starting purge_id %s", purge_id)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+ run_in_background(
+ self._purge_history,
+ purge_id, room_id, topological_ordering, delete_local_events,
+ )
+ return purge_id
+
+ @defer.inlineCallbacks
+ def _purge_history(self, purge_id, room_id, topological_ordering,
+ delete_local_events):
+ """Carry out a history purge on a room.
+
+ Args:
+ purge_id (str): The id for this purge
+ room_id (str): The room to purge from
+ topological_ordering (int): minimum topo ordering to preserve
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ Deferred
+ """
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.purge_history(
+ room_id, topological_ordering, delete_local_events,
+ )
+ logger.info("[purge] complete")
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ except Exception:
+ logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the purge from the list 24 hours after it completes
+ def clear_purge():
+ del self._purges_by_id[purge_id]
+ reactor.callLater(24 * 3600, clear_purge)
+
+ def get_purge_status(self, purge_id):
+ """Get the current status of an active purge
+
+ Args:
+ purge_id (str): purge_id returned by start_purge_history
+
+ Returns:
+ PurgeStatus|None
+ """
+ return self._purges_by_id.get(purge_id)
+
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True, event_filter=None):
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index dcf6215dad..303419d281 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -17,7 +17,7 @@
from twisted.internet import defer
from synapse.api.constants import Membership
-from synapse.api.errors import AuthError, SynapseError, Codes
+from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError
from synapse.types import UserID, create_requester
from synapse.http.servlet import parse_json_object_from_request
@@ -185,12 +185,43 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.BAD_JSON,
)
- yield self.handlers.message_handler.purge_history(
+ purge_id = yield self.handlers.message_handler.start_purge_history(
room_id, depth,
delete_local_events=delete_local_events,
)
- defer.returnValue((200, {}))
+ defer.returnValue((200, {
+ "purge_id": purge_id,
+ }))
+
+
+class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
+ PATTERNS = client_path_patterns(
+ "/admin/purge_history_status/(?P<purge_id>[^/]+)"
+ )
+
+ def __init__(self, hs):
+ """
+
+ Args:
+ hs (synapse.server.HomeServer)
+ """
+ super(PurgeHistoryStatusRestServlet, self).__init__(hs)
+ self.handlers = hs.get_handlers()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, purge_id):
+ requester = yield self.auth.get_user_by_req(request)
+ is_admin = yield self.auth.is_server_admin(requester.user)
+
+ if not is_admin:
+ raise AuthError(403, "You are not a server admin")
+
+ purge_status = self.handlers.message_handler.get_purge_status(purge_id)
+ if purge_status is None:
+ raise NotFoundError("purge id '%s' not found" % purge_id)
+
+ defer.returnValue((200, purge_status.asdict()))
class DeactivateAccountRestServlet(ClientV1RestServlet):
@@ -561,6 +592,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
def register_servlets(hs, http_server):
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
+ PurgeHistoryStatusRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PurgeHistoryRestServlet(hs).register(http_server)
UsersRestServlet(hs).register(http_server)
|