diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 53 | ||||
-rw-r--r-- | synapse/handlers/devicemessage.py | 4 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 19 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 10 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 60 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 7 | ||||
-rw-r--r-- | synapse/handlers/message.py | 73 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 11 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 10 | ||||
-rw-r--r-- | synapse/handlers/register.py | 7 | ||||
-rw-r--r-- | synapse/handlers/room.py | 10 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 64 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 44 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 4 |
15 files changed, 205 insertions, 173 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 63d05f2531..5ad408f549 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -24,7 +24,6 @@ from .profile import ProfileHandler from .directory import DirectoryHandler from .admin import AdminHandler from .identity import IdentityHandler -from .receipts import ReceiptsHandler from .search import SearchHandler @@ -56,7 +55,6 @@ class Handlers(object): self.profile_handler = ProfileHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) - self.receipts_handler = ReceiptsHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3851b35889..3b146f09d6 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -61,6 +61,8 @@ class AuthHandler(BaseHandler): for module, config in hs.config.password_providers ] + logger.info("Extra password_providers: %r", self.password_providers) + self.hs = hs # FIXME better possibility to access registrationHandler later? self.device_handler = hs.get_device_handler() @@ -160,7 +162,15 @@ class AuthHandler(BaseHandler): for f in flows: if len(set(f) - set(creds.keys())) == 0: - logger.info("Auth completed with creds: %r", creds) + # it's very useful to know what args are stored, but this can + # include the password in the case of registering, so only log + # the keys (confusingly, clientdict may contain a password + # param, creds is just what the user authed as for UI auth + # and is not sensitive). + logger.info( + "Auth completed with creds: %r. Client dict has keys: %r", + creds, clientdict.keys() + ) defer.returnValue((True, creds, clientdict, session['id'])) ret = self._auth_dict_for_flows(flows, session) @@ -378,12 +388,10 @@ class AuthHandler(BaseHandler): return self._check_password(user_id, password) @defer.inlineCallbacks - def get_login_tuple_for_user_id(self, user_id, device_id=None, - initial_display_name=None): + def get_access_token_for_user_id(self, user_id, device_id=None, + initial_display_name=None): """ - Gets login tuple for the user with the given user ID. - - Creates a new access/refresh token for the user. + Creates a new access token for the user with the given user ID. The user is assumed to have been authenticated by some other machanism (e.g. CAS), and the user_id converted to the canonical case. @@ -398,16 +406,13 @@ class AuthHandler(BaseHandler): initial_display_name (str): display name to associate with the device if it needs re-registering Returns: - A tuple of: The access token for the user's session. - The refresh token for the user's session. Raises: StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ logger.info("Logging in user %s on device %s", user_id, device_id) access_token = yield self.issue_access_token(user_id, device_id) - refresh_token = yield self.issue_refresh_token(user_id, device_id) # the device *should* have been registered before we got here; however, # it's possible we raced against a DELETE operation. The thing we @@ -418,7 +423,7 @@ class AuthHandler(BaseHandler): user_id, device_id, initial_display_name ) - defer.returnValue((access_token, refresh_token)) + defer.returnValue(access_token) @defer.inlineCallbacks def check_user_exists(self, user_id): @@ -529,35 +534,19 @@ class AuthHandler(BaseHandler): device_id) defer.returnValue(access_token) - @defer.inlineCallbacks - def issue_refresh_token(self, user_id, device_id=None): - refresh_token = self.generate_refresh_token(user_id) - yield self.store.add_refresh_token_to_user(user_id, refresh_token, - device_id) - defer.returnValue(refresh_token) - - def generate_access_token(self, user_id, extra_caveats=None, - duration_in_ms=(60 * 60 * 1000)): + def generate_access_token(self, user_id, extra_caveats=None): extra_caveats = extra_caveats or [] macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = access") - now = self.hs.get_clock().time_msec() - expiry = now + duration_in_ms - macaroon.add_first_party_caveat("time < %d" % (expiry,)) + # Include a nonce, to make sure that each login gets a different + # access token. + macaroon.add_first_party_caveat("nonce = %s" % ( + stringutils.random_string_with_symbols(16), + )) for caveat in extra_caveats: macaroon.add_first_party_caveat(caveat) return macaroon.serialize() - def generate_refresh_token(self, user_id): - m = self._generate_base_macaroon(user_id) - m.add_first_party_caveat("type = refresh") - # Important to add a nonce, because otherwise every refresh token for a - # user will be the same. - m.add_first_party_caveat("nonce = %s" % ( - stringutils.random_string_with_symbols(16), - )) - return m.serialize() - def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = login") diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c5368e5df2..f7fad15c62 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -34,9 +34,9 @@ class DeviceMessageHandler(object): self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_sender() - self.federation.register_edu_handler( + hs.get_replication_layer().register_edu_handler( "m.direct_to_device", self.on_direct_to_device_edu ) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index c00274afc3..1b5317edf5 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -339,3 +339,22 @@ class DirectoryHandler(BaseHandler): yield self.auth.check_can_change_room_list(room_id, requester.user) yield self.store.set_room_is_public(room_id, visibility == "public") + + @defer.inlineCallbacks + def edit_published_appservice_room_list(self, appservice_id, network_id, + room_id, visibility): + """Add or remove a room from the appservice/network specific public + room list. + + Args: + appservice_id (str): ID of the appservice that owns the list + network_id (str): The ID of the network the list is associated with + room_id (str) + visibility (str): either "public" or "private" + """ + if visibility not in ["public", "private"]: + raise SynapseError(400, "Invalid visibility setting") + + yield self.store.set_room_is_public_appservice( + room_id, appservice_id, network_id, visibility == "public" + ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index fd11935b40..b63a660c06 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -111,6 +111,11 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except Exception as e: + # include ConnectionRefused and other errors + failures[destination] = { + "status": 503, "message": e.message + } yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(do_remote_query)(destination) @@ -222,6 +227,11 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except Exception as e: + # include ConnectionRefused and other errors + failures[destination] = { + "status": 503, "message": e.message + } yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2d801bad47..1d07e4d02b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -80,22 +80,6 @@ class FederationHandler(BaseHandler): # When joining a room we need to queue any events for that room up self.room_queues = {} - def handle_new_event(self, event, destinations): - """ Takes in an event from the client to server side, that has already - been authed and handled by the state module, and sends it to any - remote home servers that may be interested. - - Args: - event: The event to send - destinations: A list of destinations to send it to - - Returns: - Deferred: Resolved when it has successfully been queued for - processing. - """ - - return self.replication_layer.send_pdu(event, destinations) - @log_function @defer.inlineCallbacks def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): @@ -268,9 +252,12 @@ class FederationHandler(BaseHandler): except: return False + # Parses mapping `event_id -> (type, state_key) -> state event_id` + # to get all state ids that we're interested in. event_map = yield self.store.get_events([ - e_id for key_to_eid in event_to_state_ids.values() - for key, e_id in key_to_eid + e_id + for key_to_eid in event_to_state_ids.values() + for key, e_id in key_to_eid.items() if key[0] != EventTypes.Member or check_match(key[1]) ]) @@ -830,25 +817,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - new_pdu = event - - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = set( - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ) - - destinations.discard(origin) - - logger.debug( - "on_send_join_request: Sending event: %s, signatures: %s", - event.event_id, - event.signatures, - ) - - self.replication_layer.send_pdu(new_pdu, destinations) - state_ids = context.prev_state_ids.values() auth_chain = yield self.store.get_auth_chain(set( [event.event_id] + state_ids @@ -1055,24 +1023,6 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - new_pdu = event - - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = set( - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ) - destinations.discard(origin) - - logger.debug( - "on_send_leave_request: Sending event: %s, signatures: %s", - event.event_id, - event.signatures, - ) - - self.replication_layer.send_pdu(new_pdu, destinations) - defer.returnValue(None) @defer.inlineCallbacks diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index fbfa5a0281..e0ade4c164 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -372,11 +372,12 @@ class InitialSyncHandler(BaseHandler): @defer.inlineCallbacks def get_receipts(): - receipts_handler = self.hs.get_handlers().receipts_handler - receipts = yield receipts_handler.get_receipts_for_room( + receipts = yield self.store.get_linearized_receipts_for_room( room_id, - now_token.receipt_key + to_key=now_token.receipt_key, ) + if not receipts: + receipts = [] defer.returnValue(receipts) presence, receipts, (messages, token) = yield defer.gatherResults( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81df45177a..7a57a69bd3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,9 +22,9 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.push.action_generator import ActionGenerator from synapse.types import ( - UserID, RoomAlias, RoomStreamToken, get_domain_from_id + UserID, RoomAlias, RoomStreamToken, ) -from synapse.util.async import run_on_reactor, ReadWriteLock +from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter from synapse.util.logcontext import preserve_fn from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -50,6 +50,10 @@ class MessageHandler(BaseHandler): self.pagination_lock = ReadWriteLock() + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. + self.limiter = Limiter(max_count=5) + @defer.inlineCallbacks def purge_history(self, room_id, event_id): event = yield self.store.get_event(event_id) @@ -191,36 +195,38 @@ class MessageHandler(BaseHandler): """ builder = self.event_builder_factory.new(event_dict) - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.hs.get_handlers().profile_handler - content = builder.content + with (yield self.limiter.queue(builder.room_id)): + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.hs.get_handlers().profile_handler + content = builder.content + + try: + content["displayname"] = yield profile.get_displayname(target) + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - try: - content["displayname"] = yield profile.get_displayname(target) - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + if token_id is not None: + builder.internal_metadata.token_id = token_id - if token_id is not None: - builder.internal_metadata.token_id = token_id + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + event, context = yield self._create_new_client_event( + builder=builder, + prev_event_ids=prev_event_ids, + ) - event, context = yield self._create_new_client_event( - builder=builder, - prev_event_ids=prev_event_ids, - ) defer.returnValue((event, context)) @defer.inlineCallbacks @@ -599,13 +605,6 @@ class MessageHandler(BaseHandler): event_stream_id, max_stream_id ) - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = [ - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ] - @defer.inlineCallbacks def _notify(): yield run_on_reactor() @@ -618,7 +617,3 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - - preserve_fn(federation_handler.handle_new_event)( - event, destinations=destinations, - ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b047ae2250..1b89dc6274 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -91,28 +91,29 @@ class PresenceHandler(object): self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.federation = hs.get_replication_layer() + self.replication = hs.get_replication_layer() + self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence", self.incoming_presence ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_invite", lambda origin, content: self.invite_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_accept", lambda origin, content: self.accept_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_deny", lambda origin, content: self.deny_presence( observed_user=UserID.from_string(content["observed_user"]), diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index e536a909d0..50aa513935 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler): self.server_name = hs.config.server_name self.store = hs.get_datastore() self.hs = hs - self.federation = hs.get_replication_layer() - self.federation.register_edu_handler( + self.federation = hs.get_federation_sender() + hs.get_replication_layer().register_edu_handler( "m.receipt", self._received_remote_receipt ) self.clock = self.hs.get_clock() @@ -100,7 +100,7 @@ class ReceiptsHandler(BaseHandler): if not res: # res will be None if this read receipt is 'old' - defer.returnValue(False) + continue stream_id, max_persisted_id = res @@ -109,6 +109,10 @@ class ReceiptsHandler(BaseHandler): if max_batch_id is None or max_persisted_id > max_batch_id: max_batch_id = max_persisted_id + if min_batch_id is None: + # no new receipts + defer.returnValue(False) + affected_room_ids = list(set([r["room_id"] for r in receipts])) with PreserveLoggingContext(): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7e119f13b1..286f0cef0a 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -81,7 +81,7 @@ class RegistrationHandler(BaseHandler): "User ID already taken.", errcode=Codes.USER_IN_USE, ) - user_data = yield self.auth.get_user_from_macaroon(guest_access_token) + user_data = yield self.auth.get_user_by_access_token(guest_access_token) if not user_data["is_guest"] or user_data["user"].localpart != localpart: raise AuthError( 403, @@ -369,7 +369,7 @@ class RegistrationHandler(BaseHandler): defer.returnValue(data) @defer.inlineCallbacks - def get_or_create_user(self, requester, localpart, displayname, duration_in_ms, + def get_or_create_user(self, requester, localpart, displayname, password_hash=None): """Creates a new user if the user does not exist, else revokes all previous access tokens and generates a new one. @@ -399,8 +399,7 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() - token = self.auth_handler().generate_access_token( - user_id, None, duration_in_ms) + token = self.auth_handler().generate_access_token(user_id) if need_register: yield self.store.register( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 59e4d1cd15..5f18007e90 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -44,16 +44,19 @@ class RoomCreationHandler(BaseHandler): "join_rules": JoinRules.INVITE, "history_visibility": "shared", "original_invitees_have_ops": False, + "guest_can_join": True, }, RoomCreationPreset.TRUSTED_PRIVATE_CHAT: { "join_rules": JoinRules.INVITE, "history_visibility": "shared", "original_invitees_have_ops": True, + "guest_can_join": True, }, RoomCreationPreset.PUBLIC_CHAT: { "join_rules": JoinRules.PUBLIC, "history_visibility": "shared", "original_invitees_have_ops": False, + "guest_can_join": False, }, } @@ -336,6 +339,13 @@ class RoomCreationHandler(BaseHandler): content={"history_visibility": config["history_visibility"]} ) + if config["guest_can_join"]: + if (EventTypes.GuestAccess, '') not in initial_state: + yield send( + etype=EventTypes.GuestAccess, + content={"guest_access": "can_join"} + ) + for (etype, state_key), content in initial_state.items(): yield send( etype=etype, diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index b04aea0110..667223df0c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -22,6 +22,7 @@ from synapse.api.constants import ( ) from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.types import ThirdPartyInstanceID from collections import namedtuple from unpaddedbase64 import encode_base64, decode_base64 @@ -34,6 +35,10 @@ logger = logging.getLogger(__name__) REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 +# This is used to indicate we should only return rooms published to the main list. +EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) + + class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) @@ -41,22 +46,43 @@ class RoomListHandler(BaseHandler): self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, - search_filter=None): - if search_filter: - # We explicitly don't bother caching searches. - return self._get_public_room_list(limit, since_token, search_filter) + search_filter=None, + network_tuple=EMTPY_THIRD_PARTY_ID,): + """Generate a local public room list. + + There are multiple different lists: the main one plus one per third + party network. A client can ask for a specific list or to return all. + + Args: + limit (int) + since_token (str) + search_filter (dict) + network_tuple (ThirdPartyInstanceID): Which public list to use. + This can be (None, None) to indicate the main list, or a particular + appservice and network id to use an appservice specific one. + Setting to None returns all public rooms across all lists. + """ + if search_filter or (network_tuple and network_tuple.appservice_id is not None): + # We explicitly don't bother caching searches or requests for + # appservice specific lists. + return self._get_public_room_list( + limit, since_token, search_filter, network_tuple=network_tuple, + ) result = self.response_cache.get((limit, since_token)) if not result: result = self.response_cache.set( (limit, since_token), - self._get_public_room_list(limit, since_token) + self._get_public_room_list( + limit, since_token, network_tuple=network_tuple + ) ) return result @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, - search_filter=None): + search_filter=None, + network_tuple=EMTPY_THIRD_PARTY_ID,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -73,14 +99,15 @@ class RoomListHandler(BaseHandler): current_public_id = yield self.store.get_current_public_room_stream_id() public_room_stream_id = since_token.public_room_stream_id newly_visible, newly_unpublished = yield self.store.get_public_room_changes( - public_room_stream_id, current_public_id + public_room_stream_id, current_public_id, + network_tuple=network_tuple, ) else: stream_token = yield self.store.get_room_max_stream_ordering() public_room_stream_id = yield self.store.get_current_public_room_stream_id() room_ids = yield self.store.get_public_room_ids_at_stream_id( - public_room_stream_id + public_room_stream_id, network_tuple=network_tuple, ) # We want to return rooms in a particular order: the number of joined @@ -311,7 +338,8 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_remote_public_room_list(self, server_name, limit=None, since_token=None, - search_filter=None): + search_filter=None, include_all_networks=False, + third_party_instance_id=None,): if search_filter: # We currently don't support searching across federation, so we have # to do it manually without pagination @@ -320,6 +348,8 @@ class RoomListHandler(BaseHandler): res = yield self._get_remote_list_cached( server_name, limit=limit, since_token=since_token, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) if search_filter: @@ -332,22 +362,30 @@ class RoomListHandler(BaseHandler): defer.returnValue(res) def _get_remote_list_cached(self, server_name, limit=None, since_token=None, - search_filter=None): + search_filter=None, include_all_networks=False, + third_party_instance_id=None,): repl_layer = self.hs.get_replication_layer() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( server_name, limit=limit, since_token=since_token, - search_filter=search_filter, + search_filter=search_filter, include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) - result = self.remote_response_cache.get((server_name, limit, since_token)) + key = ( + server_name, limit, since_token, include_all_networks, + third_party_instance_id, + ) + result = self.remote_response_cache.get(key) if not result: result = self.remote_response_cache.set( - (server_name, limit, since_token), + key, repl_layer.get_public_rooms( server_name, limit=limit, since_token=since_token, search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) ) return result diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1f910ff814..c880f61685 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -277,6 +277,7 @@ class SyncHandler(object): """ with Measure(self.clock, "load_filtered_recents"): timeline_limit = sync_config.filter_collection.timeline_limit() + block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline() if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True @@ -293,7 +294,7 @@ class SyncHandler(object): else: recents = [] - if not limited: + if not limited or block_all_timeline: defer.returnValue(TimelineBatch( events=recents, prev_batch=now_token, @@ -509,6 +510,7 @@ class SyncHandler(object): Returns: Deferred(SyncResult) """ + logger.info("Calculating sync response for %r", sync_config.user) # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability @@ -531,9 +533,14 @@ class SyncHandler(object): ) newly_joined_rooms, newly_joined_users = res - yield self._generate_sync_entry_for_presence( - sync_result_builder, newly_joined_rooms, newly_joined_users + block_all_presence_data = ( + since_token is None and + sync_config.filter_collection.blocks_all_presence() ) + if not block_all_presence_data: + yield self._generate_sync_entry_for_presence( + sync_result_builder, newly_joined_rooms, newly_joined_users + ) yield self._generate_sync_entry_for_to_device(sync_result_builder) @@ -569,16 +576,20 @@ class SyncHandler(object): # We only delete messages when a new message comes in, but that's # fine so long as we delete them at some point. - logger.debug("Deleting messages up to %d", since_stream_id) - yield self.store.delete_messages_for_device( + deleted = yield self.store.delete_messages_for_device( user_id, device_id, since_stream_id ) + logger.info("Deleted %d to-device messages up to %d", + deleted, since_stream_id) - logger.debug("Getting messages up to %d", now_token.to_device_key) messages, stream_id = yield self.store.get_new_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) - logger.debug("Got messages up to %d: %r", stream_id, messages) + + logger.info( + "Returning %d to-device messages between %d and %d (current token: %d)", + len(messages), since_stream_id, stream_id, now_token.to_device_key + ) sync_result_builder.now_token = now_token.copy_and_replace( "to_device_key", stream_id ) @@ -709,13 +720,20 @@ class SyncHandler(object): `(newly_joined_rooms, newly_joined_users)` """ user_id = sync_result_builder.sync_config.user.to_string() - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builder.sync_config, - now_token=sync_result_builder.now_token, - since_token=sync_result_builder.since_token, + block_all_room_ephemeral = ( + sync_result_builder.since_token is None and + sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral() ) - sync_result_builder.now_token = now_token + + if block_all_room_ephemeral: + ephemeral_by_room = {} + else: + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_result_builder.sync_config, + now_token=sync_result_builder.now_token, + since_token=sync_result_builder.since_token, + ) + sync_result_builder.now_token = now_token ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 27ee715ff0..0eea7f8f9c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -55,9 +55,9 @@ class TypingHandler(object): self.clock = hs.get_clock() self.wheel_timer = WheelTimer(bucket_size=5000) - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_sender() - self.federation.register_edu_handler("m.typing", self._recv_edu) + hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu) hs.get_distributor().observe("user_left_room", self.user_left_room) |