diff options
author | Richard van der Hoff <github@rvanderhoff.org.uk> | 2018-03-12 16:32:18 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-12 16:32:18 +0000 |
commit | d65ceb4b484dc317f2def3df6f9e92564c997cd4 (patch) | |
tree | 1363290a8c789ce70b7ac0e25dd31e6a5ecc2a53 /synapse | |
parent | Merge pull request #2961 from matrix-org/rav/run_in_background (diff) | |
parent | Add transactional API to history purge (diff) | |
download | synapse-d65ceb4b484dc317f2def3df6f9e92564c997cd4.tar.xz |
Merge pull request #2962 from matrix-org/rav/purge_history_txns
Add transactional API to history purge
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/message.py | 121 | ||||
-rw-r--r-- | synapse/rest/client/v1/admin.py | 38 |
2 files changed, 148 insertions, 11 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dd00d8a86c..42aab91c50 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -13,7 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError @@ -24,9 +25,10 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, run_in_background from synapse.util.metrics import measure_func from synapse.util.frozenutils import unfreeze +from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client from synapse.replication.http.send_event import send_event_to_master @@ -41,6 +43,36 @@ import ujson logger = logging.getLogger(__name__) +class PurgeStatus(object): + """Object tracking the status of a purge request + + This class contains information on the progress of a purge request, for + return by get_purge_status. + + Attributes: + status (int): Tracks whether this request has completed. One of + STATUS_{ACTIVE,COMPLETE,FAILED} + """ + + STATUS_ACTIVE = 0 + STATUS_COMPLETE = 1 + STATUS_FAILED = 2 + + STATUS_TEXT = { + STATUS_ACTIVE: "active", + STATUS_COMPLETE: "complete", + STATUS_FAILED: "failed", + } + + def __init__(self): + self.status = PurgeStatus.STATUS_ACTIVE + + def asdict(self): + return { + "status": PurgeStatus.STATUS_TEXT[self.status] + } + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -50,15 +82,88 @@ class MessageHandler(BaseHandler): self.clock = hs.get_clock() self.pagination_lock = ReadWriteLock() + self._purges_in_progress_by_room = set() + # map from purge id to PurgeStatus + self._purges_by_id = {} - @defer.inlineCallbacks - def purge_history(self, room_id, topological_ordering, - delete_local_events=False): - with (yield self.pagination_lock.write(room_id)): - yield self.store.purge_history( - room_id, topological_ordering, delete_local_events, + def start_purge_history(self, room_id, topological_ordering, + delete_local_events=False): + """Start off a history purge on a room. + + Args: + room_id (str): The room to purge from + + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + str: unique ID for this purge transaction. + """ + if room_id in self._purges_in_progress_by_room: + raise SynapseError( + 400, + "History purge already in progress for %s" % (room_id, ), ) + purge_id = random_string(16) + + # we log the purge_id here so that it can be tied back to the + # request id in the log lines. + logger.info("[purge] starting purge_id %s", purge_id) + + self._purges_by_id[purge_id] = PurgeStatus() + run_in_background( + self._purge_history, + purge_id, room_id, topological_ordering, delete_local_events, + ) + return purge_id + + @defer.inlineCallbacks + def _purge_history(self, purge_id, room_id, topological_ordering, + delete_local_events): + """Carry out a history purge on a room. + + Args: + purge_id (str): The id for this purge + room_id (str): The room to purge from + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + Deferred + """ + self._purges_in_progress_by_room.add(room_id) + try: + with (yield self.pagination_lock.write(room_id)): + yield self.store.purge_history( + room_id, topological_ordering, delete_local_events, + ) + logger.info("[purge] complete") + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + except Exception: + logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED + finally: + self._purges_in_progress_by_room.discard(room_id) + + # remove the purge from the list 24 hours after it completes + def clear_purge(): + del self._purges_by_id[purge_id] + reactor.callLater(24 * 3600, clear_purge) + + def get_purge_status(self, purge_id): + """Get the current status of an active purge + + Args: + purge_id (str): purge_id returned by start_purge_history + + Returns: + PurgeStatus|None + """ + return self._purges_by_id.get(purge_id) + @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True, event_filter=None): diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index dcf6215dad..303419d281 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership -from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError from synapse.types import UserID, create_requester from synapse.http.servlet import parse_json_object_from_request @@ -185,12 +185,43 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): errcode=Codes.BAD_JSON, ) - yield self.handlers.message_handler.purge_history( + purge_id = yield self.handlers.message_handler.start_purge_history( room_id, depth, delete_local_events=delete_local_events, ) - defer.returnValue((200, {})) + defer.returnValue((200, { + "purge_id": purge_id, + })) + + +class PurgeHistoryStatusRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns( + "/admin/purge_history_status/(?P<purge_id>[^/]+)" + ) + + def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer) + """ + super(PurgeHistoryStatusRestServlet, self).__init__(hs) + self.handlers = hs.get_handlers() + + @defer.inlineCallbacks + def on_GET(self, request, purge_id): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + + if not is_admin: + raise AuthError(403, "You are not a server admin") + + purge_status = self.handlers.message_handler.get_purge_status(purge_id) + if purge_status is None: + raise NotFoundError("purge id '%s' not found" % purge_id) + + defer.returnValue((200, purge_status.asdict())) class DeactivateAccountRestServlet(ClientV1RestServlet): @@ -561,6 +592,7 @@ class SearchUsersRestServlet(ClientV1RestServlet): def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) PurgeMediaCacheRestServlet(hs).register(http_server) + PurgeHistoryStatusRestServlet(hs).register(http_server) DeactivateAccountRestServlet(hs).register(http_server) PurgeHistoryRestServlet(hs).register(http_server) UsersRestServlet(hs).register(http_server) |