diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d604e7668f..2970df138b 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -231,7 +231,7 @@ class DataStore(RoomMemberStore, RoomStore,
cur.close()
self.find_stream_orderings_looping_call = self._clock.looping_call(
- self._find_stream_orderings_for_times, 60 * 60 * 1000
+ self._find_stream_orderings_for_times, 10 * 60 * 1000
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
@@ -272,17 +272,19 @@ class DataStore(RoomMemberStore, RoomStore,
Counts the number of users who used this homeserver in the last 24 hours.
"""
def _count_users(txn):
- txn.execute(
- "SELECT COUNT(DISTINCT user_id) AS users"
- " FROM user_ips"
- " WHERE last_seen > ?",
- # This is close enough to a day for our purposes.
- (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
- )
- rows = self.cursor_to_dict(txn)
- if rows:
- return rows[0]["users"]
- return 0
+ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
+
+ sql = """
+ SELECT COALESCE(count(*), 0) FROM (
+ SELECT user_id FROM user_ips
+ WHERE last_seen > ?
+ GROUP BY user_id
+ ) u
+ """
+
+ txn.execute(sql, (yesterday,))
+ count, = txn.fetchone()
+ return count
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c4aeb48800..8e7ae73a7d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,7 +38,6 @@ from functools import wraps
import synapse.metrics
import logging
-import math
import ujson as json
# these are only included to make the type annotations work
@@ -1599,69 +1598,40 @@ class EventsStore(SQLBaseStore):
call to this function, it will return None.
"""
def _count_messages(txn):
- now = self.hs.get_clock().time()
-
- txn.execute(
- "SELECT reported_stream_token, reported_time FROM stats_reporting"
- )
- last_reported = self.cursor_to_dict(txn)
-
- txn.execute(
- "SELECT stream_ordering"
- " FROM events"
- " ORDER BY stream_ordering DESC"
- " LIMIT 1"
- )
- now_reporting = self.cursor_to_dict(txn)
- if not now_reporting:
- logger.info("Calculating daily messages skipped; no now_reporting")
- return None
- now_reporting = now_reporting[0]["stream_ordering"]
-
- txn.execute("DELETE FROM stats_reporting")
- txn.execute(
- "INSERT INTO stats_reporting"
- " (reported_stream_token, reported_time)"
- " VALUES (?, ?)",
- (now_reporting, now,)
- )
-
- if not last_reported:
- logger.info("Calculating daily messages skipped; no last_reported")
- return None
-
- # Close enough to correct for our purposes.
- yesterday = (now - 24 * 60 * 60)
- since_yesterday_seconds = yesterday - last_reported[0]["reported_time"]
- any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60
- if any_since_yesterday:
- logger.info(
- "Calculating daily messages skipped; since_yesterday_seconds: %d" %
- (since_yesterday_seconds,)
- )
- return None
-
- txn.execute(
- "SELECT COUNT(*) as messages"
- " FROM events NATURAL JOIN event_json"
- " WHERE json like '%m.room.message%'"
- " AND stream_ordering > ?"
- " AND stream_ordering <= ?",
- (
- last_reported[0]["reported_stream_token"],
- now_reporting,
- )
- )
- rows = self.cursor_to_dict(txn)
- if not rows:
- logger.info("Calculating daily messages skipped; messages count missing")
- return None
- return rows[0]["messages"]
+ sql = """
+ SELECT COALESCE(COUNT(*), 0) FROM events
+ WHERE type = 'm.room.message'
+ AND stream_ordering > ?
+ """
+ txn.execute(sql, (self.stream_ordering_day_ago,))
+ count, = txn.fetchone()
+ return count
ret = yield self.runInteraction("count_messages", _count_messages)
defer.returnValue(ret)
@defer.inlineCallbacks
+ def count_daily_sent_messages(self):
+ def _count_messages(txn):
+ # This is good enough as if you have silly characters in your own
+ # hostname then thats your own fault.
+ like_clause = "%:" + self.hs.hostname
+
+ sql = """
+ SELECT COALESCE(COUNT(*), 0) FROM events
+ WHERE type = 'm.room.message'
+ AND sender LIKE ?
+ AND stream_ordering > ?
+ """
+
+ txn.execute(sql, (like_clause, self.stream_ordering_day_ago,))
+ count, = txn.fetchone()
+ return count
+
+ ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
|