summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorRichard van der Hoff <github@rvanderhoff.org.uk>2018-03-12 16:32:18 +0000
committerGitHub <noreply@github.com>2018-03-12 16:32:18 +0000
commitd65ceb4b484dc317f2def3df6f9e92564c997cd4 (patch)
tree1363290a8c789ce70b7ac0e25dd31e6a5ecc2a53 /synapse/handlers
parentMerge pull request #2961 from matrix-org/rav/run_in_background (diff)
parentAdd transactional API to history purge (diff)
downloadsynapse-d65ceb4b484dc317f2def3df6f9e92564c997cd4.tar.xz
Merge pull request #2962 from matrix-org/rav/purge_history_txns
Add transactional API to history purge
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/message.py121
1 files changed, 113 insertions, 8 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dd00d8a86c..42aab91c50 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,7 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,9 +25,10 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 from synapse.replication.http.send_event import send_event_to_master
 
@@ -41,6 +43,36 @@ import ujson
 logger = logging.getLogger(__name__)
 
 
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -50,15 +82,88 @@ class MessageHandler(BaseHandler):
         self.clock = hs.get_clock()
 
         self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
 
-    @defer.inlineCallbacks
-    def purge_history(self, room_id, topological_ordering,
-                      delete_local_events=False):
-        with (yield self.pagination_lock.write(room_id)):
-            yield self.store.purge_history(
-                room_id, topological_ordering, delete_local_events,
+    def start_purge_history(self, room_id, topological_ordering,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
             )
 
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, topological_ordering, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, topological_ordering,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, topological_ordering, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            reactor.callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
                      as_client_event=True, event_filter=None):