diff options
Diffstat (limited to 'synapse')
59 files changed, 1928 insertions, 1489 deletions
diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index 3bb5b3da37..ad68079eeb 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -23,12 +23,13 @@ class Ratelimiter(object): def __init__(self): self.message_counts = collections.OrderedDict() - def send_message(self, user_id, time_now_s, msg_rate_hz, burst_count, update=True): - """Can the user send a message? + def can_do_action(self, key, time_now_s, rate_hz, burst_count, update=True): + """Can the entity (e.g. user or IP address) perform the action? Args: - user_id: The user sending a message. + key: The key we should use when rate limiting. Can be a user ID + (when sending events), an IP address, etc. time_now_s: The time now. - msg_rate_hz: The long term number of messages a user can send in a + rate_hz: The long term number of messages a user can send in a second. burst_count: How many messages the user can send before being limited. @@ -41,10 +42,10 @@ class Ratelimiter(object): """ self.prune_message_counts(time_now_s) message_count, time_start, _ignored = self.message_counts.get( - user_id, (0., time_now_s, None), + key, (0., time_now_s, None), ) time_delta = time_now_s - time_start - sent_count = message_count - time_delta * msg_rate_hz + sent_count = message_count - time_delta * rate_hz if sent_count < 0: allowed = True time_start = time_now_s @@ -56,13 +57,13 @@ class Ratelimiter(object): message_count += 1 if update: - self.message_counts[user_id] = ( - message_count, time_start, msg_rate_hz + self.message_counts[key] = ( + message_count, time_start, rate_hz ) - if msg_rate_hz > 0: + if rate_hz > 0: time_allowed = ( - time_start + (message_count - burst_count + 1) / msg_rate_hz + time_start + (message_count - burst_count + 1) / rate_hz ) if time_allowed < time_now_s: time_allowed = time_now_s @@ -72,12 +73,12 @@ class Ratelimiter(object): return allowed, time_allowed def prune_message_counts(self, time_now_s): - for user_id in list(self.message_counts.keys()): - message_count, time_start, msg_rate_hz = ( - self.message_counts[user_id] + for key in list(self.message_counts.keys()): + message_count, time_start, rate_hz = ( + self.message_counts[key] ) time_delta = time_now_s - time_start - if message_count - time_delta * msg_rate_hz > 0: + if message_count - time_delta * rate_hz > 0: break else: - del self.message_counts[user_id] + del self.message_counts[key] diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 043b48f8f3..beaea64a61 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -33,9 +33,13 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore @@ -48,6 +52,8 @@ from synapse.rest.client.v1.room import ( RoomMemberListRestServlet, RoomStateRestServlet, ) +from synapse.rest.client.v2_alpha.account import ThreepidRestServlet +from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -60,6 +66,10 @@ logger = logging.getLogger("synapse.app.client_reader") class ClientReaderSlavedStore( + SlavedDeviceInboxStore, + SlavedDeviceStore, + SlavedReceiptsStore, + SlavedPushRuleStore, SlavedAccountDataStore, SlavedEventStore, SlavedKeyStore, @@ -96,6 +106,9 @@ class ClientReaderServer(HomeServer): RoomEventContextServlet(self).register(resource) RegisterRestServlet(self).register(resource) LoginRestServlet(self).register(resource) + ThreepidRestServlet(self).register(resource) + KeyQueryServlet(self).register(resource) + KeyChangesServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index b116c17669..7da79dc827 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -21,7 +21,7 @@ from twisted.web.resource import NoResource import synapse from synapse import events -from synapse.api.urls import FEDERATION_PREFIX +from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig @@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer): ), }) + if name in ["keys", "federation"]: + resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self) + root_resource = create_resource_tree(resources, NoResource()) _base.listen_tcp( diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 5aec43b702..c4d3087fa4 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -180,9 +180,7 @@ class Config(object): Returns: str: the yaml config file """ - default_config = "# vim:ft=yaml\n" - - default_config += "\n\n".join( + default_config = "\n\n".join( dedent(conf) for conf in self.invoke_all( "default_config", @@ -297,19 +295,26 @@ class Config(object): "Must specify a server_name to a generate config for." " Pass -H server.name." ) + + config_str = obj.generate_config( + config_dir_path=config_dir_path, + data_dir_path=os.getcwd(), + server_name=server_name, + report_stats=(config_args.report_stats == "yes"), + generate_secrets=True, + ) + if not cls.path_exists(config_dir_path): os.makedirs(config_dir_path) with open(config_path, "w") as config_file: - config_str = obj.generate_config( - config_dir_path=config_dir_path, - data_dir_path=os.getcwd(), - server_name=server_name, - report_stats=(config_args.report_stats == "yes"), - generate_secrets=True, + config_file.write( + "# vim:ft=yaml\n\n" ) - config = yaml.load(config_str) - obj.invoke_all("generate_files", config) config_file.write(config_str) + + config = yaml.load(config_str) + obj.invoke_all("generate_files", config) + print( ( "A config file has been generated in %r for server name" diff --git a/synapse/config/database.py b/synapse/config/database.py index c8890147a6..63e9cb63f8 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -49,7 +49,8 @@ class DatabaseConfig(Config): def default_config(self, data_dir_path, **kwargs): database_path = os.path.join(data_dir_path, "homeserver.db") return """\ - # Database configuration + ## Database ## + database: # The database engine name name: "sqlite3" diff --git a/synapse/config/logger.py b/synapse/config/logger.py index f6940b65fd..464c28c2d9 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -81,7 +81,9 @@ class LoggingConfig(Config): def default_config(self, config_dir_path, server_name, **kwargs): log_config = os.path.join(config_dir_path, server_name + ".log.config") - return """ + return """\ + ## Logging ## + # A yaml python logging config file # log_config: "%(log_config)s" diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 54b71e6841..093042fdb9 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -27,6 +27,13 @@ class RatelimitConfig(Config): self.federation_rc_reject_limit = config["federation_rc_reject_limit"] self.federation_rc_concurrent = config["federation_rc_concurrent"] + self.rc_registration_requests_per_second = config.get( + "rc_registration_requests_per_second", 0.17, + ) + self.rc_registration_request_burst_count = config.get( + "rc_registration_request_burst_count", 3, + ) + def default_config(self, **kwargs): return """\ ## Ratelimiting ## @@ -62,4 +69,15 @@ class RatelimitConfig(Config): # single server # federation_rc_concurrent: 3 + + # Number of registration requests a client can send per second. + # Defaults to 1/minute (0.17). + # + #rc_registration_requests_per_second: 0.17 + + # Number of registration requests a client can send before being + # throttled. + # Defaults to 3. + # + #rc_registration_request_burst_count: 3.0 """ diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 2881482f96..a123f25a68 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -64,6 +64,8 @@ class RegistrationConfig(Config): return """\ ## Registration ## + # Registration can be rate-limited using the parameters in the "Ratelimiting" + # section of this file. # Enable registration for new users. enable_registration: False @@ -90,8 +92,8 @@ class RegistrationConfig(Config): # - medium: msisdn # pattern: '\\+44' - # If set, allows registration by anyone who also has the shared - # secret, even if registration is otherwise disabled. + # If set, allows registration of standard or admin accounts by anyone who + # has the shared secret, even if registration is otherwise disabled. # %(registration_shared_secret)s diff --git a/synapse/config/server.py b/synapse/config/server.py index 4200f10da3..35a322fee0 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -260,9 +260,11 @@ class ServerConfig(Config): # This is used by remote servers to connect to this server, # e.g. matrix.org, localhost:8080, etc. # This is also the last part of your UserID. + # server_name: "%(server_name)s" # When running as a daemon, the file to store the pid in + # pid_file: %(pid_file)s # CPU affinity mask. Setting this restricts the CPUs on which the @@ -304,9 +306,11 @@ class ServerConfig(Config): # Set the soft limit on the number of file descriptors synapse can use # Zero is used to indicate synapse should set the soft limit to the # hard limit. + # soft_file_limit: 0 # Set to false to disable presence tracking on this homeserver. + # use_presence: true # The GC threshold parameters to pass to `gc.set_threshold`, if defined diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 7474fd515f..0207cd989a 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -686,9 +686,9 @@ def _handle_key_deferred(verify_request): try: with PreserveLoggingContext(): _, key_id, verify_key = yield verify_request.deferred - except (IOError, RequestSendFailed) as e: + except KeyLookupError as e: logger.warn( - "Got IOError when downloading keys for %s: %s %s", + "Failed to download keys for %s: %s %s", server_name, type(e).__name__, str(e), ) raise SynapseError( diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 20c1ab4203..fafa135182 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -77,6 +77,20 @@ class _EventInternalMetadata(object): """ return getattr(self, "recheck_redaction", False) + def is_soft_failed(self): + """Whether the event has been soft failed. + + Soft failed events should be handled as usual, except: + 1. They should not go down sync or event streams, or generally + sent to clients. + 2. They should not be added to the forward extremities (and + therefore not to current state). + + Returns: + bool + """ + return getattr(self, "soft_failed", False) + def _event_dict_property(key): # We want to be able to use hasattr with the event dict properties. @@ -127,7 +141,6 @@ class EventBase(object): origin = _event_dict_property("origin") origin_server_ts = _event_dict_property("origin_server_ts") prev_events = _event_dict_property("prev_events") - prev_state = _event_dict_property("prev_state") redacts = _event_dict_property("redacts") room_id = _event_dict_property("room_id") sender = _event_dict_property("sender") diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 569eb277a9..81f3b4b1ff 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -886,6 +886,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): def on_edu(self, edu_type, origin, content): """Overrides FederationHandlerRegistry """ + if not self.config.use_presence and edu_type == "m.presence": + return + handler = self.edu_handlers.get(edu_type) if handler: return super(ReplicationFederationHandlerRegistry, self).on_edu( diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6f5995735a..b7d0b25781 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -159,8 +159,12 @@ class FederationRemoteSendQueue(object): # stream. pass - def send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu(self, destination, edu_type, content, key=None): """As per TransactionQueue""" + if destination == self.server_name: + logger.info("Not sending EDU to ourselves") + return + pos = self._next_pos() edu = Edu( @@ -465,15 +469,11 @@ def process_rows_for_federation(transaction_queue, rows): for destination, edu_map in iteritems(buff.keyed_edus): for key, edu in edu_map.items(): - transaction_queue.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) + transaction_queue.send_edu(edu, key) for destination, edu_list in iteritems(buff.edus): for edu in edu_list: - transaction_queue.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) + transaction_queue.send_edu(edu, None) for destination in buff.device_destinations: transaction_queue.send_device_messages(destination) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 30941f5ad6..e5e42c647d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -361,7 +361,19 @@ class TransactionQueue(object): self._attempt_new_transaction(destination) - def send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu(self, destination, edu_type, content, key=None): + """Construct an Edu object, and queue it for sending + + Args: + destination (str): name of server to send to + edu_type (str): type of EDU to send + content (dict): content of EDU + key (Any|None): clobbering key for this edu + """ + if destination == self.server_name: + logger.info("Not sending EDU to ourselves") + return + edu = Edu( origin=self.server_name, destination=destination, @@ -369,18 +381,23 @@ class TransactionQueue(object): content=content, ) - if destination == self.server_name: - logger.info("Not sending EDU to ourselves") - return + self.send_edu(edu, key) + + def send_edu(self, edu, key): + """Queue an EDU for sending + Args: + edu (Edu): edu to send + key (Any|None): clobbering key for this edu + """ if key: self.pending_edus_keyed_by_dest.setdefault( - destination, {} + edu.destination, {} )[(edu.edu_type, key)] = edu else: - self.pending_edus_by_dest.setdefault(destination, []).append(edu) + self.pending_edus_by_dest.setdefault(edu.destination, []).append(edu) - self._attempt_new_transaction(destination) + self._attempt_new_transaction(edu.destination) def send_device_messages(self, destination): if destination == self.server_name: diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 8e2be218e2..4e8919d657 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -167,7 +167,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - path = _create_v1_path("/send/%s/", transaction.transaction_id) + path = _create_v1_path("/send/%s", transaction.transaction_id) response = yield self.client.put_json( transaction.destination, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 5ba94be2ec..efb6bdca48 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -312,7 +312,7 @@ class BaseFederationServlet(object): class FederationSendServlet(BaseFederationServlet): - PATH = "/send/(?P<transaction_id>[^/]*)/" + PATH = "/send/(?P<transaction_id>[^/]*)/?" def __init__(self, handler, server_name, **kwargs): super(FederationSendServlet, self).__init__( @@ -378,7 +378,7 @@ class FederationSendServlet(BaseFederationServlet): class FederationEventServlet(BaseFederationServlet): - PATH = "/event/(?P<event_id>[^/]*)/" + PATH = "/event/(?P<event_id>[^/]*)/?" # This is when someone asks for a data item for a given server data_id pair. def on_GET(self, origin, content, query, event_id): @@ -386,30 +386,30 @@ class FederationEventServlet(BaseFederationServlet): class FederationStateServlet(BaseFederationServlet): - PATH = "/state/(?P<context>[^/]*)/" + PATH = "/state/(?P<context>[^/]*)/?" # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): return self.handler.on_context_state_request( origin, context, - parse_string_from_args(query, "event_id", None), + parse_string_from_args(query, "event_id", None, required=True), ) class FederationStateIdsServlet(BaseFederationServlet): - PATH = "/state_ids/(?P<room_id>[^/]*)/" + PATH = "/state_ids/(?P<room_id>[^/]*)/?" def on_GET(self, origin, content, query, room_id): return self.handler.on_state_ids_request( origin, room_id, - parse_string_from_args(query, "event_id", None), + parse_string_from_args(query, "event_id", None, required=True), ) class FederationBackfillServlet(BaseFederationServlet): - PATH = "/backfill/(?P<context>[^/]*)/" + PATH = "/backfill/(?P<context>[^/]*)/?" def on_GET(self, origin, content, query, context): versions = [x.decode('ascii') for x in query[b"v"]] @@ -759,7 +759,7 @@ class FederationVersionServlet(BaseFederationServlet): class FederationGroupsProfileServlet(BaseFederationServlet): """Get/set the basic profile of a group on behalf of a user """ - PATH = "/groups/(?P<group_id>[^/]*)/profile$" + PATH = "/groups/(?P<group_id>[^/]*)/profile" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -787,7 +787,7 @@ class FederationGroupsProfileServlet(BaseFederationServlet): class FederationGroupsSummaryServlet(BaseFederationServlet): - PATH = "/groups/(?P<group_id>[^/]*)/summary$" + PATH = "/groups/(?P<group_id>[^/]*)/summary" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -805,7 +805,7 @@ class FederationGroupsSummaryServlet(BaseFederationServlet): class FederationGroupsRoomsServlet(BaseFederationServlet): """Get the rooms in a group on behalf of a user """ - PATH = "/groups/(?P<group_id>[^/]*)/rooms$" + PATH = "/groups/(?P<group_id>[^/]*)/rooms" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -823,7 +823,7 @@ class FederationGroupsRoomsServlet(BaseFederationServlet): class FederationGroupsAddRoomsServlet(BaseFederationServlet): """Add/remove room from group """ - PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)$" + PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, room_id): @@ -855,7 +855,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): """ PATH = ( "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)" - "/config/(?P<config_key>[^/]*)$" + "/config/(?P<config_key>[^/]*)" ) @defer.inlineCallbacks @@ -874,7 +874,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): class FederationGroupsUsersServlet(BaseFederationServlet): """Get the users in a group on behalf of a user """ - PATH = "/groups/(?P<group_id>[^/]*)/users$" + PATH = "/groups/(?P<group_id>[^/]*)/users" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -892,7 +892,7 @@ class FederationGroupsUsersServlet(BaseFederationServlet): class FederationGroupsInvitedUsersServlet(BaseFederationServlet): """Get the users that have been invited to a group """ - PATH = "/groups/(?P<group_id>[^/]*)/invited_users$" + PATH = "/groups/(?P<group_id>[^/]*)/invited_users" @defer.inlineCallbacks def on_GET(self, origin, content, query, group_id): @@ -910,7 +910,7 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet): class FederationGroupsInviteServlet(BaseFederationServlet): """Ask a group server to invite someone to the group """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -928,7 +928,7 @@ class FederationGroupsInviteServlet(BaseFederationServlet): class FederationGroupsAcceptInviteServlet(BaseFederationServlet): """Accept an invitation from the group server """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -945,7 +945,7 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet): class FederationGroupsJoinServlet(BaseFederationServlet): """Attempt to join a group """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -962,7 +962,7 @@ class FederationGroupsJoinServlet(BaseFederationServlet): class FederationGroupsRemoveUserServlet(BaseFederationServlet): """Leave or kick a user from the group """ - PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$" + PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -980,7 +980,7 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): class FederationGroupsLocalInviteServlet(BaseFederationServlet): """A group server has invited a local user """ - PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite$" + PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -997,7 +997,7 @@ class FederationGroupsLocalInviteServlet(BaseFederationServlet): class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): """A group server has removed a local user """ - PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove$" + PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -1014,7 +1014,7 @@ class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): """A group or user's server renews their attestation """ - PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)$" + PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)" @defer.inlineCallbacks def on_POST(self, origin, content, query, group_id, user_id): @@ -1037,7 +1037,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): PATH = ( "/groups/(?P<group_id>[^/]*)/summary" "(/categories/(?P<category_id>[^/]+))?" - "/rooms/(?P<room_id>[^/]*)$" + "/rooms/(?P<room_id>[^/]*)" ) @defer.inlineCallbacks @@ -1080,7 +1080,7 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet): """Get all categories for a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/categories/$" + "/groups/(?P<group_id>[^/]*)/categories/?" ) @defer.inlineCallbacks @@ -1100,7 +1100,7 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): """Add/remove/get a category in a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)$" + "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)" ) @defer.inlineCallbacks @@ -1150,7 +1150,7 @@ class FederationGroupsRolesServlet(BaseFederationServlet): """Get roles in a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/roles/$" + "/groups/(?P<group_id>[^/]*)/roles/?" ) @defer.inlineCallbacks @@ -1170,7 +1170,7 @@ class FederationGroupsRoleServlet(BaseFederationServlet): """Add/remove/get a role in a group """ PATH = ( - "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)$" + "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)" ) @defer.inlineCallbacks @@ -1226,7 +1226,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): PATH = ( "/groups/(?P<group_id>[^/]*)/summary" "(/roles/(?P<role_id>[^/]+))?" - "/users/(?P<user_id>[^/]*)$" + "/users/(?P<user_id>[^/]*)" ) @defer.inlineCallbacks @@ -1269,7 +1269,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): """Get roles in a group """ PATH = ( - "/get_groups_publicised$" + "/get_groups_publicised" ) @defer.inlineCallbacks @@ -1284,7 +1284,7 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): """Sets whether a group is joinable without an invite or knock """ - PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$" + PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy" @defer.inlineCallbacks def on_PUT(self, origin, content, query, group_id): diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 594754cfd8..d8d86d6ff3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -93,9 +93,9 @@ class BaseHandler(object): messages_per_second = self.hs.config.rc_messages_per_second burst_count = self.hs.config.rc_message_burst_count - allowed, time_allowed = self.ratelimiter.send_message( + allowed, time_allowed = self.ratelimiter.can_do_action( user_id, time_now, - msg_rate_hz=messages_per_second, + rate_hz=messages_per_second, burst_count=burst_count, update=update, ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c708c35d4d..b398848079 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -37,13 +37,185 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) -class DeviceHandler(BaseHandler): +class DeviceWorkerHandler(BaseHandler): def __init__(self, hs): - super(DeviceHandler, self).__init__(hs) + super(DeviceWorkerHandler, self).__init__(hs) self.hs = hs self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """ + Retrieve the given user's devices + + Args: + user_id (str): + Returns: + defer.Deferred: list[dict[str, X]]: info on each device + """ + + device_map = yield self.store.get_devices_by_user(user_id) + + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id=None + ) + + devices = list(device_map.values()) + for device in devices: + _update_device_from_client_ips(device, ips) + + defer.returnValue(devices) + + @defer.inlineCallbacks + def get_device(self, user_id, device_id): + """ Retrieve the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: dict[str, X]: info on the device + Raises: + errors.NotFoundError: if the device was not found + """ + try: + device = yield self.store.get_device(user_id, device_id) + except errors.StoreError: + raise errors.NotFoundError + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id, + ) + _update_device_from_client_ips(device, ips) + defer.returnValue(device) + + @measure_func("device.get_user_ids_changed") + @defer.inlineCallbacks + def get_user_ids_changed(self, user_id, from_token): + """Get list of users that have had the devices updated, or have newly + joined a room, that `user_id` may be interested in. + + Args: + user_id (str) + from_token (StreamToken) + """ + now_room_key = yield self.store.get_room_events_max_id() + + room_ids = yield self.store.get_rooms_for_user(user_id) + + # First we check if any devices have changed + changed = yield self.store.get_user_whose_devices_changed( + from_token.device_list_key + ) + + # Then work out if any users have since joined + rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + + member_events = yield self.store.get_membership_changes_for_user( + user_id, from_token.room_key, now_room_key, + ) + rooms_changed.update(event.room_id for event in member_events) + + stream_ordering = RoomStreamToken.parse_stream_token( + from_token.room_key + ).stream + + possibly_changed = set(changed) + possibly_left = set() + for room_id in rooms_changed: + current_state_ids = yield self.store.get_current_state_ids(room_id) + + # The user may have left the room + # TODO: Check if they actually did or if we were just invited. + if room_id not in room_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_left.add(state_key) + continue + + # Fetch the current state at the time. + try: + event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering=stream_ordering + ) + except errors.StoreError: + # we have purged the stream_ordering index since the stream + # ordering: treat it the same as a new room + event_ids = [] + + # special-case for an empty prev state: include all members + # in the changed list + if not event_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + continue + + current_member_id = current_state_ids.get((EventTypes.Member, user_id)) + if not current_member_id: + continue + + # mapping from event_id -> state_dict + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + + # Check if we've joined the room? If so we just blindly add all the users to + # the "possibly changed" users. + for state_dict in itervalues(prev_state_ids): + member_event = state_dict.get((EventTypes.Member, user_id), None) + if not member_event or member_event != current_member_id: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + break + + # If there has been any change in membership, include them in the + # possibly changed list. We'll check if they are joined below, + # and we're not toooo worried about spuriously adding users. + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + + # check if this member has changed since any of the extremities + # at the stream_ordering, and add them to the list if so. + for state_dict in itervalues(prev_state_ids): + prev_event_id = state_dict.get(key, None) + if not prev_event_id or prev_event_id != event_id: + if state_key != user_id: + possibly_changed.add(state_key) + break + + if possibly_changed or possibly_left: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = (possibly_changed | possibly_left) - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) + + +class DeviceHandler(DeviceWorkerHandler): + def __init__(self, hs): + super(DeviceHandler, self).__init__(hs) + self.federation_sender = hs.get_federation_sender() self._edu_updater = DeviceListEduUpdater(hs, self) @@ -104,52 +276,6 @@ class DeviceHandler(BaseHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") @defer.inlineCallbacks - def get_devices_by_user(self, user_id): - """ - Retrieve the given user's devices - - Args: - user_id (str): - Returns: - defer.Deferred: list[dict[str, X]]: info on each device - """ - - device_map = yield self.store.get_devices_by_user(user_id) - - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id=None - ) - - devices = list(device_map.values()) - for device in devices: - _update_device_from_client_ips(device, ips) - - defer.returnValue(devices) - - @defer.inlineCallbacks - def get_device(self, user_id, device_id): - """ Retrieve the given device - - Args: - user_id (str): - device_id (str): - - Returns: - defer.Deferred: dict[str, X]: info on the device - Raises: - errors.NotFoundError: if the device was not found - """ - try: - device = yield self.store.get_device(user_id, device_id) - except errors.StoreError: - raise errors.NotFoundError - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id, - ) - _update_device_from_client_ips(device, ips) - defer.returnValue(device) - - @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -276,6 +402,12 @@ class DeviceHandler(BaseHandler): user_id, device_ids, list(hosts) ) + for device_id in device_ids: + logger.debug( + "Notifying about update %r/%r, ID: %r", user_id, device_id, + position, + ) + room_ids = yield self.store.get_rooms_for_user(user_id) yield self.notifier.on_new_event( @@ -283,130 +415,10 @@ class DeviceHandler(BaseHandler): ) if hosts: - logger.info("Sending device list update notif to: %r", hosts) + logger.info("Sending device list update notif for %r to: %r", user_id, hosts) for host in hosts: self.federation_sender.send_device_messages(host) - @measure_func("device.get_user_ids_changed") - @defer.inlineCallbacks - def get_user_ids_changed(self, user_id, from_token): - """Get list of users that have had the devices updated, or have newly - joined a room, that `user_id` may be interested in. - - Args: - user_id (str) - from_token (StreamToken) - """ - now_token = yield self.hs.get_event_sources().get_current_token() - - room_ids = yield self.store.get_rooms_for_user(user_id) - - # First we check if any devices have changed - changed = yield self.store.get_user_whose_devices_changed( - from_token.device_list_key - ) - - # Then work out if any users have since joined - rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) - - member_events = yield self.store.get_membership_changes_for_user( - user_id, from_token.room_key, now_token.room_key - ) - rooms_changed.update(event.room_id for event in member_events) - - stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key - ).stream - - possibly_changed = set(changed) - possibly_left = set() - for room_id in rooms_changed: - current_state_ids = yield self.store.get_current_state_ids(room_id) - - # The user may have left the room - # TODO: Check if they actually did or if we were just invited. - if room_id not in room_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_left.add(state_key) - continue - - # Fetch the current state at the time. - try: - event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_ordering=stream_ordering - ) - except errors.StoreError: - # we have purged the stream_ordering index since the stream - # ordering: treat it the same as a new room - event_ids = [] - - # special-case for an empty prev state: include all members - # in the changed list - if not event_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - continue - - current_member_id = current_state_ids.get((EventTypes.Member, user_id)) - if not current_member_id: - continue - - # mapping from event_id -> state_dict - prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) - - # Check if we've joined the room? If so we just blindly add all the users to - # the "possibly changed" users. - for state_dict in itervalues(prev_state_ids): - member_event = state_dict.get((EventTypes.Member, user_id), None) - if not member_event or member_event != current_member_id: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - break - - # If there has been any change in membership, include them in the - # possibly changed list. We'll check if they are joined below, - # and we're not toooo worried about spuriously adding users. - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - - # check if this member has changed since any of the extremities - # at the stream_ordering, and add them to the list if so. - for state_dict in itervalues(prev_state_ids): - prev_event_id = state_dict.get(key, None) - if not prev_event_id or prev_event_id != event_id: - if state_key != user_id: - possibly_changed.add(state_key) - break - - if possibly_changed or possibly_left: - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - possibly_joined = possibly_changed & users_who_share_room - possibly_left = (possibly_changed | possibly_left) - users_who_share_room - else: - possibly_joined = [] - possibly_left = [] - - defer.returnValue({ - "changed": list(possibly_joined), - "left": list(possibly_left), - }) - @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) @@ -473,15 +485,26 @@ class DeviceListEduUpdater(object): if get_domain_from_id(user_id) != origin: # TODO: Raise? - logger.warning("Got device list update edu for %r from %r", user_id, origin) + logger.warning( + "Got device list update edu for %r/%r from %r", + user_id, device_id, origin, + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + logger.warning( + "Got device list update edu for %r/%r, but don't share a room", + user_id, device_id, + ) return + logger.debug( + "Received device list update for %r/%r", user_id, device_id, + ) + self._pending_updates.setdefault(user_id, []).append( (device_id, stream_id, prev_ids, edu_content) ) @@ -499,10 +522,18 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + for device_id, stream_id, prev_ids, content in pending_updates: + logger.debug( + "Handling update %r/%r, ID: %r, prev: %r ", + user_id, device_id, stream_id, prev_ids, + ) + # Given a list of updates we check if we need to resync. This # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) + logger.debug("Need to re-sync devices for %r? %r", user_id, resync) + if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) @@ -555,11 +586,21 @@ class DeviceListEduUpdater(object): ) devices = [] + for device in devices: + logger.debug( + "Handling resync update %r/%r, ID: %r", + user_id, device["device_id"], stream_id, + ) + yield self.store.update_remote_device_list_cache( user_id, devices, stream_id, ) device_ids = [device["device_id"] for device in devices] yield self.device_handler.notify_device_update(user_id, device_ids) + + # We clobber the seen updates since we've re-synced from a given + # point. + self._seen_updates[user_id] = set([stream_id]) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) @@ -572,9 +613,9 @@ class DeviceListEduUpdater(object): user_id, [device_id for device_id, _, _, _ in pending_updates] ) - self._seen_updates.setdefault(user_id, set()).update( - stream_id for _, stream_id, _, _ in pending_updates - ) + self._seen_updates.setdefault(user_id, set()).update( + stream_id for _, stream_id, _, _ in pending_updates + ) @defer.inlineCallbacks def _need_to_do_resync(self, user_id, updates): @@ -587,6 +628,11 @@ class DeviceListEduUpdater(object): user_id ) + logger.debug( + "Current extremity for %r: %r", + user_id, extremity, + ) + stream_id_in_updates = set() # stream_ids in updates list for _, stream_id, prev_ids, _ in updates: if not prev_ids: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f80486102a..9eaf2d3e18 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -45,6 +45,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.crypto.event_signing import compute_event_signature +from synapse.event_auth import auth_types_for_event from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, @@ -858,6 +859,52 @@ class FederationHandler(BaseHandler): logger.debug("Not backfilling as no extremeties found.") return + # We only want to paginate if we can actually see the events we'll get, + # as otherwise we'll just spend a lot of resources to get redacted + # events. + # + # We do this by filtering all the backwards extremities and seeing if + # any remain. Given we don't have the extremity events themselves, we + # need to actually check the events that reference them. + # + # *Note*: the spec wants us to keep backfilling until we reach the start + # of the room in case we are allowed to see some of the history. However + # in practice that causes more issues than its worth, as a) its + # relatively rare for there to be any visible history and b) even when + # there is its often sufficiently long ago that clients would stop + # attempting to paginate before backfill reached the visible history. + # + # TODO: If we do do a backfill then we should filter the backwards + # extremities to only include those that point to visible portions of + # history. + # + # TODO: Correctly handle the case where we are allowed to see the + # forward event but not the backward extremity, e.g. in the case of + # initial join of the server where we are allowed to see the join + # event but not anything before it. This would require looking at the + # state *before* the event, ignoring the special casing certain event + # types have. + + forward_events = yield self.store.get_successor_events( + list(extremities), + ) + + extremities_events = yield self.store.get_events( + forward_events, + check_redacted=False, + get_prev_content=False, + ) + + # We set `check_history_visibility_only` as we might otherwise get false + # positives from users having been erased. + filtered_extremities = yield filter_events_for_server( + self.store, self.server_name, list(extremities_events.values()), + redact=False, check_history_visibility_only=True, + ) + + if not filtered_extremities: + defer.returnValue(False) + # Check if we reached a point where we should start backfilling. sorted_extremeties_tuple = sorted( extremities.items(), @@ -1582,6 +1629,7 @@ class FederationHandler(BaseHandler): origin, event, state=state, auth_events=auth_events, + backfilled=backfilled, ) # reraise does not allow inlineCallbacks to preserve the stacktrace, so we @@ -1626,6 +1674,7 @@ class FederationHandler(BaseHandler): event, state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), + backfilled=backfilled, ) defer.returnValue(res) @@ -1748,7 +1797,7 @@ class FederationHandler(BaseHandler): ) @defer.inlineCallbacks - def _prep_event(self, origin, event, state=None, auth_events=None): + def _prep_event(self, origin, event, state, auth_events, backfilled): """ Args: @@ -1756,6 +1805,7 @@ class FederationHandler(BaseHandler): event: state: auth_events: + backfilled (bool) Returns: Deferred, which resolves to synapse.events.snapshot.EventContext @@ -1797,12 +1847,100 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + if not context.rejected: + yield self._check_for_soft_fail(event, state, backfilled) + if event.type == EventTypes.GuestAccess and not context.rejected: yield self.maybe_kick_guest_users(event) defer.returnValue(context) @defer.inlineCallbacks + def _check_for_soft_fail(self, event, state, backfilled): + """Checks if we should soft fail the event, if so marks the event as + such. + + Args: + event (FrozenEvent) + state (dict|None): The state at the event if we don't have all the + event's prev events + backfilled (bool): Whether the event is from backfill + + Returns: + Deferred + """ + # For new (non-backfilled and non-outlier) events we check if the event + # passes auth based on the current state. If it doesn't then we + # "soft-fail" the event. + do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier() + if do_soft_fail_check: + extrem_ids = yield self.store.get_latest_event_ids_in_room( + event.room_id, + ) + + extrem_ids = set(extrem_ids) + prev_event_ids = set(event.prev_event_ids()) + + if extrem_ids == prev_event_ids: + # If they're the same then the current state is the same as the + # state at the event, so no point rechecking auth for soft fail. + do_soft_fail_check = False + + if do_soft_fail_check: + room_version = yield self.store.get_room_version(event.room_id) + + # Calculate the "current state". + if state is not None: + # If we're explicitly given the state then we won't have all the + # prev events, and so we have a gap in the graph. In this case + # we want to be a little careful as we might have been down for + # a while and have an incorrect view of the current state, + # however we still want to do checks as gaps are easy to + # maliciously manufacture. + # + # So we use a "current state" that is actually a state + # resolution across the current forward extremities and the + # given state at the event. This should correctly handle cases + # like bans, especially with state res v2. + + state_sets = yield self.store.get_state_groups( + event.room_id, extrem_ids, + ) + state_sets = list(state_sets.values()) + state_sets.append(state) + current_state_ids = yield self.state_handler.resolve_events( + room_version, state_sets, event, + ) + current_state_ids = { + k: e.event_id for k, e in iteritems(current_state_ids) + } + else: + current_state_ids = yield self.state_handler.get_current_state_ids( + event.room_id, latest_event_ids=extrem_ids, + ) + + # Now check if event pass auth against said current state + auth_types = auth_types_for_event(event) + current_state_ids = [ + e for k, e in iteritems(current_state_ids) + if k in auth_types + ] + + current_auth_events = yield self.store.get_events(current_state_ids) + current_auth_events = { + (e.type, e.state_key): e for e in current_auth_events.values() + } + + try: + self.auth.check(room_version, event, auth_events=current_auth_events) + except AuthError as e: + logger.warn( + "Failed current state auth resolution for %r because %s", + event, e, + ) + event.internal_metadata.soft_failed = True + + @defer.inlineCallbacks def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects, missing): in_room = yield self.auth.check_host_in_room( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3981fe69ce..c762b58902 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -436,10 +436,11 @@ class EventCreationHandler(object): if event.is_state(): prev_state = yield self.deduplicate_state_event(event, context) - logger.info( - "Not bothering to persist duplicate state event %s", event.event_id, - ) if prev_state is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, prev_state.event_id, + ) defer.returnValue(prev_state) yield self.handle_new_client_event( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ba3856674d..37e87fc054 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -816,7 +816,7 @@ class PresenceHandler(object): if self.is_mine(observed_user): yield self.invite_presence(observed_user, observer_user) else: - yield self.federation.send_edu( + yield self.federation.build_and_send_edu( destination=observed_user.domain, edu_type="m.presence_invite", content={ @@ -836,7 +836,7 @@ class PresenceHandler(object): if self.is_mine(observer_user): yield self.accept_presence(observed_user, observer_user) else: - self.federation.send_edu( + self.federation.build_and_send_edu( destination=observer_user.domain, edu_type="m.presence_accept", content={ @@ -848,7 +848,7 @@ class PresenceHandler(object): state_dict = yield self.get_state(observed_user, as_event=False) state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) - self.federation.send_edu( + self.federation.build_and_send_edu( destination=observer_user.domain, edu_type="m.presence", content={ diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 696469732c..1728089667 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -16,7 +16,6 @@ import logging from twisted.internet import defer -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id from ._base import BaseHandler @@ -39,31 +38,6 @@ class ReceiptsHandler(BaseHandler): self.state = hs.get_state_handler() @defer.inlineCallbacks - def received_client_receipt(self, room_id, receipt_type, user_id, - event_id): - """Called when a client tells us a local user has read up to the given - event_id in the room. - """ - receipt = { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_ids": [event_id], - "data": { - "ts": int(self.clock.time_msec()), - } - } - - is_new = yield self._handle_new_receipts([receipt]) - - if is_new: - # fire off a process in the background to send the receipt to - # remote servers - run_as_background_process( - 'push_receipts_to_remotes', self._push_remotes, receipt - ) - - @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): """Called when we receive an EDU of type m.receipt from a remote HS. """ @@ -128,43 +102,54 @@ class ReceiptsHandler(BaseHandler): defer.returnValue(True) @defer.inlineCallbacks - def _push_remotes(self, receipt): - """Given a receipt, works out which remote servers should be - poked and pokes them. + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + """Called when a client tells us a local user has read up to the given + event_id in the room. """ - try: - # TODO: optimise this to move some of the work to the workers. - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] + receipt = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + "data": { + "ts": int(self.clock.time_msec()), + } + } - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, - } + is_new = yield self._handle_new_receipts([receipt]) + if not is_new: + return + + # Work out which remote servers should be poked and poke them. + + # TODO: optimise this to move some of the work to the workers. + data = receipt["data"] + + # XXX why does this not use state.get_current_hosts_in_room() ? + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.build_and_send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": [event_id], + "data": data, } - }, + } }, - key=(room_id, receipt_type, user_id), - ) - except Exception: - logger.exception("Error pushing receipts to remote servers") + }, + key=(room_id, receipt_type, user_id), + ) @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index c0e06929bd..03130edc54 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -24,6 +24,7 @@ from synapse.api.errors import ( AuthError, Codes, InvalidCaptchaError, + LimitExceededError, RegistrationError, SynapseError, ) @@ -60,6 +61,7 @@ class RegistrationHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() self.captcha_client = CaptchaServerHttpClient(hs) self.identity_handler = self.hs.get_handlers().identity_handler + self.ratelimiter = hs.get_registration_ratelimiter() self._next_generated_user_id = None @@ -149,6 +151,7 @@ class RegistrationHandler(BaseHandler): threepid=None, user_type=None, default_display_name=None, + address=None, ): """Registers a new client on the server. @@ -167,6 +170,7 @@ class RegistrationHandler(BaseHandler): api.constants.UserTypes, or None for a normal user. default_display_name (unicode|None): if set, the new user's displayname will be set to this. Defaults to 'localpart'. + address (str|None): the IP address used to perform the regitration. Returns: A tuple of (user_id, access_token). Raises: @@ -206,7 +210,7 @@ class RegistrationHandler(BaseHandler): token = None if generate_token: token = self.macaroon_gen.generate_access_token(user_id) - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, @@ -215,6 +219,7 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=default_display_name, admin=admin, user_type=user_type, + address=address, ) if self.hs.config.user_directory_search_all_users: @@ -238,12 +243,13 @@ class RegistrationHandler(BaseHandler): if default_display_name is None: default_display_name = localpart try: - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, make_guest=make_guest, create_profile_with_displayname=default_display_name, + address=address, ) except SynapseError: # if user id is taken, just generate another @@ -337,7 +343,7 @@ class RegistrationHandler(BaseHandler): user_id, allowed_appservice=service ) - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, password_hash="", appservice_id=service_id, @@ -513,7 +519,7 @@ class RegistrationHandler(BaseHandler): token = self.macaroon_gen.generate_access_token(user_id) if need_register: - yield self._register_with_store( + yield self.register_with_store( user_id=user_id, token=token, password_hash=password_hash, @@ -590,10 +596,10 @@ class RegistrationHandler(BaseHandler): ratelimit=False, ) - def _register_with_store(self, user_id, token=None, password_hash=None, - was_guest=False, make_guest=False, appservice_id=None, - create_profile_with_displayname=None, admin=False, - user_type=None): + def register_with_store(self, user_id, token=None, password_hash=None, + was_guest=False, make_guest=False, appservice_id=None, + create_profile_with_displayname=None, admin=False, + user_type=None, address=None): """Register user in the datastore. Args: @@ -612,10 +618,26 @@ class RegistrationHandler(BaseHandler): admin (boolean): is an admin user? user_type (str|None): type of user. One of the values from api.constants.UserTypes, or None for a normal user. + address (str|None): the IP address used to perform the regitration. Returns: Deferred """ + # Don't rate limit for app services + if appservice_id is None and address is not None: + time_now = self.clock.time() + + allowed, time_allowed = self.ratelimiter.can_do_action( + address, time_now_s=time_now, + rate_hz=self.hs.config.rc_registration_requests_per_second, + burst_count=self.hs.config.rc_registration_request_burst_count, + ) + + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now)), + ) + if self.hs.config.worker_app: return self._register_client( user_id=user_id, @@ -627,6 +649,7 @@ class RegistrationHandler(BaseHandler): create_profile_with_displayname=create_profile_with_displayname, admin=admin, user_type=user_type, + address=address, ) else: return self.store.register( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 190ea2c7b1..aead9e4608 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -232,6 +232,10 @@ class RoomMemberHandler(object): self.copy_room_tags_and_direct_to_room( predecessor["room_id"], room_id, user_id, ) + # Move over old push rules + self.store.move_push_rules_from_room_to_room_for_user( + predecessor["room_id"], room_id, user_id, + ) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bd97241ab4..57bb996245 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +# Debug logger for https://github.com/matrix-org/synapse/issues/4422 +issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") + # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is @@ -962,6 +965,15 @@ class SyncHandler(object): yield self._generate_sync_entry_for_groups(sync_result_builder) + # debug for https://github.com/matrix-org/synapse/issues/4422 + for joined_room in sync_result_builder.joined: + room_id = joined_room.room_id + if room_id in newly_joined_rooms: + issue4422_logger.debug( + "Sync result for newly joined room %s: %r", + room_id, joined_room, + ) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -1425,6 +1437,17 @@ class SyncHandler(object): old_mem_ev = yield self.store.get_event( old_mem_ev_id, allow_none=True ) + + # debug for #4422 + if has_join: + prev_membership = None + if old_mem_ev: + prev_membership = old_mem_ev.membership + issue4422_logger.debug( + "Previous membership for room %s with join: %s (event %s)", + room_id, prev_membership, old_mem_ev_id, + ) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) @@ -1519,30 +1542,39 @@ class SyncHandler(object): for room_id in sync_result_builder.joined_room_ids: room_entry = room_to_events.get(room_id, None) + newly_joined = room_id in newly_joined_rooms if room_entry: events, start_key = room_entry prev_batch_token = now_token.copy_and_replace("room_key", start_key) - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=events, - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, - since_token=None if room_id in newly_joined_rooms else since_token, + since_token=None if newly_joined else since_token, upto_token=prev_batch_token, - )) + ) else: - room_entries.append(RoomSyncResultBuilder( + entry = RoomSyncResultBuilder( room_id=room_id, rtype="joined", events=[], - newly_joined=room_id in newly_joined_rooms, + newly_joined=newly_joined, full_state=False, since_token=since_token, upto_token=since_token, - )) + ) + + if newly_joined: + # debugging for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "RoomSyncResultBuilder events for newly joined room %s: %r", + room_id, entry.events, + ) + room_entries.append(entry) defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @@ -1663,6 +1695,13 @@ class SyncHandler(object): newly_joined_room=newly_joined, ) + if newly_joined: + # debug for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "Timeline events after filtering in newly-joined room %s: %r", + room_id, batch, + ) + # When we join the room (or the client requests full_state), we should # send down any existing tags. Usually the user won't have tags in a # newly joined room, unless either a) they've joined before or b) the @@ -1894,15 +1933,34 @@ def _calculate_state( class SyncResultBuilder(object): - "Used to help build up a new SyncResult for a user" + """Used to help build up a new SyncResult for a user + + Attributes: + sync_config (SyncConfig) + full_state (bool) + since_token (StreamToken) + now_token (StreamToken) + joined_room_ids (list[str]) + + # The following mirror the fields in a sync response + presence (list) + account_data (list) + joined (list[JoinedSyncResult]) + invited (list[InvitedSyncResult]) + archived (list[ArchivedSyncResult]) + device (list) + groups (GroupsSyncResult|None) + to_device (list) + """ def __init__(self, sync_config, full_state, since_token, now_token, joined_room_ids): """ Args: - sync_config(SyncConfig) - full_state(bool): The full_state flag as specified by user - since_token(StreamToken): The token supplied by user, or None. - now_token(StreamToken): The token to sync up to. + sync_config (SyncConfig) + full_state (bool): The full_state flag as specified by user + since_token (StreamToken): The token supplied by user, or None. + now_token (StreamToken): The token to sync up to. + joined_room_ids (list[str]): List of rooms the user is joined to """ self.sync_config = sync_config self.full_state = full_state @@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object): Args: room_id(str) rtype(str): One of `"joined"` or `"archived"` - events(list): List of events to include in the room, (more events - may be added when generating result). + events(list[FrozenEvent]): List of events to include in the room + (more events may be added when generating result). newly_joined(bool): If the user has newly joined the room full_state(bool): Whether the full state should be sent in result since_token(StreamToken): Earliest point to return events from, or None diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a61bbf9392..39df960c31 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -231,7 +231,7 @@ class TypingHandler(object): for domain in set(get_domain_from_id(u) for u in users): if domain != self.server_name: logger.debug("sending typing update to %s", domain) - self.federation.send_edu( + self.federation.build_and_send_edu( destination=domain, edu_type="m.typing", content={ diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 283c6c1b81..c21da8343a 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -15,7 +15,7 @@ import logging -from six import iteritems +from six import iteritems, iterkeys from twisted.internet import defer @@ -63,10 +63,6 @@ class UserDirectoryHandler(object): # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() - self.initially_handled_users_in_public = set() - - self.initially_handled_users_share = set() - self.initially_handled_users_share_private_room = set() # The current position in the current_state_delta stream self.pos = None @@ -140,7 +136,6 @@ class UserDirectoryHandler(object): # FIXME(#3714): We should probably do this in the same worker as all # the other changes. yield self.store.remove_from_user_dir(user_id) - yield self.store.remove_from_user_in_public_room(user_id) @defer.inlineCallbacks def _unsafe_process(self): @@ -215,15 +210,13 @@ class UserDirectoryHandler(object): logger.info("Processed all users") self.initially_handled_users = None - self.initially_handled_users_in_public = None - self.initially_handled_users_share = None - self.initially_handled_users_share_private_room = None yield self.store.update_user_directory_stream_pos(new_pos) @defer.inlineCallbacks def _handle_initial_room(self, room_id): - """Called when we initially fill out user_directory one room at a time + """ + Called when we initially fill out user_directory one room at a time """ is_in_room = yield self.store.is_host_joined(room_id, self.server_name) if not is_in_room: @@ -238,23 +231,15 @@ class UserDirectoryHandler(object): unhandled_users = user_ids - self.initially_handled_users yield self.store.add_profiles_to_user_dir( - room_id, {user_id: users_with_profile[user_id] for user_id in unhandled_users}, ) self.initially_handled_users |= unhandled_users - if is_public: - yield self.store.add_users_to_public_room( - room_id, user_ids=user_ids - self.initially_handled_users_in_public - ) - self.initially_handled_users_in_public |= user_ids - # We now go and figure out the new users who share rooms with user entries # We sleep aggressively here as otherwise it can starve resources. # We also batch up inserts/updates, but try to avoid too many at once. to_insert = set() - to_update = set() count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: @@ -277,21 +262,7 @@ class UserDirectoryHandler(object): count += 1 user_set = (user_id, other_user_id) - - if user_set in self.initially_handled_users_share_private_room: - continue - - if user_set in self.initially_handled_users_share: - if is_public: - continue - to_update.add(user_set) - else: - to_insert.add(user_set) - - if is_public: - self.initially_handled_users_share.add(user_set) - else: - self.initially_handled_users_share_private_room.add(user_set) + to_insert.add(user_set) if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE: yield self.store.add_users_who_share_room( @@ -299,22 +270,10 @@ class UserDirectoryHandler(object): ) to_insert.clear() - if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - to_update.clear() - if to_insert: yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) to_insert.clear() - if to_update: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - to_update.clear() - @defer.inlineCallbacks def _handle_deltas(self, deltas): """Called with the state deltas to process @@ -356,6 +315,7 @@ class UserDirectoryHandler(object): user_ids = yield self.store.get_users_in_dir_due_to_room( room_id ) + for user_id in user_ids: yield self._handle_remove_user(room_id, user_id) return @@ -436,14 +396,20 @@ class UserDirectoryHandler(object): # ignore the change return - if change: - users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in iteritems(users_with_profile): - yield self._handle_new_user(room_id, user_id, profile) - else: - users = yield self.store.get_users_in_public_due_to_room(room_id) - for user_id in users: - yield self._handle_remove_user(room_id, user_id) + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # Remove every user from the sharing tables for that room. + for user_id in iterkeys(users_with_profile): + yield self.store.remove_user_who_share_room(user_id, room_id) + + # Then, re-add them to the tables. + # NOTE: this is not the most efficient method, as handle_new_user sets + # up local_user -> other_user and other_user_whos_local -> local_user, + # which when ran over an entire room, will result in the same values + # being added multiple times. The batching upserts shouldn't make this + # too bad, though. + for user_id, profile in iteritems(users_with_profile): + yield self._handle_new_user(room_id, user_id, profile) @defer.inlineCallbacks def _handle_local_user(self, user_id): @@ -457,7 +423,7 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_directory(user_id) if not row: - yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) + yield self.store.add_profiles_to_user_dir({user_id: profile}) @defer.inlineCallbacks def _handle_new_user(self, room_id, user_id, profile): @@ -471,55 +437,27 @@ class UserDirectoryHandler(object): row = yield self.store.get_user_in_directory(user_id) if not row: - yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile}) + yield self.store.add_profiles_to_user_dir({user_id: profile}) is_public = yield self.store.is_room_world_readable_or_publicly_joinable( room_id ) - - if is_public: - row = yield self.store.get_user_in_public_room(user_id) - if not row: - yield self.store.add_users_to_public_room(room_id, [user_id]) - else: - logger.debug("Not adding new user to public dir, %r", user_id) - - # Now we update users who share rooms with users. We do this by getting - # all the current users in the room and seeing which aren't already - # marked in the database as sharing with `user_id` - + # Now we update users who share rooms with users. users_with_profile = yield self.state.get_current_user_in_room(room_id) to_insert = set() - to_update = set() - - is_appservice = self.store.get_if_app_services_interested_in_user(user_id) # First, if they're our user then we need to update for every user - if self.is_mine_id(user_id) and not is_appservice: - # Returns a map of other_user_id -> shared_private. We only need - # to update mappings if for users that either don't share a room - # already (aren't in the map) or, if the room is private, those that - # only share a public room. - user_ids_shared = yield self.store.get_users_who_share_room_from_dir( - user_id - ) + if self.is_mine_id(user_id): - for other_user_id in users_with_profile: - if user_id == other_user_id: - continue + is_appservice = self.store.get_if_app_services_interested_in_user(user_id) + + # We don't care about appservice users. + if not is_appservice: + for other_user_id in users_with_profile: + if user_id == other_user_id: + continue - shared_is_private = user_ids_shared.get(other_user_id) - if shared_is_private is True: - # We've already marked in the database they share a private room - continue - elif shared_is_private is False: - # They already share a public room, so only update if this is - # a private room - if not is_public: - to_update.add((user_id, other_user_id)) - elif shared_is_private is None: - # This is the first time they both share a room to_insert.add((user_id, other_user_id)) # Next we need to update for every local user in the room @@ -531,29 +469,11 @@ class UserDirectoryHandler(object): other_user_id ) if self.is_mine_id(other_user_id) and not is_appservice: - shared_is_private = yield self.store.get_if_users_share_a_room( - other_user_id, user_id - ) - if shared_is_private is True: - # We've already marked in the database they share a private room - continue - elif shared_is_private is False: - # They already share a public room, so only update if this is - # a private room - if not is_public: - to_update.add((other_user_id, user_id)) - elif shared_is_private is None: - # This is the first time they both share a room - to_insert.add((other_user_id, user_id)) + to_insert.add((other_user_id, user_id)) if to_insert: yield self.store.add_users_who_share_room(room_id, not is_public, to_insert) - if to_update: - yield self.store.update_users_who_share_room( - room_id, not is_public, to_update - ) - @defer.inlineCallbacks def _handle_remove_user(self, room_id, user_id): """Called when we might need to remove user to directory @@ -562,84 +482,16 @@ class UserDirectoryHandler(object): room_id (str): room_id that user left or stopped being public that user_id (str) """ - logger.debug("Maybe removing user %r", user_id) - - row = yield self.store.get_user_in_directory(user_id) - update_user_dir = row and row["room_id"] == room_id - - row = yield self.store.get_user_in_public_room(user_id) - update_user_in_public = row and row["room_id"] == room_id - - if update_user_in_public or update_user_dir: - # XXX: Make this faster? - rooms = yield self.store.get_rooms_for_user(user_id) - for j_room_id in rooms: - if not update_user_in_public and not update_user_dir: - break - - is_in_room = yield self.store.is_host_joined( - j_room_id, self.server_name - ) - - if not is_in_room: - continue - - if update_user_dir: - update_user_dir = False - yield self.store.update_user_in_user_dir(user_id, j_room_id) + logger.debug("Removing user %r", user_id) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) + # Remove user from sharing tables + yield self.store.remove_user_who_share_room(user_id, room_id) - if update_user_in_public and is_public: - yield self.store.update_user_in_public_user_list(user_id, j_room_id) - update_user_in_public = False + # Are they still in a room with members? If not, remove them entirely. + users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id) - if update_user_dir: + if len(users_in_room_with) == 0: yield self.store.remove_from_user_dir(user_id) - elif update_user_in_public: - yield self.store.remove_from_user_in_public_room(user_id) - - # Now handle users_who_share_rooms. - - # Get a list of user tuples that were in the DB due to this room and - # users (this includes tuples where the other user matches `user_id`) - user_tuples = yield self.store.get_users_in_share_dir_with_room_id( - user_id, room_id - ) - - for user_id, other_user_id in user_tuples: - # For each user tuple get a list of rooms that they still share, - # trying to find a private room, and update the entry in the DB - rooms = yield self.store.get_rooms_in_common_for_users( - user_id, other_user_id - ) - - # If they dont share a room anymore, remove the mapping - if not rooms: - yield self.store.remove_user_who_share_room(user_id, other_user_id) - continue - - found_public_share = None - for j_room_id in rooms: - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( - j_room_id - ) - - if is_public: - found_public_share = j_room_id - else: - found_public_share = None - yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] - ) - break - - if found_public_share: - yield self.store.update_users_who_share_room( - room_id, not is_public, [(user_id, other_user_id)] - ) @defer.inlineCallbacks def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 384d8a37a2..1334c630cc 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -68,9 +68,13 @@ class MatrixFederationAgent(object): TLS policy to use for fetching .well-known files. None to use a default (browser-like) implementation. - srv_resolver (SrvResolver|None): + _srv_resolver (SrvResolver|None): SRVResolver impl to use for looking up SRV records. None to use a default implementation. + + _well_known_cache (TTLCache|None): + TTLCache impl for storing cached well-known lookups. None to use a default + implementation. """ def __init__( diff --git a/synapse/notifier.py b/synapse/notifier.py index de02b1017e..ff589660da 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -178,8 +178,6 @@ class Notifier(object): self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS ) - self.replication_deferred = ObservableDeferred(defer.Deferred()) - # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. @@ -205,7 +203,9 @@ class Notifier(object): def add_replication_callback(self, cb): """Add a callback that will be called when some new data is available. - Callback is not given any arguments. + Callback is not given any arguments. It should *not* return a Deferred - if + it needs to do any asynchronous work, a background thread should be started and + wrapped with run_as_background_process. """ self.replication_callbacks.append(cb) @@ -517,60 +517,5 @@ class Notifier(object): def notify_replication(self): """Notify the any replication listeners that there's a new event""" - with PreserveLoggingContext(): - deferred = self.replication_deferred - self.replication_deferred = ObservableDeferred(defer.Deferred()) - deferred.callback(None) - - # the callbacks may well outlast the current request, so we run - # them in the sentinel logcontext. - # - # (ideally it would be up to the callbacks to know if they were - # starting off background processes and drop the logcontext - # accordingly, but that requires more changes) - for cb in self.replication_callbacks: - cb() - - @defer.inlineCallbacks - def wait_for_replication(self, callback, timeout): - """Wait for an event to happen. - - Args: - callback: Gets called whenever an event happens. If this returns a - truthy value then ``wait_for_replication`` returns, otherwise - it waits for another event. - timeout: How many milliseconds to wait for callback return a truthy - value. - - Returns: - A deferred that resolves with the value returned by the callback. - """ - listener = _NotificationListener(None) - - end_time = self.clock.time_msec() + timeout - - while True: - listener.deferred = self.replication_deferred.observe() - result = yield callback() - if result: - break - - now = self.clock.time_msec() - if end_time <= now: - break - - listener.deferred = timeout_deferred( - listener.deferred, - timeout=(end_time - now) / 1000., - reactor=self.hs.get_reactor(), - ) - - try: - with PreserveLoggingContext(): - yield listener.deferred - except defer.TimeoutError: - break - except defer.CancelledError: - break - - defer.returnValue(result) + for cb in self.replication_callbacks: + cb() diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py index 1d27c9221f..912a5ac341 100644 --- a/synapse/replication/http/register.py +++ b/synapse/replication/http/register.py @@ -33,11 +33,12 @@ class ReplicationRegisterServlet(ReplicationEndpoint): def __init__(self, hs): super(ReplicationRegisterServlet, self).__init__(hs) self.store = hs.get_datastore() + self.registration_handler = hs.get_registration_handler() @staticmethod def _serialize_payload( user_id, token, password_hash, was_guest, make_guest, appservice_id, - create_profile_with_displayname, admin, user_type, + create_profile_with_displayname, admin, user_type, address, ): """ Args: @@ -56,6 +57,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): admin (boolean): is an admin user? user_type (str|None): type of user. One of the values from api.constants.UserTypes, or None for a normal user. + address (str|None): the IP address used to perform the regitration. """ return { "token": token, @@ -66,13 +68,14 @@ class ReplicationRegisterServlet(ReplicationEndpoint): "create_profile_with_displayname": create_profile_with_displayname, "admin": admin, "user_type": user_type, + "address": address, } @defer.inlineCallbacks def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) - yield self.store.register( + yield self.registration_handler.register_with_store( user_id=user_id, token=content["token"], password_hash=content["password_hash"], @@ -82,6 +85,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint): create_profile_with_displayname=content["create_profile_with_displayname"], admin=content["admin"], user_type=content["user_type"], + address=content["address"] ) defer.returnValue((200, {})) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py index 60641f1a49..5b8521c770 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py @@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore): if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return + self.client_ip_last_seen.prefill(key, now) + self.hs.get_tcp_replication().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 4f19fd35aa..4d59778863 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -13,15 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.storage.deviceinbox import DeviceInboxWorkerStore from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore, __func__ -from ._slaved_id_tracker import SlavedIdTracker - -class SlavedDeviceInboxStore(BaseSlavedStore): +class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceInboxStore, self).__init__(db_conn, hs) self._device_inbox_id_gen = SlavedIdTracker( @@ -43,12 +42,6 @@ class SlavedDeviceInboxStore(BaseSlavedStore): expiry_ms=30 * 60 * 1000, ) - get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token) - get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device) - get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote) - delete_messages_for_device = __func__(DataStore.delete_messages_for_device) - delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote) - def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() result["to_device"] = self._device_inbox_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index ec2fd561cc..16c9a162c5 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -13,15 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore -from synapse.storage.end_to_end_keys import EndToEndKeyStore +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.storage.devices import DeviceWorkerStore +from synapse.storage.end_to_end_keys import EndToEndKeyWorkerStore from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import BaseSlavedStore, __func__ -from ._slaved_id_tracker import SlavedIdTracker - -class SlavedDeviceStore(BaseSlavedStore): +class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceStore, self).__init__(db_conn, hs) @@ -38,17 +37,6 @@ class SlavedDeviceStore(BaseSlavedStore): "DeviceListFederationStreamChangeCache", device_list_max, ) - get_device_stream_token = __func__(DataStore.get_device_stream_token) - get_user_whose_devices_changed = __func__(DataStore.get_user_whose_devices_changed) - get_devices_by_remote = __func__(DataStore.get_devices_by_remote) - _get_devices_by_remote_txn = __func__(DataStore._get_devices_by_remote_txn) - _get_e2e_device_keys_txn = __func__(DataStore._get_e2e_device_keys_txn) - mark_as_sent_devices_by_remote = __func__(DataStore.mark_as_sent_devices_by_remote) - _mark_as_sent_devices_by_remote_txn = ( - __func__(DataStore._mark_as_sent_devices_by_remote_txn) - ) - count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"] - def stream_positions(self): result = super(SlavedDeviceStore, self).stream_positions() result["device_lists"] = self._device_list_id_gen.get_current_token() @@ -58,14 +46,23 @@ class SlavedDeviceStore(BaseSlavedStore): if stream_name == "device_lists": self._device_list_id_gen.advance(token) for row in rows: - self._device_list_stream_cache.entity_has_changed( - row.user_id, token + self._invalidate_caches_for_devices( + token, row.user_id, row.destination, ) - - if row.destination: - self._device_list_federation_stream_cache.entity_has_changed( - row.destination, token - ) return super(SlavedDeviceStore, self).process_replication_rows( stream_name, token, rows ) + + def _invalidate_caches_for_devices(self, token, user_id, destination): + self._device_list_stream_cache.entity_has_changed( + user_id, token + ) + + if destination: + self._device_list_federation_stream_cache.entity_has_changed( + destination, token + ) + + self._get_cached_devices_for_user.invalidate((user_id,)) + self._get_cached_user_device.invalidate_many((user_id,)) + self.get_device_list_last_stream_id_for_remote.invalidate((user_id,)) diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 92447b00d4..9e530defe0 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -54,8 +54,11 @@ class SlavedPresenceStore(BaseSlavedStore): def stream_positions(self): result = super(SlavedPresenceStore, self).stream_positions() - position = self._presence_id_gen.get_current_token() - result["presence"] = position + + if self.hs.config.use_presence: + position = self._presence_id_gen.get_current_token() + result["presence"] = position + return result def process_replication_rows(self, stream_name, token, rows): diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index f0200c1e98..45fc913c52 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -20,7 +20,7 @@ from ._slaved_id_tracker import SlavedIdTracker from .events import SlavedEventStore -class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore): +class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): def __init__(self, db_conn, hs): self._push_rules_stream_id_gen = SlavedIdTracker( db_conn, "push_rules_stream", "stream_id", diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 586dddb40b..e558f90e1a 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory): Accepts a handler that will be called when new data is available or data is required. """ - maxDelay = 5 # Try at least once every N seconds + maxDelay = 30 # Try at least once every N seconds def __init__(self, hs, client_name, handler): self.client_name = client_name @@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory): def buildProtocol(self, addr): logger.info("Connected to replication: %r", addr) - self.resetDelay() return ClientReplicationStreamProtocol( self.client_name, self.server_name, self._clock, self.handler ) @@ -90,15 +89,18 @@ class ReplicationClientHandler(object): # Used for tests. self.awaiting_syncs = {} + # The factory used to create connections. + self.factory = None + def start_replication(self, hs): """Helper method to start a replication connection to the remote server using TCP. """ client_name = hs.config.worker_name - factory = ReplicationClientFactory(hs, client_name, self) + self.factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - hs.get_reactor().connectTCP(host, port, factory) + hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes @@ -140,6 +142,7 @@ class ReplicationClientHandler(object): args["account_data"] = user_account_data elif room_account_data: args["account_data"] = room_account_data + return args def get_currently_syncing_users(self): @@ -204,3 +207,14 @@ class ReplicationClientHandler(object): for cmd in self.pending_commands: connection.send_command(cmd) self.pending_commands = [] + + def finished_connecting(self): + """Called when we have successfully subscribed and caught up to all + streams we're interested in. + """ + logger.info("Finished connecting to server") + + # We don't reset the delay any earlier as otherwise if there is a + # problem during start up we'll end up tight looping connecting to the + # server. + self.factory.resetDelay() diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 327556f6a1..2098c32a77 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -127,8 +127,11 @@ class RdataCommand(Command): class PositionCommand(Command): - """Sent by the client to tell the client the stream postition without + """Sent by the server to tell the client the stream postition without needing to send an RDATA. + + Sent to the client after all missing updates for a stream have been sent + to the client and they're now up to date. """ NAME = "POSITION" diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 429471c345..55630ba9a7 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): - """Subscribe the remote to a streams. + """Subscribe the remote to a stream. This invloves checking if they've missed anything and sending those updates down if they have. During that time new updates for the stream @@ -478,11 +478,36 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) + updates = [] for token, update in pending_rdata: - # Only send updates newer than the current token - if token > current_token: + # If the token is null, it is part of a batch update. Batches + # are multiple updates that share a single token. To denote + # this, the token is set to None for all tokens in the batch + # except for the last. If we find a None token, we keep looking + # through tokens until we find one that is not None and then + # process all previous updates in the batch as if they had the + # final token. + if token is None: + # Store this update as part of a batch + updates.append(update) + continue + + if token <= current_token: + # This update or batch of updates is older than + # current_token, dismiss it + updates = [] + continue + + updates.append(update) + + # Send all updates that are part of this batch with the + # found token + for update in updates: self.send_command(RdataCommand(stream_name, token, update)) + # Clear stored updates + updates = [] + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: @@ -526,6 +551,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.server_name = server_name self.handler = handler + # Set of stream names that have been subscribe to, but haven't yet + # caught up with. This is used to track when the client has been fully + # connected to the remote. + self.streams_connecting = set() + # Map of stream to batched updates. See RdataCommand for info on how # batching works. self.pending_batches = {} @@ -548,6 +578,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # We've now finished connecting to so inform the client handler self.handler.update_connection(self) + # This will happen if we don't actually subscribe to any streams + if not self.streams_connecting: + self.handler.finished_connecting() + def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) @@ -577,6 +611,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): return self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): + # When we get a `POSITION` command it means we've finished getting + # missing updates for the given stream, and are now up to date. + self.streams_connecting.discard(cmd.stream_name) + if not self.streams_connecting: + self.handler.finished_connecting() + return self.handler.on_position(cmd.stream_name, cmd.token) def on_SYNC(self, cmd): @@ -593,6 +633,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.id(), stream_name, token ) + self.streams_connecting.add(stream_name) + self.send_command(ReplicateCommand(stream_name, token)) def on_connection_closed(self): diff --git a/synapse/res/templates/notif.html b/synapse/res/templates/notif.html index 88b921ca9c..1a6c70b562 100644 --- a/synapse/res/templates/notif.html +++ b/synapse/res/templates/notif.html @@ -6,11 +6,11 @@ <img alt="" class="sender_avatar" src="{{ message.sender_avatar_url|mxc_to_http(32,32) }}" /> {% else %} {% if message.sender_hash % 3 == 0 %} - <img class="sender_avatar" src="https://vector.im/beta/img/76cfa6.png" /> + <img class="sender_avatar" src="https://riot.im/img/external/avatar-1.png" /> {% elif message.sender_hash % 3 == 1 %} - <img class="sender_avatar" src="https://vector.im/beta/img/50e2c2.png" /> + <img class="sender_avatar" src="https://riot.im/img/external/avatar-2.png" /> {% else %} - <img class="sender_avatar" src="https://vector.im/beta/img/f4c371.png" /> + <img class="sender_avatar" src="https://riot.im/img/external/avatar-3.png" /> {% endif %} {% endif %} {% endif %} diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html index fcdb3109fe..019506e5fb 100644 --- a/synapse/res/templates/notif_mail.html +++ b/synapse/res/templates/notif_mail.html @@ -19,7 +19,7 @@ </td> <td class="logo"> {% if app_name == "Riot" %} - <img src="http://matrix.org/img/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> + <img src="http://riot.im/img/external/riot-logo-email.png" width="83" height="83" alt="[Riot]"/> {% elif app_name == "Vector" %} <img src="http://matrix.org/img/vector-logo-email.png" width="64" height="83" alt="[Vector]"/> {% else %} diff --git a/synapse/res/templates/room.html b/synapse/res/templates/room.html index 723c222d25..b8525fef88 100644 --- a/synapse/res/templates/room.html +++ b/synapse/res/templates/room.html @@ -5,11 +5,11 @@ <img alt="" src="{{ room.avatar_url|mxc_to_http(48,48) }}" /> {% else %} {% if room.hash % 3 == 0 %} - <img alt="" src="https://vector.im/beta/img/76cfa6.png" /> + <img alt="" src="https://riot.im/img/external/avatar-1.png" /> {% elif room.hash % 3 == 1 %} - <img alt="" src="https://vector.im/beta/img/50e2c2.png" /> + <img alt="" src="https://riot.im/img/external/avatar-2.png" /> {% else %} - <img alt="" src="https://vector.im/beta/img/f4c371.png" /> + <img alt="" src="https://riot.im/img/external/avatar-3.png" /> {% endif %} {% endif %} </td> diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 82433a2aa9..2a29f0c2af 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -17,12 +17,14 @@ import hashlib import hmac import logging +import platform from six import text_type from six.moves import http_client from twisted.internet import defer +import synapse from synapse.api.constants import Membership, UserTypes from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError from synapse.http.servlet import ( @@ -32,6 +34,7 @@ from synapse.http.servlet import ( parse_string, ) from synapse.types import UserID, create_requester +from synapse.util.versionstring import get_version_string from .base import ClientV1RestServlet, client_path_patterns @@ -66,6 +69,25 @@ class UsersRestServlet(ClientV1RestServlet): defer.returnValue((200, ret)) +class VersionServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/admin/server_version") + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + + if not is_admin: + raise AuthError(403, "You are not a server admin") + + ret = { + 'server_version': get_version_string(synapse), + 'python_version': platform.python_version(), + } + + defer.returnValue((200, ret)) + + class UserRegisterServlet(ClientV1RestServlet): """ Attributes: @@ -466,17 +488,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): ) new_room_id = info["room_id"] - yield self.event_creation_handler.create_and_send_nonmember_event( - room_creator_requester, - { - "type": "m.room.message", - "content": {"body": message, "msgtype": "m.text"}, - "room_id": new_room_id, - "sender": new_room_user_id, - }, - ratelimit=False, - ) - requester_user_id = requester.user.to_string() logger.info("Shutting down room %r", room_id) @@ -514,6 +525,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): kicked_users.append(user_id) + yield self.event_creation_handler.create_and_send_nonmember_event( + room_creator_requester, + { + "type": "m.room.message", + "content": {"body": message, "msgtype": "m.text"}, + "room_id": new_room_id, + "sender": new_room_user_id, + }, + ratelimit=False, + ) + aliases_for_room = yield self.store.get_aliases_for_room(room_id) yield self.store.update_aliases_for_room( @@ -763,3 +785,4 @@ def register_servlets(hs, http_server): QuarantineMediaInRoom(hs).register(http_server) ListMediaInRoom(hs).register(http_server) UserRegisterServlet(hs).register(http_server) + VersionServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 94cbba4303..6f34029431 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -25,7 +25,12 @@ from twisted.internet import defer import synapse import synapse.types from synapse.api.constants import LoginType -from synapse.api.errors import Codes, SynapseError, UnrecognizedRequestError +from synapse.api.errors import ( + Codes, + LimitExceededError, + SynapseError, + UnrecognizedRequestError, +) from synapse.config.server import is_threepid_reserved from synapse.http.servlet import ( RestServlet, @@ -191,18 +196,36 @@ class RegisterRestServlet(RestServlet): self.identity_handler = hs.get_handlers().identity_handler self.room_member_handler = hs.get_room_member_handler() self.macaroon_gen = hs.get_macaroon_generator() + self.ratelimiter = hs.get_registration_ratelimiter() + self.clock = hs.get_clock() @interactive_auth_handler @defer.inlineCallbacks def on_POST(self, request): body = parse_json_object_from_request(request) + client_addr = request.getClientIP() + + time_now = self.clock.time() + + allowed, time_allowed = self.ratelimiter.can_do_action( + client_addr, time_now_s=time_now, + rate_hz=self.hs.config.rc_registration_requests_per_second, + burst_count=self.hs.config.rc_registration_request_burst_count, + update=False, + ) + + if not allowed: + raise LimitExceededError( + retry_after_ms=int(1000 * (time_allowed - time_now)), + ) + kind = b"user" if b"kind" in request.args: kind = request.args[b"kind"][0] if kind == b"guest": - ret = yield self._do_guest_registration(body) + ret = yield self._do_guest_registration(body, address=client_addr) defer.returnValue(ret) return elif kind != b"user": @@ -411,6 +434,7 @@ class RegisterRestServlet(RestServlet): guest_access_token=guest_access_token, generate_token=False, threepid=threepid, + address=client_addr, ) # Necessary due to auth checks prior to the threepid being # written to the db @@ -522,12 +546,13 @@ class RegisterRestServlet(RestServlet): defer.returnValue(result) @defer.inlineCallbacks - def _do_guest_registration(self, params): + def _do_guest_registration(self, params, address=None): if not self.hs.config.allow_guest_access: raise SynapseError(403, "Guest access is disabled") user_id, _ = yield self.registration_handler.register( generate_token=False, - make_guest=True + make_guest=True, + address=address, ) # we don't allow guests to specify their own device_id, because diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index d16a30acd8..953d89bd82 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -99,10 +100,29 @@ def add_file_headers(request, media_type, file_size, upload_name): request.setHeader(b"Content-Type", media_type.encode("UTF-8")) if upload_name: - if is_ascii(upload_name): - disposition = "inline; filename=%s" % (_quote(upload_name),) + # RFC6266 section 4.1 [1] defines both `filename` and `filename*`. + # + # `filename` is defined to be a `value`, which is defined by RFC2616 + # section 3.6 [2] to be a `token` or a `quoted-string`, where a `token` + # is (essentially) a single US-ASCII word, and a `quoted-string` is a + # US-ASCII string surrounded by double-quotes, using backslash as an + # escape charater. Note that %-encoding is *not* permitted. + # + # `filename*` is defined to be an `ext-value`, which is defined in + # RFC5987 section 3.2.1 [3] to be `charset "'" [ language ] "'" value-chars`, + # where `value-chars` is essentially a %-encoded string in the given charset. + # + # [1]: https://tools.ietf.org/html/rfc6266#section-4.1 + # [2]: https://tools.ietf.org/html/rfc2616#section-3.6 + # [3]: https://tools.ietf.org/html/rfc5987#section-3.2.1 + + # We avoid the quoted-string version of `filename`, because (a) synapse didn't + # correctly interpret those as of 0.99.2 and (b) they are a bit of a pain and we + # may as well just do the filename* version. + if _can_encode_filename_as_token(upload_name): + disposition = 'inline; filename=%s' % (upload_name, ) else: - disposition = "inline; filename*=utf-8''%s" % (_quote(upload_name),) + disposition = "inline; filename*=utf-8''%s" % (_quote(upload_name), ) request.setHeader(b"Content-Disposition", disposition.encode('ascii')) @@ -115,6 +135,35 @@ def add_file_headers(request, media_type, file_size, upload_name): request.setHeader(b"Content-Length", b"%d" % (file_size,)) +# separators as defined in RFC2616. SP and HT are handled separately. +# see _can_encode_filename_as_token. +_FILENAME_SEPARATOR_CHARS = set(( + "(", ")", "<", ">", "@", ",", ";", ":", "\\", '"', + "/", "[", "]", "?", "=", "{", "}", +)) + + +def _can_encode_filename_as_token(x): + for c in x: + # from RFC2616: + # + # token = 1*<any CHAR except CTLs or separators> + # + # separators = "(" | ")" | "<" | ">" | "@" + # | "," | ";" | ":" | "\" | <"> + # | "/" | "[" | "]" | "?" | "=" + # | "{" | "}" | SP | HT + # + # CHAR = <any US-ASCII character (octets 0 - 127)> + # + # CTL = <any US-ASCII control character + # (octets 0 - 31) and DEL (127)> + # + if ord(c) >= 127 or ord(c) <= 32 or c in _FILENAME_SEPARATOR_CHARS: + return False + return True + + @defer.inlineCallbacks def respond_with_responder(request, responder, media_type, file_size, upload_name=None): """Responds to the request with given responder. If responder is None then @@ -213,8 +262,7 @@ def get_filename_from_headers(headers): Content-Disposition HTTP header. Args: - headers (twisted.web.http_headers.Headers): The HTTP - request headers. + headers (dict[bytes, list[bytes]]): The HTTP request headers. Returns: A Unicode string of the filename, or None. @@ -225,23 +273,12 @@ def get_filename_from_headers(headers): if not content_disposition[0]: return - # dict of unicode: bytes, corresponding to the key value sections of the - # Content-Disposition header. - params = {} - parts = content_disposition[0].split(b";") - for i in parts: - # Split into key-value pairs, if able - # We don't care about things like `inline`, so throw it out - if b"=" not in i: - continue - - key, value = i.strip().split(b"=") - params[key.decode('ascii')] = value + _, params = _parse_header(content_disposition[0]) upload_name = None # First check if there is a valid UTF-8 filename - upload_name_utf8 = params.get("filename*", None) + upload_name_utf8 = params.get(b"filename*", None) if upload_name_utf8: if upload_name_utf8.lower().startswith(b"utf-8''"): upload_name_utf8 = upload_name_utf8[7:] @@ -267,12 +304,68 @@ def get_filename_from_headers(headers): # If there isn't check for an ascii name. if not upload_name: - upload_name_ascii = params.get("filename", None) + upload_name_ascii = params.get(b"filename", None) if upload_name_ascii and is_ascii(upload_name_ascii): - # Make sure there's no %-quoted bytes. If there is, reject it as - # non-valid ASCII. - if b"%" not in upload_name_ascii: - upload_name = upload_name_ascii.decode('ascii') + upload_name = upload_name_ascii.decode('ascii') # This may be None here, indicating we did not find a matching name. return upload_name + + +def _parse_header(line): + """Parse a Content-type like header. + + Cargo-culted from `cgi`, but works on bytes rather than strings. + + Args: + line (bytes): header to be parsed + + Returns: + Tuple[bytes, dict[bytes, bytes]]: + the main content-type, followed by the parameter dictionary + """ + parts = _parseparam(b';' + line) + key = next(parts) + pdict = {} + for p in parts: + i = p.find(b'=') + if i >= 0: + name = p[:i].strip().lower() + value = p[i + 1:].strip() + + # strip double-quotes + if len(value) >= 2 and value[0:1] == value[-1:] == b'"': + value = value[1:-1] + value = value.replace(b'\\\\', b'\\').replace(b'\\"', b'"') + pdict[name] = value + + return key, pdict + + +def _parseparam(s): + """Generator which splits the input on ;, respecting double-quoted sequences + + Cargo-culted from `cgi`, but works on bytes rather than strings. + + Args: + s (bytes): header to be parsed + + Returns: + Iterable[bytes]: the split input + """ + while s[:1] == b';': + s = s[1:] + + # look for the next ; + end = s.find(b';') + + # if there is an odd number of " marks between here and the next ;, skip to the + # next ; instead + while end > 0 and (s.count(b'"', 0, end) - s.count(b'\\"', 0, end)) % 2: + end = s.find(b';', end + 1) + + if end < 0: + end = len(s) + f = s[:end] + yield f.strip() + s = s[end:] diff --git a/synapse/server.py b/synapse/server.py index 4d364fccce..72835e8c86 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -51,7 +51,7 @@ from synapse.handlers.acme import AcmeHandler from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler, MacaroonGenerator from synapse.handlers.deactivate_account import DeactivateAccountHandler -from synapse.handlers.device import DeviceHandler +from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler @@ -206,6 +206,7 @@ class HomeServer(object): self.clock = Clock(reactor) self.distributor = Distributor() self.ratelimiter = Ratelimiter() + self.registration_ratelimiter = Ratelimiter() self.datastore = None @@ -251,6 +252,9 @@ class HomeServer(object): def get_ratelimiter(self): return self.ratelimiter + def get_registration_ratelimiter(self): + return self.registration_ratelimiter + def build_federation_client(self): return FederationClient(self) @@ -307,7 +311,10 @@ class HomeServer(object): return MacaroonGenerator(self) def build_device_handler(self): - return DeviceHandler(self) + if self.config.worker_app: + return DeviceWorkerHandler(self) + else: + return DeviceHandler(self) def build_device_message_handler(self): return DeviceMessageHandler(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index 06cd083a74..fb8df56cd5 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -7,9 +7,9 @@ import synapse.handlers.auth import synapse.handlers.deactivate_account import synapse.handlers.device import synapse.handlers.e2e_keys +import synapse.handlers.message import synapse.handlers.room import synapse.handlers.room_member -import synapse.handlers.message import synapse.handlers.set_password import synapse.rest.media.v1.media_repository import synapse.server_notices.server_notices_manager diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index e06b0bc56d..e6a42a53bb 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -19,14 +19,174 @@ from canonicaljson import json from twisted.internet import defer +from synapse.storage._base import SQLBaseStore +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache -from .background_updates import BackgroundUpdateStore - logger = logging.getLogger(__name__) -class DeviceInboxStore(BackgroundUpdateStore): +class DeviceInboxWorkerStore(SQLBaseStore): + def get_to_device_stream_token(self): + return self._device_inbox_id_gen.get_current_token() + + def get_new_messages_for_device( + self, user_id, device_id, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + user_id(str): The recipient user_id. + device_id(str): The recipient device_id. + current_stream_id(int): The current position of the to device + message stream. + Returns: + Deferred ([dict], int): List of messages for the device and where + in the stream the messages got to. + """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + + def get_new_messages_for_device_txn(txn): + sql = ( + "SELECT stream_id, message_json FROM device_inbox" + " WHERE user_id = ? AND device_id = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + user_id, device_id, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn: + stream_pos = row[0] + messages.append(json.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_messages_for_device", get_new_messages_for_device_txn, + ) + + @defer.inlineCallbacks + def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): + """ + Args: + user_id(str): The recipient user_id. + device_id(str): The recipient device_id. + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves to the number of messages deleted. + """ + # If we have cached the last stream id we've deleted up to, we can + # check if there is likely to be anything that needs deleting + last_deleted_stream_id = self._last_device_delete_cache.get( + (user_id, device_id), None + ) + if last_deleted_stream_id: + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_deleted_stream_id + ) + if not has_changed: + defer.returnValue(0) + + def delete_messages_for_device_txn(txn): + sql = ( + "DELETE FROM device_inbox" + " WHERE user_id = ? AND device_id = ?" + " AND stream_id <= ?" + ) + txn.execute(sql, (user_id, device_id, up_to_stream_id)) + return txn.rowcount + + count = yield self.runInteraction( + "delete_messages_for_device", delete_messages_for_device_txn + ) + + # Update the cache, ensuring that we only ever increase the value + last_deleted_stream_id = self._last_device_delete_cache.get( + (user_id, device_id), 0 + ) + self._last_device_delete_cache[(user_id, device_id)] = max( + last_deleted_stream_id, up_to_stream_id + ) + + defer.returnValue(count) + + def get_new_device_msgs_for_remote( + self, destination, last_stream_id, current_stream_id, limit=100 + ): + """ + Args: + destination(str): The name of the remote server. + last_stream_id(int|long): The last position of the device message stream + that the server sent up to. + current_stream_id(int|long): The current position of the device + message stream. + Returns: + Deferred ([dict], int|long): List of messages for the device and where + in the stream the messages got to. + """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed or last_stream_id == current_stream_id: + return defer.succeed(([], current_stream_id)) + + def get_new_messages_for_remote_destination_txn(txn): + sql = ( + "SELECT stream_id, messages_json FROM device_federation_outbox" + " WHERE destination = ?" + " AND ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC" + " LIMIT ?" + ) + txn.execute(sql, ( + destination, last_stream_id, current_stream_id, limit + )) + messages = [] + for row in txn: + stream_pos = row[0] + messages.append(json.loads(row[1])) + if len(messages) < limit: + stream_pos = current_stream_id + return (messages, stream_pos) + + return self.runInteraction( + "get_new_device_msgs_for_remote", + get_new_messages_for_remote_destination_txn, + ) + + def delete_device_msgs_for_remote(self, destination, up_to_stream_id): + """Used to delete messages when the remote destination acknowledges + their receipt. + + Args: + destination(str): The destination server_name + up_to_stream_id(int): Where to delete messages up to. + Returns: + A deferred that resolves when the messages have been deleted. + """ + def delete_messages_for_remote_destination_txn(txn): + sql = ( + "DELETE FROM device_federation_outbox" + " WHERE destination = ?" + " AND stream_id <= ?" + ) + txn.execute(sql, (destination, up_to_stream_id)) + + return self.runInteraction( + "delete_device_msgs_for_remote", + delete_messages_for_remote_destination_txn + ) + + +class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" def __init__(self, db_conn, hs): @@ -220,93 +380,6 @@ class DeviceInboxStore(BackgroundUpdateStore): txn.executemany(sql, rows) - def get_new_messages_for_device( - self, user_id, device_id, last_stream_id, current_stream_id, limit=100 - ): - """ - Args: - user_id(str): The recipient user_id. - device_id(str): The recipient device_id. - current_stream_id(int): The current position of the to device - message stream. - Returns: - Deferred ([dict], int): List of messages for the device and where - in the stream the messages got to. - """ - has_changed = self._device_inbox_stream_cache.has_entity_changed( - user_id, last_stream_id - ) - if not has_changed: - return defer.succeed(([], current_stream_id)) - - def get_new_messages_for_device_txn(txn): - sql = ( - "SELECT stream_id, message_json FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - " LIMIT ?" - ) - txn.execute(sql, ( - user_id, device_id, last_stream_id, current_stream_id, limit - )) - messages = [] - for row in txn: - stream_pos = row[0] - messages.append(json.loads(row[1])) - if len(messages) < limit: - stream_pos = current_stream_id - return (messages, stream_pos) - - return self.runInteraction( - "get_new_messages_for_device", get_new_messages_for_device_txn, - ) - - @defer.inlineCallbacks - def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): - """ - Args: - user_id(str): The recipient user_id. - device_id(str): The recipient device_id. - up_to_stream_id(int): Where to delete messages up to. - Returns: - A deferred that resolves to the number of messages deleted. - """ - # If we have cached the last stream id we've deleted up to, we can - # check if there is likely to be anything that needs deleting - last_deleted_stream_id = self._last_device_delete_cache.get( - (user_id, device_id), None - ) - if last_deleted_stream_id: - has_changed = self._device_inbox_stream_cache.has_entity_changed( - user_id, last_deleted_stream_id - ) - if not has_changed: - defer.returnValue(0) - - def delete_messages_for_device_txn(txn): - sql = ( - "DELETE FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" - ) - txn.execute(sql, (user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = yield self.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) - - # Update the cache, ensuring that we only ever increase the value - last_deleted_stream_id = self._last_device_delete_cache.get( - (user_id, device_id), 0 - ) - self._last_device_delete_cache[(user_id, device_id)] = max( - last_deleted_stream_id, up_to_stream_id - ) - - defer.returnValue(count) - def get_all_new_device_messages(self, last_pos, current_pos, limit): """ Args: @@ -351,77 +424,6 @@ class DeviceInboxStore(BackgroundUpdateStore): "get_all_new_device_messages", get_all_new_device_messages_txn ) - def get_to_device_stream_token(self): - return self._device_inbox_id_gen.get_current_token() - - def get_new_device_msgs_for_remote( - self, destination, last_stream_id, current_stream_id, limit=100 - ): - """ - Args: - destination(str): The name of the remote server. - last_stream_id(int|long): The last position of the device message stream - that the server sent up to. - current_stream_id(int|long): The current position of the device - message stream. - Returns: - Deferred ([dict], int|long): List of messages for the device and where - in the stream the messages got to. - """ - - has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( - destination, last_stream_id - ) - if not has_changed or last_stream_id == current_stream_id: - return defer.succeed(([], current_stream_id)) - - def get_new_messages_for_remote_destination_txn(txn): - sql = ( - "SELECT stream_id, messages_json FROM device_federation_outbox" - " WHERE destination = ?" - " AND ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - " LIMIT ?" - ) - txn.execute(sql, ( - destination, last_stream_id, current_stream_id, limit - )) - messages = [] - for row in txn: - stream_pos = row[0] - messages.append(json.loads(row[1])) - if len(messages) < limit: - stream_pos = current_stream_id - return (messages, stream_pos) - - return self.runInteraction( - "get_new_device_msgs_for_remote", - get_new_messages_for_remote_destination_txn, - ) - - def delete_device_msgs_for_remote(self, destination, up_to_stream_id): - """Used to delete messages when the remote destination acknowledges - their receipt. - - Args: - destination(str): The destination server_name - up_to_stream_id(int): Where to delete messages up to. - Returns: - A deferred that resolves when the messages have been deleted. - """ - def delete_messages_for_remote_destination_txn(txn): - sql = ( - "DELETE FROM device_federation_outbox" - " WHERE destination = ?" - " AND stream_id <= ?" - ) - txn.execute(sql, (destination, up_to_stream_id)) - - return self.runInteraction( - "delete_device_msgs_for_remote", - delete_messages_for_remote_destination_txn - ) - @defer.inlineCallbacks def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index ecdab34e7d..e716dc1437 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -22,11 +22,10 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList -from ._base import Cache, db_to_json - logger = logging.getLogger(__name__) DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( @@ -34,7 +33,343 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( ) -class DeviceStore(BackgroundUpdateStore): +class DeviceWorkerStore(SQLBaseStore): + def get_device(self, user_id, device_id): + """Retrieve a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to retrieve + Returns: + defer.Deferred for a dict containing the device information + Raises: + StoreError: if the device is not found + """ + return self._simple_select_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + retcols=("user_id", "device_id", "display_name"), + desc="get_device", + ) + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """Retrieve all of a user's registered devices. + + Args: + user_id (str): + Returns: + defer.Deferred: resolves to a dict from device_id to a dict + containing "device_id", "user_id" and "display_name" for each + device. + """ + devices = yield self._simple_select_list( + table="devices", + keyvalues={"user_id": user_id}, + retcols=("user_id", "device_id", "display_name"), + desc="get_devices_by_user" + ) + + defer.returnValue({d["device_id"]: d for d in devices}) + + def get_devices_by_remote(self, destination, from_stream_id): + """Get stream of updates to send to remote servers + + Returns: + (int, list[dict]): current stream id and list of updates + """ + now_stream_id = self._device_list_id_gen.get_current_token() + + has_changed = self._device_list_federation_stream_cache.has_entity_changed( + destination, int(from_stream_id) + ) + if not has_changed: + return (now_stream_id, []) + + return self.runInteraction( + "get_devices_by_remote", self._get_devices_by_remote_txn, + destination, from_stream_id, now_stream_id, + ) + + def _get_devices_by_remote_txn(self, txn, destination, from_stream_id, + now_stream_id): + sql = """ + SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes + WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? + GROUP BY user_id, device_id + LIMIT 20 + """ + txn.execute( + sql, (destination, from_stream_id, now_stream_id, False) + ) + + # maps (user_id, device_id) -> stream_id + query_map = {(r[0], r[1]): r[2] for r in txn} + if not query_map: + return (now_stream_id, []) + + if len(query_map) >= 20: + now_stream_id = max(stream_id for stream_id in itervalues(query_map)) + + devices = self._get_e2e_device_keys_txn( + txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True + ) + + prev_sent_id_sql = """ + SELECT coalesce(max(stream_id), 0) as stream_id + FROM device_lists_outbound_last_success + WHERE destination = ? AND user_id = ? AND stream_id <= ? + """ + + results = [] + for user_id, user_devices in iteritems(devices): + # The prev_id for the first row is always the last row before + # `from_stream_id` + txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) + rows = txn.fetchall() + prev_id = rows[0][0] + for device_id, device in iteritems(user_devices): + stream_id = query_map[(user_id, device_id)] + result = { + "user_id": user_id, + "device_id": device_id, + "prev_id": [prev_id] if prev_id else [], + "stream_id": stream_id, + } + + prev_id = stream_id + + if device is not None: + key_json = device.get("key_json", None) + if key_json: + result["keys"] = db_to_json(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + else: + result["deleted"] = True + + results.append(result) + + return (now_stream_id, results) + + def mark_as_sent_devices_by_remote(self, destination, stream_id): + """Mark that updates have successfully been sent to the destination. + """ + return self.runInteraction( + "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, + destination, stream_id, + ) + + def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): + # We update the device_lists_outbound_last_success with the successfully + # poked users. We do the join to see which users need to be inserted and + # which updated. + sql = """ + SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) + FROM device_lists_outbound_pokes as o + LEFT JOIN device_lists_outbound_last_success as s + USING (destination, user_id) + WHERE destination = ? AND o.stream_id <= ? + GROUP BY user_id + """ + txn.execute(sql, (destination, stream_id,)) + rows = txn.fetchall() + + sql = """ + UPDATE device_lists_outbound_last_success + SET stream_id = ? + WHERE destination = ? AND user_id = ? + """ + txn.executemany( + sql, ((row[1], destination, row[0],) for row in rows if row[2]) + ) + + sql = """ + INSERT INTO device_lists_outbound_last_success + (destination, user_id, stream_id) VALUES (?, ?, ?) + """ + txn.executemany( + sql, ((destination, row[0], row[1],) for row in rows if not row[2]) + ) + + # Delete all sent outbound pokes + sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE destination = ? AND stream_id <= ? + """ + txn.execute(sql, (destination, stream_id,)) + + def get_device_stream_token(self): + return self._device_list_id_gen.get_current_token() + + @defer.inlineCallbacks + def get_user_devices_from_cache(self, query_list): + """Get the devices (and keys if any) for remote users from the cache. + + Args: + query_list(list): List of (user_id, device_ids), if device_ids is + falsey then return all device ids for that user. + + Returns: + (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is + a set of user_ids and results_map is a mapping of + user_id -> device_id -> device_info + """ + user_ids = set(user_id for user_id, _ in query_list) + user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) + user_ids_in_cache = set( + user_id for user_id, stream_id in user_map.items() if stream_id + ) + user_ids_not_in_cache = user_ids - user_ids_in_cache + + results = {} + for user_id, device_id in query_list: + if user_id not in user_ids_in_cache: + continue + + if device_id: + device = yield self._get_cached_user_device(user_id, device_id) + results.setdefault(user_id, {})[device_id] = device + else: + results[user_id] = yield self._get_cached_devices_for_user(user_id) + + defer.returnValue((user_ids_not_in_cache, results)) + + @cachedInlineCallbacks(num_args=2, tree=True) + def _get_cached_user_device(self, user_id, device_id): + content = yield self._simple_select_one_onecol( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="content", + desc="_get_cached_user_device", + ) + defer.returnValue(db_to_json(content)) + + @cachedInlineCallbacks() + def _get_cached_devices_for_user(self, user_id): + devices = yield self._simple_select_list( + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + }, + retcols=("device_id", "content"), + desc="_get_cached_devices_for_user", + ) + defer.returnValue({ + device["device_id"]: db_to_json(device["content"]) + for device in devices + }) + + def get_devices_with_keys_by_user(self, user_id): + """Get all devices (with any device keys) for a user + + Returns: + (stream_id, devices) + """ + return self.runInteraction( + "get_devices_with_keys_by_user", + self._get_devices_with_keys_by_user_txn, user_id, + ) + + def _get_devices_with_keys_by_user_txn(self, txn, user_id): + now_stream_id = self._device_list_id_gen.get_current_token() + + devices = self._get_e2e_device_keys_txn( + txn, [(user_id, None)], include_all_devices=True + ) + + if devices: + user_devices = devices[user_id] + results = [] + for device_id, device in iteritems(user_devices): + result = { + "device_id": device_id, + } + + key_json = device.get("key_json", None) + if key_json: + result["keys"] = db_to_json(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + + results.append(result) + + return now_stream_id, results + + return now_stream_id, [] + + @defer.inlineCallbacks + def get_user_whose_devices_changed(self, from_key): + """Get set of users whose devices have changed since `from_key`. + """ + from_key = int(from_key) + changed = self._device_list_stream_cache.get_all_entities_changed(from_key) + if changed is not None: + defer.returnValue(set(changed)) + + sql = """ + SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? + """ + rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) + defer.returnValue(set(row[0] for row in rows)) + + def get_all_device_list_changes_for_remotes(self, from_key, to_key): + """Return a list of `(stream_id, user_id, destination)` which is the + combined list of changes to devices, and which destinations need to be + poked. `destination` may be None if no destinations need to be poked. + """ + # We do a group by here as there can be a large number of duplicate + # entries, since we throw away device IDs. + sql = """ + SELECT MAX(stream_id) AS stream_id, user_id, destination + FROM device_lists_stream + LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) + WHERE ? < stream_id AND stream_id <= ? + GROUP BY user_id, destination + """ + return self._execute( + "get_all_device_list_changes_for_remotes", None, + sql, from_key, to_key + ) + + @cached(max_entries=10000) + def get_device_list_last_stream_id_for_remote(self, user_id): + """Get the last stream_id we got for a user. May be None if we haven't + got any information for them. + """ + return self._simple_select_one_onecol( + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + retcol="stream_id", + desc="get_device_list_last_stream_id_for_remote", + allow_none=True, + ) + + @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote", + list_name="user_ids", inlineCallbacks=True) + def get_device_list_last_stream_id_for_remotes(self, user_ids): + rows = yield self._simple_select_many_batch( + table="device_lists_remote_extremeties", + column="user_id", + iterable=user_ids, + retcols=("user_id", "stream_id",), + desc="get_device_list_last_stream_id_for_remotes", + ) + + results = {user_id: None for user_id in user_ids} + results.update({ + row["user_id"]: row["stream_id"] for row in rows + }) + + defer.returnValue(results) + + +class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): def __init__(self, db_conn, hs): super(DeviceStore, self).__init__(db_conn, hs) @@ -121,24 +456,6 @@ class DeviceStore(BackgroundUpdateStore): initial_device_display_name, e) raise StoreError(500, "Problem storing device.") - def get_device(self, user_id, device_id): - """Retrieve a device. - - Args: - user_id (str): The ID of the user which owns the device - device_id (str): The ID of the device to retrieve - Returns: - defer.Deferred for a dict containing the device information - Raises: - StoreError: if the device is not found - """ - return self._simple_select_one( - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, - retcols=("user_id", "device_id", "display_name"), - desc="get_device", - ) - @defer.inlineCallbacks def delete_device(self, user_id, device_id): """Delete a device. @@ -203,57 +520,6 @@ class DeviceStore(BackgroundUpdateStore): ) @defer.inlineCallbacks - def get_devices_by_user(self, user_id): - """Retrieve all of a user's registered devices. - - Args: - user_id (str): - Returns: - defer.Deferred: resolves to a dict from device_id to a dict - containing "device_id", "user_id" and "display_name" for each - device. - """ - devices = yield self._simple_select_list( - table="devices", - keyvalues={"user_id": user_id}, - retcols=("user_id", "device_id", "display_name"), - desc="get_devices_by_user" - ) - - defer.returnValue({d["device_id"]: d for d in devices}) - - @cached(max_entries=10000) - def get_device_list_last_stream_id_for_remote(self, user_id): - """Get the last stream_id we got for a user. May be None if we haven't - got any information for them. - """ - return self._simple_select_one_onecol( - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - retcol="stream_id", - desc="get_device_list_remote_extremity", - allow_none=True, - ) - - @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote", - list_name="user_ids", inlineCallbacks=True) - def get_device_list_last_stream_id_for_remotes(self, user_ids): - rows = yield self._simple_select_many_batch( - table="device_lists_remote_extremeties", - column="user_id", - iterable=user_ids, - retcols=("user_id", "stream_id",), - desc="get_user_devices_from_cache", - ) - - results = {user_id: None for user_id in user_ids} - results.update({ - row["user_id"]: row["stream_id"] for row in rows - }) - - defer.returnValue(results) - - @defer.inlineCallbacks def mark_remote_user_device_list_as_unsubscribed(self, user_id): """Mark that we no longer track device lists for remote user. """ @@ -405,268 +671,6 @@ class DeviceStore(BackgroundUpdateStore): lock=False, ) - def get_devices_by_remote(self, destination, from_stream_id): - """Get stream of updates to send to remote servers - - Returns: - (int, list[dict]): current stream id and list of updates - """ - now_stream_id = self._device_list_id_gen.get_current_token() - - has_changed = self._device_list_federation_stream_cache.has_entity_changed( - destination, int(from_stream_id) - ) - if not has_changed: - return (now_stream_id, []) - - return self.runInteraction( - "get_devices_by_remote", self._get_devices_by_remote_txn, - destination, from_stream_id, now_stream_id, - ) - - def _get_devices_by_remote_txn(self, txn, destination, from_stream_id, - now_stream_id): - sql = """ - SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes - WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? - GROUP BY user_id, device_id - LIMIT 20 - """ - txn.execute( - sql, (destination, from_stream_id, now_stream_id, False) - ) - - # maps (user_id, device_id) -> stream_id - query_map = {(r[0], r[1]): r[2] for r in txn} - if not query_map: - return (now_stream_id, []) - - if len(query_map) >= 20: - now_stream_id = max(stream_id for stream_id in itervalues(query_map)) - - devices = self._get_e2e_device_keys_txn( - txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True - ) - - prev_sent_id_sql = """ - SELECT coalesce(max(stream_id), 0) as stream_id - FROM device_lists_outbound_last_success - WHERE destination = ? AND user_id = ? AND stream_id <= ? - """ - - results = [] - for user_id, user_devices in iteritems(devices): - # The prev_id for the first row is always the last row before - # `from_stream_id` - txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) - rows = txn.fetchall() - prev_id = rows[0][0] - for device_id, device in iteritems(user_devices): - stream_id = query_map[(user_id, device_id)] - result = { - "user_id": user_id, - "device_id": device_id, - "prev_id": [prev_id] if prev_id else [], - "stream_id": stream_id, - } - - prev_id = stream_id - - if device is not None: - key_json = device.get("key_json", None) - if key_json: - result["keys"] = db_to_json(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name - else: - result["deleted"] = True - - results.append(result) - - return (now_stream_id, results) - - @defer.inlineCallbacks - def get_user_devices_from_cache(self, query_list): - """Get the devices (and keys if any) for remote users from the cache. - - Args: - query_list(list): List of (user_id, device_ids), if device_ids is - falsey then return all device ids for that user. - - Returns: - (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is - a set of user_ids and results_map is a mapping of - user_id -> device_id -> device_info - """ - user_ids = set(user_id for user_id, _ in query_list) - user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids)) - user_ids_in_cache = set( - user_id for user_id, stream_id in user_map.items() if stream_id - ) - user_ids_not_in_cache = user_ids - user_ids_in_cache - - results = {} - for user_id, device_id in query_list: - if user_id not in user_ids_in_cache: - continue - - if device_id: - device = yield self._get_cached_user_device(user_id, device_id) - results.setdefault(user_id, {})[device_id] = device - else: - results[user_id] = yield self._get_cached_devices_for_user(user_id) - - defer.returnValue((user_ids_not_in_cache, results)) - - @cachedInlineCallbacks(num_args=2, tree=True) - def _get_cached_user_device(self, user_id, device_id): - content = yield self._simple_select_one_onecol( - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - retcol="content", - desc="_get_cached_user_device", - ) - defer.returnValue(db_to_json(content)) - - @cachedInlineCallbacks() - def _get_cached_devices_for_user(self, user_id): - devices = yield self._simple_select_list( - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - }, - retcols=("device_id", "content"), - desc="_get_cached_devices_for_user", - ) - defer.returnValue({ - device["device_id"]: db_to_json(device["content"]) - for device in devices - }) - - def get_devices_with_keys_by_user(self, user_id): - """Get all devices (with any device keys) for a user - - Returns: - (stream_id, devices) - """ - return self.runInteraction( - "get_devices_with_keys_by_user", - self._get_devices_with_keys_by_user_txn, user_id, - ) - - def _get_devices_with_keys_by_user_txn(self, txn, user_id): - now_stream_id = self._device_list_id_gen.get_current_token() - - devices = self._get_e2e_device_keys_txn( - txn, [(user_id, None)], include_all_devices=True - ) - - if devices: - user_devices = devices[user_id] - results = [] - for device_id, device in iteritems(user_devices): - result = { - "device_id": device_id, - } - - key_json = device.get("key_json", None) - if key_json: - result["keys"] = db_to_json(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name - - results.append(result) - - return now_stream_id, results - - return now_stream_id, [] - - def mark_as_sent_devices_by_remote(self, destination, stream_id): - """Mark that updates have successfully been sent to the destination. - """ - return self.runInteraction( - "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn, - destination, stream_id, - ) - - def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): - # We update the device_lists_outbound_last_success with the successfully - # poked users. We do the join to see which users need to be inserted and - # which updated. - sql = """ - SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) - FROM device_lists_outbound_pokes as o - LEFT JOIN device_lists_outbound_last_success as s - USING (destination, user_id) - WHERE destination = ? AND o.stream_id <= ? - GROUP BY user_id - """ - txn.execute(sql, (destination, stream_id,)) - rows = txn.fetchall() - - sql = """ - UPDATE device_lists_outbound_last_success - SET stream_id = ? - WHERE destination = ? AND user_id = ? - """ - txn.executemany( - sql, ((row[1], destination, row[0],) for row in rows if row[2]) - ) - - sql = """ - INSERT INTO device_lists_outbound_last_success - (destination, user_id, stream_id) VALUES (?, ?, ?) - """ - txn.executemany( - sql, ((destination, row[0], row[1],) for row in rows if not row[2]) - ) - - # Delete all sent outbound pokes - sql = """ - DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? - """ - txn.execute(sql, (destination, stream_id,)) - - @defer.inlineCallbacks - def get_user_whose_devices_changed(self, from_key): - """Get set of users whose devices have changed since `from_key`. - """ - from_key = int(from_key) - changed = self._device_list_stream_cache.get_all_entities_changed(from_key) - if changed is not None: - defer.returnValue(set(changed)) - - sql = """ - SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? - """ - rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) - defer.returnValue(set(row[0] for row in rows)) - - def get_all_device_list_changes_for_remotes(self, from_key, to_key): - """Return a list of `(stream_id, user_id, destination)` which is the - combined list of changes to devices, and which destinations need to be - poked. `destination` may be None if no destinations need to be poked. - """ - # We do a group by here as there can be a large number of duplicate - # entries, since we throw away device IDs. - sql = """ - SELECT MAX(stream_id) AS stream_id, user_id, destination - FROM device_lists_stream - LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) - WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id, destination - """ - return self._execute( - "get_all_device_list_changes_for_remotes", None, - sql, from_key, to_key - ) - @defer.inlineCallbacks def add_device_change_to_streams(self, user_id, device_ids, hosts): """Persist that a user's devices have been updated, and which hosts @@ -732,9 +736,6 @@ class DeviceStore(BackgroundUpdateStore): ] ) - def get_device_stream_token(self): - return self._device_list_id_gen.get_current_token() - def _prune_old_outbound_device_pokes(self): """Delete old entries out of the device_lists_outbound_pokes to ensure that we don't fill up due to dead servers. We keep one entry per diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 2a0f6cfca9..e381e472a2 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -23,49 +23,7 @@ from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json -class EndToEndKeyStore(SQLBaseStore): - def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): - """Stores device keys for a device. Returns whether there was a change - or the keys were already in the database. - """ - def _set_e2e_device_keys_txn(txn): - old_key_json = self._simple_select_one_onecol_txn( - txn, - table="e2e_device_keys_json", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - retcol="key_json", - allow_none=True, - ) - - # In py3 we need old_key_json to match new_key_json type. The DB - # returns unicode while encode_canonical_json returns bytes. - new_key_json = encode_canonical_json(device_keys).decode("utf-8") - - if old_key_json == new_key_json: - return False - - self._simple_upsert_txn( - txn, - table="e2e_device_keys_json", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - values={ - "ts_added_ms": time_now, - "key_json": new_key_json, - } - ) - - return True - - return self.runInteraction( - "set_e2e_device_keys", _set_e2e_device_keys_txn - ) - +class EndToEndKeyWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, @@ -238,6 +196,50 @@ class EndToEndKeyStore(SQLBaseStore): "count_e2e_one_time_keys", _count_e2e_one_time_keys ) + +class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): + def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): + """Stores device keys for a device. Returns whether there was a change + or the keys were already in the database. + """ + def _set_e2e_device_keys_txn(txn): + old_key_json = self._simple_select_one_onecol_txn( + txn, + table="e2e_device_keys_json", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + retcol="key_json", + allow_none=True, + ) + + # In py3 we need old_key_json to match new_key_json type. The DB + # returns unicode while encode_canonical_json returns bytes. + new_key_json = encode_canonical_json(device_keys).decode("utf-8") + + if old_key_json == new_key_json: + return False + + self._simple_upsert_txn( + txn, + table="e2e_device_keys_json", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "ts_added_ms": time_now, + "key_json": new_key_json, + } + ) + + return True + + return self.runInteraction( + "set_e2e_device_keys", _set_e2e_device_keys_txn + ) + def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" def _claim_e2e_one_time_keys(txn): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 38809ed0fc..a8d90456e3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -442,6 +442,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, event_results.reverse() return event_results + @defer.inlineCallbacks + def get_successor_events(self, event_ids): + """Fetch all events that have the given events as a prev event + + Args: + event_ids (iterable[str]) + + Returns: + Deferred[list[str]] + """ + rows = yield self._simple_select_many_batch( + table="event_edges", + column="prev_event_id", + iterable=event_ids, + retcols=("event_id",), + desc="get_successor_events" + ) + + defer.returnValue([ + row["event_id"] for row in rows + ]) + class EventFederationStore(EventFederationWorkerStore): """ Responsible for storing and serving up the various graphs associated diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 06db9e56e6..428300ea0a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -537,6 +537,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore new_events = [ event for event, ctx in event_contexts if not event.internal_metadata.is_outlier() and not ctx.rejected + and not event.internal_metadata.is_soft_failed() ] # start with the existing forward extremities @@ -1406,21 +1407,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore values=state_values, ) - self._simple_insert_many_txn( - txn, - table="event_edges", - values=[ - { - "event_id": event.event_id, - "prev_event_id": prev_id, - "room_id": event.room_id, - "is_state": True, - } - for event, _ in state_events_and_contexts - for prev_id, _ in event.prev_state - ], - ) - # Prefill the event cache self._add_to_cache(txn, events_and_contexts) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 6a5028961d..4b8438c3e9 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -186,6 +186,63 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, defer.returnValue(results) @defer.inlineCallbacks + def move_push_rule_from_room_to_room( + self, new_room_id, user_id, rule, + ): + """Move a single push rule from one room to another for a specific user. + + Args: + new_room_id (str): ID of the new room. + user_id (str): ID of user the push rule belongs to. + rule (Dict): A push rule. + """ + # Create new rule id + rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1]) + new_rule_id = rule_id_scope + "/" + new_room_id + + # Change room id in each condition + for condition in rule.get("conditions", []): + if condition.get("key") == "room_id": + condition["pattern"] = new_room_id + + # Add the rule for the new room + yield self.add_push_rule( + user_id=user_id, + rule_id=new_rule_id, + priority_class=rule["priority_class"], + conditions=rule["conditions"], + actions=rule["actions"], + ) + + # Delete push rule for the old room + yield self.delete_push_rule(user_id, rule["rule_id"]) + + @defer.inlineCallbacks + def move_push_rules_from_room_to_room_for_user( + self, old_room_id, new_room_id, user_id, + ): + """Move all of the push rules from one room to another for a specific + user. + + Args: + old_room_id (str): ID of the old room. + new_room_id (str): ID of the new room. + user_id (str): ID of user to copy push rules for. + """ + # Retrieve push rules for this user + user_push_rules = yield self.get_push_rules_for_user(user_id) + + # Get rules relating to the old room, move them to the new room, then + # delete them from the old room + for rule in user_push_rules: + conditions = rule.get("conditions", []) + if any((c.get("key") == "room_id" and + c.get("pattern") == old_room_id) for c in conditions): + self.move_push_rule_from_room_to_room( + new_room_id, user_id, rule, + ) + + @defer.inlineCallbacks def bulk_get_push_rules_for_room(self, event, context): state_group = context.state_group if not state_group: diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0ac665e967..0fd1ccc40a 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -346,15 +346,23 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + """Inserts a read-receipt into the database if it's newer than the current RR + + Returns: int|None + None if the RR is older than the current RR + otherwise, the rx timestamp of the event that the RR corresponds to + (or 0 if the event is unknown) + """ res = self._simple_select_one_txn( txn, table="events", - retcols=["topological_ordering", "stream_ordering"], + retcols=["stream_ordering", "received_ts"], keyvalues={"event_id": event_id}, allow_none=True ) stream_ordering = int(res["stream_ordering"]) if res else None + rx_ts = res["received_ts"] if res else 0 # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -373,7 +381,7 @@ class ReceiptsStore(ReceiptsWorkerStore): "one for later event %s", event_id, eid, ) - return False + return None txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) @@ -429,7 +437,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_ordering=stream_ordering, ) - return True + return rx_ts @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): @@ -466,7 +474,7 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id_manager = self._receipts_id_gen.get_next() with stream_id_manager as stream_id: - have_persisted = yield self.runInteraction( + event_ts = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, @@ -474,8 +482,14 @@ class ReceiptsStore(ReceiptsWorkerStore): stream_id=stream_id, ) - if not have_persisted: - defer.returnValue(None) + if event_ts is None: + defer.returnValue(None) + + now = self._clock.time_msec() + logger.debug( + "RR for event %s in %s (%i ms old)", + linearized_event_id, room_id, now - event_ts, + ) yield self.insert_graph_receipt( room_id, receipt_type, user_id, event_ids, data diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9b9572890b..9b6c28892c 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -295,6 +295,39 @@ class RegistrationWorkerStore(SQLBaseStore): return ret['user_id'] return None + @defer.inlineCallbacks + def user_add_threepid(self, user_id, medium, address, validated_at, added_at): + yield self._simple_upsert("user_threepids", { + "medium": medium, + "address": address, + }, { + "user_id": user_id, + "validated_at": validated_at, + "added_at": added_at, + }) + + @defer.inlineCallbacks + def user_get_threepids(self, user_id): + ret = yield self._simple_select_list( + "user_threepids", { + "user_id": user_id + }, + ['medium', 'address', 'validated_at', 'added_at'], + 'user_get_threepids' + ) + defer.returnValue(ret) + + def user_delete_threepid(self, user_id, medium, address): + return self._simple_delete( + "user_threepids", + keyvalues={ + "user_id": user_id, + "medium": medium, + "address": address, + }, + desc="user_delete_threepids", + ) + class RegistrationStore(RegistrationWorkerStore, background_updates.BackgroundUpdateStore): @@ -633,39 +666,6 @@ class RegistrationStore(RegistrationWorkerStore, defer.returnValue(res if res else False) @defer.inlineCallbacks - def user_add_threepid(self, user_id, medium, address, validated_at, added_at): - yield self._simple_upsert("user_threepids", { - "medium": medium, - "address": address, - }, { - "user_id": user_id, - "validated_at": validated_at, - "added_at": added_at, - }) - - @defer.inlineCallbacks - def user_get_threepids(self, user_id): - ret = yield self._simple_select_list( - "user_threepids", { - "user_id": user_id - }, - ['medium', 'address', 'validated_at', 'added_at'], - 'user_get_threepids' - ) - defer.returnValue(ret) - - def user_delete_threepid(self, user_id, medium, address): - return self._simple_delete( - "user_threepids", - keyvalues={ - "user_id": user_id, - "medium": medium, - "address": address, - }, - desc="user_delete_threepids", - ) - - @defer.inlineCallbacks def save_or_get_3pid_guest_access_token( self, medium, address, access_token, inviter_user_id ): diff --git a/synapse/storage/schema/delta/53/user_share.sql b/synapse/storage/schema/delta/53/user_share.sql new file mode 100644 index 0000000000..14424ded0c --- /dev/null +++ b/synapse/storage/schema/delta/53/user_share.sql @@ -0,0 +1,47 @@ +/* Copyright 2017 Vector Creations Ltd, 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Old disused version of the tables below. +DROP TABLE IF EXISTS users_who_share_rooms; + +-- This is no longer used because it's duplicated by the users_who_share_public_rooms +DROP TABLE IF EXISTS users_in_public_rooms; + +-- Tables keeping track of what users share rooms. This is a map of local users +-- to local or remote users, per room. Remote users cannot be in the user_id +-- column, only the other_user_id column. There are two tables, one for public +-- rooms and those for private rooms. +CREATE TABLE IF NOT EXISTS users_who_share_public_rooms ( + user_id TEXT NOT NULL, + other_user_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS users_who_share_private_rooms ( + user_id TEXT NOT NULL, + other_user_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX users_who_share_public_rooms_u_idx ON users_who_share_public_rooms(user_id, other_user_id, room_id); +CREATE INDEX users_who_share_public_rooms_r_idx ON users_who_share_public_rooms(room_id); +CREATE INDEX users_who_share_public_rooms_o_idx ON users_who_share_public_rooms(other_user_id); + +CREATE UNIQUE INDEX users_who_share_private_rooms_u_idx ON users_who_share_private_rooms(user_id, other_user_id, room_id); +CREATE INDEX users_who_share_private_rooms_r_idx ON users_who_share_private_rooms(room_id); +CREATE INDEX users_who_share_private_rooms_o_idx ON users_who_share_private_rooms(other_user_id); + +-- Make sure that we populate the tables initially by resetting the stream ID +UPDATE user_directory_stream_pos SET stream_id = NULL; diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql index 52eec88357..bccd1c6f74 100644 --- a/synapse/storage/schema/full_schemas/11/event_edges.sql +++ b/synapse/storage/schema/full_schemas/11/event_edges.sql @@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS event_edges( event_id TEXT NOT NULL, prev_event_id TEXT NOT NULL, room_id TEXT NOT NULL, + -- We no longer insert prev_state into this table, so all new rows will have + -- is_state as false. is_state BOOL NOT NULL, UNIQUE (event_id, prev_event_id, room_id, is_state) ); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d6cfdba519..580fafeb3a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -191,6 +191,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, order='DESC'): + """Get new room events in stream ordering since `from_key`. + + Args: + room_id (str) + from_key (str): Token from which no events are returned before + to_key (str): Token from which no events are returned after. (This + is typically the current stream token) + limit (int): Maximum number of events to return + order (str): Either "DESC" or "ASC". Determines which events are + returned when the result is limited. If "DESC" then the most + recent `limit` events are returned, otherwise returns the + oldest `limit` events. + + Returns: + Deferred[dict[str,tuple[list[FrozenEvent], str]]] + A map from room id to a tuple containing: + - list of recent events in the room + - stream ordering key for the start of the chunk of events returned. + """ from_id = RoomStreamToken.parse_stream_token(from_key).stream room_ids = yield self._events_stream_cache.get_entities_changed( diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index fea866c043..2317d22ed6 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -63,31 +63,14 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) - @defer.inlineCallbacks - def add_users_to_public_room(self, room_id, user_ids): - """Add user to the list of users in public rooms - - Args: - room_id (str): A room_id that all users are in that is world_readable - or publically joinable - user_ids (list(str)): Users to add - """ - yield self._simple_insert_many( - table="users_in_public_rooms", - values=[{"user_id": user_id, "room_id": room_id} for user_id in user_ids], - desc="add_users_to_public_room", - ) - for user_id in user_ids: - self.get_user_in_public_room.invalidate((user_id,)) - - def add_profiles_to_user_dir(self, room_id, users_with_profile): + def add_profiles_to_user_dir(self, users_with_profile): """Add profiles to the user directory Args: - room_id (str): A room_id that all users are joined to users_with_profile (dict): Users to add to directory in the form of mapping of user_id -> ProfileInfo """ + if isinstance(self.database_engine, PostgresEngine): # We weight the loclpart most highly, then display name and finally # server name @@ -113,7 +96,7 @@ class UserDirectoryStore(SQLBaseStore): INSERT INTO user_directory_search(user_id, value) VALUES (?,?) """ - args = ( + args = tuple( ( user_id, "%s %s" % (user_id, p.display_name) if p.display_name else user_id, @@ -132,7 +115,7 @@ class UserDirectoryStore(SQLBaseStore): values=[ { "user_id": user_id, - "room_id": room_id, + "room_id": None, "display_name": profile.display_name, "avatar_url": profile.avatar_url, } @@ -250,16 +233,6 @@ class UserDirectoryStore(SQLBaseStore): "update_profile_in_user_dir", _update_profile_in_user_dir_txn ) - @defer.inlineCallbacks - def update_user_in_public_user_list(self, user_id, room_id): - yield self._simple_update_one( - table="users_in_public_rooms", - keyvalues={"user_id": user_id}, - updatevalues={"room_id": room_id}, - desc="update_user_in_public_user_list", - ) - self.get_user_in_public_room.invalidate((user_id,)) - def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): self._simple_delete_txn( @@ -269,62 +242,50 @@ class UserDirectoryStore(SQLBaseStore): txn, table="user_directory_search", keyvalues={"user_id": user_id} ) self._simple_delete_txn( - txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} + txn, + table="users_who_share_public_rooms", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_public_rooms", + keyvalues={"other_user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"other_user_id": user_id}, ) txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) - txn.call_after(self.get_user_in_public_room.invalidate, (user_id,)) return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn) @defer.inlineCallbacks - def remove_from_user_in_public_room(self, user_id): - yield self._simple_delete( - table="users_in_public_rooms", - keyvalues={"user_id": user_id}, - desc="remove_from_user_in_public_room", - ) - self.get_user_in_public_room.invalidate((user_id,)) - - def get_users_in_public_due_to_room(self, room_id): - """Get all user_ids that are in the room directory because they're - in the given room_id - """ - return self._simple_select_onecol( - table="users_in_public_rooms", - keyvalues={"room_id": room_id}, - retcol="user_id", - desc="get_users_in_public_due_to_room", - ) - - @defer.inlineCallbacks def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory because they're in the given room_id """ - user_ids_dir = yield self._simple_select_onecol( - table="user_directory", - keyvalues={"room_id": room_id}, - retcol="user_id", - desc="get_users_in_dir_due_to_room", - ) - - user_ids_pub = yield self._simple_select_onecol( - table="users_in_public_rooms", + user_ids_share_pub = yield self._simple_select_onecol( + table="users_who_share_public_rooms", keyvalues={"room_id": room_id}, - retcol="user_id", + retcol="other_user_id", desc="get_users_in_dir_due_to_room", ) - user_ids_share = yield self._simple_select_onecol( - table="users_who_share_rooms", + user_ids_share_priv = yield self._simple_select_onecol( + table="users_who_share_private_rooms", keyvalues={"room_id": room_id}, - retcol="user_id", + retcol="other_user_id", desc="get_users_in_dir_due_to_room", ) - user_ids = set(user_ids_dir) - user_ids.update(user_ids_pub) - user_ids.update(user_ids_share) + user_ids = set(user_ids_share_pub) + user_ids.update(user_ids_share_priv) defer.returnValue(user_ids) @@ -351,7 +312,7 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue([name for name, in rows]) def add_users_who_share_room(self, room_id, share_private, user_id_tuples): - """Insert entries into the users_who_share_rooms table. The first + """Insert entries into the users_who_share_*_rooms table. The first user should be a local user. Args: @@ -361,109 +322,71 @@ class UserDirectoryStore(SQLBaseStore): """ def _add_users_who_share_room_txn(txn): - self._simple_insert_many_txn( + + if share_private: + tbl = "users_who_share_private_rooms" + else: + tbl = "users_who_share_public_rooms" + + self._simple_upsert_many_txn( txn, - table="users_who_share_rooms", - values=[ - { - "user_id": user_id, - "other_user_id": other_user_id, - "room_id": room_id, - "share_private": share_private, - } + table=tbl, + key_names=["user_id", "other_user_id", "room_id"], + key_values=[ + (user_id, other_user_id, room_id) for user_id, other_user_id in user_id_tuples ], + value_names=(), + value_values=None, ) for user_id, other_user_id in user_id_tuples: txn.call_after( self.get_users_who_share_room_from_dir.invalidate, (user_id,) ) - txn.call_after( - self.get_if_users_share_a_room.invalidate, (user_id, other_user_id) - ) return self.runInteraction( "add_users_who_share_room", _add_users_who_share_room_txn ) - def update_users_who_share_room(self, room_id, share_private, user_id_sets): - """Updates entries in the users_who_share_rooms table. The first - user should be a local user. - - Args: - room_id (str) - share_private (bool): Is the room private - user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. + def remove_user_who_share_room(self, user_id, room_id): """ - - def _update_users_who_share_room_txn(txn): - sql = """ - UPDATE users_who_share_rooms - SET room_id = ?, share_private = ? - WHERE user_id = ? AND other_user_id = ? - """ - txn.executemany( - sql, ((room_id, share_private, uid, oid) for uid, oid in user_id_sets) - ) - for user_id, other_user_id in user_id_sets: - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) - ) - txn.call_after( - self.get_if_users_share_a_room.invalidate, (user_id, other_user_id) - ) - - return self.runInteraction( - "update_users_who_share_room", _update_users_who_share_room_txn - ) - - def remove_user_who_share_room(self, user_id, other_user_id): - """Deletes entries in the users_who_share_rooms table. The first + Deletes entries in the users_who_share_*_rooms table. The first user should be a local user. Args: + user_id (str) room_id (str) - share_private (bool): Is the room private - user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. """ def _remove_user_who_share_room_txn(txn): self._simple_delete_txn( txn, - table="users_who_share_rooms", - keyvalues={"user_id": user_id, "other_user_id": other_user_id}, + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id, "room_id": room_id}, ) - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"other_user_id": user_id, "room_id": room_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_public_rooms", + keyvalues={"user_id": user_id, "room_id": room_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_public_rooms", + keyvalues={"other_user_id": user_id, "room_id": room_id}, ) txn.call_after( - self.get_if_users_share_a_room.invalidate, (user_id, other_user_id) + self.get_users_who_share_room_from_dir.invalidate, (user_id,) ) return self.runInteraction( "remove_user_who_share_room", _remove_user_who_share_room_txn ) - @cached(max_entries=500000) - def get_if_users_share_a_room(self, user_id, other_user_id): - """Gets if users share a room. - - Args: - user_id (str): Must be a local user_id - other_user_id (str) - - Returns: - bool|None: None if they don't share a room, otherwise whether they - share a private room or not. - """ - return self._simple_select_one_onecol( - table="users_who_share_rooms", - keyvalues={"user_id": user_id, "other_user_id": other_user_id}, - retcol="share_private", - allow_none=True, - desc="get_if_users_share_a_room", - ) - @cachedInlineCallbacks(max_entries=500000, iterable=True) def get_users_who_share_room_from_dir(self, user_id): """Returns the set of users who share a room with `user_id` @@ -472,32 +395,29 @@ class UserDirectoryStore(SQLBaseStore): user_id(str): Must be a local user Returns: - dict: user_id -> share_private mapping + list: user_id """ - rows = yield self._simple_select_list( - table="users_who_share_rooms", + rows = yield self._simple_select_onecol( + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id}, + retcol="other_user_id", + desc="get_users_who_share_room_with_user", + ) + + pub_rows = yield self._simple_select_onecol( + table="users_who_share_public_rooms", keyvalues={"user_id": user_id}, - retcols=("other_user_id", "share_private"), + retcol="other_user_id", desc="get_users_who_share_room_with_user", ) - defer.returnValue({row["other_user_id"]: row["share_private"] for row in rows}) + users = set(pub_rows) + users.update(rows) - def get_users_in_share_dir_with_room_id(self, user_id, room_id): - """Get all user tuples that are in the users_who_share_rooms due to the - given room_id. + # Remove the user themselves from this list. + users.discard(user_id) - Returns: - [(user_id, other_user_id)]: where one of the two will match the given - user_id. - """ - sql = """ - SELECT user_id, other_user_id FROM users_who_share_rooms - WHERE room_id = ? AND (user_id = ? OR other_user_id = ?) - """ - return self._execute( - "get_users_in_share_dir_with_room_id", None, sql, room_id, user_id, user_id - ) + defer.returnValue(list(users)) @defer.inlineCallbacks def get_rooms_in_common_for_users(self, user_id, other_user_id): @@ -532,12 +452,10 @@ class UserDirectoryStore(SQLBaseStore): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") - txn.execute("DELETE FROM users_in_public_rooms") - txn.execute("DELETE FROM users_who_share_rooms") + txn.execute("DELETE FROM users_who_share_public_rooms") + txn.execute("DELETE FROM users_who_share_private_rooms") txn.call_after(self.get_user_in_directory.invalidate_all) - txn.call_after(self.get_user_in_public_room.invalidate_all) txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) - txn.call_after(self.get_if_users_share_a_room.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -548,21 +466,11 @@ class UserDirectoryStore(SQLBaseStore): return self._simple_select_one( table="user_directory", keyvalues={"user_id": user_id}, - retcols=("room_id", "display_name", "avatar_url"), + retcols=("display_name", "avatar_url"), allow_none=True, desc="get_user_in_directory", ) - @cached() - def get_user_in_public_room(self, user_id): - return self._simple_select_one( - table="users_in_public_rooms", - keyvalues={"user_id": user_id}, - retcols=("room_id",), - allow_none=True, - desc="get_user_in_public_room", - ) - def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -660,14 +568,15 @@ class UserDirectoryStore(SQLBaseStore): where_clause = "1=1" else: join_clause = """ - LEFT JOIN users_in_public_rooms AS p USING (user_id) LEFT JOIN ( - SELECT other_user_id AS user_id FROM users_who_share_rooms - WHERE user_id = ? AND share_private - ) AS s USING (user_id) + SELECT other_user_id AS user_id FROM users_who_share_public_rooms + UNION + SELECT other_user_id AS user_id FROM users_who_share_private_rooms + WHERE user_id = ? + ) AS p USING (user_id) """ join_args = (user_id,) - where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)" + where_clause = "p.user_id IS NOT NULL" if isinstance(self.database_engine, PostgresEngine): full_query, exact_query, prefix_query = _parse_query_postgres(search_term) @@ -686,7 +595,7 @@ class UserDirectoryStore(SQLBaseStore): %s AND vector @@ to_tsquery('english', ?) ORDER BY - (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) + (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END) * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END) * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END) * ( diff --git a/synapse/visibility.py b/synapse/visibility.py index 0281a7c919..16c40cd74c 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -67,6 +67,10 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, Returns: Deferred[list[synapse.events.EventBase]] """ + # Filter out events that have been soft failed so that we don't relay them + # to clients. + events = list(e for e in events if not e.internal_metadata.is_soft_failed()) + types = ( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id), @@ -216,28 +220,36 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, @defer.inlineCallbacks -def filter_events_for_server(store, server_name, events): - # Whatever else we do, we need to check for senders which have requested - # erasure of their data. - erased_senders = yield store.are_users_erased( - (e.sender for e in events), - ) +def filter_events_for_server(store, server_name, events, redact=True, + check_history_visibility_only=False): + """Filter a list of events based on whether given server is allowed to + see them. - def redact_disallowed(event, state): - # if the sender has been gdpr17ed, always return a redacted - # copy of the event. - if erased_senders[event.sender]: + Args: + store (DataStore) + server_name (str) + events (iterable[FrozenEvent]) + redact (bool): Whether to return a redacted version of the event, or + to filter them out entirely. + check_history_visibility_only (bool): Whether to only check the + history visibility, rather than things like if the sender has been + erased. This is used e.g. during pagination to decide whether to + backfill or not. + + Returns + Deferred[list[FrozenEvent]] + """ + + def is_sender_erased(event, erased_senders): + if erased_senders and erased_senders[event.sender]: logger.info( "Sender of %s has been erased, redacting", event.event_id, ) - return prune_event(event) - - # state will be None if we decided we didn't need to filter by - # room membership. - if not state: - return event + return True + return False + def check_event_is_visible(event, state): history = state.get((EventTypes.RoomHistoryVisibility, ''), None) if history: visibility = history.content.get("history_visibility", "shared") @@ -259,17 +271,17 @@ def filter_events_for_server(store, server_name, events): memtype = ev.membership if memtype == Membership.JOIN: - return event + return True elif memtype == Membership.INVITE: if visibility == "invited": - return event + return True else: # server has no users in the room: redact - return prune_event(event) + return False - return event + return True - # Next lets check to see if all the events have a history visibility + # Lets check to see if all the events have a history visibility # of "shared" or "world_readable". If thats the case then we don't # need to check membership (as we know the server is in the room). event_to_state_ids = yield store.get_state_ids_for_events( @@ -296,16 +308,31 @@ def filter_events_for_server(store, server_name, events): for e in itervalues(event_map) ) + if not check_history_visibility_only: + erased_senders = yield store.are_users_erased( + (e.sender for e in events), + ) + else: + # We don't want to check whether users are erased, which is equivalent + # to no users having been erased. + erased_senders = {} + if all_open: # all the history_visibility state affecting these events is open, so # we don't need to filter by membership state. We *do* need to check # for user erasure, though. if erased_senders: - events = [ - redact_disallowed(e, None) - for e in events - ] + to_return = [] + for e in events: + if not is_sender_erased(e, erased_senders): + to_return.append(e) + elif redact: + to_return.append(prune_event(e)) + + defer.returnValue(to_return) + # If there are no erased users then we can just return the given list + # of events without having to copy it. defer.returnValue(events) # Ok, so we're dealing with events that have non-trivial visibility @@ -361,7 +388,13 @@ def filter_events_for_server(store, server_name, events): for e_id, key_to_eid in iteritems(event_to_state_ids) } - defer.returnValue([ - redact_disallowed(e, event_to_state[e.event_id]) - for e in events - ]) + to_return = [] + for e in events: + erased = is_sender_erased(e, erased_senders) + visible = check_event_is_visible(e, event_to_state[e.event_id]) + if visible and not erased: + to_return.append(e) + elif redact: + to_return.append(prune_event(e)) + + defer.returnValue(to_return) |