From e654230a51affdeca69b365b0ec43ff95134bdf6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 10 Apr 2018 17:41:58 +0100 Subject: Written but untested profile replication --- synapse/config/registration.py | 8 +++ synapse/handlers/profile.py | 70 +++++++++++++++++++++- synapse/storage/prepare_database.py | 2 +- synapse/storage/profile.py | 56 +++++++++++++++++ synapse/storage/schema/delta/48/profiles_batch.sql | 23 +++++++ 5 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 synapse/storage/schema/delta/48/profiles_batch.sql (limited to 'synapse') diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 2854a48d0a..8f4d051fba 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -49,6 +49,10 @@ class RegistrationConfig(Config): self.auto_join_rooms = config.get("auto_join_rooms", []) + self.replicate_user_profiles_to = config.get("replicate_user_profiles_to", []) + if not isinstance(self.replicate_user_profiles_to, list): + self.replicate_user_profiles_to = [self.replicate_user_profiles_to,] + def default_config(self, **kwargs): registration_shared_secret = random_string_with_symbols(50) @@ -106,6 +110,10 @@ class RegistrationConfig(Config): - vector.im - riot.im + # If enabled, user IDs, display names and avatar URLs will be replicated + # to this server whenever they change. + # replicate_user_profiles_to: example.com + # Users who register on this homeserver will automatically be joined # to these rooms #auto_join_rooms: diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index cb710fe796..be9d1d8c41 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -15,12 +15,14 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler +from signedjson.sign import sign_json + logger = logging.getLogger(__name__) @@ -28,6 +30,8 @@ class ProfileHandler(BaseHandler): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 + PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000 + def __init__(self, hs): super(ProfileHandler, self).__init__(hs) @@ -38,8 +42,72 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() + self.http_client = hs.get_simple_http_client() + self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + reactor.callWhenRunning(self._assign_profile_replication_batches) + reactor.callWhenRunning(self._replicate_profiles) + self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) + + @defer.inlineCallbacks + def _assign_profile_replication_batches(self): + """If no profile replication has been done yet, allocate replication batch + numbers to each profile to start the replication process. + """ + logger.info("Assigning profile batch numbers...") + total = 0 + while True: + assigned = yield self.store.assign_profile_batch() + total += assigned + if assigned == 0: + break + logger.info("Assigned %d profile batch numbers", total) + + @defer.inlineCallbacks + def _replicate_profiles(self): + """If any profile data has been updated and not pushed to the replication targets, + replicate it. + """ + host_batches = yield self.store.get_replication_hosts() + latest_batch = yield self.store.get_latest_profile_replication_batch_number() + for repl_host in self.hs.config.replicate_user_profiles_to: + if repl_host not in host_batches: + host_batches[repl_host] = -1 + try: + for i in xrange(host_batches[repl_host] + 1, latest_batch + 1): + yield self._replicate_host_profile_batch(repl_host, i) + except: + logger.exception( + "Exception while replicating to %s: aborting for now", repl_host, + ) + + @defer.inlineCallbacks + def _replicate_host_profile_batch(self, host, batchnum): + logger.info("Replicating profile batch %d to %s", batchnum, host) + batch_rows = yield self.store.get_profile_batch(batchnum) + batch = { + UserID(r["user_id"], self.hs.hostname).to_string(): { + "displayname": r["displayname"], + "avatar_url": r["avatar_url"], + } for r in batch_rows + } + + url = "https://%s/_matrix/federation/v1/replicate_profiles" % (host,) + signed_batch = { + "batchnum": batchnum, + "signed_batch": sign_json(batch, self.hs.hostname, self.hs.config.signing_key[0]), + "origin_server": self.hs.hostname, + } + try: + yield self.http_client.post_json_get_json(url, signed_batch) + self.store.update_replication_batch_for_host(host, batchnum) + logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) + except: + # This will get retried when the looping call next comes around + logger.exception("Failed to replicate profile batch %d to %s", batchnum, host) + raise + @defer.inlineCallbacks def get_profile(self, user_id): target_user = UserID.from_string(user_id) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c845a0cec5..68675e15d2 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 47 +SCHEMA_VERSION = 48 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index ec02e73bc2..cfc4a0606e 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,6 +21,8 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore +BATCH_SIZE = 100 + class ProfileStore(SQLBaseStore): def create_profile(self, user_localpart): @@ -85,6 +88,59 @@ class ProfileStore(SQLBaseStore): desc="set_profile_avatar_url", ) + @defer.inlineCallbacks + def get_latest_profile_replication_batch_number(self): + def f(txn): + txn.execute("SELECT MAX(batch) as maxbatch FROM profiles") + rows = self.cursor_to_dict(txn) + return rows[0]['maxbatch'] + max_batch = yield self.runInteraction("get_latest_profile_replication_batch_number", f) + defer.returnValue(max_batch) + + def get_profile_batch(self, batchnum): + return self._simple_select_list( + table="profiles", + keyvalues={ + "batch": batchnum, + }, + retcols=("user_id", "displayname", "avatar_url"), + desc="get_profile_batch", + ) + + @defer.inlineCallbacks + def assign_profile_batch(self): + def f(txn): + sql = ( + "UPDATE profiles SET batch = " + "(SELECT IFNULL(MAX(batch), -1) + 1 FROM profiles) " + "WHERE user_id in (" + " SELECT user_id FROM profiles WHERE batch is NULL limit ?" + ")" + ) + txn.execute(sql, (BATCH_SIZE,)) + return txn.rowcount + assigned = yield self.runInteraction("assign_profile_batch", f) + defer.returnValue(assigned) + + @defer.inlineCallbacks + def get_replication_hosts(self): + def f(txn): + txn.execute("SELECT host, last_synced_batch FROM profile_replication_status") + rows = self.cursor_to_dict(txn) + return { r['host']: r['last_synced_batch'] for r in rows } + result = yield self.runInteraction("get_replication_hosts", f) + defer.returnValue(result) + + def update_replication_batch_for_host(self, host, last_synced_batch): + return self._simple_upsert( + table="profile_replication_status", + keyvalues={"host": host}, + values={ + "last_synced_batch": last_synced_batch, + }, + desc="update_replication_batch_for_host", + ) + def get_from_remote_profile_cache(self, user_id): return self._simple_select_one( table="remote_profile_cache", diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql new file mode 100644 index 0000000000..7639ff22d5 --- /dev/null +++ b/synapse/storage/schema/delta/48/profiles_batch.sql @@ -0,0 +1,23 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL; + +CREATE INDEX profiles_batch_idx ON profiles(batch); + +CREATE TABLE profile_replication_status ( + host TEXT NOT NULL, + last_synced_batch BIGINT NOT NULL +); -- cgit 1.5.1 From 4e12b10c7cfcdd102c5380524e331ec78526f067 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 11 Apr 2018 10:17:07 +0100 Subject: Trigger profile replication on profile change --- synapse/handlers/profile.py | 15 +++++++++++++-- synapse/storage/profile.py | 14 ++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index be9d1d8c41..5ba6c257c7 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer, reactor from synapse.api.errors import SynapseError, AuthError, CodeMessageException +from synapse.util.logcontext import run_in_background from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -205,8 +206,10 @@ class ProfileHandler(BaseHandler): if new_displayname == '': new_displayname = None + new_batchnum = (yield self.store.get_latest_profile_replication_batch_number()) + 1 + yield self.store.set_profile_displayname( - target_user.localpart, new_displayname + target_user.localpart, new_displayname, new_batchnum ) if self.hs.config.user_directory_search_all_users: @@ -217,6 +220,9 @@ class ProfileHandler(BaseHandler): yield self._update_join_states(requester, target_user) + # start a profile replication push + run_in_background(self._replicate_profiles) + @defer.inlineCallbacks def get_avatar_url(self, target_user): if self.hs.is_mine(target_user): @@ -255,8 +261,10 @@ class ProfileHandler(BaseHandler): if not by_admin and target_user != requester.user: raise AuthError(400, "Cannot set another user's avatar_url") + new_batchnum = yield self.store.get_latest_profile_replication_batch_number() + 1 + yield self.store.set_profile_avatar_url( - target_user.localpart, new_avatar_url + target_user.localpart, new_avatar_url, new_batchnum, ) if self.hs.config.user_directory_search_all_users: @@ -267,6 +275,9 @@ class ProfileHandler(BaseHandler): yield self._update_join_states(requester, target_user) + # start a profile replication push + run_in_background(self._replicate_profiles) + @defer.inlineCallbacks def on_profile_query(self, args): user = UserID.from_string(args["user_id"]) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index cfc4a0606e..7b01f81d3d 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -64,11 +64,14 @@ class ProfileStore(SQLBaseStore): desc="get_profile_displayname", ) - def set_profile_displayname(self, user_localpart, new_displayname): + def set_profile_displayname(self, user_localpart, new_displayname, batchnum): return self._simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + updatevalues={ + "displayname": new_displayname, + "batch": batchnum, + }, desc="set_profile_displayname", ) @@ -80,11 +83,14 @@ class ProfileStore(SQLBaseStore): desc="get_profile_avatar_url", ) - def set_profile_avatar_url(self, user_localpart, new_avatar_url): + def set_profile_avatar_url(self, user_localpart, new_avatar_url, batchnum): return self._simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, + updatevalues={ + "avatar_url": new_avatar_url, + "batch": batchnum, + }, desc="set_profile_avatar_url", ) -- cgit 1.5.1 From 1147ce7e18d44c6c4833516deef923c5085b0574 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 12 Apr 2018 17:59:37 +0100 Subject: Include origin_server in the sig! Also be consistent with underscores --- synapse/handlers/profile.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 5ba6c257c7..454f828d41 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -89,19 +89,20 @@ class ProfileHandler(BaseHandler): batch_rows = yield self.store.get_profile_batch(batchnum) batch = { UserID(r["user_id"], self.hs.hostname).to_string(): { - "displayname": r["displayname"], + "display_name": r["displayname"], "avatar_url": r["avatar_url"], } for r in batch_rows } url = "https://%s/_matrix/federation/v1/replicate_profiles" % (host,) - signed_batch = { + body = { "batchnum": batchnum, - "signed_batch": sign_json(batch, self.hs.hostname, self.hs.config.signing_key[0]), + "batch": batch, "origin_server": self.hs.hostname, } + signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0]) try: - yield self.http_client.post_json_get_json(url, signed_batch) + yield self.http_client.post_json_get_json(url, signed_body) self.store.update_replication_batch_for_host(host, batchnum) logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) except: -- cgit 1.5.1 From 7285afa4be4f06b213c883ac3d1723742a7ca546 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Apr 2018 10:28:00 +0100 Subject: Handle current batch number being null --- synapse/handlers/profile.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 12ab92ca85..d56adf069b 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -212,7 +212,8 @@ class ProfileHandler(BaseHandler): if new_displayname == '': new_displayname = None - new_batchnum = (yield self.store.get_latest_profile_replication_batch_number()) + 1 + cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() + new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 yield self.store.set_profile_displayname( target_user.localpart, new_displayname, new_batchnum @@ -267,7 +268,8 @@ class ProfileHandler(BaseHandler): if not by_admin and target_user != requester.user: raise AuthError(400, "Cannot set another user's avatar_url") - new_batchnum = yield self.store.get_latest_profile_replication_batch_number() + 1 + cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() + new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 yield self.store.set_profile_avatar_url( target_user.localpart, new_avatar_url, new_batchnum, -- cgit 1.5.1 From 8743f42b495fdb8a7cf14d1c16021655b0d0222e Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Apr 2018 10:34:04 +0100 Subject: pep8 --- synapse/config/registration.py | 2 +- synapse/handlers/profile.py | 4 ++-- synapse/storage/profile.py | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 14373c502d..5c21287998 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -51,7 +51,7 @@ class RegistrationConfig(Config): self.replicate_user_profiles_to = config.get("replicate_user_profiles_to", []) if not isinstance(self.replicate_user_profiles_to, list): - self.replicate_user_profiles_to = [self.replicate_user_profiles_to,] + self.replicate_user_profiles_to = [self.replicate_user_profiles_to, ] def default_config(self, **kwargs): registration_shared_secret = random_string_with_symbols(50) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index d56adf069b..bf91eedb7f 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -83,7 +83,7 @@ class ProfileHandler(BaseHandler): try: for i in xrange(host_batches[repl_host] + 1, latest_batch + 1): yield self._replicate_host_profile_batch(repl_host, i) - except: + except Exception: logger.exception( "Exception while replicating to %s: aborting for now", repl_host, ) @@ -110,7 +110,7 @@ class ProfileHandler(BaseHandler): yield self.http_client.post_json_get_json(url, signed_body) self.store.update_replication_batch_for_host(host, batchnum) logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) - except: + except Exception: # This will get retried when the looping call next comes around logger.exception("Failed to replicate profile batch %d to %s", batchnum, host) raise diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index a3f144eb41..048f48dcc1 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -71,7 +71,9 @@ class ProfileWorkerStore(SQLBaseStore): txn.execute("SELECT MAX(batch) as maxbatch FROM profiles") rows = self.cursor_to_dict(txn) return rows[0]['maxbatch'] - max_batch = yield self.runInteraction("get_latest_profile_replication_batch_number", f) + max_batch = yield self.runInteraction( + "get_latest_profile_replication_batch_number", f, + ) defer.returnValue(max_batch) def get_profile_batch(self, batchnum): @@ -104,7 +106,7 @@ class ProfileWorkerStore(SQLBaseStore): def f(txn): txn.execute("SELECT host, last_synced_batch FROM profile_replication_status") rows = self.cursor_to_dict(txn) - return { r['host']: r['last_synced_batch'] for r in rows } + return {r['host']: r['last_synced_batch'] for r in rows} result = yield self.runInteraction("get_replication_hosts", f) defer.returnValue(result) -- cgit 1.5.1 From 22e416b7261d4dd583cd6933481c01aeec318f83 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Apr 2018 12:17:16 +0100 Subject: Update profile cache only on master and same for the profile replication --- synapse/handlers/profile.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index bf91eedb7f..a06f596248 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -45,16 +45,14 @@ class ProfileHandler(BaseHandler): self.http_client = hs.get_simple_http_client() - self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) - if hs.config.worker_app is None: self.clock.looping_call( self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, ) - reactor.callWhenRunning(self._assign_profile_replication_batches) - reactor.callWhenRunning(self._replicate_profiles) - self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) + reactor.callWhenRunning(self._assign_profile_replication_batches) + reactor.callWhenRunning(self._replicate_profiles) + self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) @defer.inlineCallbacks def _assign_profile_replication_batches(self): -- cgit 1.5.1 From dde01efbcb1897e5041f6c2543c25ee53bc20326 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Apr 2018 12:26:45 +0100 Subject: Don't do profile repl if no repl targets --- synapse/handlers/profile.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index a06f596248..61496cd60c 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -50,9 +50,10 @@ class ProfileHandler(BaseHandler): self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, ) - reactor.callWhenRunning(self._assign_profile_replication_batches) - reactor.callWhenRunning(self._replicate_profiles) - self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) + if len(self.hs.config.replicate_user_profiles_to) > 0: + reactor.callWhenRunning(self._assign_profile_replication_batches) + reactor.callWhenRunning(self._replicate_profiles) + self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) @defer.inlineCallbacks def _assign_profile_replication_batches(self): @@ -210,8 +211,11 @@ class ProfileHandler(BaseHandler): if new_displayname == '': new_displayname = None - cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() - new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 + if len(self.hs.config.replicate_user_profiles_to) > 0: + cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() + new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 + else: + new_batchnum = None yield self.store.set_profile_displayname( target_user.localpart, new_displayname, new_batchnum @@ -266,8 +270,11 @@ class ProfileHandler(BaseHandler): if not by_admin and target_user != requester.user: raise AuthError(400, "Cannot set another user's avatar_url") - cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() - new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 + if len(self.hs.config.replicate_user_profiles_to) > 0: + cur_batchnum = yield self.store.get_latest_profile_replication_batch_number() + new_batchnum = 0 if cur_batchnum is None else cur_batchnum + 1 + else: + new_batchnum = None yield self.store.set_profile_avatar_url( target_user.localpart, new_avatar_url, new_batchnum, -- cgit 1.5.1 From 3add16df4977bd96021f1e481c6d0450a4b1d82b Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Apr 2018 13:23:16 +0100 Subject: pep8 again --- synapse/handlers/profile.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 61496cd60c..8332771c15 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -53,7 +53,9 @@ class ProfileHandler(BaseHandler): if len(self.hs.config.replicate_user_profiles_to) > 0: reactor.callWhenRunning(self._assign_profile_replication_batches) reactor.callWhenRunning(self._replicate_profiles) - self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) + self.clock.looping_call( + self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL + ) @defer.inlineCallbacks def _assign_profile_replication_batches(self): -- cgit 1.5.1 From de341bec1b04b68449a80939cb1738e122b4076c Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 25 Apr 2018 11:51:57 +0100 Subject: Add 'ex[erimental API' comment --- synapse/config/registration.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse') diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 5c21287998..34326718ad 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -114,6 +114,8 @@ class RegistrationConfig(Config): # If enabled, user IDs, display names and avatar URLs will be replicated # to this server whenever they change. + # This is an experimental API currently implemented by sydent to support + # cross-homeserver user directories. # replicate_user_profiles_to: example.com # Users who register on this homeserver will automatically be joined -- cgit 1.5.1 From 7fafa838ae17429df150a27308a1f12becb60fb0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 25 Apr 2018 11:59:22 +0100 Subject: Comment why the looping call loops --- synapse/handlers/profile.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 8332771c15..7699152cdd 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -53,6 +53,9 @@ class ProfileHandler(BaseHandler): if len(self.hs.config.replicate_user_profiles_to) > 0: reactor.callWhenRunning(self._assign_profile_replication_batches) reactor.callWhenRunning(self._replicate_profiles) + # Add a looping call to replicate_profiles: this handles retries + # if the replication is unsuccessful when the user updated their + # profile. self.clock.looping_call( self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL ) -- cgit 1.5.1 From 47ed4a4aa785ecd819bf023cd38b743c8eeac81e Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 25 Apr 2018 13:58:37 +0100 Subject: PR feedback Unnecessary inlineCallbacks, missing yield, SQL comments & trailing commas. --- synapse/handlers/profile.py | 2 +- synapse/storage/profile.py | 12 +++--------- synapse/storage/schema/delta/48/profiles_batch.sql | 13 +++++++++++++ tests/handlers/test_profile.py | 4 ++-- tests/storage/test_profile.py | 4 ++-- 5 files changed, 21 insertions(+), 14 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 7699152cdd..7202d3c81d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -112,7 +112,7 @@ class ProfileHandler(BaseHandler): signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0]) try: yield self.http_client.post_json_get_json(url, signed_body) - self.store.update_replication_batch_for_host(host, batchnum) + yield self.store.update_replication_batch_for_host(host, batchnum) logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) except Exception: # This will get retried when the looping call next comes around diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 048f48dcc1..12e2d44406 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -65,16 +65,14 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_profile_avatar_url", ) - @defer.inlineCallbacks def get_latest_profile_replication_batch_number(self): def f(txn): txn.execute("SELECT MAX(batch) as maxbatch FROM profiles") rows = self.cursor_to_dict(txn) return rows[0]['maxbatch'] - max_batch = yield self.runInteraction( + return self.runInteraction( "get_latest_profile_replication_batch_number", f, ) - defer.returnValue(max_batch) def get_profile_batch(self, batchnum): return self._simple_select_list( @@ -86,7 +84,6 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_profile_batch", ) - @defer.inlineCallbacks def assign_profile_batch(self): def f(txn): sql = ( @@ -98,17 +95,14 @@ class ProfileWorkerStore(SQLBaseStore): ) txn.execute(sql, (BATCH_SIZE,)) return txn.rowcount - assigned = yield self.runInteraction("assign_profile_batch", f) - defer.returnValue(assigned) + return self.runInteraction("assign_profile_batch", f) - @defer.inlineCallbacks def get_replication_hosts(self): def f(txn): txn.execute("SELECT host, last_synced_batch FROM profile_replication_status") rows = self.cursor_to_dict(txn) return {r['host']: r['last_synced_batch'] for r in rows} - result = yield self.runInteraction("get_replication_hosts", f) - defer.returnValue(result) + return self.runInteraction("get_replication_hosts", f) def update_replication_batch_for_host(self, host, last_synced_batch): return self._simple_upsert( diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql index 7639ff22d5..e744c02fe8 100644 --- a/synapse/storage/schema/delta/48/profiles_batch.sql +++ b/synapse/storage/schema/delta/48/profiles_batch.sql @@ -13,10 +13,23 @@ * limitations under the License. */ +/* + * Add a batch number to track changes to profiles and the + * order they're made in so we can replicate user profiles + * to other hosts as they change + */ ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL; +/* + * Index on the batch number so we can get profiles + * by their batch + */ CREATE INDEX profiles_batch_idx ON profiles(batch); +/* + * A table to track what batch of user profiles has been + * synced to what profile replication target. + */ CREATE TABLE profile_replication_status ( host TEXT NOT NULL, last_synced_batch BIGINT NOT NULL diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 205190f8d6..8646c4e434 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -75,7 +75,7 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_name(self): yield self.store.set_profile_displayname( - self.frank.localpart, "Frank", 1 + self.frank.localpart, "Frank", 1, ) displayname = yield self.handler.get_displayname(self.frank) @@ -135,7 +135,7 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_my_avatar(self): yield self.store.set_profile_avatar_url( - self.frank.localpart, "http://my.server/me.png", 1 + self.frank.localpart, "http://my.server/me.png", 1, ) avatar_url = yield self.handler.get_avatar_url(self.frank) diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py index 6b0cc17010..1bfabc15ad 100644 --- a/tests/storage/test_profile.py +++ b/tests/storage/test_profile.py @@ -40,7 +40,7 @@ class ProfileStoreTestCase(unittest.TestCase): ) yield self.store.set_profile_displayname( - self.u_frank.localpart, "Frank", 1 + self.u_frank.localpart, "Frank", 1, ) self.assertEquals( @@ -55,7 +55,7 @@ class ProfileStoreTestCase(unittest.TestCase): ) yield self.store.set_profile_avatar_url( - self.u_frank.localpart, "http://my.site/here", 1 + self.u_frank.localpart, "http://my.site/here", 1, ) self.assertEquals( -- cgit 1.5.1