summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py1
-rw-r--r--synapse/config/homeserver.py42
-rw-r--r--synapse/config/stats.py60
-rw-r--r--synapse/handlers/stats.py325
-rw-r--r--synapse/server.py6
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/events_worker.py24
-rw-r--r--synapse/storage/roommember.py32
-rw-r--r--synapse/storage/schema/delta/54/stats.sql80
-rw-r--r--synapse/storage/state_deltas.py12
-rw-r--r--synapse/storage/stats.py450
11 files changed, 1021 insertions, 13 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 6b347b1749..ee129c8689 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -79,6 +79,7 @@ class EventTypes(object):
 
     RoomHistoryVisibility = "m.room.history_visibility"
     CanonicalAlias = "m.room.canonical_alias"
+    Encryption = "m.room.encryption"
     RoomAvatar = "m.room.avatar"
     RoomEncryption = "m.room.encryption"
     GuestAccess = "m.room.guest_access"
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 727fdc54d8..5c4fc8ff21 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -13,6 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from .api import ApiConfig
 from .appservice import AppServiceConfig
 from .captcha import CaptchaConfig
@@ -36,20 +37,41 @@ from .saml2_config import SAML2Config
 from .server import ServerConfig
 from .server_notices_config import ServerNoticesConfig
 from .spam_checker import SpamCheckerConfig
+from .stats import StatsConfig
 from .tls import TlsConfig
 from .user_directory import UserDirectoryConfig
 from .voip import VoipConfig
 from .workers import WorkerConfig
 
 
-class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig,
-                       RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
-                       VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
-                       AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
-                       JWTConfig, PasswordConfig, EmailConfig,
-                       WorkerConfig, PasswordAuthProviderConfig, PushConfig,
-                       SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,
-                       ConsentConfig,
-                       ServerNoticesConfig, RoomDirectoryConfig,
-                       ):
+class HomeServerConfig(
+    ServerConfig,
+    TlsConfig,
+    DatabaseConfig,
+    LoggingConfig,
+    RatelimitConfig,
+    ContentRepositoryConfig,
+    CaptchaConfig,
+    VoipConfig,
+    RegistrationConfig,
+    MetricsConfig,
+    ApiConfig,
+    AppServiceConfig,
+    KeyConfig,
+    SAML2Config,
+    CasConfig,
+    JWTConfig,
+    PasswordConfig,
+    EmailConfig,
+    WorkerConfig,
+    PasswordAuthProviderConfig,
+    PushConfig,
+    SpamCheckerConfig,
+    GroupsConfig,
+    UserDirectoryConfig,
+    ConsentConfig,
+    StatsConfig,
+    ServerNoticesConfig,
+    RoomDirectoryConfig,
+):
     pass
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
new file mode 100644
index 0000000000..80fc1b9dd0
--- /dev/null
+++ b/synapse/config/stats.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import division
+
+import sys
+
+from ._base import Config
+
+
+class StatsConfig(Config):
+    """Stats Configuration
+    Configuration for the behaviour of synapse's stats engine
+    """
+
+    def read_config(self, config):
+        self.stats_enabled = True
+        self.stats_bucket_size = 86400
+        self.stats_retention = sys.maxsize
+        stats_config = config.get("stats", None)
+        if stats_config:
+            self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
+            self.stats_bucket_size = (
+                self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
+            )
+            self.stats_retention = (
+                self.parse_duration(
+                    stats_config.get("retention", "%ds" % (sys.maxsize,))
+                )
+                / 1000
+            )
+
+    def default_config(self, config_dir_path, server_name, **kwargs):
+        return """
+        # Local statistics collection. Used in populating the room directory.
+        #
+        # 'bucket_size' controls how large each statistics timeslice is. It can
+        # be defined in a human readable short form -- e.g. "1d", "1y".
+        #
+        # 'retention' controls how long historical statistics will be kept for.
+        # It can be defined in a human readable short form -- e.g. "1d", "1y".
+        #
+        #
+        #stats:
+        #   enabled: true
+        #   bucket_size: 1d
+        #   retention: 1y
+        """
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..0e92b405ba
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,325 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import UserID
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(StateDeltasHandler):
+    """Handles keeping the *_stats tables updated with a simple time-series of
+    information about the users, rooms and media on the server, such that admins
+    have some idea of who is consuming their resources.
+
+    Heavily derived from UserDirectoryHandler
+    """
+
+    def __init__(self, hs):
+        super(StatsHandler, self).__init__(hs)
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
+        self.stats_bucket_size = hs.config.stats_bucket_size
+
+        # The current position in the current_state_delta stream
+        self.pos = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if hs.config.stats_enabled:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off so that we don't have to wait for a change before
+            # we start populating stats
+            self.clock.call_later(0, self.notify_new_event)
+
+    def notify_new_event(self):
+        """Called when there may be more deltas to process
+        """
+        if not self.hs.config.stats_enabled:
+            return
+
+        if self._is_processing:
+            return
+
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
+        self._is_processing = True
+        run_as_background_process("stats.notify_new_event", process)
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = yield self.store.get_stats_stream_pos()
+
+        # If still None then the initial background update hasn't happened yet
+        if self.pos is None:
+            defer.returnValue(None)
+
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "stats_delta"):
+                deltas = yield self.store.get_current_state_deltas(self.pos)
+                if not deltas:
+                    return
+
+                logger.info("Handling %d state deltas", len(deltas))
+                yield self._handle_deltas(deltas)
+
+                self.pos = deltas[-1]["stream_id"]
+                yield self.store.update_stats_stream_pos(self.pos)
+
+                event_processing_positions.labels("stats").set(self.pos)
+
+    @defer.inlineCallbacks
+    def _handle_deltas(self, deltas):
+        """
+        Called with the state deltas to process
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            stream_id = delta["stream_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+
+            # If the earliest token to begin from is larger than our current
+            # stream ID, skip processing this delta.
+            if token is not None and token >= stream_id:
+                logger.debug(
+                    "Ignoring: %s as earlier than this room's initial ingestion event",
+                    event_id,
+                )
+                continue
+
+            if event_id is None and prev_event_id is None:
+                # Errr...
+                continue
+
+            event_content = {}
+
+            if event_id is not None:
+                event_content = (yield self.store.get_event(event_id)).content or {}
+
+            # quantise time to the nearest bucket
+            now = yield self.store.get_received_ts(event_id)
+            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+
+            if typ == EventTypes.Member:
+                # we could use _get_key_change here but it's a bit inefficient
+                # given we're not testing for a specific result; might as well
+                # just grab the prev_membership and membership strings and
+                # compare them.
+                prev_event_content = {}
+                if prev_event_id is not None:
+                    prev_event_content = (
+                        yield self.store.get_event(prev_event_id)
+                    ).content
+
+                membership = event_content.get("membership", Membership.LEAVE)
+                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
+
+                if prev_membership == membership:
+                    continue
+
+                if prev_membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", -1
+                    )
+                elif prev_membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", -1
+                    )
+                elif prev_membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", -1
+                    )
+                elif prev_membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", -1
+                    )
+                else:
+                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                if membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", +1
+                    )
+                elif membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", +1
+                    )
+                elif membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", +1
+                    )
+                elif membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", +1
+                    )
+                else:
+                    err = "%s is not a valid membership" % (repr(membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                user_id = state_key
+                if self.is_mine_id(user_id):
+                    # update user_stats as it's one of our users
+                    public = yield self._is_public_room(room_id)
+
+                    if membership == Membership.LEAVE:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            -1,
+                        )
+                    elif membership == Membership.JOIN:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            +1,
+                        )
+
+            elif typ == EventTypes.Create:
+                # Newly created room. Add it with all blank portions.
+                yield self.store.update_room_state(
+                    room_id,
+                    {
+                        "join_rules": None,
+                        "history_visibility": None,
+                        "encryption": None,
+                        "name": None,
+                        "topic": None,
+                        "avatar": None,
+                        "canonical_alias": None,
+                    },
+                )
+
+            elif typ == EventTypes.JoinRules:
+                yield self.store.update_room_state(
+                    room_id, {"join_rules": event_content.get("join_rule")}
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.RoomHistoryVisibility:
+                yield self.store.update_room_state(
+                    room_id,
+                    {"history_visibility": event_content.get("history_visibility")},
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "history_visibility", "world_readable"
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.Encryption:
+                yield self.store.update_room_state(
+                    room_id, {"encryption": event_content.get("algorithm")}
+                )
+            elif typ == EventTypes.Name:
+                yield self.store.update_room_state(
+                    room_id, {"name": event_content.get("name")}
+                )
+            elif typ == EventTypes.Topic:
+                yield self.store.update_room_state(
+                    room_id, {"topic": event_content.get("topic")}
+                )
+            elif typ == EventTypes.RoomAvatar:
+                yield self.store.update_room_state(
+                    room_id, {"avatar": event_content.get("url")}
+                )
+            elif typ == EventTypes.CanonicalAlias:
+                yield self.store.update_room_state(
+                    room_id, {"canonical_alias": event_content.get("alias")}
+                )
+
+    @defer.inlineCallbacks
+    def update_public_room_stats(self, ts, room_id, is_public):
+        """
+        Increment/decrement a user's number of public rooms when a room they are
+        in changes to/from public visibility.
+
+        Args:
+            ts (int): Timestamp in seconds
+            room_id (str)
+            is_public (bool)
+        """
+        # For now, blindly iterate over all local users in the room so that
+        # we can handle the whole problem of copying buckets over as needed
+        user_ids = yield self.store.get_users_in_room(room_id)
+
+        for user_id in user_ids:
+            if self.hs.is_mine(UserID.from_string(user_id)):
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
+                )
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
+                )
+
+    @defer.inlineCallbacks
+    def _is_public_room(self, room_id):
+        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
+        history_visibility = yield self.state.get_current_state(
+            room_id, EventTypes.RoomHistoryVisibility
+        )
+
+        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
+            (
+                history_visibility
+                and history_visibility.content.get("history_visibility")
+                == "world_readable"
+            )
+        ):
+            defer.returnValue(True)
+        else:
+            defer.returnValue(False)
diff --git a/synapse/server.py b/synapse/server.py
index 80d40b9272..9229a68a8d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -72,6 +72,7 @@ from synapse.handlers.room_list import RoomListHandler
 from synapse.handlers.room_member import RoomMemberMasterHandler
 from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
 from synapse.handlers.set_password import SetPasswordHandler
+from synapse.handlers.stats import StatsHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
 from synapse.handlers.user_directory import UserDirectoryHandler
@@ -139,6 +140,7 @@ class HomeServer(object):
         'acme_handler',
         'auth_handler',
         'device_handler',
+        'stats_handler',
         'e2e_keys_handler',
         'e2e_room_keys_handler',
         'event_handler',
@@ -191,6 +193,7 @@ class HomeServer(object):
 
     REQUIRED_ON_MASTER_STARTUP = [
         "user_directory_handler",
+        "stats_handler"
     ]
 
     # This is overridden in derived application classes
@@ -474,6 +477,9 @@ class HomeServer(object):
     def build_secrets(self):
         return Secrets()
 
+    def build_stats_handler(self):
+        return StatsHandler(self)
+
     def build_spam_checker(self):
         return SpamChecker(self)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7522d3fd57..66675d08ae 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -55,6 +55,7 @@ from .roommember import RoomMemberStore
 from .search import SearchStore
 from .signatures import SignatureStore
 from .state import StateStore
+from .stats import StatsStore
 from .stream import StreamStore
 from .tags import TagsStore
 from .transactions import TransactionStore
@@ -100,6 +101,7 @@ class DataStore(
     GroupServerStore,
     UserErasureStore,
     MonthlyActiveUsersStore,
+    StatsStore,
     RelationsStore,
 ):
     def __init__(self, db_conn, hs):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index adc6cf26b5..83ffae2132 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -611,3 +611,27 @@ class EventsWorkerStore(SQLBaseStore):
             return res
 
         return self.runInteraction("get_rejection_reasons", f)
+
+    def _get_total_state_event_counts_txn(self, txn, room_id):
+        """
+        See get_state_event_counts.
+        """
+        sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        return row[0] if row else 0
+
+    def get_total_state_event_counts(self, room_id):
+        """
+        Gets the total number of state events in a room.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_total_state_event_counts",
+            self._get_total_state_event_counts_txn, room_id
+        )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 57df17bcc2..4bd1669458 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -142,6 +142,38 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return self.runInteraction("get_room_summary", _get_room_summary_txn)
 
+    def _get_user_count_in_room_txn(self, txn, room_id, membership):
+        """
+        See get_user_count_in_room.
+        """
+        sql = (
+            "SELECT count(*) FROM room_memberships as m"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id "
+            " AND m.room_id = c.room_id "
+            " AND m.user_id = c.state_key"
+            " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
+        )
+
+        txn.execute(sql, (room_id, membership))
+        row = txn.fetchone()
+        return row[0]
+
+    def get_user_count_in_room(self, room_id, membership):
+        """
+        Get the user count in a room with a particular membership.
+
+        Args:
+            room_id (str)
+            membership (Membership)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
+        )
+
     @cached()
     def get_invited_rooms_for_user(self, user_id):
         """ Get all the rooms the user is invited to
diff --git a/synapse/storage/schema/delta/54/stats.sql b/synapse/storage/schema/delta/54/stats.sql
new file mode 100644
index 0000000000..652e58308e
--- /dev/null
+++ b/synapse/storage/schema/delta/54/stats.sql
@@ -0,0 +1,80 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE stats_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id BIGINT,
+    CHECK (Lock='X')
+);
+
+INSERT INTO stats_stream_pos (stream_id) VALUES (null);
+
+CREATE TABLE user_stats (
+    user_id TEXT NOT NULL,
+    ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL
+);
+
+CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
+
+CREATE TABLE room_stats (
+    room_id TEXT NOT NULL,
+    ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+    current_state_events INT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+    state_events INT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
+
+-- cache of current room state; useful for the publicRooms list
+CREATE TABLE room_state (
+    room_id TEXT NOT NULL,
+    join_rules TEXT,
+    history_visibility TEXT,
+    encryption TEXT,
+    name TEXT,
+    topic TEXT,
+    avatar TEXT,
+    canonical_alias TEXT
+    -- get aliases straight from the right table
+);
+
+CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
+
+CREATE TABLE room_stats_earliest_token (
+    room_id TEXT NOT NULL,
+    token BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('populate_stats_createtables', '{}');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_rooms', '{}', 'populate_stats_createtables');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms');
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
index 31a0279b18..5fdb442104 100644
--- a/synapse/storage/state_deltas.py
+++ b/synapse/storage/state_deltas.py
@@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore):
             "get_current_state_deltas", get_current_state_deltas_txn
         )
 
-    def get_max_stream_id_in_current_state_deltas(self):
-        return self._simple_select_one_onecol(
+    def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
+        return self._simple_select_one_onecol_txn(
+            txn,
             table="current_state_delta_stream",
             keyvalues={},
             retcol="COALESCE(MAX(stream_id), -1)",
-            desc="get_max_stream_id_in_current_state_deltas",
+        )
+
+    def get_max_stream_id_in_current_state_deltas(self):
+        return self.runInteraction(
+            "get_max_stream_id_in_current_state_deltas",
+            self._get_max_stream_id_in_current_state_deltas_txn,
         )
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
new file mode 100644
index 0000000000..71b80a891d
--- /dev/null
+++ b/synapse/storage/stats.py
@@ -0,0 +1,450 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018, 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.storage.state_deltas import StateDeltasStore
+from synapse.util.caches.descriptors import cached
+
+logger = logging.getLogger(__name__)
+
+# these fields track absolutes (e.g. total number of rooms on the server)
+ABSOLUTE_STATS_FIELDS = {
+    "room": (
+        "current_state_events",
+        "joined_members",
+        "invited_members",
+        "left_members",
+        "banned_members",
+        "state_events",
+    ),
+    "user": ("public_rooms", "private_rooms"),
+}
+
+TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+TEMP_TABLE = "_temp_populate_stats"
+
+
+class StatsStore(StateDeltasStore):
+    def __init__(self, db_conn, hs):
+        super(StatsStore, self).__init__(db_conn, hs)
+
+        self.server_name = hs.hostname
+        self.clock = self.hs.get_clock()
+        self.stats_enabled = hs.config.stats_enabled
+        self.stats_bucket_size = hs.config.stats_bucket_size
+
+        self.register_background_update_handler(
+            "populate_stats_createtables", self._populate_stats_createtables
+        )
+        self.register_background_update_handler(
+            "populate_stats_process_rooms", self._populate_stats_process_rooms
+        )
+        self.register_background_update_handler(
+            "populate_stats_cleanup", self._populate_stats_cleanup
+        )
+
+    @defer.inlineCallbacks
+    def _populate_stats_createtables(self, progress, batch_size):
+
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_createtables")
+            defer.returnValue(1)
+
+        # Get all the rooms that we want to process.
+        def _make_staging_area(txn):
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_position(position TEXT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            # Get rooms we want to process from the database
+            sql = """
+                SELECT room_id, count(*) FROM current_state_events
+                GROUP BY room_id
+            """
+            txn.execute(sql)
+            rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
+            self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+            del rooms
+
+        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+        yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
+        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+        self.get_earliest_token_for_room_stats.invalidate_all()
+
+        yield self._end_background_update("populate_stats_createtables")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_stats_cleanup(self, progress, batch_size):
+        """
+        Update the user directory stream position, then clean up the old tables.
+        """
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_cleanup")
+            defer.returnValue(1)
+
+        position = yield self._simple_select_one_onecol(
+            TEMP_TABLE + "_position", None, "position"
+        )
+        yield self.update_stats_stream_pos(position)
+
+        def _delete_staging_area(txn):
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+        yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+
+        yield self._end_background_update("populate_stats_cleanup")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_stats_process_rooms(self, progress, batch_size):
+
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_process_rooms")
+            defer.returnValue(1)
+
+        # If we don't have progress filed, delete everything.
+        if not progress:
+            yield self.delete_all_stats()
+
+        def _get_next_batch(txn):
+            # Only fetch 250 rooms, so we don't fetch too many at once, even
+            # if those 250 rooms have less than batch_size state events.
+            sql = """
+                SELECT room_id, events FROM %s_rooms
+                ORDER BY events DESC
+                LIMIT 250
+            """ % (
+                TEMP_TABLE,
+            )
+            txn.execute(sql)
+            rooms_to_work_on = txn.fetchall()
+
+            if not rooms_to_work_on:
+                return None
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+            progress["remaining"] = txn.fetchone()[0]
+
+            return rooms_to_work_on
+
+        rooms_to_work_on = yield self.runInteraction(
+            "populate_stats_temp_read", _get_next_batch
+        )
+
+        # No more rooms -- complete the transaction.
+        if not rooms_to_work_on:
+            yield self._end_background_update("populate_stats_process_rooms")
+            defer.returnValue(1)
+
+        logger.info(
+            "Processing the next %d rooms of %d remaining",
+            (len(rooms_to_work_on), progress["remaining"]),
+        )
+
+        # Number of state events we've processed by going through each room
+        processed_event_count = 0
+
+        for room_id, event_count in rooms_to_work_on:
+
+            current_state_ids = yield self.get_current_state_ids(room_id)
+
+            join_rules = yield self.get_event(
+                current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
+            )
+            history_visibility = yield self.get_event(
+                current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
+                allow_none=True,
+            )
+            encryption = yield self.get_event(
+                current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
+            )
+            name = yield self.get_event(
+                current_state_ids.get((EventTypes.Name, "")), allow_none=True
+            )
+            topic = yield self.get_event(
+                current_state_ids.get((EventTypes.Topic, "")), allow_none=True
+            )
+            avatar = yield self.get_event(
+                current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
+            )
+            canonical_alias = yield self.get_event(
+                current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
+            )
+
+            def _or_none(x, arg):
+                if x:
+                    return x.content.get(arg)
+                return None
+
+            yield self.update_room_state(
+                room_id,
+                {
+                    "join_rules": _or_none(join_rules, "join_rule"),
+                    "history_visibility": _or_none(
+                        history_visibility, "history_visibility"
+                    ),
+                    "encryption": _or_none(encryption, "algorithm"),
+                    "name": _or_none(name, "name"),
+                    "topic": _or_none(topic, "topic"),
+                    "avatar": _or_none(avatar, "url"),
+                    "canonical_alias": _or_none(canonical_alias, "alias"),
+                },
+            )
+
+            now = self.hs.get_reactor().seconds()
+
+            # quantise time to the nearest bucket
+            now = (now // self.stats_bucket_size) * self.stats_bucket_size
+
+            def _fetch_data(txn):
+
+                # Get the current token of the room
+                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+                current_state_events = len(current_state_ids)
+                joined_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.JOIN
+                )
+                invited_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.INVITE
+                )
+                left_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.LEAVE
+                )
+                banned_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.BAN
+                )
+                total_state_events = self._get_total_state_event_counts_txn(
+                    txn, room_id
+                )
+
+                self._update_stats_txn(
+                    txn,
+                    "room",
+                    room_id,
+                    now,
+                    {
+                        "bucket_size": self.stats_bucket_size,
+                        "current_state_events": current_state_events,
+                        "joined_members": joined_members,
+                        "invited_members": invited_members,
+                        "left_members": left_members,
+                        "banned_members": banned_members,
+                        "state_events": total_state_events,
+                    },
+                )
+                self._simple_insert_txn(
+                    txn,
+                    "room_stats_earliest_token",
+                    {"room_id": room_id, "token": current_token},
+                )
+
+            yield self.runInteraction("update_room_stats", _fetch_data)
+
+            # We've finished a room. Delete it from the table.
+            yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+            yield self.runInteraction(
+                "populate_stats",
+                self._background_update_progress_txn,
+                "populate_stats_process_rooms",
+                progress,
+            )
+
+            processed_event_count += event_count
+
+            if processed_event_count > batch_size:
+                # Don't process any more rooms, we've hit our batch size.
+                defer.returnValue(processed_event_count)
+
+        defer.returnValue(processed_event_count)
+
+    def delete_all_stats(self):
+        """
+        Delete all statistics records.
+        """
+
+        def _delete_all_stats_txn(txn):
+            txn.execute("DELETE FROM room_state")
+            txn.execute("DELETE FROM room_stats")
+            txn.execute("DELETE FROM room_stats_earliest_token")
+            txn.execute("DELETE FROM user_stats")
+
+        return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
+
+    def get_stats_stream_pos(self):
+        return self._simple_select_one_onecol(
+            table="stats_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="stats_stream_pos",
+        )
+
+    def update_stats_stream_pos(self, stream_id):
+        return self._simple_update_one(
+            table="stats_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="update_stats_stream_pos",
+        )
+
+    def update_room_state(self, room_id, fields):
+        """
+        Args:
+            room_id (str)
+            fields (dict[str:Any])
+        """
+        return self._simple_upsert(
+            table="room_state",
+            keyvalues={"room_id": room_id},
+            values=fields,
+            desc="update_room_state",
+        )
+
+    def get_deltas_for_room(self, room_id, start, size=100):
+        """
+        Get statistics deltas for a given room.
+
+        Args:
+            room_id (str)
+            start (int): Pagination start. Number of entries, not timestamp.
+            size (int): How many entries to return.
+
+        Returns:
+            Deferred[list[dict]], where the dict has the keys of
+            ABSOLUTE_STATS_FIELDS["room"] and "ts".
+        """
+        return self._simple_select_list_paginate(
+            "room_stats",
+            {"room_id": room_id},
+            "ts",
+            start,
+            size,
+            retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+            order_direction="DESC",
+        )
+
+    def get_all_room_state(self):
+        return self._simple_select_list(
+            "room_state", None, retcols=("name", "topic", "canonical_alias")
+        )
+
+    @cached()
+    def get_earliest_token_for_room_stats(self, room_id):
+        """
+        Fetch the "earliest token". This is used by the room stats delta
+        processor to ignore deltas that have been processed between the
+        start of the background task and any particular room's stats
+        being calculated.
+
+        Returns:
+            Deferred[int]
+        """
+        return self._simple_select_one_onecol(
+            "room_stats_earliest_token",
+            {"room_id": room_id},
+            retcol="token",
+            allow_none=True,
+        )
+
+    def update_stats(self, stats_type, stats_id, ts, fields):
+        table, id_col = TYPE_TO_ROOM[stats_type]
+        return self._simple_upsert(
+            table=table,
+            keyvalues={id_col: stats_id, "ts": ts},
+            values=fields,
+            desc="update_stats",
+        )
+
+    def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
+        table, id_col = TYPE_TO_ROOM[stats_type]
+        return self._simple_upsert_txn(
+            txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+        )
+
+    def update_stats_delta(self, ts, stats_type, stats_id, field, value):
+        def _update_stats_delta(txn):
+            table, id_col = TYPE_TO_ROOM[stats_type]
+
+            sql = (
+                "SELECT * FROM %s"
+                " WHERE %s=? and ts=("
+                "  SELECT MAX(ts) FROM %s"
+                "  WHERE %s=?"
+                ")"
+            ) % (table, id_col, table, id_col)
+            txn.execute(sql, (stats_id, stats_id))
+            rows = self.cursor_to_dict(txn)
+            if len(rows) == 0:
+                # silently skip as we don't have anything to apply a delta to yet.
+                # this tries to minimise any race between the initial sync and
+                # subsequent deltas arriving.
+                return
+
+            current_ts = ts
+            latest_ts = rows[0]["ts"]
+            if current_ts < latest_ts:
+                # This one is in the past, but we're just encountering it now.
+                # Mark it as part of the current bucket.
+                current_ts = latest_ts
+            elif ts != latest_ts:
+                # we have to copy our absolute counters over to the new entry.
+                values = {
+                    key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
+                }
+                values[id_col] = stats_id
+                values["ts"] = ts
+                values["bucket_size"] = self.stats_bucket_size
+
+                self._simple_insert_txn(txn, table=table, values=values)
+
+            # actually update the new value
+            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
+                self._simple_update_txn(
+                    txn,
+                    table=table,
+                    keyvalues={id_col: stats_id, "ts": current_ts},
+                    updatevalues={field: value},
+                )
+            else:
+                sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
+                    table,
+                    field,
+                    field,
+                    id_col,
+                )
+                txn.execute(sql, (value, stats_id, current_ts))
+
+        return self.runInteraction("update_stats_delta", _update_stats_delta)