summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py58
1 files changed, 57 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0a477e3122..2b51db9940 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from _base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer, reactor
@@ -28,6 +27,7 @@ from canonicaljson import encode_canonical_json
 from contextlib import contextmanager
 
 import logging
+import math
 import ujson as json
 
 logger = logging.getLogger(__name__)
@@ -905,3 +905,59 @@ class EventsStore(SQLBaseStore):
         txn.execute(sql, (event.event_id,))
         result = txn.fetchone()
         return result[0] if result else None
+
+    @defer.inlineCallbacks
+    def count_daily_messages(self):
+        def _count_messages(txn):
+            now = self.hs.get_clock().time()
+
+            txn.execute(
+                "SELECT reported_stream_token, reported_time FROM stats_reporting"
+            )
+            last_reported = self.cursor_to_dict(txn)
+
+            txn.execute(
+                "SELECT stream_ordering"
+                " FROM events"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT 1"
+            )
+            now_reporting = self.cursor_to_dict(txn)
+            if not now_reporting:
+                return None
+            now_reporting = now_reporting[0]["stream_ordering"]
+
+            txn.execute("DELETE FROM stats_reporting")
+            txn.execute(
+                "INSERT INTO stats_reporting"
+                " (reported_stream_token, reported_time)"
+                " VALUES (?, ?)",
+                (now_reporting, now,)
+            )
+
+            if not last_reported:
+                return None
+
+            # Close enough to correct for our purposes.
+            yesterday = (now - 24 * 60 * 60)
+            if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+                return None
+
+            txn.execute(
+                "SELECT COUNT(*) as messages"
+                " FROM events NATURAL JOIN event_json"
+                " WHERE json like '%m.room.message%'"
+                " AND stream_ordering > ?"
+                " AND stream_ordering <= ?",
+                (
+                    last_reported[0]["reported_stream_token"],
+                    now_reporting,
+                )
+            )
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+            return rows[0]["messages"]
+
+        ret = yield self.runInteraction("count_messages", _count_messages)
+        defer.returnValue(ret)