diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 59 | ||||
-rw-r--r-- | synapse/storage/registration.py | 36 | ||||
-rw-r--r-- | synapse/storage/schema/delta/50/add_creation_ts_users_index.sql | 19 |
4 files changed, 86 insertions, 30 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c08e9cd65a..cf2aae0468 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 49 +SCHEMA_VERSION = 50 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 709c69a926..c93c228f6e 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + stream_ordering = int(res["stream_ordering"]) if res else None + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + if stream_ordering is not None: + sql = ( + "SELECT stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + txn.execute(sql, (room_id, receipt_type, user_id)) + + for so, eid in txn: + if int(so) >= stream_ordering: + logger.debug( + "Ignoring new receipt for %s in favour of existing " + "one for later event %s", + event_id, eid, + ) + return False + txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) @@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore): (user_id, room_id, receipt_type) ) - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - - topological_ordering = int(res["topological_ordering"]) if res else None - stream_ordering = int(res["stream_ordering"]) if res else None - - # We don't want to clobber receipts for more recent events, so we - # have to compare orderings of existing receipts - sql = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized as r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" - ) - - txn.execute(sql, (room_id, receipt_type, user_id)) - - if topological_ordering: - for to, so, _ in txn: - if int(to) > topological_ordering: - return False - elif int(to) == topological_ordering and int(so) >= stream_ordering: - return False - self._simple_delete_txn( txn, table="receipts_linearized", @@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore): } ) - if receipt_type == "m.read" and topological_ordering: + if receipt_type == "m.read" and stream_ordering is not None: self._remove_old_push_actions_before_txn( txn, room_id=room_id, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 40f7cc16ee..c241167fbe 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -102,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore, columns=["user_id", "device_id"], ) + self.register_background_index_update( + "users_creation_ts", + index_name="users_creation_ts", + table="users", + columns=["creation_ts"], + ) + # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. @@ -486,6 +493,35 @@ class RegistrationStore(RegistrationWorkerStore, ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) + def count_daily_user_type(self): + """ + Counts 1) native non guest users + 2) native guests users + 3) bridged users + who registered on the homeserver in the past 24 hours + """ + def _count_daily_user_type(txn): + yesterday = int(self._clock.time()) - (60 * 60 * 24) + + sql = """ + SELECT user_type, COALESCE(count(*), 0) AS count FROM ( + SELECT + CASE + WHEN is_guest=0 AND appservice_id IS NULL THEN 'native' + WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest' + WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged' + END AS user_type + FROM users + WHERE creation_ts > ? + ) AS t GROUP BY user_type + """ + results = {'native': 0, 'guest': 0, 'bridged': 0} + txn.execute(sql, (yesterday,)) + for row in txn: + results[row[0]] = row[1] + return results + return self.runInteraction("count_daily_user_type", _count_daily_user_type) + @defer.inlineCallbacks def count_nonbridged_users(self): def _count_users(txn): diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql new file mode 100644 index 0000000000..c93ae47532 --- /dev/null +++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql @@ -0,0 +1,19 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +INSERT into background_updates (update_name, progress_json) + VALUES ('users_creation_ts', '{}'); |