summary refs log tree commit diff
path: root/synapse/storage/profile.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/profile.py')
-rw-r--r--synapse/storage/profile.py110
1 files changed, 98 insertions, 12 deletions
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py

index aeec2f57c4..38524f2545 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py
@@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,8 +19,11 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.storage.roommember import ProfileInfo +from . import background_updates from ._base import SQLBaseStore +BATCH_SIZE = 100 + class ProfileWorkerStore(SQLBaseStore): @defer.inlineCallbacks @@ -61,6 +65,55 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_profile_avatar_url", ) + def get_latest_profile_replication_batch_number(self): + def f(txn): + txn.execute("SELECT MAX(batch) as maxbatch FROM profiles") + rows = self.cursor_to_dict(txn) + return rows[0]['maxbatch'] + return self.runInteraction( + "get_latest_profile_replication_batch_number", f, + ) + + def get_profile_batch(self, batchnum): + return self._simple_select_list( + table="profiles", + keyvalues={ + "batch": batchnum, + }, + retcols=("user_id", "displayname", "avatar_url", "active"), + desc="get_profile_batch", + ) + + def assign_profile_batch(self): + def f(txn): + sql = ( + "UPDATE profiles SET batch = " + "(SELECT COALESCE(MAX(batch), -1) + 1 FROM profiles) " + "WHERE user_id in (" + " SELECT user_id FROM profiles WHERE batch is NULL limit ?" + ")" + ) + txn.execute(sql, (BATCH_SIZE,)) + return txn.rowcount + return self.runInteraction("assign_profile_batch", f) + + def get_replication_hosts(self): + def f(txn): + txn.execute("SELECT host, last_synced_batch FROM profile_replication_status") + rows = self.cursor_to_dict(txn) + return {r['host']: r['last_synced_batch'] for r in rows} + return self.runInteraction("get_replication_hosts", f) + + def update_replication_batch_for_host(self, host, last_synced_batch): + return self._simple_upsert( + table="profile_replication_status", + keyvalues={"host": host}, + values={ + "last_synced_batch": last_synced_batch, + }, + desc="update_replication_batch_for_host", + ) + def get_from_remote_profile_cache(self, user_id): return self._simple_select_one( table="remote_profile_cache", @@ -70,29 +123,62 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_from_remote_profile_cache", ) - def create_profile(self, user_localpart): - return self._simple_insert( - table="profiles", values={"user_id": user_localpart}, desc="create_profile" - ) - - def set_profile_displayname(self, user_localpart, new_displayname): - return self._simple_update_one( + def set_profile_displayname(self, user_localpart, new_displayname, batchnum): + return self._simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + values={ + "displayname": new_displayname, + "batch": batchnum, + }, desc="set_profile_displayname", + lock=False # we can do this because user_id has a unique index ) - def set_profile_avatar_url(self, user_localpart, new_avatar_url): - return self._simple_update_one( + def set_profile_avatar_url(self, user_localpart, new_avatar_url, batchnum): + return self._simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, + values={ + "avatar_url": new_avatar_url, + "batch": batchnum, + }, desc="set_profile_avatar_url", + lock=False # we can do this because user_id has a unique index + ) + + def set_profile_active(self, user_localpart, active, hide, batchnum): + values = { + "active": int(active), + "batch": batchnum, + } + if not active and not hide: + # we are deactivating for real (not in hide mode) + # so clear the profile. + values["avatar_url"] = None + values["displayname"] = None + return self._simple_upsert( + table="profiles", + keyvalues={"user_id": user_localpart}, + values=values, + desc="set_profile_active", + lock=False # we can do this because user_id has a unique index ) -class ProfileStore(ProfileWorkerStore): +class ProfileStore(ProfileWorkerStore, background_updates.BackgroundUpdateStore): + def __init__(self, db_conn, hs): + + super(ProfileStore, self).__init__(db_conn, hs) + + self.register_background_index_update( + "profile_replication_status_host_index", + index_name="profile_replication_status_idx", + table="profile_replication_status", + columns=["host"], + unique=True, + ) + def add_remote_profile_cache(self, user_id, displayname, avatar_url): """Ensure we are caching the remote user's profiles.