summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-05-15 16:06:30 +0100
committerErik Johnston <erik@matrix.org>2018-05-15 16:23:50 +0100
commit5f27ed75ad804ab9b5287f0deb8fd24b9a3a6232 (patch)
tree70cd3d0742c5f9a408c1da0abc82f10784e42550
parentUse events_to_purge table rather than token (diff)
downloadsynapse-5f27ed75ad804ab9b5287f0deb8fd24b9a3a6232.tar.xz
Make purge_history operate on tokens
As we're soon going to change how topological_ordering works
-rw-r--r--synapse/handlers/message.py12
-rw-r--r--synapse/rest/client/v1/admin.py17
-rw-r--r--synapse/storage/events.py17
3 files changed, 25 insertions, 21 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b793fc4df7..8343b5839d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -86,14 +86,14 @@ class MessageHandler(BaseHandler):
         # map from purge id to PurgeStatus
         self._purges_by_id = {}
 
-    def start_purge_history(self, room_id, topological_ordering,
+    def start_purge_history(self, room_id, token,
                             delete_local_events=False):
         """Start off a history purge on a room.
 
         Args:
             room_id (str): The room to purge from
 
-            topological_ordering (int): minimum topo ordering to preserve
+            token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
 
@@ -115,19 +115,19 @@ class MessageHandler(BaseHandler):
         self._purges_by_id[purge_id] = PurgeStatus()
         run_in_background(
             self._purge_history,
-            purge_id, room_id, topological_ordering, delete_local_events,
+            purge_id, room_id, token, delete_local_events,
         )
         return purge_id
 
     @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, topological_ordering,
+    def _purge_history(self, purge_id, room_id, token,
                        delete_local_events):
         """Carry out a history purge on a room.
 
         Args:
             purge_id (str): The id for this purge
             room_id (str): The room to purge from
-            topological_ordering (int): minimum topo ordering to preserve
+            token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
 
@@ -138,7 +138,7 @@ class MessageHandler(BaseHandler):
         try:
             with (yield self.pagination_lock.write(room_id)):
                 yield self.store.purge_history(
-                    room_id, topological_ordering, delete_local_events,
+                    room_id, token, delete_local_events,
                 )
             logger.info("[purge] complete")
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index efd5c9873d..282ce6be42 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -151,10 +151,11 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
             if event.room_id != room_id:
                 raise SynapseError(400, "Event is for wrong room.")
 
-            depth = event.depth
+            token = yield self.store.get_topological_token_for_event(event_id)
+
             logger.info(
-                "[purge] purging up to depth %i (event_id %s)",
-                depth, event_id,
+                "[purge] purging up to token %s (event_id %s)",
+                token, event_id,
             )
         elif 'purge_up_to_ts' in body:
             ts = body['purge_up_to_ts']
@@ -174,7 +175,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                 )
             )
             if room_event_after_stream_ordering:
-                (_, depth, _) = room_event_after_stream_ordering
+                token = yield self.store.get_topological_token_for_event(
+                    room_event_after_stream_ordering,
+                )
             else:
                 logger.warn(
                     "[purge] purging events not possible: No event found "
@@ -187,9 +190,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
                     errcode=Codes.NOT_FOUND,
                 )
             logger.info(
-                "[purge] purging up to depth %i (received_ts %i => "
+                "[purge] purging up to token %d (received_ts %i => "
                 "stream_ordering %i)",
-                depth, ts, stream_ordering,
+                token, ts, stream_ordering,
             )
         else:
             raise SynapseError(
@@ -199,7 +202,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
             )
 
         purge_id = yield self.handlers.message_handler.start_purge_history(
-            room_id, depth,
+            room_id, token,
             delete_local_events=delete_local_events,
         )
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b4ae6664f0..f65e18c1ee 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,7 +33,7 @@ from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id
+from synapse.types import get_domain_from_id, RoomStreamToken
 import synapse.metrics
 
 # these are only included to make the type annotations work
@@ -1803,15 +1803,14 @@ class EventsStore(EventsWorkerStore):
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
     def purge_history(
-        self, room_id, topological_ordering, delete_local_events,
+        self, room_id, token, delete_local_events,
     ):
         """Deletes room history before a certain point
 
         Args:
             room_id (str):
 
-            topological_ordering (int):
-                minimum topo ordering to preserve
+            token (str): A topological token to delete events before
 
             delete_local_events (bool):
                 if True, we will delete local events as well as remote ones
@@ -1821,12 +1820,12 @@ class EventsStore(EventsWorkerStore):
 
         return self.runInteraction(
             "purge_history",
-            self._purge_history_txn, room_id, topological_ordering,
+            self._purge_history_txn, room_id, token,
             delete_local_events,
         )
 
     def _purge_history_txn(
-        self, txn, room_id, topological_ordering, delete_local_events,
+        self, txn, room_id, token, delete_local_events,
     ):
         # Tables that should be pruned:
         #     event_auth
@@ -1856,6 +1855,8 @@ class EventsStore(EventsWorkerStore):
         # furthermore, we might already have the table from a previous (failed)
         # purge attempt, so let's drop the table first.
 
+        token = RoomStreamToken.parse(token)
+
         txn.execute("DROP TABLE IF EXISTS events_to_purge")
 
         txn.execute(
@@ -1888,7 +1889,7 @@ class EventsStore(EventsWorkerStore):
         rows = txn.fetchall()
         max_depth = max(row[0] for row in rows)
 
-        if max_depth <= topological_ordering:
+        if max_depth <= token.topological:
             # We need to ensure we don't delete all the events from the datanase
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
@@ -1904,7 +1905,7 @@ class EventsStore(EventsWorkerStore):
             should_delete_expr += " AND event_id NOT LIKE ?"
             should_delete_params += ("%:" + self.hs.hostname, )
 
-        should_delete_params += (room_id, topological_ordering)
+        should_delete_params += (room_id, token.topological)
 
         txn.execute(
             "INSERT INTO events_to_purge"