diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index de0b26f437..a4d7430f9b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -67,7 +67,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
"MembershipStreamChangeCache", events_max,
)
- self.stream_ordering_month_ago = 0
self._stream_order_on_start = self.get_room_max_stream_ordering()
# Cached functions can't be accessed through a class instance so we need
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0f136f8a06..b3cdcfdc21 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
-from ._base import LoggingTransaction
from .directory import DirectoryStore
from .events import EventsStore
from .presence import PresenceStore, UserPresenceState
@@ -228,20 +227,6 @@ class DataStore(RoomMemberStore, RoomStore,
prefilled_cache=_group_updates_prefill,
)
- cur = LoggingTransaction(
- db_conn.cursor(),
- name="_find_stream_orderings_for_times_txn",
- database_engine=self.database_engine,
- after_callbacks=[],
- final_callbacks=[],
- )
- self._find_stream_orderings_for_times_txn(cur)
- cur.close()
-
- self.find_stream_orderings_looping_call = self._clock.looping_call(
- self._find_stream_orderings_for_times, 10 * 60 * 1000
- )
-
self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6454045c2d..c08bebe112 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
class EventPushActionsWorkerStore(SQLBaseStore):
+ def __init__(self, db_conn, hs):
+ super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
+
+ # These get correctly ste by _find_stream_orderings_for_times_txn
+ self.stream_ordering_month_ago = 0
+ self.stream_ordering_day_ago = 0
+
+ cur = LoggingTransaction(
+ db_conn.cursor(),
+ name="_find_stream_orderings_for_times_txn",
+ database_engine=self.database_engine,
+ after_callbacks=[],
+ final_callbacks=[],
+ )
+ self._find_stream_orderings_for_times_txn(cur)
+ cur.close()
+
+ self.find_stream_orderings_looping_call = self._clock.looping_call(
+ self._find_stream_orderings_for_times, 10 * 60 * 1000
+ )
+
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
@@ -443,6 +464,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
desc="remove_push_actions_from_staging",
)
+ @defer.inlineCallbacks
+ def _find_stream_orderings_for_times(self):
+ yield self.runInteraction(
+ "_find_stream_orderings_for_times",
+ self._find_stream_orderings_for_times_txn
+ )
+
+ def _find_stream_orderings_for_times_txn(self, txn):
+ logger.info("Searching for stream ordering 1 month ago")
+ self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 month ago: it's %d",
+ self.stream_ordering_month_ago
+ )
+ logger.info("Searching for stream ordering 1 day ago")
+ self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+ txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+ )
+ logger.info(
+ "Found stream ordering 1 day ago: it's %d",
+ self.stream_ordering_day_ago
+ )
+
+ def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+ """
+ Find the stream_ordering of the first event that was received after
+ a given timestamp. This is relatively slow as there is no index on
+ received_ts but we can then use this to delete push actions before
+ this.
+
+ received_ts must necessarily be in the same order as stream_ordering
+ and stream_ordering is indexed, so we manually binary search using
+ stream_ordering
+ """
+ txn.execute("SELECT MAX(stream_ordering) FROM events")
+ max_stream_ordering = txn.fetchone()[0]
+
+ if max_stream_ordering is None:
+ return 0
+
+ range_start = 0
+ range_end = max_stream_ordering
+
+ sql = (
+ "SELECT received_ts FROM events"
+ " WHERE stream_ordering > ?"
+ " ORDER BY stream_ordering"
+ " LIMIT 1"
+ )
+
+ while range_end - range_start > 1:
+ middle = int((range_end + range_start) / 2)
+ txn.execute(sql, (middle,))
+ middle_ts = txn.fetchone()[0]
+ if ts > middle_ts:
+ range_start = middle
+ else:
+ range_end = middle
+
+ return range_end
+
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -651,69 +735,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
""", (room_id, user_id, stream_ordering))
@defer.inlineCallbacks
- def _find_stream_orderings_for_times(self):
- yield self.runInteraction(
- "_find_stream_orderings_for_times",
- self._find_stream_orderings_for_times_txn
- )
-
- def _find_stream_orderings_for_times_txn(self, txn):
- logger.info("Searching for stream ordering 1 month ago")
- self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
- txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
- )
- logger.info(
- "Found stream ordering 1 month ago: it's %d",
- self.stream_ordering_month_ago
- )
- logger.info("Searching for stream ordering 1 day ago")
- self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
- txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
- )
- logger.info(
- "Found stream ordering 1 day ago: it's %d",
- self.stream_ordering_day_ago
- )
-
- def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
- """
- Find the stream_ordering of the first event that was received after
- a given timestamp. This is relatively slow as there is no index on
- received_ts but we can then use this to delete push actions before
- this.
-
- received_ts must necessarily be in the same order as stream_ordering
- and stream_ordering is indexed, so we manually binary search using
- stream_ordering
- """
- txn.execute("SELECT MAX(stream_ordering) FROM events")
- max_stream_ordering = txn.fetchone()[0]
-
- if max_stream_ordering is None:
- return 0
-
- range_start = 0
- range_end = max_stream_ordering
-
- sql = (
- "SELECT received_ts FROM events"
- " WHERE stream_ordering > ?"
- " ORDER BY stream_ordering"
- " LIMIT 1"
- )
-
- while range_end - range_start > 1:
- middle = int((range_end + range_start) / 2)
- txn.execute(sql, (middle,))
- middle_ts = txn.fetchone()[0]
- if ts > middle_ts:
- range_start = middle
- else:
- range_end = middle
-
- return range_end
-
- @defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
|