summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py23
-rw-r--r--synapse/storage/event_federation.py9
-rw-r--r--synapse/storage/events.py66
-rw-r--r--synapse/storage/registration.py13
-rw-r--r--synapse/storage/roommember.py14
-rw-r--r--synapse/storage/schema/delta/23/drop_state_index.sql16
-rw-r--r--synapse/storage/schema/delta/24/stats_reporting.sql22
-rw-r--r--synapse/storage/stream.py42
8 files changed, 189 insertions, 16 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 77cb1dbd81..340e59afcb 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -54,7 +54,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 23
+SCHEMA_VERSION = 24
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -126,6 +126,27 @@ class DataStore(RoomMemberStore, RoomStore,
             lock=False,
         )
 
+    @defer.inlineCallbacks
+    def count_daily_users(self):
+        """
+        Counts the number of users who used this homeserver in the last 24 hours.
+        """
+        def _count_users(txn):
+            txn.execute(
+                "SELECT COUNT(DISTINCT user_id) AS users"
+                " FROM user_ips"
+                " WHERE last_seen > ?",
+                # This is close enough to a day for our purposes.
+                (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
+            )
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
+
     def get_user_ip_and_agents(self, user):
         return self._simple_select_list(
             table="user_ips",
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 989ad340b0..c1cabbaa60 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -303,6 +303,15 @@ class EventFederationStore(SQLBaseStore):
             ],
         )
 
+        self._update_extremeties(txn, events)
+
+    def _update_extremeties(self, txn, events):
+        """Updates the event_*_extremities tables based on the new/updated
+        events being persisted.
+
+        This is called for new events *and* for events that were outliers, but
+        are are now being persisted as non-outliers.
+        """
         events_by_room = {}
         for ev in events:
             events_by_room.setdefault(ev.room_id, []).append(ev)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index fba837f461..46df6b4d6d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from _base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer, reactor
@@ -28,6 +27,7 @@ from canonicaljson import encode_canonical_json
 from contextlib import contextmanager
 
 import logging
+import math
 import ujson as json
 
 logger = logging.getLogger(__name__)
@@ -281,6 +281,8 @@ class EventsStore(SQLBaseStore):
                     (False, event.event_id,)
                 )
 
+                self._update_extremeties(txn, [event])
+
         events_and_contexts = filter(
             lambda ec: ec[0] not in to_remove,
             events_and_contexts
@@ -903,3 +905,65 @@ class EventsStore(SQLBaseStore):
         txn.execute(sql, (event.event_id,))
         result = txn.fetchone()
         return result[0] if result else None
+
+    @defer.inlineCallbacks
+    def count_daily_messages(self):
+        """
+        Returns an estimate of the number of messages sent in the last day.
+
+        If it has been significantly less or more than one day since the last
+        call to this function, it will return None.
+        """
+        def _count_messages(txn):
+            now = self.hs.get_clock().time()
+
+            txn.execute(
+                "SELECT reported_stream_token, reported_time FROM stats_reporting"
+            )
+            last_reported = self.cursor_to_dict(txn)
+
+            txn.execute(
+                "SELECT stream_ordering"
+                " FROM events"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT 1"
+            )
+            now_reporting = self.cursor_to_dict(txn)
+            if not now_reporting:
+                return None
+            now_reporting = now_reporting[0]["stream_ordering"]
+
+            txn.execute("DELETE FROM stats_reporting")
+            txn.execute(
+                "INSERT INTO stats_reporting"
+                " (reported_stream_token, reported_time)"
+                " VALUES (?, ?)",
+                (now_reporting, now,)
+            )
+
+            if not last_reported:
+                return None
+
+            # Close enough to correct for our purposes.
+            yesterday = (now - 24 * 60 * 60)
+            if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+                return None
+
+            txn.execute(
+                "SELECT COUNT(*) as messages"
+                " FROM events NATURAL JOIN event_json"
+                " WHERE json like '%m.room.message%'"
+                " AND stream_ordering > ?"
+                " AND stream_ordering <= ?",
+                (
+                    last_reported[0]["reported_stream_token"],
+                    now_reporting,
+                )
+            )
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+            return rows[0]["messages"]
+
+        ret = yield self.runInteraction("count_messages", _count_messages)
+        defer.returnValue(ret)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c9ceb132ae..b454dd5b3a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -289,3 +289,16 @@ class RegistrationStore(SQLBaseStore):
         if ret:
             defer.returnValue(ret['user_id'])
         defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def count_all_users(self):
+        """Counts all users registered on the homeserver."""
+        def _count_users(txn):
+            txn.execute("SELECT COUNT(*) AS users FROM users")
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 2a59ee7d6d..41c939efb1 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
 
 RoomsForUser = namedtuple(
     "RoomsForUser",
-    ("room_id", "sender", "membership", "event_id")
+    ("room_id", "sender", "membership", "event_id", "stream_ordering")
 )
 
 
@@ -141,11 +141,13 @@ class RoomMemberStore(SQLBaseStore):
         args.extend(membership_list)
 
         sql = (
-            "SELECT m.event_id, m.room_id, m.sender, m.membership"
-            " FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
+            "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+            " FROM current_state_events as c"
+            " INNER JOIN room_memberships as m"
+            " ON m.event_id = c.event_id"
+            " INNER JOIN events as e"
+            " ON e.event_id = c.event_id"
+            " AND m.room_id = c.room_id"
             " AND m.user_id = c.state_key"
             " WHERE %s"
         ) % (where_clause,)
diff --git a/synapse/storage/schema/delta/23/drop_state_index.sql b/synapse/storage/schema/delta/23/drop_state_index.sql
new file mode 100644
index 0000000000..07d0ea5cb2
--- /dev/null
+++ b/synapse/storage/schema/delta/23/drop_state_index.sql
@@ -0,0 +1,16 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+DROP INDEX IF EXISTS state_groups_state_tuple;
diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql
new file mode 100644
index 0000000000..e9165d2917
--- /dev/null
+++ b/synapse/storage/schema/delta/24/stats_reporting.sql
@@ -0,0 +1,22 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Should only ever contain one row
+CREATE TABLE IF NOT EXISTS stats_reporting(
+  -- The stream ordering token which was most recently reported as stats
+  reported_stream_token INTEGER,
+  -- The time (seconds since epoch) stats were most recently reported
+  reported_time BIGINT
+);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d7fe423f5a..3cab06fdef 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -159,9 +159,7 @@ class StreamStore(SQLBaseStore):
 
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
-                               limit=0, with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
+                               limit=0):
         current_room_membership_sql = (
             "SELECT m.room_id FROM room_memberships as m "
             " INNER JOIN current_state_events as c"
@@ -227,10 +225,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1,
-                             with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
+                             direction='b', limit=-1):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -302,7 +297,6 @@ class StreamStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=4)
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
-        # TODO (erikj): Handle compressed feedback
 
         end_token = RoomStreamToken.parse_stream_token(end_token)
 
@@ -379,6 +373,38 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
+    def get_stream_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "s%d" stream token.
+        """
+        return self._simple_select_one_onecol(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcol="stream_ordering",
+        ).addCallback(lambda row: "s%d" % (row,))
+
+    def get_topological_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "t%d-%d" topological token.
+        """
+        return self._simple_select_one(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcols=("stream_ordering", "topological_ordering"),
+        ).addCallback(lambda row: "t%d-%d" % (
+            row["topological_ordering"], row["stream_ordering"],)
+        )
+
     def _get_max_topological_txn(self, txn):
         txn.execute(
             "SELECT MAX(topological_ordering) FROM events"