diff options
author | Erik Johnston <erik@matrix.org> | 2019-03-04 11:54:58 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-03-04 11:54:58 +0000 |
commit | fbc047f2a5f12ee934e5ccbe7274100aa72166b5 (patch) | |
tree | 2eabc4f13032883ff61fc635d0be43292a5ad131 /synapse/handlers | |
parent | Update newsfile to have a full stop (diff) | |
parent | Update test_typing to use HomeserverTestCase. (#4771) (diff) | |
download | synapse-fbc047f2a5f12ee934e5ccbe7274100aa72166b5.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/stop_fed_not_in_room
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 34 | ||||
-rw-r--r-- | synapse/handlers/message.py | 7 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 8 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 68 | ||||
-rw-r--r-- | synapse/handlers/register.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 68 |
6 files changed, 137 insertions, 52 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index de839ca527..0425380e55 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -770,10 +770,26 @@ class FederationHandler(BaseHandler): set(auth_events.keys()) | set(state_events.keys()) ) + # We now have a chunk of events plus associated state and auth chain to + # persist. We do the persistence in two steps: + # 1. Auth events and state get persisted as outliers, plus the + # backward extremities get persisted (as non-outliers). + # 2. The rest of the events in the chunk get persisted one by one, as + # each one depends on the previous event for its state. + # + # The important thing is that events in the chunk get persisted as + # non-outliers, including when those events are also in the state or + # auth chain. Caution must therefore be taken to ensure that they are + # not accidentally marked as outliers. + + # Step 1a: persist auth events that *don't* appear in the chunk ev_infos = [] for a in auth_events.values(): - if a.event_id in seen_events: + # We only want to persist auth events as outliers that we haven't + # seen and aren't about to persist as part of the backfilled chunk. + if a.event_id in seen_events or a.event_id in event_map: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, @@ -785,14 +801,21 @@ class FederationHandler(BaseHandler): } }) + # Step 1b: persist the events in the chunk we fetched state for (i.e. + # the backwards extremities) as non-outliers. for e_id in events_to_state: + # For paranoia we ensure that these events are marked as + # non-outliers + ev = event_map[e_id] + assert(not ev.internal_metadata.is_outlier()) + ev_infos.append({ - "event": event_map[e_id], + "event": ev, "state": events_to_state[e_id], "auth_events": { (auth_events[a_id].type, auth_events[a_id].state_key): auth_events[a_id] - for a_id in event_map[e_id].auth_event_ids() + for a_id in ev.auth_event_ids() if a_id in auth_events } }) @@ -802,12 +825,17 @@ class FederationHandler(BaseHandler): backfilled=True, ) + # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) for event in events: if event in events_to_state: continue + # For paranoia we ensure that these events are marked as + # non-outliers + assert(not event.internal_metadata.is_outlier()) + # We store these one at a time since each event depends on the # previous to work out the state. # TODO: We can probably do something more clever here. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3981fe69ce..c762b58902 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -436,10 +436,11 @@ class EventCreationHandler(object): if event.is_state(): prev_state = yield self.deduplicate_state_event(event, context) - logger.info( - "Not bothering to persist duplicate state event %s", event.event_id, - ) if prev_state is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, prev_state.event_id, + ) defer.returnValue(prev_state) yield self.handle_new_client_event( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 9d257ecf31..e4fdae9266 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -136,7 +136,11 @@ class PaginationHandler(object): logger.info("[purge] complete") self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE except Exception: - logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + f = Failure() + logger.error( + "[purge] failed", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED finally: self._purges_in_progress_by_room.discard(room_id) @@ -254,7 +258,7 @@ class PaginationHandler(object): }) state = None - if event_filter and event_filter.lazy_load_members(): + if event_filter and event_filter.lazy_load_members() and len(events) > 0: # TODO: remove redundant members # FIXME: we also care about invite targets etc. diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 4c2690ba26..696469732c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -16,8 +16,8 @@ import logging from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id -from synapse.util import logcontext from ._base import BaseHandler @@ -59,7 +59,9 @@ class ReceiptsHandler(BaseHandler): if is_new: # fire off a process in the background to send the receipt to # remote servers - self._push_remotes([receipt]) + run_as_background_process( + 'push_receipts_to_remotes', self._push_remotes, receipt + ) @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): @@ -125,44 +127,42 @@ class ReceiptsHandler(BaseHandler): defer.returnValue(True) - @logcontext.preserve_fn # caller should not yield on this @defer.inlineCallbacks - def _push_remotes(self, receipts): - """Given a list of receipts, works out which remote servers should be + def _push_remotes(self, receipt): + """Given a receipt, works out which remote servers should be poked and pokes them. """ try: - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, - } + # TODO: optimise this to move some of the work to the workers. + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, } - }, + } }, - key=(room_id, receipt_type, user_id), - ) + }, + key=(room_id, receipt_type, user_id), + ) except Exception: logger.exception("Error pushing receipts to remote servers") diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 24a4cb5a83..c0e06929bd 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -460,7 +460,7 @@ class RegistrationHandler(BaseHandler): lines = response.split('\n') json = { "valid": lines[0] == 'true', - "error_url": "http://www.google.com/recaptcha/api/challenge?" + + "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" + "error=%s" % lines[1] } defer.returnValue(json) @@ -471,7 +471,7 @@ class RegistrationHandler(BaseHandler): Used only by c/s api v1 """ data = yield self.captcha_client.post_urlencoded_get_raw( - "http://www.google.com:80/recaptcha/api/verify", + "http://www.recaptcha.net:80/recaptcha/api/verify", args={ 'privatekey': private_key, 'remoteip': ip_addr, diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 13e212d669..afa508d729 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -50,16 +50,17 @@ class RoomListHandler(BaseHandler): def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMPTY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID, + from_federation=False): """Generate a local public room list. There are multiple different lists: the main one plus one per third party network. A client can ask for a specific list or to return all. Args: - limit (int) - since_token (str) - search_filter (dict) + limit (int|None) + since_token (str|None) + search_filter (dict|None) network_tuple (ThirdPartyInstanceID): Which public list to use. This can be (None, None) to indicate the main list, or a particular appservice and network id to use an appservice specific one. @@ -87,14 +88,30 @@ class RoomListHandler(BaseHandler): return self.response_cache.wrap( key, self._get_public_room_list, - limit, since_token, network_tuple=network_tuple, + limit, since_token, + network_tuple=network_tuple, from_federation=from_federation, ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, network_tuple=EMPTY_THIRD_PARTY_ID, + from_federation=False, timeout=None,): + """Generate a public room list. + Args: + limit (int|None): Maximum amount of rooms to return. + since_token (str|None) + search_filter (dict|None): Dictionary to filter rooms by. + network_tuple (ThirdPartyInstanceID): Which public list to use. + This can be (None, None) to indicate the main list, or a particular + appservice and network id to use an appservice specific one. + Setting to None returns all public rooms across all lists. + from_federation (bool): Whether this request originated from a + federating server or a client. Used for room filtering. + timeout (int|None): Amount of seconds to wait for a response before + timing out. + """ if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -217,7 +234,8 @@ class RoomListHandler(BaseHandler): yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], - chunk, limit, search_filter + chunk, limit, search_filter, + from_federation=from_federation, ), batch, 5, ) @@ -288,23 +306,51 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, - search_filter): + search_filter, from_federation=False): """Generate the entry for a room in the public room list and append it to the `chunk` if it matches the search filter + + Args: + room_id (str): The ID of the room. + num_joined_users (int): The number of joined users in the room. + chunk (list) + limit (int|None): Maximum amount of rooms to display. Function will + return if length of chunk is greater than limit + 1. + search_filter (dict|None) + from_federation (bool): Whether this request originated from a + federating server or a client. Used for room filtering. """ if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return result = yield self.generate_room_entry(room_id, num_joined_users) + if not result: + return + + if from_federation and not result.get("m.federate", True): + # This is a room that other servers cannot join. Do not show them + # this room. + return - if result and _matches_room_entry(result, search_filter): + if _matches_room_entry(result, search_filter): chunk.append(result) @cachedInlineCallbacks(num_args=1, cache_context=True) def generate_room_entry(self, room_id, num_joined_users, cache_context, with_alias=True, allow_private=False): """Returns the entry for a room + + Args: + room_id (str): The room's ID. + num_joined_users (int): Number of users in the room. + cache_context: Information for cached responses. + with_alias (bool): Whether to return the room's aliases in the result. + allow_private (bool): Whether invite-only rooms should be shown. + + Returns: + Deferred[dict|None]: Returns a room entry as a dictionary, or None if this + room was determined not to be shown publicly. """ result = { "room_id": room_id, @@ -318,6 +364,7 @@ class RoomListHandler(BaseHandler): event_map = yield self.store.get_events([ event_id for key, event_id in iteritems(current_state_ids) if key[0] in ( + EventTypes.Create, EventTypes.JoinRules, EventTypes.Name, EventTypes.Topic, @@ -334,12 +381,17 @@ class RoomListHandler(BaseHandler): } # Double check that this is actually a public room. + join_rules_event = current_state.get((EventTypes.JoinRules, "")) if join_rules_event: join_rule = join_rules_event.content.get("join_rule", None) if not allow_private and join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) + # Return whether this room is open to federation users or not + create_event = current_state.get((EventTypes.Create, "")) + result["m.federate"] = create_event.content.get("m.federate", True) + if with_alias: aliases = yield self.store.get_aliases_for_room( room_id, on_invalidate=cache_context.invalidate |