summary refs log tree commit diff
diff options
context:
space:
mode:
authorDaniel Wagner-Hall <dawagner@gmail.com>2015-09-22 13:59:37 +0100
committerDaniel Wagner-Hall <dawagner@gmail.com>2015-09-22 13:59:37 +0100
commitf17aadd1b52463b51b8532cda074094ff2d0339b (patch)
tree2324815f78ea2fe5eebe29472ca5d169dd274953
parentMerge pull request #276 from matrix-org/markjh/history_for_rooms_that_have_be... (diff)
parentAdd some docstrings (diff)
downloadsynapse-f17aadd1b52463b51b8532cda074094ff2d0339b.tar.xz
Merge pull request #285 from matrix-org/daniel/metrics-2
Implement configurable stats reporting
Diffstat (limited to '')
-rwxr-xr-xsynapse/app/homeserver.py38
-rwxr-xr-xsynapse/app/synctl.py12
-rw-r--r--synapse/config/_base.py45
-rw-r--r--synapse/config/appservice.py2
-rw-r--r--synapse/config/captcha.py2
-rw-r--r--synapse/config/database.py2
-rw-r--r--synapse/config/key.py2
-rw-r--r--synapse/config/logger.py2
-rw-r--r--synapse/config/metrics.py8
-rw-r--r--synapse/config/ratelimiting.py2
-rw-r--r--synapse/config/registration.py2
-rw-r--r--synapse/config/repository.py2
-rw-r--r--synapse/config/saml2.py2
-rw-r--r--synapse/config/server.py2
-rw-r--r--synapse/config/tls.py2
-rw-r--r--synapse/config/voip.py2
-rw-r--r--synapse/storage/__init__.py23
-rw-r--r--synapse/storage/events.py64
-rw-r--r--synapse/storage/registration.py13
-rw-r--r--synapse/storage/schema/delta/24/stats_reporting.sql22
-rw-r--r--tests/storage/event_injector.py81
-rw-r--r--tests/storage/test_events.py116
-rw-r--r--tests/storage/test_room.py2
-rw-r--r--tests/storage/test_stream.py68
24 files changed, 438 insertions, 78 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 15c0a4a003..21840e4a28 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -42,7 +42,7 @@ from synapse.storage import (
 from synapse.server import HomeServer
 
 
-from twisted.internet import reactor
+from twisted.internet import reactor, task, defer
 from twisted.application import service
 from twisted.enterprise import adbapi
 from twisted.web.resource import Resource, EncodingResourceWrapper
@@ -677,6 +677,42 @@ def run(hs):
         ThreadPool._worker = profile(ThreadPool._worker)
         reactor.run = profile(reactor.run)
 
+    start_time = hs.get_clock().time()
+
+    @defer.inlineCallbacks
+    def phone_stats_home():
+        now = int(hs.get_clock().time())
+        uptime = int(now - start_time)
+        if uptime < 0:
+            uptime = 0
+
+        stats = {}
+        stats["homeserver"] = hs.config.server_name
+        stats["timestamp"] = now
+        stats["uptime_seconds"] = uptime
+        stats["total_users"] = yield hs.get_datastore().count_all_users()
+
+        all_rooms = yield hs.get_datastore().get_rooms(False)
+        stats["total_room_count"] = len(all_rooms)
+
+        stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
+        daily_messages = yield hs.get_datastore().count_daily_messages()
+        if daily_messages is not None:
+            stats["daily_messages"] = daily_messages
+
+        logger.info("Reporting stats to matrix.org: %s" % (stats,))
+        try:
+            yield hs.get_simple_http_client().put_json(
+                "https://matrix.org/report-usage-stats/push",
+                stats
+            )
+        except Exception as e:
+            logger.warn("Error reporting stats: %s", e)
+
+    if hs.config.report_stats:
+        phone_home_task = task.LoopingCall(phone_stats_home)
+        phone_home_task.start(60 * 60 * 24, now=False)
+
     def in_thread():
         with LoggingContext("run"):
             change_resource_limit(hs.config.soft_file_limit)
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 1f7d543c31..6bcc437591 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -25,6 +25,7 @@ SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"]
 CONFIGFILE = "homeserver.yaml"
 
 GREEN = "\x1b[1;32m"
+RED = "\x1b[1;31m"
 NORMAL = "\x1b[m"
 
 if not os.path.exists(CONFIGFILE):
@@ -45,8 +46,15 @@ def start():
     print "Starting ...",
     args = SYNAPSE
     args.extend(["--daemonize", "-c", CONFIGFILE])
-    subprocess.check_call(args)
-    print GREEN + "started" + NORMAL
+    try:
+        subprocess.check_call(args)
+        print GREEN + "started" + NORMAL
+    except subprocess.CalledProcessError as e:
+        print (
+            RED +
+            "error starting (exit code: %d); see above for logs" % e.returncode +
+            NORMAL
+        )
 
 
 def stop():
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 8a75c48733..ceef309afc 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -26,6 +26,16 @@ class ConfigError(Exception):
 
 class Config(object):
 
+    stats_reporting_begging_spiel = (
+        "We would really appreciate it if you could help our project out by"
+        " reporting anonymized usage statistics from your homeserver. Only very"
+        " basic aggregate data (e.g. number of users) will be reported, but it"
+        " helps us to track the growth of the Matrix community, and helps us to"
+        " make Matrix a success, as well as to convince other networks that they"
+        " should peer with us."
+        "\nThank you."
+    )
+
     @staticmethod
     def parse_size(value):
         if isinstance(value, int) or isinstance(value, long):
@@ -111,11 +121,14 @@ class Config(object):
                 results.append(getattr(cls, name)(self, *args, **kargs))
         return results
 
-    def generate_config(self, config_dir_path, server_name):
+    def generate_config(self, config_dir_path, server_name, report_stats=None):
         default_config = "# vim:ft=yaml\n"
 
         default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all(
-            "default_config", config_dir_path, server_name
+            "default_config",
+            config_dir_path=config_dir_path,
+            server_name=server_name,
+            report_stats=report_stats,
         ))
 
         config = yaml.load(default_config)
@@ -140,6 +153,12 @@ class Config(object):
             help="Generate a config file for the server name"
         )
         config_parser.add_argument(
+            "--report-stats",
+            action="store",
+            help="Stuff",
+            choices=["yes", "no"]
+        )
+        config_parser.add_argument(
             "--generate-keys",
             action="store_true",
             help="Generate any missing key files then exit"
@@ -189,6 +208,11 @@ class Config(object):
                     config_files.append(config_path)
 
         if config_args.generate_config:
+            if config_args.report_stats is None:
+                config_parser.error(
+                    "Please specify either --report-stats=yes or --report-stats=no\n\n" +
+                    cls.stats_reporting_begging_spiel
+                )
             if not config_files:
                 config_parser.error(
                     "Must supply a config file.\nA config file can be automatically"
@@ -211,7 +235,9 @@ class Config(object):
                     os.makedirs(config_dir_path)
                 with open(config_path, "wb") as config_file:
                     config_bytes, config = obj.generate_config(
-                        config_dir_path, server_name
+                        config_dir_path=config_dir_path,
+                        server_name=server_name,
+                        report_stats=(config_args.report_stats == "yes"),
                     )
                     obj.invoke_all("generate_files", config)
                     config_file.write(config_bytes)
@@ -261,9 +287,20 @@ class Config(object):
             specified_config.update(yaml_config)
 
         server_name = specified_config["server_name"]
-        _, config = obj.generate_config(config_dir_path, server_name)
+        _, config = obj.generate_config(
+            config_dir_path=config_dir_path,
+            server_name=server_name
+        )
         config.pop("log_config")
         config.update(specified_config)
+        if "report_stats" not in config:
+            sys.stderr.write(
+                "Please opt in or out of reporting anonymized homeserver usage "
+                "statistics, by setting the report_stats key in your config file "
+                " ( " + config_path + " ) " +
+                "to either True or False.\n\n" +
+                Config.stats_reporting_begging_spiel + "\n")
+            sys.exit(1)
 
         if generate_keys:
             obj.invoke_all("generate_files", config)
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 38f41933b7..b8d301995e 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -20,7 +20,7 @@ class AppServiceConfig(Config):
     def read_config(self, config):
         self.app_service_config_files = config.get("app_service_config_files", [])
 
-    def default_config(cls, config_dir_path, server_name):
+    def default_config(cls, **kwargs):
         return """\
         # A list of application service config file to use
         app_service_config_files: []
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index 15a132b4e3..dd92fcd0dc 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -24,7 +24,7 @@ class CaptchaConfig(Config):
         self.captcha_bypass_secret = config.get("captcha_bypass_secret")
         self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"]
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, **kwargs):
         return """\
         ## Captcha ##
 
diff --git a/synapse/config/database.py b/synapse/config/database.py
index f0611e8884..baeda8f300 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -45,7 +45,7 @@ class DatabaseConfig(Config):
 
         self.set_databasepath(config.get("database_path"))
 
-    def default_config(self, config, config_dir_path):
+    def default_config(self, **kwargs):
         database_path = self.abspath("homeserver.db")
         return """\
         # Database configuration
diff --git a/synapse/config/key.py b/synapse/config/key.py
index 23ac8a3fca..2c187065e5 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -40,7 +40,7 @@ class KeyConfig(Config):
             config["perspectives"]
         )
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, config_dir_path, server_name, **kwargs):
         base_key_name = os.path.join(config_dir_path, server_name)
         return """\
         ## Signing Keys ##
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index daca698d0c..bd0c17c861 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -70,7 +70,7 @@ class LoggingConfig(Config):
         self.log_config = self.abspath(config.get("log_config"))
         self.log_file = self.abspath(config.get("log_file"))
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, config_dir_path, server_name, **kwargs):
         log_file = self.abspath("homeserver.log")
         log_config = self.abspath(
             os.path.join(config_dir_path, server_name + ".log.config")
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
index ae5a691527..825fec9a38 100644
--- a/synapse/config/metrics.py
+++ b/synapse/config/metrics.py
@@ -19,13 +19,15 @@ from ._base import Config
 class MetricsConfig(Config):
     def read_config(self, config):
         self.enable_metrics = config["enable_metrics"]
+        self.report_stats = config.get("report_stats", None)
         self.metrics_port = config.get("metrics_port")
         self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
 
-    def default_config(self, config_dir_path, server_name):
-        return """\
+    def default_config(self, report_stats=None, **kwargs):
+        suffix = "" if report_stats is None else "report_stats: %(report_stats)s\n"
+        return ("""\
         ## Metrics ###
 
         # Enable collection and rendering of performance metrics
         enable_metrics: False
-        """
+        """ + suffix) % locals()
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 76d9970e5b..611b598ec7 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -27,7 +27,7 @@ class RatelimitConfig(Config):
         self.federation_rc_reject_limit = config["federation_rc_reject_limit"]
         self.federation_rc_concurrent = config["federation_rc_concurrent"]
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, **kwargs):
         return """\
         ## Ratelimiting ##
 
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 62de4b399f..fa98eced34 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -34,7 +34,7 @@ class RegistrationConfig(Config):
         self.registration_shared_secret = config.get("registration_shared_secret")
         self.macaroon_secret_key = config.get("macaroon_secret_key")
 
-    def default_config(self, config_dir, server_name):
+    def default_config(self, **kwargs):
         registration_shared_secret = random_string_with_symbols(50)
         macaroon_secret_key = random_string_with_symbols(50)
         return """\
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 64644b9a7a..2fcf872449 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -60,7 +60,7 @@ class ContentRepositoryConfig(Config):
             config["thumbnail_sizes"]
         )
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, **kwargs):
         media_store = self.default_path("media_store")
         uploads_path = self.default_path("uploads")
         return """
diff --git a/synapse/config/saml2.py b/synapse/config/saml2.py
index 1532036876..4c6133cf22 100644
--- a/synapse/config/saml2.py
+++ b/synapse/config/saml2.py
@@ -41,7 +41,7 @@ class SAML2Config(Config):
             self.saml2_config_path = None
             self.saml2_idp_redirect_url = None
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, config_dir_path, server_name, **kwargs):
         return """
         # Enable SAML2 for registration and login. Uses pysaml2
         # config_path:      Path to the sp_conf.py configuration file
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a03e55c223..4d12d49857 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -117,7 +117,7 @@ class ServerConfig(Config):
 
         self.content_addr = content_addr
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, server_name, **kwargs):
         if ":" in server_name:
             bind_port = int(server_name.split(":")[1])
             unsecure_port = bind_port - 400
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index e6023a718d..0ac2698293 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -50,7 +50,7 @@ class TlsConfig(Config):
             "use_insecure_ssl_client_just_for_testing_do_not_use"
         )
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, config_dir_path, server_name, **kwargs):
         base_key_name = os.path.join(config_dir_path, server_name)
 
         tls_certificate_path = base_key_name + ".tls.crt"
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index a1707223d3..a093354ccd 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -22,7 +22,7 @@ class VoipConfig(Config):
         self.turn_shared_secret = config["turn_shared_secret"]
         self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
 
-    def default_config(self, config_dir_path, server_name):
+    def default_config(self, **kwargs):
         return """\
         ## Turn ##
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 77cb1dbd81..340e59afcb 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -54,7 +54,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 23
+SCHEMA_VERSION = 24
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -126,6 +126,27 @@ class DataStore(RoomMemberStore, RoomStore,
             lock=False,
         )
 
+    @defer.inlineCallbacks
+    def count_daily_users(self):
+        """
+        Counts the number of users who used this homeserver in the last 24 hours.
+        """
+        def _count_users(txn):
+            txn.execute(
+                "SELECT COUNT(DISTINCT user_id) AS users"
+                " FROM user_ips"
+                " WHERE last_seen > ?",
+                # This is close enough to a day for our purposes.
+                (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
+            )
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
+
     def get_user_ip_and_agents(self, user):
         return self._simple_select_list(
             table="user_ips",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0a477e3122..46df6b4d6d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from _base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer, reactor
@@ -28,6 +27,7 @@ from canonicaljson import encode_canonical_json
 from contextlib import contextmanager
 
 import logging
+import math
 import ujson as json
 
 logger = logging.getLogger(__name__)
@@ -905,3 +905,65 @@ class EventsStore(SQLBaseStore):
         txn.execute(sql, (event.event_id,))
         result = txn.fetchone()
         return result[0] if result else None
+
+    @defer.inlineCallbacks
+    def count_daily_messages(self):
+        """
+        Returns an estimate of the number of messages sent in the last day.
+
+        If it has been significantly less or more than one day since the last
+        call to this function, it will return None.
+        """
+        def _count_messages(txn):
+            now = self.hs.get_clock().time()
+
+            txn.execute(
+                "SELECT reported_stream_token, reported_time FROM stats_reporting"
+            )
+            last_reported = self.cursor_to_dict(txn)
+
+            txn.execute(
+                "SELECT stream_ordering"
+                " FROM events"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT 1"
+            )
+            now_reporting = self.cursor_to_dict(txn)
+            if not now_reporting:
+                return None
+            now_reporting = now_reporting[0]["stream_ordering"]
+
+            txn.execute("DELETE FROM stats_reporting")
+            txn.execute(
+                "INSERT INTO stats_reporting"
+                " (reported_stream_token, reported_time)"
+                " VALUES (?, ?)",
+                (now_reporting, now,)
+            )
+
+            if not last_reported:
+                return None
+
+            # Close enough to correct for our purposes.
+            yesterday = (now - 24 * 60 * 60)
+            if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+                return None
+
+            txn.execute(
+                "SELECT COUNT(*) as messages"
+                " FROM events NATURAL JOIN event_json"
+                " WHERE json like '%m.room.message%'"
+                " AND stream_ordering > ?"
+                " AND stream_ordering <= ?",
+                (
+                    last_reported[0]["reported_stream_token"],
+                    now_reporting,
+                )
+            )
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+            return rows[0]["messages"]
+
+        ret = yield self.runInteraction("count_messages", _count_messages)
+        defer.returnValue(ret)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c9ceb132ae..b454dd5b3a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -289,3 +289,16 @@ class RegistrationStore(SQLBaseStore):
         if ret:
             defer.returnValue(ret['user_id'])
         defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def count_all_users(self):
+        """Counts all users registered on the homeserver."""
+        def _count_users(txn):
+            txn.execute("SELECT COUNT(*) AS users FROM users")
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql
new file mode 100644
index 0000000000..e9165d2917
--- /dev/null
+++ b/synapse/storage/schema/delta/24/stats_reporting.sql
@@ -0,0 +1,22 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Should only ever contain one row
+CREATE TABLE IF NOT EXISTS stats_reporting(
+  -- The stream ordering token which was most recently reported as stats
+  reported_stream_token INTEGER,
+  -- The time (seconds since epoch) stats were most recently reported
+  reported_time BIGINT
+);
diff --git a/tests/storage/event_injector.py b/tests/storage/event_injector.py
new file mode 100644
index 0000000000..42bd8928bd
--- /dev/null
+++ b/tests/storage/event_injector.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from tests import unittest
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.types import UserID, RoomID
+
+from tests.utils import setup_test_homeserver
+
+from mock import Mock
+
+
+class EventInjector:
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.message_handler = hs.get_handlers().message_handler
+        self.event_builder_factory = hs.get_event_builder_factory()
+
+    @defer.inlineCallbacks
+    def create_room(self, room):
+        builder = self.event_builder_factory.new({
+            "type": EventTypes.Create,
+            "room_id": room.to_string(),
+            "content": {},
+        })
+
+        event, context = yield self.message_handler._create_new_client_event(
+            builder
+        )
+
+        yield self.store.persist_event(event, context)
+
+    @defer.inlineCallbacks
+    def inject_room_member(self, room, user, membership):
+        builder = self.event_builder_factory.new({
+            "type": EventTypes.Member,
+            "sender": user.to_string(),
+            "state_key": user.to_string(),
+            "room_id": room.to_string(),
+            "content": {"membership": membership},
+        })
+
+        event, context = yield self.message_handler._create_new_client_event(
+            builder
+        )
+
+        yield self.store.persist_event(event, context)
+
+        defer.returnValue(event)
+
+    @defer.inlineCallbacks
+    def inject_message(self, room, user, body):
+        builder = self.event_builder_factory.new({
+            "type": EventTypes.Message,
+            "sender": user.to_string(),
+            "state_key": user.to_string(),
+            "room_id": room.to_string(),
+            "content": {"body": body, "msgtype": u"message"},
+        })
+
+        event, context = yield self.message_handler._create_new_client_event(
+            builder
+        )
+
+        yield self.store.persist_event(event, context)
diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py
new file mode 100644
index 0000000000..313013009e
--- /dev/null
+++ b/tests/storage/test_events.py
@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import uuid
+from mock.mock import Mock
+from synapse.types import RoomID, UserID
+
+from tests import unittest
+from twisted.internet import defer
+from tests.storage.event_injector import EventInjector
+
+from tests.utils import setup_test_homeserver
+
+
+class EventsStoreTestCase(unittest.TestCase):
+
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.hs = yield setup_test_homeserver(
+            resource_for_federation=Mock(),
+            http_client=None,
+        )
+        self.store = self.hs.get_datastore()
+        self.db_pool = self.hs.get_db_pool()
+        self.message_handler = self.hs.get_handlers().message_handler
+        self.event_injector = EventInjector(self.hs)
+
+    @defer.inlineCallbacks
+    def test_count_daily_messages(self):
+        self.db_pool.runQuery("DELETE FROM stats_reporting")
+
+        self.hs.clock.now = 100
+
+        # Never reported before, and nothing which could be reported
+        count = yield self.store.count_daily_messages()
+        self.assertIsNone(count)
+        count = yield self.db_pool.runQuery("SELECT COUNT(*) FROM stats_reporting")
+        self.assertEqual([(0,)], count)
+
+        # Create something to report
+        room = RoomID.from_string("!abc123:test")
+        user = UserID.from_string("@raccoonlover:test")
+        yield self.event_injector.create_room(room)
+
+        self.base_event = yield self._get_last_stream_token()
+
+        yield self.event_injector.inject_message(room, user, "Raccoons are really cute")
+
+        # Never reported before, something could be reported, but isn't because
+        # it isn't old enough.
+        count = yield self.store.count_daily_messages()
+        self.assertIsNone(count)
+        self._assert_stats_reporting(1, self.hs.clock.now)
+
+        # Already reported yesterday, two new events from today.
+        yield self.event_injector.inject_message(room, user, "Yeah they are!")
+        yield self.event_injector.inject_message(room, user, "Incredibly!")
+        self.hs.clock.now += 60 * 60 * 24
+        count = yield self.store.count_daily_messages()
+        self.assertEqual(2, count)  # 2 since yesterday
+        self._assert_stats_reporting(3, self.hs.clock.now)  # 3 ever
+
+        # Last reported too recently.
+        yield self.event_injector.inject_message(room, user, "Who could disagree?")
+        self.hs.clock.now += 60 * 60 * 22
+        count = yield self.store.count_daily_messages()
+        self.assertIsNone(count)
+        self._assert_stats_reporting(4, self.hs.clock.now)
+
+        # Last reported too long ago
+        yield self.event_injector.inject_message(room, user, "No one.")
+        self.hs.clock.now += 60 * 60 * 26
+        count = yield self.store.count_daily_messages()
+        self.assertIsNone(count)
+        self._assert_stats_reporting(5, self.hs.clock.now)
+
+        # And now let's actually report something
+        yield self.event_injector.inject_message(room, user, "Indeed.")
+        yield self.event_injector.inject_message(room, user, "Indeed.")
+        yield self.event_injector.inject_message(room, user, "Indeed.")
+        # A little over 24 hours is fine :)
+        self.hs.clock.now += (60 * 60 * 24) + 50
+        count = yield self.store.count_daily_messages()
+        self.assertEqual(3, count)
+        self._assert_stats_reporting(8, self.hs.clock.now)
+
+    @defer.inlineCallbacks
+    def _get_last_stream_token(self):
+        rows = yield self.db_pool.runQuery(
+            "SELECT stream_ordering"
+            " FROM events"
+            " ORDER BY stream_ordering DESC"
+            " LIMIT 1"
+        )
+        if not rows:
+            defer.returnValue(0)
+        else:
+            defer.returnValue(rows[0][0])
+
+    @defer.inlineCallbacks
+    def _assert_stats_reporting(self, messages, time):
+        rows = yield self.db_pool.runQuery(
+            "SELECT reported_stream_token, reported_time FROM stats_reporting"
+        )
+        self.assertEqual([(self.base_event + messages, time,)], rows)
diff --git a/tests/storage/test_room.py b/tests/storage/test_room.py
index ab7625a3ca..caffce64e3 100644
--- a/tests/storage/test_room.py
+++ b/tests/storage/test_room.py
@@ -85,7 +85,7 @@ class RoomEventsStoreTestCase(unittest.TestCase):
         # Room events need the full datastore, for persist_event() and
         # get_room_state()
         self.store = hs.get_datastore()
-        self.event_factory = hs.get_event_factory();
+        self.event_factory = hs.get_event_factory()
 
         self.room = RoomID.from_string("!abcde:test")
 
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index 0c9b89d765..a658a789aa 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.types import UserID, RoomID
+from tests.storage.event_injector import EventInjector
 
 from tests.utils import setup_test_homeserver
 
@@ -36,6 +37,7 @@ class StreamStoreTestCase(unittest.TestCase):
 
         self.store = hs.get_datastore()
         self.event_builder_factory = hs.get_event_builder_factory()
+        self.event_injector = EventInjector(hs)
         self.handlers = hs.get_handlers()
         self.message_handler = self.handlers.message_handler
 
@@ -45,60 +47,20 @@ class StreamStoreTestCase(unittest.TestCase):
         self.room1 = RoomID.from_string("!abc123:test")
         self.room2 = RoomID.from_string("!xyx987:test")
 
-        self.depth = 1
-
-    @defer.inlineCallbacks
-    def inject_room_member(self, room, user, membership):
-        self.depth += 1
-
-        builder = self.event_builder_factory.new({
-            "type": EventTypes.Member,
-            "sender": user.to_string(),
-            "state_key": user.to_string(),
-            "room_id": room.to_string(),
-            "content": {"membership": membership},
-        })
-
-        event, context = yield self.message_handler._create_new_client_event(
-            builder
-        )
-
-        yield self.store.persist_event(event, context)
-
-        defer.returnValue(event)
-
-    @defer.inlineCallbacks
-    def inject_message(self, room, user, body):
-        self.depth += 1
-
-        builder = self.event_builder_factory.new({
-            "type": EventTypes.Message,
-            "sender": user.to_string(),
-            "state_key": user.to_string(),
-            "room_id": room.to_string(),
-            "content": {"body": body, "msgtype": u"message"},
-        })
-
-        event, context = yield self.message_handler._create_new_client_event(
-            builder
-        )
-
-        yield self.store.persist_event(event, context)
-
     @defer.inlineCallbacks
     def test_event_stream_get_other(self):
         # Both bob and alice joins the room
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_alice, Membership.JOIN
         )
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_bob, Membership.JOIN
         )
 
         # Initial stream key:
         start = yield self.store.get_room_events_max_id()
 
-        yield self.inject_message(self.room1, self.u_alice, u"test")
+        yield self.event_injector.inject_message(self.room1, self.u_alice, u"test")
 
         end = yield self.store.get_room_events_max_id()
 
@@ -125,17 +87,17 @@ class StreamStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_event_stream_get_own(self):
         # Both bob and alice joins the room
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_alice, Membership.JOIN
         )
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_bob, Membership.JOIN
         )
 
         # Initial stream key:
         start = yield self.store.get_room_events_max_id()
 
-        yield self.inject_message(self.room1, self.u_alice, u"test")
+        yield self.event_injector.inject_message(self.room1, self.u_alice, u"test")
 
         end = yield self.store.get_room_events_max_id()
 
@@ -162,22 +124,22 @@ class StreamStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_event_stream_join_leave(self):
         # Both bob and alice joins the room
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_alice, Membership.JOIN
         )
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_bob, Membership.JOIN
         )
 
         # Then bob leaves again.
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_bob, Membership.LEAVE
         )
 
         # Initial stream key:
         start = yield self.store.get_room_events_max_id()
 
-        yield self.inject_message(self.room1, self.u_alice, u"test")
+        yield self.event_injector.inject_message(self.room1, self.u_alice, u"test")
 
         end = yield self.store.get_room_events_max_id()
 
@@ -193,17 +155,17 @@ class StreamStoreTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_event_stream_prev_content(self):
-        yield self.inject_room_member(
+        yield self.event_injector.inject_room_member(
             self.room1, self.u_bob, Membership.JOIN
         )
 
-        event1 = yield self.inject_room_member(
+        event1 = yield self.event_injector.inject_room_member(
             self.room1, self.u_alice, Membership.JOIN
         )
 
         start = yield self.store.get_room_events_max_id()
 
-        event2 = yield self.inject_room_member(
+        event2 = yield self.event_injector.inject_room_member(
             self.room1, self.u_alice, Membership.JOIN,
         )