diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 77cb1dbd81..b64c90d631 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -54,7 +54,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 23
+SCHEMA_VERSION = 24
dir_path = os.path.abspath(os.path.dirname(__file__))
@@ -126,6 +126,24 @@ class DataStore(RoomMemberStore, RoomStore,
lock=False,
)
+ @defer.inlineCallbacks
+ def count_daily_users(self):
+ def _count_users(txn):
+ txn.execute(
+ "SELECT COUNT(DISTINCT user_id) AS users"
+ " FROM user_ips"
+ " WHERE last_seen > ?",
+ # This is close enough to a day for our purposes.
+ (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
+ )
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]["users"]
+ return 0
+
+ ret = yield self.runInteraction("count_users", _count_users)
+ defer.returnValue(ret)
+
def get_user_ip_and_agents(self, user):
return self._simple_select_list(
table="user_ips",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0a477e3122..2b51db9940 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from _base import SQLBaseStore, _RollbackButIsFineException
from twisted.internet import defer, reactor
@@ -28,6 +27,7 @@ from canonicaljson import encode_canonical_json
from contextlib import contextmanager
import logging
+import math
import ujson as json
logger = logging.getLogger(__name__)
@@ -905,3 +905,59 @@ class EventsStore(SQLBaseStore):
txn.execute(sql, (event.event_id,))
result = txn.fetchone()
return result[0] if result else None
+
+ @defer.inlineCallbacks
+ def count_daily_messages(self):
+ def _count_messages(txn):
+ now = self.hs.get_clock().time()
+
+ txn.execute(
+ "SELECT reported_stream_token, reported_time FROM stats_reporting"
+ )
+ last_reported = self.cursor_to_dict(txn)
+
+ txn.execute(
+ "SELECT stream_ordering"
+ " FROM events"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT 1"
+ )
+ now_reporting = self.cursor_to_dict(txn)
+ if not now_reporting:
+ return None
+ now_reporting = now_reporting[0]["stream_ordering"]
+
+ txn.execute("DELETE FROM stats_reporting")
+ txn.execute(
+ "INSERT INTO stats_reporting"
+ " (reported_stream_token, reported_time)"
+ " VALUES (?, ?)",
+ (now_reporting, now,)
+ )
+
+ if not last_reported:
+ return None
+
+ # Close enough to correct for our purposes.
+ yesterday = (now - 24 * 60 * 60)
+ if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+ return None
+
+ txn.execute(
+ "SELECT COUNT(*) as messages"
+ " FROM events NATURAL JOIN event_json"
+ " WHERE json like '%m.room.message%'"
+ " AND stream_ordering > ?"
+ " AND stream_ordering <= ?",
+ (
+ last_reported[0]["reported_stream_token"],
+ now_reporting,
+ )
+ )
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return None
+ return rows[0]["messages"]
+
+ ret = yield self.runInteraction("count_messages", _count_messages)
+ defer.returnValue(ret)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c9ceb132ae..6d76237658 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -289,3 +289,15 @@ class RegistrationStore(SQLBaseStore):
if ret:
defer.returnValue(ret['user_id'])
defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def count_all_users(self):
+ def _count_users(txn):
+ txn.execute("SELECT COUNT(*) AS users FROM users")
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]["users"]
+ return 0
+
+ ret = yield self.runInteraction("count_users", _count_users)
+ defer.returnValue(ret)
diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql
new file mode 100644
index 0000000000..e9165d2917
--- /dev/null
+++ b/synapse/storage/schema/delta/24/stats_reporting.sql
@@ -0,0 +1,22 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Should only ever contain one row
+CREATE TABLE IF NOT EXISTS stats_reporting(
+ -- The stream ordering token which was most recently reported as stats
+ reported_stream_token INTEGER,
+ -- The time (seconds since epoch) stats were most recently reported
+ reported_time BIGINT
+);
|