summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-03-08 11:47:28 +0000
committerRichard van der Hoff <richard@matrix.org>2018-03-12 16:22:55 +0000
commite48c7aac4d827b66182adf80ab9804f42db186c9 (patch)
tree1363290a8c789ce70b7ac0e25dd31e6a5ecc2a53 /synapse/handlers/message.py
parentReturn an error when doing two purges on a room (diff)
downloadsynapse-e48c7aac4d827b66182adf80ab9804f42db186c9.tar.xz
Add transactional API to history purge
Make the purge request return quickly, and allow scripts to poll for updates.
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py104
1 files changed, 99 insertions, 5 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6eb8d19dc9..42aab91c50 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,7 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,9 +25,10 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 from synapse.replication.http.send_event import send_event_to_master
 
@@ -41,6 +43,36 @@ import ujson
 logger = logging.getLogger(__name__)
 
 
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -51,25 +83,87 @@ class MessageHandler(BaseHandler):
 
         self.pagination_lock = ReadWriteLock()
         self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
 
-    @defer.inlineCallbacks
-    def purge_history(self, room_id, topological_ordering,
-                      delete_local_events=False):
+    def start_purge_history(self, room_id, topological_ordering,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
         if room_id in self._purges_in_progress_by_room:
             raise SynapseError(
                 400,
                 "History purge already in progress for %s" % (room_id, ),
             )
 
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, topological_ordering, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, topological_ordering,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
         self._purges_in_progress_by_room.add(room_id)
         try:
             with (yield self.pagination_lock.write(room_id)):
                 yield self.store.purge_history(
                     room_id, topological_ordering, delete_local_events,
                 )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
         finally:
             self._purges_in_progress_by_room.discard(room_id)
 
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            reactor.callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
                      as_client_event=True, event_filter=None):