summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xsynapse/app/homeserver.py4
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/receipts.py59
-rw-r--r--synapse/storage/registration.py36
-rw-r--r--synapse/storage/schema/delta/50/add_creation_ts_users_index.sql19
5 files changed, 90 insertions, 30 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 51fc3645d5..714f98a3e0 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -439,6 +439,10 @@ def run(hs):
         total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
         stats["total_nonbridged_users"] = total_nonbridged_users
 
+        daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
+        for name, count in daily_user_type_results.iteritems():
+            stats["daily_user_type_" + name] = count
+
         room_count = yield hs.get_datastore().get_room_count()
         stats["total_room_count"] = room_count
 
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c08e9cd65a..cf2aae0468 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 49
+SCHEMA_VERSION = 50
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 709c69a926..c93c228f6e 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore):
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+            allow_none=True
+        )
+
+        stream_ordering = int(res["stream_ordering"]) if res else None
+
+        # We don't want to clobber receipts for more recent events, so we
+        # have to compare orderings of existing receipts
+        if stream_ordering is not None:
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+                " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+            )
+            txn.execute(sql, (room_id, receipt_type, user_id))
+
+            for so, eid in txn:
+                if int(so) >= stream_ordering:
+                    logger.debug(
+                        "Ignoring new receipt for %s in favour of existing "
+                        "one for later event %s",
+                        event_id, eid,
+                    )
+                    return False
+
         txn.call_after(
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
@@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
             (user_id, room_id, receipt_type)
         )
 
-        res = self._simple_select_one_txn(
-            txn,
-            table="events",
-            retcols=["topological_ordering", "stream_ordering"],
-            keyvalues={"event_id": event_id},
-            allow_none=True
-        )
-
-        topological_ordering = int(res["topological_ordering"]) if res else None
-        stream_ordering = int(res["stream_ordering"]) if res else None
-
-        # We don't want to clobber receipts for more recent events, so we
-        # have to compare orderings of existing receipts
-        sql = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
-            " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
-        )
-
-        txn.execute(sql, (room_id, receipt_type, user_id))
-
-        if topological_ordering:
-            for to, so, _ in txn:
-                if int(to) > topological_ordering:
-                    return False
-                elif int(to) == topological_ordering and int(so) >= stream_ordering:
-                    return False
-
         self._simple_delete_txn(
             txn,
             table="receipts_linearized",
@@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             }
         )
 
-        if receipt_type == "m.read" and topological_ordering:
+        if receipt_type == "m.read" and stream_ordering is not None:
             self._remove_old_push_actions_before_txn(
                 txn,
                 room_id=room_id,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 40f7cc16ee..c241167fbe 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -102,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore,
             columns=["user_id", "device_id"],
         )
 
+        self.register_background_index_update(
+            "users_creation_ts",
+            index_name="users_creation_ts",
+            table="users",
+            columns=["creation_ts"],
+        )
+
         # we no longer use refresh tokens, but it's possible that some people
         # might have a background update queued to build this index. Just
         # clear the background update.
@@ -486,6 +493,35 @@ class RegistrationStore(RegistrationWorkerStore,
         ret = yield self.runInteraction("count_users", _count_users)
         defer.returnValue(ret)
 
+    def count_daily_user_type(self):
+        """
+        Counts 1) native non guest users
+               2) native guests users
+               3) bridged users
+        who registered on the homeserver in the past 24 hours
+        """
+        def _count_daily_user_type(txn):
+            yesterday = int(self._clock.time()) - (60 * 60 * 24)
+
+            sql = """
+                SELECT user_type, COALESCE(count(*), 0) AS count FROM (
+                    SELECT
+                    CASE
+                        WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
+                        WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
+                        WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
+                    END AS user_type
+                    FROM users
+                    WHERE creation_ts > ?
+                ) AS t GROUP BY user_type
+            """
+            results = {'native': 0, 'guest': 0, 'bridged': 0}
+            txn.execute(sql, (yesterday,))
+            for row in txn:
+                results[row[0]] = row[1]
+            return results
+        return self.runInteraction("count_daily_user_type", _count_daily_user_type)
+
     @defer.inlineCallbacks
     def count_nonbridged_users(self):
         def _count_users(txn):
diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql
new file mode 100644
index 0000000000..c93ae47532
--- /dev/null
+++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql
@@ -0,0 +1,19 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('users_creation_ts', '{}');