diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6454045c2d..01f8339825 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
class EventPushActionsWorkerStore(SQLBaseStore):
+ def __init__(self, db_conn, hs):
+ super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
+
+ # These get correctly set by _find_stream_orderings_for_times_txn
+ self.stream_ordering_month_ago = None
+ self.stream_ordering_day_ago = None
+
+ cur = LoggingTransaction(
+ db_conn.cursor(),
+ name="_find_stream_orderings_for_times_txn",
+ database_engine=self.database_engine,
+ after_callbacks=[],
+ exception_callbacks=[],
+ )
+ self._find_stream_orderings_for_times_txn(cur)
+ cur.close()
+
+ self.find_stream_orderings_looping_call = self._clock.looping_call(
+ self._find_stream_orderings_for_times, 10 * 60 * 1000
+ )
+
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
@@ -443,6 +464,128 @@ class EventPushActionsWorkerStore(SQLBaseStore):
desc="remove_push_actions_from_staging",
)
+ @defer.inlineCallbacks
+ def _find_stream_orderings_for_times(self):
+ yield self.runInteraction(
+ "_find_stream_orderings_for_times",
+ self._find_stream_orderings_for_times_txn
+ )
+
+ def _find_stream_orderings_for_times_txn(self, txn):
+ logger.info("Searching for stream ordering 1 month ago")
+ self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 month ago: it's %d",
+ self.stream_ordering_month_ago
+ )
+ logger.info("Searching for stream ordering 1 day ago")
+ self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 day ago: it's %d",
+ self.stream_ordering_day_ago
+ )
+
+ def find_first_stream_ordering_after_ts(self, ts):
+ """Gets the stream ordering corresponding to a given timestamp.
+
+ Specifically, finds the stream_ordering of the first event that was
+ received on or after the timestamp. This is done by a binary search on
+ the events table, since there is no index on received_ts, so is
+ relatively slow.
+
+ Args:
+ ts (int): timestamp in millis
+
+ Returns:
+ Deferred[int]: stream ordering of the first event received on/after
+ the timestamp
+ """
+ return self.runInteraction(
+ "_find_first_stream_ordering_after_ts_txn",
+ self._find_first_stream_ordering_after_ts_txn,
+ ts,
+ )
+
+ @staticmethod
+ def _find_first_stream_ordering_after_ts_txn(txn, ts):
+ """
+ Find the stream_ordering of the first event that was received on or
+ after a given timestamp. This is relatively slow as there is no index
+ on received_ts but we can then use this to delete push actions before
+ this.
+
+ received_ts must necessarily be in the same order as stream_ordering
+ and stream_ordering is indexed, so we manually binary search using
+ stream_ordering
+
+ Args:
+ txn (twisted.enterprise.adbapi.Transaction):
+ ts (int): timestamp to search for
+
+ Returns:
+ int: stream ordering
+ """
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ max_stream_ordering = txn.fetchone()[0]
+
+ if max_stream_ordering is None:
+ return 0
+
+ # We want the first stream_ordering in which received_ts is greater
+ # than or equal to ts. Call this point X.
+ #
+ # We maintain the invariants:
+ #
+ # range_start <= X <= range_end
+ #
+ range_start = 0
+ range_end = max_stream_ordering + 1
+
+ # Given a stream_ordering, look up the timestamp at that
+ # stream_ordering.
+ #
+ # The array may be sparse (we may be missing some stream_orderings).
+ # We treat the gaps as the same as having the same value as the
+ # preceding entry, because we will pick the lowest stream_ordering
+ # which satisfies our requirement of received_ts >= ts.
+ #
+ # For example, if our array of events indexed by stream_ordering is
+ # [10, <none>, 20], we should treat this as being equivalent to
+ # [10, 10, 20].
+ #
+ sql = (
+ "SELECT received_ts FROM events"
+ " WHERE stream_ordering <= ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT 1"
+ )
+
+ while range_end - range_start > 0:
+ middle = (range_end + range_start) // 2
+ txn.execute(sql, (middle,))
+ row = txn.fetchone()
+ if row is None:
+ # no rows with stream_ordering<=middle
+ range_start = middle + 1
+ continue
+
+ middle_ts = row[0]
+ if ts > middle_ts:
+ # we got a timestamp lower than the one we were looking for.
+ # definitely need to look higher: X > middle.
+ range_start = middle + 1
+ else:
+ # we got a timestamp higher than (or the same as) the one we
+ # were looking for. We aren't yet sure about the point we
+ # looked up, but we can be sure that X <= middle.
+ range_end = middle
+
+ return range_end
+
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -651,69 +794,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
""", (room_id, user_id, stream_ordering))
@defer.inlineCallbacks
- def _find_stream_orderings_for_times(self):
- yield self.runInteraction(
- "_find_stream_orderings_for_times",
- self._find_stream_orderings_for_times_txn
- )
-
- def _find_stream_orderings_for_times_txn(self, txn):
- logger.info("Searching for stream ordering 1 month ago")
- self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
- txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
- )
- logger.info(
- "Found stream ordering 1 month ago: it's %d",
- self.stream_ordering_month_ago
- )
- logger.info("Searching for stream ordering 1 day ago")
- self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
- txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
- )
- logger.info(
- "Found stream ordering 1 day ago: it's %d",
- self.stream_ordering_day_ago
- )
-
- def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
- """
- Find the stream_ordering of the first event that was received after
- a given timestamp. This is relatively slow as there is no index on
- received_ts but we can then use this to delete push actions before
- this.
-
- received_ts must necessarily be in the same order as stream_ordering
- and stream_ordering is indexed, so we manually binary search using
- stream_ordering
- """
- txn.execute("SELECT MAX(stream_ordering) FROM events")
- max_stream_ordering = txn.fetchone()[0]
-
- if max_stream_ordering is None:
- return 0
-
- range_start = 0
- range_end = max_stream_ordering
-
- sql = (
- "SELECT received_ts FROM events"
- " WHERE stream_ordering > ?"
- " ORDER BY stream_ordering"
- " LIMIT 1"
- )
-
- while range_end - range_start > 1:
- middle = int((range_end + range_start) / 2)
- txn.execute(sql, (middle,))
- middle_ts = txn.fetchone()[0]
- if ts > middle_ts:
- range_start = middle
- else:
- range_end = middle
-
- return range_end
-
- @defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
|