From 7213588083dd9a721b0cd623fe22b308f25f19a5 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 22 Sep 2015 12:57:40 +0100 Subject: Implement configurable stats reporting SYN-287 This requires that HS owners either opt in or out of stats reporting. When --generate-config is passed, --report-stats must be specified If an already-generated config is used, and doesn't have the report_stats key, it is requested to be set. --- synapse/storage/__init__.py | 20 +++++++- synapse/storage/events.py | 58 +++++++++++++++++++++- synapse/storage/registration.py | 12 +++++ .../storage/schema/delta/24/stats_reporting.sql | 22 ++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/schema/delta/24/stats_reporting.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 77cb1dbd81..b64c90d631 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -54,7 +54,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 23 +SCHEMA_VERSION = 24 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -126,6 +126,24 @@ class DataStore(RoomMemberStore, RoomStore, lock=False, ) + @defer.inlineCallbacks + def count_daily_users(self): + def _count_users(txn): + txn.execute( + "SELECT COUNT(DISTINCT user_id) AS users" + " FROM user_ips" + " WHERE last_seen > ?", + # This is close enough to a day for our purposes. + (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),) + ) + rows = self.cursor_to_dict(txn) + if rows: + return rows[0]["users"] + return 0 + + ret = yield self.runInteraction("count_users", _count_users) + defer.returnValue(ret) + def get_user_ip_and_agents(self, user): return self._simple_select_list( table="user_ips", diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0a477e3122..2b51db9940 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from _base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer, reactor @@ -28,6 +27,7 @@ from canonicaljson import encode_canonical_json from contextlib import contextmanager import logging +import math import ujson as json logger = logging.getLogger(__name__) @@ -905,3 +905,59 @@ class EventsStore(SQLBaseStore): txn.execute(sql, (event.event_id,)) result = txn.fetchone() return result[0] if result else None + + @defer.inlineCallbacks + def count_daily_messages(self): + def _count_messages(txn): + now = self.hs.get_clock().time() + + txn.execute( + "SELECT reported_stream_token, reported_time FROM stats_reporting" + ) + last_reported = self.cursor_to_dict(txn) + + txn.execute( + "SELECT stream_ordering" + " FROM events" + " ORDER BY stream_ordering DESC" + " LIMIT 1" + ) + now_reporting = self.cursor_to_dict(txn) + if not now_reporting: + return None + now_reporting = now_reporting[0]["stream_ordering"] + + txn.execute("DELETE FROM stats_reporting") + txn.execute( + "INSERT INTO stats_reporting" + " (reported_stream_token, reported_time)" + " VALUES (?, ?)", + (now_reporting, now,) + ) + + if not last_reported: + return None + + # Close enough to correct for our purposes. + yesterday = (now - 24 * 60 * 60) + if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60: + return None + + txn.execute( + "SELECT COUNT(*) as messages" + " FROM events NATURAL JOIN event_json" + " WHERE json like '%m.room.message%'" + " AND stream_ordering > ?" + " AND stream_ordering <= ?", + ( + last_reported[0]["reported_stream_token"], + now_reporting, + ) + ) + rows = self.cursor_to_dict(txn) + if not rows: + return None + return rows[0]["messages"] + + ret = yield self.runInteraction("count_messages", _count_messages) + defer.returnValue(ret) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index c9ceb132ae..6d76237658 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -289,3 +289,15 @@ class RegistrationStore(SQLBaseStore): if ret: defer.returnValue(ret['user_id']) defer.returnValue(None) + + @defer.inlineCallbacks + def count_all_users(self): + def _count_users(txn): + txn.execute("SELECT COUNT(*) AS users FROM users") + rows = self.cursor_to_dict(txn) + if rows: + return rows[0]["users"] + return 0 + + ret = yield self.runInteraction("count_users", _count_users) + defer.returnValue(ret) diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql new file mode 100644 index 0000000000..e9165d2917 --- /dev/null +++ b/synapse/storage/schema/delta/24/stats_reporting.sql @@ -0,0 +1,22 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Should only ever contain one row +CREATE TABLE IF NOT EXISTS stats_reporting( + -- The stream ordering token which was most recently reported as stats + reported_stream_token INTEGER, + -- The time (seconds since epoch) stats were most recently reported + reported_time BIGINT +); -- cgit 1.4.1