From 4dfbae18fe10c22a54421f211ad4a46a11777c16 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Fri, 1 Mar 2019 15:02:02 -0700 Subject: Use static locations for Riot icons See https://github.com/vector-im/riot-web/issues/9009 --- synapse/res/templates/notif.html | 6 +++--- synapse/res/templates/notif_mail.html | 2 +- synapse/res/templates/room.html | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/res/templates/notif.html b/synapse/res/templates/notif.html index 88b921ca9c..1a6c70b562 100644 --- a/synapse/res/templates/notif.html +++ b/synapse/res/templates/notif.html @@ -6,11 +6,11 @@ {% else %} {% if message.sender_hash % 3 == 0 %} - + {% elif message.sender_hash % 3 == 1 %} - + {% else %} - + {% endif %} {% endif %} {% endif %} diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html index fcdb3109fe..019506e5fb 100644 --- a/synapse/res/templates/notif_mail.html +++ b/synapse/res/templates/notif_mail.html @@ -19,7 +19,7 @@ {% if app_name == "Riot" %} - [Riot] + [Riot] {% elif app_name == "Vector" %} [Vector] {% else %} diff --git a/synapse/res/templates/room.html b/synapse/res/templates/room.html index 723c222d25..b8525fef88 100644 --- a/synapse/res/templates/room.html +++ b/synapse/res/templates/room.html @@ -5,11 +5,11 @@ {% else %} {% if room.hash % 3 == 0 %} - + {% elif room.hash % 3 == 1 %} - + {% else %} - + {% endif %} {% endif %} -- cgit 1.5.1 From 5f0c449dd50fa84ff741e09f34cad5330c6e4745 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 4 Mar 2019 13:56:49 +0000 Subject: Prevent replication wedging --- synapse/replication/tcp/protocol.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 49ae5b3355..a6df04d851 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): - """Subscribe the remote to a streams. + """Subscribe the remote to a stream. This invloves checking if they've missed anything and sending those updates down if they have. During that time new updates for the stream @@ -478,10 +478,30 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) + batch_updates = [] for token, update in pending_rdata: - # Only send updates newer than the current token - if token > current_token: - self.send_command(RdataCommand(stream_name, token, update)) + # If the token is null, it is part of a batch update. Batches + # are multiple updates that share a single token. To denote + # this, the token is set to None for all tokens in the batch + # except for the last. If we find a None token, we keep looking + # through tokens until we find one that is not None and then + # process all previous updates in the batch as if they had the + # final token. + if not token or len(batch_updates) > 0: + batch_updates.append(update) + if token and not token > current_token: + # This batch is older than current_token, dismiss + batch_updates = [] + continue + if token: + # Send all updates that are part of this batch with the + # found token + for update in batch_updates: + self.send_command(RdataCommand(stream_name, token, update)) + else: + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) # They're now fully subscribed self.replication_streams.add(stream_name) -- cgit 1.5.1 From 9f7cdf3da16e4e6c29229dcc80d9cf060cd64584 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 4 Mar 2019 14:36:52 +0000 Subject: Clearer branching, fix missing list clear --- synapse/replication/tcp/protocol.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index a6df04d851..53615b7ee3 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -488,16 +488,23 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # process all previous updates in the batch as if they had the # final token. if not token or len(batch_updates) > 0: - batch_updates.append(update) - if token and not token > current_token: + if token is None: + # Store this update as part of the batch + batch_updates.append(update) + elif current_token <= current_token: # This batch is older than current_token, dismiss batch_updates = [] - continue - if token: + else: + # Append final update of this batch before sending + batch_updates.append(update) + # Send all updates that are part of this batch with the # found token for update in batch_updates: self.send_command(RdataCommand(stream_name, token, update)) + + # Clear saved batch updates + batch_updates = [] else: # Only send updates newer than the current token if token > current_token: -- cgit 1.5.1 From fe7bd23a85988c5251fe17e78589b69f92f21dd7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 4 Mar 2019 15:08:15 +0000 Subject: Clean up logic and add comments --- synapse/replication/tcp/protocol.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 53615b7ee3..dac4fbeef7 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -487,15 +487,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # through tokens until we find one that is not None and then # process all previous updates in the batch as if they had the # final token. - if not token or len(batch_updates) > 0: - if token is None: - # Store this update as part of the batch - batch_updates.append(update) - elif current_token <= current_token: - # This batch is older than current_token, dismiss + if token is None: + # Store this update as part of a batch + batch_updates.append(update) + continue + + if len(batch_updates) > 0: + # There is an ongoing batch and this is the end + if current_token <= current_token: + # This batch is older than current_token, dismiss it batch_updates = [] else: - # Append final update of this batch before sending + # This is the end of the batch. Append final update of + # this batch before sending batch_updates.append(update) # Send all updates that are part of this batch with the @@ -505,10 +509,13 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Clear saved batch updates batch_updates = [] - else: - # Only send updates newer than the current token - if token > current_token: - self.send_command(RdataCommand(stream_name, token, update)) + continue + + # This is an update that's not part of a batch. + # + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) # They're now fully subscribed self.replication_streams.add(stream_name) -- cgit 1.5.1 From b9f61630927752422fb80cf7ece083741aefd399 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 5 Mar 2019 13:58:30 +0000 Subject: Simplify token replication logic --- synapse/replication/tcp/protocol.py | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) (limited to 'synapse') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index dac4fbeef7..55630ba9a7 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -478,7 +478,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) - batch_updates = [] + updates = [] for token, update in pending_rdata: # If the token is null, it is part of a batch update. Batches # are multiple updates that share a single token. To denote @@ -489,34 +489,25 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # final token. if token is None: # Store this update as part of a batch - batch_updates.append(update) + updates.append(update) continue - if len(batch_updates) > 0: - # There is an ongoing batch and this is the end - if current_token <= current_token: - # This batch is older than current_token, dismiss it - batch_updates = [] - else: - # This is the end of the batch. Append final update of - # this batch before sending - batch_updates.append(update) - - # Send all updates that are part of this batch with the - # found token - for update in batch_updates: - self.send_command(RdataCommand(stream_name, token, update)) - - # Clear saved batch updates - batch_updates = [] + if token <= current_token: + # This update or batch of updates is older than + # current_token, dismiss it + updates = [] continue - # This is an update that's not part of a batch. - # - # Only send updates newer than the current token - if token > current_token: + updates.append(update) + + # Send all updates that are part of this batch with the + # found token + for update in updates: self.send_command(RdataCommand(stream_name, token, update)) + # Clear stored updates + updates = [] + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: -- cgit 1.5.1 From 067ce795c06f3ac5ebc25e4d01624b076a972f76 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 5 Mar 2019 18:03:14 +0000 Subject: Move settings from registration to ratelimiting in config file --- synapse/config/ratelimiting.py | 18 ++++++++++++++++++ synapse/config/registration.py | 20 ++------------------ 2 files changed, 20 insertions(+), 18 deletions(-) (limited to 'synapse') diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 54b71e6841..093042fdb9 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -27,6 +27,13 @@ class RatelimitConfig(Config): self.federation_rc_reject_limit = config["federation_rc_reject_limit"] self.federation_rc_concurrent = config["federation_rc_concurrent"] + self.rc_registration_requests_per_second = config.get( + "rc_registration_requests_per_second", 0.17, + ) + self.rc_registration_request_burst_count = config.get( + "rc_registration_request_burst_count", 3, + ) + def default_config(self, **kwargs): return """\ ## Ratelimiting ## @@ -62,4 +69,15 @@ class RatelimitConfig(Config): # single server # federation_rc_concurrent: 3 + + # Number of registration requests a client can send per second. + # Defaults to 1/minute (0.17). + # + #rc_registration_requests_per_second: 0.17 + + # Number of registration requests a client can send before being + # throttled. + # Defaults to 3. + # + #rc_registration_request_burst_count: 3.0 """ diff --git a/synapse/config/registration.py b/synapse/config/registration.py index d32f6fff73..d34dc9e456 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -54,13 +54,6 @@ class RegistrationConfig(Config): config.get("disable_msisdn_registration", False) ) - self.rc_registration_requests_per_second = config.get( - "rc_registration_requests_per_second", 0.17, - ) - self.rc_registration_request_burst_count = config.get( - "rc_registration_request_burst_count", 3, - ) - def default_config(self, generate_secrets=False, **kwargs): if generate_secrets: registration_shared_secret = 'registration_shared_secret: "%s"' % ( @@ -71,6 +64,8 @@ class RegistrationConfig(Config): return """\ ## Registration ## + # Registration can be rate-limited using the parameters in the "Ratelimiting" + # section of this file. # Enable registration for new users. enable_registration: False @@ -147,17 +142,6 @@ class RegistrationConfig(Config): # users cannot be auto-joined since they do not exist. # autocreate_auto_join_rooms: true - - # Number of registration requests a client can send per second. - # Defaults to 1/minute (0.17). - # - #rc_registration_requests_per_second: 0.17 - - # Number of registration requests a client can send before being - # throttled. - # Defaults to 3. - # - #rc_registration_request_burst_count: 3.0 """ % locals() def add_arguments(self, parser): -- cgit 1.5.1 From d7dbad3526136cfc9fdbd568635be5016fb637db Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 5 Mar 2019 18:41:27 +0000 Subject: Split ratelimiters in two (one for events, one for registration) --- synapse/handlers/_base.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/register.py | 2 +- synapse/rest/client/v2_alpha/register.py | 2 +- synapse/server.py | 10 +++++++--- tests/handlers/test_profile.py | 2 +- tests/replication/slave/storage/_base.py | 2 +- tests/rest/client/v1/test_events.py | 2 +- tests/rest/client/v1/test_typing.py | 2 +- 9 files changed, 15 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d8d86d6ff3..a2212e2023 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -44,7 +44,7 @@ class BaseHandler(object): self.notifier = hs.get_notifier() self.state_handler = hs.get_state_handler() self.distributor = hs.get_distributor() - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_events_ratelimiter() self.clock = hs.get_clock() self.hs = hs diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c762b58902..120aa0d017 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -224,7 +224,7 @@ class EventCreationHandler(object): self.profile_handler = hs.get_profile_handler() self.event_builder_factory = hs.get_event_builder_factory() self.server_name = hs.hostname - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_events_ratelimiter() self.notifier = hs.get_notifier() self.config = hs.config diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 47d5e276f8..03130edc54 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -61,7 +61,7 @@ class RegistrationHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() self.captcha_client = CaptchaServerHttpClient(hs) self.identity_handler = self.hs.get_handlers().identity_handler - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_registration_ratelimiter() self._next_generated_user_id = None diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b7f354570c..6f34029431 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -196,7 +196,7 @@ class RegisterRestServlet(RestServlet): self.identity_handler = hs.get_handlers().identity_handler self.room_member_handler = hs.get_room_member_handler() self.macaroon_gen = hs.get_macaroon_generator() - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_registration_ratelimiter() self.clock = hs.get_clock() @interactive_auth_handler diff --git a/synapse/server.py b/synapse/server.py index 4323e7ff12..f3ca3e259a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -205,7 +205,8 @@ class HomeServer(object): self.clock = Clock(reactor) self.distributor = Distributor() - self.ratelimiter = Ratelimiter() + self.events_ratelimiter = Ratelimiter() + self.registration_ratelimiter = Ratelimiter() self.datastore = None @@ -248,8 +249,11 @@ class HomeServer(object): def get_distributor(self): return self.distributor - def get_ratelimiter(self): - return self.ratelimiter + def get_events_ratelimiter(self): + return self.events_ratelimiter + + def get_registration_ratelimiter(self): + return self.registration_ratelimiter def build_federation_client(self): return FederationClient(self) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index d60c124eec..905816a44b 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -58,7 +58,7 @@ class ProfileTestCase(unittest.TestCase): ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_events_ratelimiter() self.ratelimiter.can_do_action.return_value = (True, 0) self.store = hs.get_datastore() diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 524af4f8d1..b293e04355 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -34,7 +34,7 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) - hs.get_ratelimiter().can_do_action.return_value = (True, 0) + hs.get_events_ratelimiter().can_do_action.return_value = (True, 0) return hs diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index 36d8547275..cd328dc5f1 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -42,7 +42,7 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase): hs = self.setup_test_homeserver( config=config, ratelimiter=NonCallableMock(spec_set=["can_do_action"]) ) - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_events_ratelimiter() self.ratelimiter.can_do_action.return_value = (True, 0) hs.get_handlers().federation_handler = Mock() diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 30fb77bac8..2e2e314a49 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -47,7 +47,7 @@ class RoomTypingTestCase(unittest.HomeserverTestCase): self.event_source = hs.get_event_sources().sources["typing"] - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_events_ratelimiter() self.ratelimiter.can_do_action.return_value = (True, 0) hs.get_handlers().federation_handler = Mock() -- cgit 1.5.1 From f4195f41188928b8da9bed38c60e221466274a48 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 6 Mar 2019 10:55:22 +0000 Subject: Revert "Split ratelimiters in two (one for events, one for registration)" This reverts commit d7dbad3526136cfc9fdbd568635be5016fb637db. --- synapse/handlers/_base.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/register.py | 2 +- synapse/rest/client/v2_alpha/register.py | 2 +- synapse/server.py | 10 +++------- tests/handlers/test_profile.py | 2 +- tests/replication/slave/storage/_base.py | 2 +- tests/rest/client/v1/test_events.py | 2 +- tests/rest/client/v1/test_typing.py | 2 +- 9 files changed, 11 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index a2212e2023..d8d86d6ff3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -44,7 +44,7 @@ class BaseHandler(object): self.notifier = hs.get_notifier() self.state_handler = hs.get_state_handler() self.distributor = hs.get_distributor() - self.ratelimiter = hs.get_events_ratelimiter() + self.ratelimiter = hs.get_ratelimiter() self.clock = hs.get_clock() self.hs = hs diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 120aa0d017..c762b58902 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -224,7 +224,7 @@ class EventCreationHandler(object): self.profile_handler = hs.get_profile_handler() self.event_builder_factory = hs.get_event_builder_factory() self.server_name = hs.hostname - self.ratelimiter = hs.get_events_ratelimiter() + self.ratelimiter = hs.get_ratelimiter() self.notifier = hs.get_notifier() self.config = hs.config diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 03130edc54..47d5e276f8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -61,7 +61,7 @@ class RegistrationHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() self.captcha_client = CaptchaServerHttpClient(hs) self.identity_handler = self.hs.get_handlers().identity_handler - self.ratelimiter = hs.get_registration_ratelimiter() + self.ratelimiter = hs.get_ratelimiter() self._next_generated_user_id = None diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 6f34029431..b7f354570c 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -196,7 +196,7 @@ class RegisterRestServlet(RestServlet): self.identity_handler = hs.get_handlers().identity_handler self.room_member_handler = hs.get_room_member_handler() self.macaroon_gen = hs.get_macaroon_generator() - self.ratelimiter = hs.get_registration_ratelimiter() + self.ratelimiter = hs.get_ratelimiter() self.clock = hs.get_clock() @interactive_auth_handler diff --git a/synapse/server.py b/synapse/server.py index f3ca3e259a..4323e7ff12 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -205,8 +205,7 @@ class HomeServer(object): self.clock = Clock(reactor) self.distributor = Distributor() - self.events_ratelimiter = Ratelimiter() - self.registration_ratelimiter = Ratelimiter() + self.ratelimiter = Ratelimiter() self.datastore = None @@ -249,11 +248,8 @@ class HomeServer(object): def get_distributor(self): return self.distributor - def get_events_ratelimiter(self): - return self.events_ratelimiter - - def get_registration_ratelimiter(self): - return self.registration_ratelimiter + def get_ratelimiter(self): + return self.ratelimiter def build_federation_client(self): return FederationClient(self) diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 905816a44b..d60c124eec 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -58,7 +58,7 @@ class ProfileTestCase(unittest.TestCase): ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) - self.ratelimiter = hs.get_events_ratelimiter() + self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.can_do_action.return_value = (True, 0) self.store = hs.get_datastore() diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index b293e04355..524af4f8d1 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -34,7 +34,7 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) - hs.get_events_ratelimiter().can_do_action.return_value = (True, 0) + hs.get_ratelimiter().can_do_action.return_value = (True, 0) return hs diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index cd328dc5f1..36d8547275 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -42,7 +42,7 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase): hs = self.setup_test_homeserver( config=config, ratelimiter=NonCallableMock(spec_set=["can_do_action"]) ) - self.ratelimiter = hs.get_events_ratelimiter() + self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.can_do_action.return_value = (True, 0) hs.get_handlers().federation_handler = Mock() diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py index 2e2e314a49..30fb77bac8 100644 --- a/tests/rest/client/v1/test_typing.py +++ b/tests/rest/client/v1/test_typing.py @@ -47,7 +47,7 @@ class RoomTypingTestCase(unittest.HomeserverTestCase): self.event_source = hs.get_event_sources().sources["typing"] - self.ratelimiter = hs.get_events_ratelimiter() + self.ratelimiter = hs.get_ratelimiter() self.ratelimiter.can_do_action.return_value = (True, 0) hs.get_handlers().federation_handler = Mock() -- cgit 1.5.1 From 6f3cde8b2500aafad2438de7eddfc442ec5288c7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 6 Mar 2019 11:02:42 +0000 Subject: Make registration ratelimiter separate from the main events one --- synapse/handlers/register.py | 2 +- synapse/rest/client/v2_alpha/register.py | 2 +- synapse/server.py | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 47d5e276f8..03130edc54 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -61,7 +61,7 @@ class RegistrationHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() self.captcha_client = CaptchaServerHttpClient(hs) self.identity_handler = self.hs.get_handlers().identity_handler - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_registration_ratelimiter() self._next_generated_user_id = None diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b7f354570c..6f34029431 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -196,7 +196,7 @@ class RegisterRestServlet(RestServlet): self.identity_handler = hs.get_handlers().identity_handler self.room_member_handler = hs.get_room_member_handler() self.macaroon_gen = hs.get_macaroon_generator() - self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter = hs.get_registration_ratelimiter() self.clock = hs.get_clock() @interactive_auth_handler diff --git a/synapse/server.py b/synapse/server.py index 4323e7ff12..72835e8c86 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -206,6 +206,7 @@ class HomeServer(object): self.clock = Clock(reactor) self.distributor = Distributor() self.ratelimiter = Ratelimiter() + self.registration_ratelimiter = Ratelimiter() self.datastore = None @@ -251,6 +252,9 @@ class HomeServer(object): def get_ratelimiter(self): return self.ratelimiter + def get_registration_ratelimiter(self): + return self.registration_ratelimiter + def build_federation_client(self): return FederationClient(self) -- cgit 1.5.1 From 6d13bdec91e228a54a856ebe0e104062d96a4180 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Mar 2019 17:21:08 +0000 Subject: Add docstrings from matrix-org-hotfixes --- synapse/handlers/sync.py | 33 ++++++++++++++++++++++++++------- synapse/storage/stream.py | 19 +++++++++++++++++++ 2 files changed, 45 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bd97241ab4..42f514cd10 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1894,15 +1894,34 @@ def _calculate_state( class SyncResultBuilder(object): - "Used to help build up a new SyncResult for a user" + """Used to help build up a new SyncResult for a user + + Attributes: + sync_config (SyncConfig) + full_state (bool) + since_token (StreamToken) + now_token (StreamToken) + joined_room_ids (list[str]) + + # The following mirror the fields in a sync response + presence (list) + account_data (list) + joined (list[JoinedSyncResult]) + invited (list[InvitedSyncResult]) + archived (list[ArchivedSyncResult]) + device (list) + groups (GroupsSyncResult|None) + to_device (list) + """ def __init__(self, sync_config, full_state, since_token, now_token, joined_room_ids): """ Args: - sync_config(SyncConfig) - full_state(bool): The full_state flag as specified by user - since_token(StreamToken): The token supplied by user, or None. - now_token(StreamToken): The token to sync up to. + sync_config (SyncConfig) + full_state (bool): The full_state flag as specified by user + since_token (StreamToken): The token supplied by user, or None. + now_token (StreamToken): The token to sync up to. + joined_room_ids (list[str]): List of rooms the user is joined to """ self.sync_config = sync_config self.full_state = full_state @@ -1930,8 +1949,8 @@ class RoomSyncResultBuilder(object): Args: room_id(str) rtype(str): One of `"joined"` or `"archived"` - events(list): List of events to include in the room, (more events - may be added when generating result). + events(list[FrozenEvent]): List of events to include in the room + (more events may be added when generating result). newly_joined(bool): If the user has newly joined the room full_state(bool): Whether the full state should be sent in result since_token(StreamToken): Earliest point to return events from, or None diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d6cfdba519..580fafeb3a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -191,6 +191,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, order='DESC'): + """Get new room events in stream ordering since `from_key`. + + Args: + room_id (str) + from_key (str): Token from which no events are returned before + to_key (str): Token from which no events are returned after. (This + is typically the current stream token) + limit (int): Maximum number of events to return + order (str): Either "DESC" or "ASC". Determines which events are + returned when the result is limited. If "DESC" then the most + recent `limit` events are returned, otherwise returns the + oldest `limit` events. + + Returns: + Deferred[dict[str,tuple[list[FrozenEvent], str]]] + A map from room id to a tuple containing: + - list of recent events in the room + - stream ordering key for the start of the chunk of events returned. + """ from_id = RoomStreamToken.parse_stream_token(from_key).stream room_ids = yield self._events_stream_cache.get_entities_changed( -- cgit 1.5.1 From 8b7790e68f552748b0fe20455c766a2376c2fefd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Mar 2019 17:29:15 +0000 Subject: Port #4422 debug logging from hotfixes --- synapse/handlers/sync.py | 53 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bd97241ab4..32101eb36a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +# Debug logger for https://github.com/matrix-org/synapse/issues/4422 +issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") + # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -962,6 +965,15 @@ class SyncHandler(object): yield self._generate_sync_entry_for_groups(sync_result_builder) + # debug for https://github.com/matrix-org/synapse/issues/4422 + for joined_room in sync_result_builder.joined: + room_id = joined_room.room_id + if room_id in newly_joined_rooms: + issue4422_logger.debug( + "Sync result for newly joined room %s: %r", + room_id, joined_room, + ) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -1425,6 +1437,17 @@ class SyncHandler(object): old_mem_ev = yield self.store.get_event( old_mem_ev_id, allow_none=True ) + + # debug for #4422 + if has_join: + prev_membership = None + if old_mem_ev: + prev_membership = old_mem_ev.membership + issue4422_logger.debug( + "Previous membership for room %s with join: %s (event %s)", + room_id, prev_membership, old_mem_ev_id, + ) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) @@ -1519,30 +1542,39 @@ class SyncHandler(object): for room_id in sync_result_builder.joined_room_ids: room_entry = room_to_events.get(room_id, None) + newly_joined = room_id in newly_joined_rooms if room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace("room_key", start_key) - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=events, - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, - since_token=None if room_id in newly_joined_rooms else since_token, + since_token=None if newly_joined else since_token, upto_token=prev_batch_token, - )) + ) else: - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=[], - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, since_token=since_token, upto_token=since_token, - )) + ) + + if newly_joined: + # debugging for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "RoomSyncResultBuilder events for newly joined room %s: %r", + room_id, entry.events, + ) + room_entries.append(entry) defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @@ -1663,6 +1695,13 @@ class SyncHandler(object): newly_joined_room=newly_joined, ) + if newly_joined: + # debug for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "Timeline events after filtering in newly-joined room %s: %r", + room_id, batch, + ) + # When we join the room (or the client requests full_state), we should # send down any existing tags. Usually the user won't have tags in a # newly joined room, unless either a) they've joined before or b) the -- cgit 1.5.1 From b879870b2dc3e5cd1e8a9907209b5af66e32ddd2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Mar 2019 17:35:11 +0000 Subject: Send message after room has been shutdown Currently the explanation message is sent to the abuse room before any users are forced joined, which means it tends to get lost in the backlog of joins. So instead we send the message *after* we've forced joined everyone. --- synapse/rest/client/v1/admin.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) (limited to 'synapse') diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 0201cf1186..2a29f0c2af 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -488,17 +488,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - yield self.event_creation_handler.create_and_send_nonmember_event( - room_creator_requester, - { - "type": "m.room.message", - "content": {"body": message, "msgtype": "m.text"}, - "room_id": new_room_id, - "sender": new_room_user_id, - }, - ratelimit=False, - ) - requester_user_id = requester.user.to_string() logger.info("Shutting down room %r", room_id) @@ -536,6 +525,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): kicked_users.append(user_id) + yield self.event_creation_handler.create_and_send_nonmember_event( + room_creator_requester, + { + "type": "m.room.message", + "content": {"body": message, "msgtype": "m.text"}, + "room_id": new_room_id, + "sender": new_room_user_id, + }, + ratelimit=False, + ) + aliases_for_room = yield self.store.get_aliases_for_room(room_id) yield self.store.update_aliases_for_room( -- cgit 1.5.1 From face0c5b3c8ed6d0f29f7eaa3a2f9fd19eb99540 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Mar 2019 17:39:32 +0000 Subject: Prefill client IPs cache on workers --- synapse/replication/slave/storage/client_ips.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse') diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 60641f1a49..5b8521c770 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore): if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return + self.client_ip_last_seen.prefill(key, now) + self.hs.get_tcp_replication().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) -- cgit 1.5.1 From f6135d06cf94fdef9942051f43872c7518511e74 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 7 Mar 2019 01:22:53 -0800 Subject: Rewrite userdir to be faster (#4537) --- changelog.d/4537.feature | 1 + synapse/handlers/user_directory.py | 222 ++++---------------- synapse/storage/schema/delta/53/user_share.sql | 47 +++++ synapse/storage/user_directory.py | 271 ++++++++----------------- tests/handlers/test_user_directory.py | 266 ++++++++++++++++++++---- tests/storage/test_user_directory.py | 2 - 6 files changed, 400 insertions(+), 409 deletions(-) create mode 100644 changelog.d/4537.feature create mode 100644 synapse/storage/schema/delta/53/user_share.sql (limited to 'synapse') diff --git a/changelog.d/4537.feature b/changelog.d/4537.feature new file mode 100644 index 0000000000..8f792b8890 --- /dev/null +++ b/changelog.d/4537.feature @@ -0,0 +1 @@ +The user directory has been rewritten to make it faster, with less chance of falling behind on a large server. diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 283c6c1b81..c21da8343a 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -15,7 +15,7 @@ import logging -from six import iteritems +from six import iteritems, iterkeys from twisted.internet import defer @@ -63,10 +63,6 @@ class UserDirectoryHandler(object): # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() - self.initially_handled_users_in_public = set() - - self.initially_handled_users_share = set() - self.initially_handled_users_share_private_room = set() # The current position in the current_state_delta stream self.pos = None @@ -140,7 +136,6 @@ class UserDirectoryHandler(object): # FIXME(#3714): We should probably do this in the same worker as all # the other changes. yield self.store.remove_from_user_dir(user_id) - yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _unsafe_process(self): @@ -215,15 +210,13 @@ class UserDirectoryHandler(object): logger.info("Processed all users") self.initially_handled_users = None - self.initially_handled_users_in_public = None - self.initially_handled_users_share = None - self.initially_handled_users_share_private_room = None yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks def _handle_initial_room(self, room_id): - """Called when we initially fill out user_directory one room at a time + """ + Called when we initially fill out user_directory one room at a time """ is_in_room = yield self.store.is_host_joined(room_id, self.server_name) if not is_in_room: @@ -238,23 +231,15 @@ class UserDirectoryHandler(object): unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( - room_id, {user_id: users_with_profile[user_id] for user_id in unhandled_users}, ) self.initially_handled_users |= unhandled_users - if is_public: - yield self.store.add_users_to_public_room( - room_id, user_ids=user_ids - self.initially_handled_users_in_public - ) - self.initially_handled_users_in_public |= user_ids - # We now go and figure out the new users who share rooms with user entries # We sleep aggressively here as otherwise it can starve resources. # We also batch up inserts/updates, but try to avoid too many at once. to_insert = set() - to_update = set() count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: @@ -277,21 +262,7 @@ class UserDirectoryHandler(object): count += 1 user_set = (user_id, other_user_id) - - if user_set in self.initially_handled_users_share_private_room: - continue - - if user_set in self.initially_handled_users_share: - if is_public: - continue - to_update.add(user_set) - else: - to_insert.add(user_set) - - if is_public: - self.initially_handled_users_share.add(user_set) - else: - self.initially_handled_users_share_private_room.add(user_set) + to_insert.add(user_set) if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.add_users_who_share_room( @@ -299,22 +270,10 @@ class UserDirectoryHandler(object): ) to_insert.clear() - if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - to_update.clear() - if to_insert: yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) to_insert.clear() - if to_update: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - to_update.clear() - @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -356,6 +315,7 @@ class UserDirectoryHandler(object): user_ids = yield self.store.get_users_in_dir_due_to_room( room_id ) + for user_id in user_ids: yield self._handle_remove_user(room_id, user_id) return @@ -436,14 +396,20 @@ class UserDirectoryHandler(object): # ignore the change return - if change: - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in iteritems(users_with_profile): - yield self._handle_new_user(room_id, user_id, profile) - else: - users = yield self.store.get_users_in_public_due_to_room(room_id) - for user_id in users: - yield self._handle_remove_user(room_id, user_id) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # Remove every user from the sharing tables for that room. + for user_id in iterkeys(users_with_profile): + yield self.store.remove_user_who_share_room(user_id, room_id) + + # Then, re-add them to the tables. + # NOTE: this is not the most efficient method, as handle_new_user sets + # up local_user -> other_user and other_user_whos_local -> local_user, + # which when ran over an entire room, will result in the same values + # being added multiple times. The batching upserts shouldn't make this + # too bad, though. + for user_id, profile in iteritems(users_with_profile): + yield self._handle_new_user(room_id, user_id, profile) @defer.inlineCallbacks def _handle_local_user(self, user_id): @@ -457,7 +423,7 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_directory(user_id) if not row: - yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) + yield self.store.add_profiles_to_user_dir({user_id: profile}) @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): @@ -471,55 +437,27 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_directory(user_id) if not row: - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + yield self.store.add_profiles_to_user_dir({user_id: profile}) is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) - - if is_public: - row = yield self.store.get_user_in_public_room(user_id) - if not row: - yield self.store.add_users_to_public_room(room_id, [user_id]) - else: - logger.debug("Not adding new user to public dir, %r", user_id) - - # Now we update users who share rooms with users. We do this by getting - # all the current users in the room and seeing which aren't already - # marked in the database as sharing with `user_id` - + # Now we update users who share rooms with users. users_with_profile = yield self.state.get_current_user_in_room(room_id) to_insert = set() - to_update = set() - - is_appservice = self.store.get_if_app_services_interested_in_user(user_id) # First, if they're our user then we need to update for every user - if self.is_mine_id(user_id) and not is_appservice: - # Returns a map of other_user_id -> shared_private. We only need - # to update mappings if for users that either don't share a room - # already (aren't in the map) or, if the room is private, those that - # only share a public room. - user_ids_shared = yield self.store.get_users_who_share_room_from_dir( - user_id - ) + if self.is_mine_id(user_id): - for other_user_id in users_with_profile: - if user_id == other_user_id: - continue + is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + + # We don't care about appservice users. + if not is_appservice: + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue - shared_is_private = user_ids_shared.get(other_user_id) - if shared_is_private is True: - # We've already marked in the database they share a private room - continue - elif shared_is_private is False: - # They already share a public room, so only update if this is - # a private room - if not is_public: - to_update.add((user_id, other_user_id)) - elif shared_is_private is None: - # This is the first time they both share a room to_insert.add((user_id, other_user_id)) # Next we need to update for every local user in the room @@ -531,29 +469,11 @@ class UserDirectoryHandler(object): other_user_id ) if self.is_mine_id(other_user_id) and not is_appservice: - shared_is_private = yield self.store.get_if_users_share_a_room( - other_user_id, user_id - ) - if shared_is_private is True: - # We've already marked in the database they share a private room - continue - elif shared_is_private is False: - # They already share a public room, so only update if this is - # a private room - if not is_public: - to_update.add((other_user_id, user_id)) - elif shared_is_private is None: - # This is the first time they both share a room - to_insert.add((other_user_id, user_id)) + to_insert.add((other_user_id, user_id)) if to_insert: yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) - if to_update: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): """Called when we might need to remove user to directory @@ -562,84 +482,16 @@ class UserDirectoryHandler(object): room_id (str): room_id that user left or stopped being public that user_id (str) """ - logger.debug("Maybe removing user %r", user_id) - - row = yield self.store.get_user_in_directory(user_id) - update_user_dir = row and row["room_id"] == room_id - - row = yield self.store.get_user_in_public_room(user_id) - update_user_in_public = row and row["room_id"] == room_id - - if update_user_in_public or update_user_dir: - # XXX: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: - break - - is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name - ) - - if not is_in_room: - continue - - if update_user_dir: - update_user_dir = False - yield self.store.update_user_in_user_dir(user_id, j_room_id) + logger.debug("Removing user %r", user_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + # Remove user from sharing tables + yield self.store.remove_user_who_share_room(user_id, room_id) - if update_user_in_public and is_public: - yield self.store.update_user_in_public_user_list(user_id, j_room_id) - update_user_in_public = False + # Are they still in a room with members? If not, remove them entirely. + users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id) - if update_user_dir: + if len(users_in_room_with) == 0: yield self.store.remove_from_user_dir(user_id) - elif update_user_in_public: - yield self.store.remove_from_user_in_public_room(user_id) - - # Now handle users_who_share_rooms. - - # Get a list of user tuples that were in the DB due to this room and - # users (this includes tuples where the other user matches `user_id`) - user_tuples = yield self.store.get_users_in_share_dir_with_room_id( - user_id, room_id - ) - - for user_id, other_user_id in user_tuples: - # For each user tuple get a list of rooms that they still share, - # trying to find a private room, and update the entry in the DB - rooms = yield self.store.get_rooms_in_common_for_users( - user_id, other_user_id - ) - - # If they dont share a room anymore, remove the mapping - if not rooms: - yield self.store.remove_user_who_share_room(user_id, other_user_id) - continue - - found_public_share = None - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: - found_public_share = j_room_id - else: - found_public_share = None - yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] - ) - break - - if found_public_share: - yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] - ) @defer.inlineCallbacks def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): diff --git a/synapse/storage/schema/delta/53/user_share.sql b/synapse/storage/schema/delta/53/user_share.sql new file mode 100644 index 0000000000..14424ded0c --- /dev/null +++ b/synapse/storage/schema/delta/53/user_share.sql @@ -0,0 +1,47 @@ +/* Copyright 2017 Vector Creations Ltd, 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Old disused version of the tables below. +DROP TABLE IF EXISTS users_who_share_rooms; + +-- This is no longer used because it's duplicated by the users_who_share_public_rooms +DROP TABLE IF EXISTS users_in_public_rooms; + +-- Tables keeping track of what users share rooms. This is a map of local users +-- to local or remote users, per room. Remote users cannot be in the user_id +-- column, only the other_user_id column. There are two tables, one for public +-- rooms and those for private rooms. +CREATE TABLE IF NOT EXISTS users_who_share_public_rooms ( + user_id TEXT NOT NULL, + other_user_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS users_who_share_private_rooms ( + user_id TEXT NOT NULL, + other_user_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX users_who_share_public_rooms_u_idx ON users_who_share_public_rooms(user_id, other_user_id, room_id); +CREATE INDEX users_who_share_public_rooms_r_idx ON users_who_share_public_rooms(room_id); +CREATE INDEX users_who_share_public_rooms_o_idx ON users_who_share_public_rooms(other_user_id); + +CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id); +CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id); +CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id); + +-- Make sure that we populate the tables initially by resetting the stream ID +UPDATE user_directory_stream_pos SET stream_id = NULL; diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index fea866c043..2317d22ed6 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -63,31 +63,14 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) - @defer.inlineCallbacks - def add_users_to_public_room(self, room_id, user_ids): - """Add user to the list of users in public rooms - - Args: - room_id (str): A room_id that all users are in that is world_readable - or publically joinable - user_ids (list(str)): Users to add - """ - yield self._simple_insert_many( - table="users_in_public_rooms", - values=[{"user_id": user_id, "room_id": room_id} for user_id in user_ids], - desc="add_users_to_public_room", - ) - for user_id in user_ids: - self.get_user_in_public_room.invalidate((user_id,)) - - def add_profiles_to_user_dir(self, room_id, users_with_profile): + def add_profiles_to_user_dir(self, users_with_profile): """Add profiles to the user directory Args: - room_id (str): A room_id that all users are joined to users_with_profile (dict): Users to add to directory in the form of mapping of user_id -> ProfileInfo """ + if isinstance(self.database_engine, PostgresEngine): # We weight the loclpart most highly, then display name and finally # server name @@ -113,7 +96,7 @@ class UserDirectoryStore(SQLBaseStore): INSERT INTO user_directory_search(user_id, value) VALUES (?,?) """ - args = ( + args = tuple( ( user_id, "%s %s" % (user_id, p.display_name) if p.display_name else user_id, @@ -132,7 +115,7 @@ class UserDirectoryStore(SQLBaseStore): values=[ { "user_id": user_id, - "room_id": room_id, + "room_id": None, "display_name": profile.display_name, "avatar_url": profile.avatar_url, } @@ -250,16 +233,6 @@ class UserDirectoryStore(SQLBaseStore): "update_profile_in_user_dir", _update_profile_in_user_dir_txn ) - @defer.inlineCallbacks - def update_user_in_public_user_list(self, user_id, room_id): - yield self._simple_update_one( - table="users_in_public_rooms", - keyvalues={"user_id": user_id}, - updatevalues={"room_id": room_id}, - desc="update_user_in_public_user_list", - ) - self.get_user_in_public_room.invalidate((user_id,)) - def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): self._simple_delete_txn( @@ -269,62 +242,50 @@ class UserDirectoryStore(SQLBaseStore): txn, table="user_directory_search", keyvalues={"user_id": user_id} ) self._simple_delete_txn( - txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} + txn, + table="users_who_share_public_rooms", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_public_rooms", + keyvalues={"other_user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"other_user_id": user_id}, ) txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) - txn.call_after(self.get_user_in_public_room.invalidate, (user_id,)) return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn) - @defer.inlineCallbacks - def remove_from_user_in_public_room(self, user_id): - yield self._simple_delete( - table="users_in_public_rooms", - keyvalues={"user_id": user_id}, - desc="remove_from_user_in_public_room", - ) - self.get_user_in_public_room.invalidate((user_id,)) - - def get_users_in_public_due_to_room(self, room_id): - """Get all user_ids that are in the room directory because they're - in the given room_id - """ - return self._simple_select_onecol( - table="users_in_public_rooms", - keyvalues={"room_id": room_id}, - retcol="user_id", - desc="get_users_in_public_due_to_room", - ) - @defer.inlineCallbacks def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory because they're in the given room_id """ - user_ids_dir = yield self._simple_select_onecol( - table="user_directory", - keyvalues={"room_id": room_id}, - retcol="user_id", - desc="get_users_in_dir_due_to_room", - ) - - user_ids_pub = yield self._simple_select_onecol( - table="users_in_public_rooms", + user_ids_share_pub = yield self._simple_select_onecol( + table="users_who_share_public_rooms", keyvalues={"room_id": room_id}, - retcol="user_id", + retcol="other_user_id", desc="get_users_in_dir_due_to_room", ) - user_ids_share = yield self._simple_select_onecol( - table="users_who_share_rooms", + user_ids_share_priv = yield self._simple_select_onecol( + table="users_who_share_private_rooms", keyvalues={"room_id": room_id}, - retcol="user_id", + retcol="other_user_id", desc="get_users_in_dir_due_to_room", ) - user_ids = set(user_ids_dir) - user_ids.update(user_ids_pub) - user_ids.update(user_ids_share) + user_ids = set(user_ids_share_pub) + user_ids.update(user_ids_share_priv) defer.returnValue(user_ids) @@ -351,7 +312,7 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue([name for name, in rows]) def add_users_who_share_room(self, room_id, share_private, user_id_tuples): - """Insert entries into the users_who_share_rooms table. The first + """Insert entries into the users_who_share_*_rooms table. The first user should be a local user. Args: @@ -361,109 +322,71 @@ class UserDirectoryStore(SQLBaseStore): """ def _add_users_who_share_room_txn(txn): - self._simple_insert_many_txn( + + if share_private: + tbl = "users_who_share_private_rooms" + else: + tbl = "users_who_share_public_rooms" + + self._simple_upsert_many_txn( txn, - table="users_who_share_rooms", - values=[ - { - "user_id": user_id, - "other_user_id": other_user_id, - "room_id": room_id, - "share_private": share_private, - } + table=tbl, + key_names=["user_id", "other_user_id", "room_id"], + key_values=[ + (user_id, other_user_id, room_id) for user_id, other_user_id in user_id_tuples ], + value_names=(), + value_values=None, ) for user_id, other_user_id in user_id_tuples: txn.call_after( self.get_users_who_share_room_from_dir.invalidate, (user_id,) ) - txn.call_after( - self.get_if_users_share_a_room.invalidate, (user_id, other_user_id) - ) return self.runInteraction( "add_users_who_share_room", _add_users_who_share_room_txn ) - def update_users_who_share_room(self, room_id, share_private, user_id_sets): - """Updates entries in the users_who_share_rooms table. The first - user should be a local user. - - Args: - room_id (str) - share_private (bool): Is the room private - user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + def remove_user_who_share_room(self, user_id, room_id): """ - - def _update_users_who_share_room_txn(txn): - sql = """ - UPDATE users_who_share_rooms - SET room_id = ?, share_private = ? - WHERE user_id = ? AND other_user_id = ? - """ - txn.executemany( - sql, ((room_id, share_private, uid, oid) for uid, oid in user_id_sets) - ) - for user_id, other_user_id in user_id_sets: - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) - ) - txn.call_after( - self.get_if_users_share_a_room.invalidate, (user_id, other_user_id) - ) - - return self.runInteraction( - "update_users_who_share_room", _update_users_who_share_room_txn - ) - - def remove_user_who_share_room(self, user_id, other_user_id): - """Deletes entries in the users_who_share_rooms table. The first + Deletes entries in the users_who_share_*_rooms table. The first user should be a local user. Args: + user_id (str) room_id (str) - share_private (bool): Is the room private - user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. """ def _remove_user_who_share_room_txn(txn): self._simple_delete_txn( txn, - table="users_who_share_rooms", - keyvalues={"user_id": user_id, "other_user_id": other_user_id}, + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id, "room_id": room_id}, ) - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"other_user_id": user_id, "room_id": room_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_public_rooms", + keyvalues={"user_id": user_id, "room_id": room_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_public_rooms", + keyvalues={"other_user_id": user_id, "room_id": room_id}, ) txn.call_after( - self.get_if_users_share_a_room.invalidate, (user_id, other_user_id) + self.get_users_who_share_room_from_dir.invalidate, (user_id,) ) return self.runInteraction( "remove_user_who_share_room", _remove_user_who_share_room_txn ) - @cached(max_entries=500000) - def get_if_users_share_a_room(self, user_id, other_user_id): - """Gets if users share a room. - - Args: - user_id (str): Must be a local user_id - other_user_id (str) - - Returns: - bool|None: None if they don't share a room, otherwise whether they - share a private room or not. - """ - return self._simple_select_one_onecol( - table="users_who_share_rooms", - keyvalues={"user_id": user_id, "other_user_id": other_user_id}, - retcol="share_private", - allow_none=True, - desc="get_if_users_share_a_room", - ) - @cachedInlineCallbacks(max_entries=500000, iterable=True) def get_users_who_share_room_from_dir(self, user_id): """Returns the set of users who share a room with `user_id` @@ -472,32 +395,29 @@ class UserDirectoryStore(SQLBaseStore): user_id(str): Must be a local user Returns: - dict: user_id -> share_private mapping + list: user_id """ - rows = yield self._simple_select_list( - table="users_who_share_rooms", + rows = yield self._simple_select_onecol( + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id}, + retcol="other_user_id", + desc="get_users_who_share_room_with_user", + ) + + pub_rows = yield self._simple_select_onecol( + table="users_who_share_public_rooms", keyvalues={"user_id": user_id}, - retcols=("other_user_id", "share_private"), + retcol="other_user_id", desc="get_users_who_share_room_with_user", ) - defer.returnValue({row["other_user_id"]: row["share_private"] for row in rows}) + users = set(pub_rows) + users.update(rows) - def get_users_in_share_dir_with_room_id(self, user_id, room_id): - """Get all user tuples that are in the users_who_share_rooms due to the - given room_id. + # Remove the user themselves from this list. + users.discard(user_id) - Returns: - [(user_id, other_user_id)]: where one of the two will match the given - user_id. - """ - sql = """ - SELECT user_id, other_user_id FROM users_who_share_rooms - WHERE room_id = ? AND (user_id = ? OR other_user_id = ?) - """ - return self._execute( - "get_users_in_share_dir_with_room_id", None, sql, room_id, user_id, user_id - ) + defer.returnValue(list(users)) @defer.inlineCallbacks def get_rooms_in_common_for_users(self, user_id, other_user_id): @@ -532,12 +452,10 @@ class UserDirectoryStore(SQLBaseStore): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") - txn.execute("DELETE FROM users_in_public_rooms") - txn.execute("DELETE FROM users_who_share_rooms") + txn.execute("DELETE FROM users_who_share_public_rooms") + txn.execute("DELETE FROM users_who_share_private_rooms") txn.call_after(self.get_user_in_directory.invalidate_all) - txn.call_after(self.get_user_in_public_room.invalidate_all) txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) - txn.call_after(self.get_if_users_share_a_room.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -548,21 +466,11 @@ class UserDirectoryStore(SQLBaseStore): return self._simple_select_one( table="user_directory", keyvalues={"user_id": user_id}, - retcols=("room_id", "display_name", "avatar_url"), + retcols=("display_name", "avatar_url"), allow_none=True, desc="get_user_in_directory", ) - @cached() - def get_user_in_public_room(self, user_id): - return self._simple_select_one( - table="users_in_public_rooms", - keyvalues={"user_id": user_id}, - retcols=("room_id",), - allow_none=True, - desc="get_user_in_public_room", - ) - def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -660,14 +568,15 @@ class UserDirectoryStore(SQLBaseStore): where_clause = "1=1" else: join_clause = """ - LEFT JOIN users_in_public_rooms AS p USING (user_id) LEFT JOIN ( - SELECT other_user_id AS user_id FROM users_who_share_rooms - WHERE user_id = ? AND share_private - ) AS s USING (user_id) + SELECT other_user_id AS user_id FROM users_who_share_public_rooms + UNION + SELECT other_user_id AS user_id FROM users_who_share_private_rooms + WHERE user_id = ? + ) AS p USING (user_id) """ join_args = (user_id,) - where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)" + where_clause = "p.user_id IS NOT NULL" if isinstance(self.database_engine, PostgresEngine): full_query, exact_query, prefix_query = _parse_query_postgres(search_term) @@ -686,7 +595,7 @@ class UserDirectoryStore(SQLBaseStore): %s AND vector @@ to_tsquery('english', ?) ORDER BY - (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) + (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END) * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END) * ( diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 11f2bae698..a16a2dc67b 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -14,78 +14,262 @@ # limitations under the License. from mock import Mock -from twisted.internet import defer - from synapse.api.constants import UserTypes -from synapse.handlers.user_directory import UserDirectoryHandler +from synapse.rest.client.v1 import admin, login, room from synapse.storage.roommember import ProfileInfo from tests import unittest -from tests.utils import setup_test_homeserver -class UserDirectoryHandlers(object): - def __init__(self, hs): - self.user_directory_handler = UserDirectoryHandler(hs) +class UserDirectoryTestCase(unittest.HomeserverTestCase): + """ + Tests the UserDirectoryHandler. + """ + servlets = [ + login.register_servlets, + admin.register_servlets, + room.register_servlets, + ] -class UserDirectoryTestCase(unittest.TestCase): - """ Tests the UserDirectoryHandler. """ + def make_homeserver(self, reactor, clock): - @defer.inlineCallbacks - def setUp(self): - hs = yield setup_test_homeserver(self.addCleanup) - self.store = hs.get_datastore() - hs.handlers = UserDirectoryHandlers(hs) + config = self.default_config() + config.update_user_directory = True + return self.setup_test_homeserver(config=config) - self.handler = hs.get_handlers().user_directory_handler + def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + self.handler = hs.get_user_directory_handler() - @defer.inlineCallbacks def test_handle_local_profile_change_with_support_user(self): support_user_id = "@support:test" - yield self.store.register( - user_id=support_user_id, - token="123", - password_hash=None, - user_type=UserTypes.SUPPORT + self.get_success( + self.store.register( + user_id=support_user_id, + token="123", + password_hash=None, + user_type=UserTypes.SUPPORT, + ) ) - yield self.handler.handle_local_profile_change(support_user_id, None) - profile = yield self.store.get_user_in_directory(support_user_id) + self.get_success( + self.handler.handle_local_profile_change(support_user_id, None) + ) + profile = self.get_success(self.store.get_user_in_directory(support_user_id)) self.assertTrue(profile is None) display_name = 'display_name' - profile_info = ProfileInfo( - avatar_url='avatar_url', - display_name=display_name, - ) + profile_info = ProfileInfo(avatar_url='avatar_url', display_name=display_name) regular_user_id = '@regular:test' - yield self.handler.handle_local_profile_change(regular_user_id, profile_info) - profile = yield self.store.get_user_in_directory(regular_user_id) + self.get_success( + self.handler.handle_local_profile_change(regular_user_id, profile_info) + ) + profile = self.get_success(self.store.get_user_in_directory(regular_user_id)) self.assertTrue(profile['display_name'] == display_name) - @defer.inlineCallbacks def test_handle_user_deactivated_support_user(self): s_user_id = "@support:test" - self.store.register( - user_id=s_user_id, - token="123", - password_hash=None, - user_type=UserTypes.SUPPORT + self.get_success( + self.store.register( + user_id=s_user_id, + token="123", + password_hash=None, + user_type=UserTypes.SUPPORT, + ) ) self.store.remove_from_user_dir = Mock() self.store.remove_from_user_in_public_room = Mock() - yield self.handler.handle_user_deactivated(s_user_id) + self.get_success(self.handler.handle_user_deactivated(s_user_id)) self.store.remove_from_user_dir.not_called() self.store.remove_from_user_in_public_room.not_called() - @defer.inlineCallbacks def test_handle_user_deactivated_regular_user(self): r_user_id = "@regular:test" - self.store.register(user_id=r_user_id, token="123", password_hash=None) + self.get_success( + self.store.register(user_id=r_user_id, token="123", password_hash=None) + ) self.store.remove_from_user_dir = Mock() - self.store.remove_from_user_in_public_room = Mock() - yield self.handler.handle_user_deactivated(r_user_id) + self.get_success(self.handler.handle_user_deactivated(r_user_id)) self.store.remove_from_user_dir.called_once_with(r_user_id) - self.store.remove_from_user_in_public_room.assert_called_once_with(r_user_id) + + def test_private_room(self): + """ + A user can be searched for only by people that are either in a public + room, or that share a private chat. + """ + u1 = self.register_user("user1", "pass") + u1_token = self.login(u1, "pass") + u2 = self.register_user("user2", "pass") + u2_token = self.login(u2, "pass") + u3 = self.register_user("user3", "pass") + + # We do not add users to the directory until they join a room. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 0) + + room = self.helper.create_room_as(u1, is_public=False, tok=u1_token) + self.helper.invite(room, src=u1, targ=u2, tok=u1_token) + self.helper.join(room, user=u2, tok=u2_token) + + # Check we have populated the database correctly. + shares_public = self.get_users_who_share_public_rooms() + shares_private = self.get_users_who_share_private_rooms() + + self.assertEqual(shares_public, []) + self.assertEqual( + self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + ) + + # We get one search result when searching for user2 by user1. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 1) + + # We get NO search results when searching for user2 by user3. + s = self.get_success(self.handler.search_users(u3, "user2", 10)) + self.assertEqual(len(s["results"]), 0) + + # We get NO search results when searching for user3 by user1. + s = self.get_success(self.handler.search_users(u1, "user3", 10)) + self.assertEqual(len(s["results"]), 0) + + # User 2 then leaves. + self.helper.leave(room, user=u2, tok=u2_token) + + # Check we have removed the values. + shares_public = self.get_users_who_share_public_rooms() + shares_private = self.get_users_who_share_private_rooms() + + self.assertEqual(shares_public, []) + self.assertEqual(self._compress_shared(shares_private), set()) + + # User1 now gets no search results for any of the other users. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 0) + + s = self.get_success(self.handler.search_users(u1, "user3", 10)) + self.assertEqual(len(s["results"]), 0) + + def _compress_shared(self, shared): + """ + Compress a list of users who share rooms dicts to a list of tuples. + """ + r = set() + for i in shared: + r.add((i["user_id"], i["other_user_id"], i["room_id"])) + return r + + def get_users_who_share_public_rooms(self): + return self.get_success( + self.store._simple_select_list( + "users_who_share_public_rooms", + None, + ["user_id", "other_user_id", "room_id"], + ) + ) + + def get_users_who_share_private_rooms(self): + return self.get_success( + self.store._simple_select_list( + "users_who_share_private_rooms", + None, + ["user_id", "other_user_id", "room_id"], + ) + ) + + def test_initial(self): + """ + The user directory's initial handler correctly updates the search tables. + """ + u1 = self.register_user("user1", "pass") + u1_token = self.login(u1, "pass") + u2 = self.register_user("user2", "pass") + u2_token = self.login(u2, "pass") + u3 = self.register_user("user3", "pass") + u3_token = self.login(u3, "pass") + + room = self.helper.create_room_as(u1, is_public=True, tok=u1_token) + self.helper.invite(room, src=u1, targ=u2, tok=u1_token) + self.helper.join(room, user=u2, tok=u2_token) + + private_room = self.helper.create_room_as(u1, is_public=False, tok=u1_token) + self.helper.invite(private_room, src=u1, targ=u3, tok=u1_token) + self.helper.join(private_room, user=u3, tok=u3_token) + + self.get_success(self.store.update_user_directory_stream_pos(None)) + self.get_success(self.store.delete_all_from_user_dir()) + + shares_public = self.get_users_who_share_public_rooms() + shares_private = self.get_users_who_share_private_rooms() + + self.assertEqual(shares_private, []) + self.assertEqual(shares_public, []) + + # Reset the handled users caches + self.handler.initially_handled_users = set() + + # Do the initial population + d = self.handler._do_initial_spam() + + # This takes a while, so pump it a bunch of times to get through the + # sleep delays + for i in range(10): + self.pump(1) + + self.get_success(d) + + shares_public = self.get_users_who_share_public_rooms() + shares_private = self.get_users_who_share_private_rooms() + + # User 1 and User 2 share public rooms + self.assertEqual( + self._compress_shared(shares_public), set([(u1, u2, room), (u2, u1, room)]) + ) + + # User 1 and User 3 share private rooms + self.assertEqual( + self._compress_shared(shares_private), + set([(u1, u3, private_room), (u3, u1, private_room)]), + ) + + def test_search_all_users(self): + """ + Search all users = True means that a user does not have to share a + private room with the searching user or be in a public room to be search + visible. + """ + self.handler.search_all_users = True + self.hs.config.user_directory_search_all_users = True + + u1 = self.register_user("user1", "pass") + u1_token = self.login(u1, "pass") + u2 = self.register_user("user2", "pass") + u2_token = self.login(u2, "pass") + u3 = self.register_user("user3", "pass") + + # User 1 and User 2 join a room. User 3 never does. + room = self.helper.create_room_as(u1, is_public=True, tok=u1_token) + self.helper.invite(room, src=u1, targ=u2, tok=u1_token) + self.helper.join(room, user=u2, tok=u2_token) + + self.get_success(self.store.update_user_directory_stream_pos(None)) + self.get_success(self.store.delete_all_from_user_dir()) + + # Reset the handled users caches + self.handler.initially_handled_users = set() + + # Do the initial population + d = self.handler._do_initial_spam() + + # This takes a while, so pump it a bunch of times to get through the + # sleep delays + for i in range(10): + self.pump(1) + + self.get_success(d) + + # Despite not sharing a room, search_all_users means we get a search + # result. + s = self.get_success(self.handler.search_users(u1, u3, 10)) + self.assertEqual(len(s["results"]), 1) diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py index 0dde1ab2fe..a2a652a235 100644 --- a/tests/storage/test_user_directory.py +++ b/tests/storage/test_user_directory.py @@ -35,14 +35,12 @@ class UserDirectoryStoreTestCase(unittest.TestCase): # alice and bob are both in !room_id. bobby is not but shares # a homeserver with alice. yield self.store.add_profiles_to_user_dir( - "!room:id", { ALICE: ProfileInfo(None, "alice"), BOB: ProfileInfo(None, "bob"), BOBBY: ProfileInfo(None, "bobby"), }, ) - yield self.store.add_users_to_public_room("!room:id", [ALICE, BOB]) yield self.store.add_users_who_share_room( "!room:id", False, ((ALICE, BOB), (BOB, ALICE)) ) -- cgit 1.5.1 From c633fc02d72e325ab9689f3f27edb86ef93cec0c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Mar 2019 15:53:14 +0000 Subject: Add some debug logging for device list handling --- synapse/handlers/device.py | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c09a7c6280..03644a93cc 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -402,6 +402,12 @@ class DeviceHandler(DeviceWorkerHandler): user_id, device_ids, list(hosts) ) + for device_id in device_ids: + logger.debug( + "Notifying about update %r/%r, ID: %r", user_id, device_id, + position, + ) + room_ids = yield self.store.get_rooms_for_user(user_id) yield self.notifier.on_new_event( @@ -409,7 +415,7 @@ class DeviceHandler(DeviceWorkerHandler): ) if hosts: - logger.info("Sending device list update notif to: %r", hosts) + logger.info("Sending device list update notif for %r to: %r", user_id, hosts) for host in hosts: self.federation_sender.send_device_messages(host) @@ -479,15 +485,26 @@ class DeviceListEduUpdater(object): if get_domain_from_id(user_id) != origin: # TODO: Raise? - logger.warning("Got device list update edu for %r from %r", user_id, origin) + logger.warning( + "Got device list update edu for %r/%r from %r", + user_id, device_id, origin, + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + logger.warning( + "Got device list update edu for %r/%r, but don't share a room", + user_id, device_id, + ) return + logger.debug( + "Received device list update for %r/%r", user_id, device_id, + ) + self._pending_updates.setdefault(user_id, []).append( (device_id, stream_id, prev_ids, edu_content) ) @@ -505,10 +522,18 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + for device_id, stream_id, prev_ids, content in pending_updates: + logger.debug( + "Handling update %r/%r, ID: %r, prev: %r ", + user_id, device_id, stream_id, prev_ids, + ) + # Given a list of updates we check if we need to resync. This # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) + logger.debug("Need to re-sync devices for %r? %r", user_id, resync) + if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) @@ -561,6 +586,12 @@ class DeviceListEduUpdater(object): ) devices = [] + for device in devices: + logger.debug( + "Handling resync update %r/%r, ID: %r", + user_id, device["device_id"], stream_id, + ) + yield self.store.update_remote_device_list_cache( user_id, devices, stream_id, ) @@ -593,6 +624,11 @@ class DeviceListEduUpdater(object): user_id ) + logger.debug( + "Current extremity for %r: %r", + user_id, extremity, + ) + stream_id_in_updates = set() # stream_ids in updates list for _, stream_id, prev_ids, _ in updates: if not prev_ids: -- cgit 1.5.1 From d42b41544a3d8950f2a804703aa4ad311e9feddd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Mar 2019 16:04:24 +0000 Subject: When re-syncing device lists reset the state We keep track of what stream IDs we've seen so that we know what updates we've handled or missed. If we re-sync we don't know if the updates we've seen are included in the re-sync (there may be a race), so we should reset the seen updates. --- synapse/handlers/device.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c09a7c6280..00f12ba40d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -566,6 +566,10 @@ class DeviceListEduUpdater(object): ) device_ids = [device["device_id"] for device in devices] yield self.device_handler.notify_device_update(user_id, device_ids) + + # We clobber the seen updates since we've re-synced from a given + # point. + self._seen_updates[user_id] = set([stream_id]) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) @@ -578,9 +582,9 @@ class DeviceListEduUpdater(object): user_id, [device_id for device_id, _, _, _ in pending_updates] ) - self._seen_updates.setdefault(user_id, set()).update( - stream_id for _, stream_id, _, _ in pending_updates - ) + self._seen_updates.setdefault(user_id, set()).update( + stream_id for _, stream_id, _, _ in pending_updates + ) @defer.inlineCallbacks def _need_to_do_resync(self, user_id, updates): -- cgit 1.5.1