summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/__init__.py9
-rw-r--r--synapse/storage/_base.py1
-rw-r--r--synapse/storage/event_push_actions.py71
3 files changed, 68 insertions, 13 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d970fde9e8..49feb77779 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
     def __init__(self, db_conn, hs):
         self.hs = hs
+        self._clock = hs.get_clock()
         self.database_engine = hs.database_engine
 
         self.client_ip_last_seen = Cache(
@@ -173,6 +174,14 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=push_rules_prefill,
         )
 
+        cur = db_conn.cursor()
+        self._find_stream_orderings_for_times_txn(cur)
+        cur.close()
+
+        self.find_stream_orderings_looping_call = self._clock.looping_call(
+            self._find_stream_orderings_for_times, 60 * 60 * 1000
+        )
+
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e0d7098692..56a0dd80f3 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -153,7 +153,6 @@ class SQLBaseStore(object):
     def __init__(self, hs):
         self.hs = hs
         self._db_pool = hs.get_db_pool()
-        self._clock = hs.get_clock()
 
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 336c03c68a..4425d4bce5 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,10 +22,12 @@ import ujson as json
 
 logger = logging.getLogger(__name__)
 
-KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000
-
 
 class EventPushActionsStore(SQLBaseStore):
+    def __init__(self, hs):
+        self.stream_ordering_month_ago = None
+        super(EventPushActionsStore, self).__init__(hs)
+
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
         Args:
@@ -237,9 +239,6 @@ class EventPushActionsStore(SQLBaseStore):
             user_id: user ID to delete for
             topological_ordering: The lowest topological ordering which will
                                   not be deleted.
-
-        Returns:
-
         """
         txn.call_after(
             self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
@@ -259,15 +258,63 @@ class EventPushActionsStore(SQLBaseStore):
         txn.execute(
             "DELETE FROM event_push_actions "
             " WHERE user_id = ? AND room_id = ? AND "
-            " topological_ordering < ? AND stream_ordering < ("
-            "  SELECT stream_ordering FROM events"
-            "  WHERE room_id = ? AND received_ts < ?"
-            "  ORDER BY stream_ordering DESC"
-            "  LIMIT 1"
-            ")",
-            (user_id, room_id, topological_ordering, room_id, threshold)
+            " topological_ordering < ? AND stream_ordering < ?"
+            (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+        )
+
+    @defer.inlineCallbacks
+    def _find_stream_orderings_for_times(self):
+        yield self.runInteraction(
+            "_find_stream_orderings_for_times",
+            self._find_stream_orderings_for_times_txn
+        )
+
+    def _find_stream_orderings_for_times_txn(self, txn):
+        logger.info("Searching for stream ordering 1 month ago")
+        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 month ago: it's %d",
+            self.stream_ordering_month_ago
         )
 
+    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+        """
+        Find the stream_ordering of the first event that was received after
+        a given timestamp. This is relatively slow as there is no index on
+        received_ts but we can then use this to delete push actions before
+        this.
+
+        received_ts must necessarily be in the same order as stream_ordering
+        and stream_ordering is indexed, so we manually binary search using
+        stream_ordering
+        """
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        max_stream_ordering = txn.fetchone()[0]
+
+        range_start = 0
+        range_end = max_stream_ordering
+
+        sql = (
+            "SELECT received_ts FROM events"
+            " WHERE stream_ordering > ?"
+            " ORDER BY stream_ordering"
+            " LIMIT 1"
+        )
+
+        while range_end - range_start > 1:
+            middle = int((range_end + range_start) / 2)
+            txn.execute(sql, (middle,))
+            middle_ts = txn.fetchone()[0]
+            if ts > middle_ts:
+                range_start = middle
+            else:
+                range_end = middle
+        logger.info("done: picking %d from %d and %d", range_end, range_start, range_end)
+
+        return range_end
+
 
 def _action_has_highlight(actions):
     for action in actions: