summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.rst25
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py1
-rw-r--r--synapse/events/__init__.py9
-rw-r--r--synapse/federation/federation_client.py4
-rw-r--r--synapse/federation/federation_server.py21
-rw-r--r--synapse/federation/transaction_queue.py27
-rw-r--r--synapse/federation/transport/client.py5
-rw-r--r--synapse/handlers/federation.py5
-rw-r--r--synapse/state.py4
-rw-r--r--synapse/storage/events.py2
-rw-r--r--synapse/storage/roommember.py4
-rw-r--r--synapse/storage/state.py2
-rw-r--r--synapse/util/async.py13
14 files changed, 109 insertions, 15 deletions
diff --git a/CHANGES.rst b/CHANGES.rst

index 108f827cf2..c1a8dd7613 100644 --- a/CHANGES.rst +++ b/CHANGES.rst
@@ -1,3 +1,28 @@ +Changes in synapse v0.18.6-rc3 (2017-01-05) +=========================================== + +Bug fixes: + +* Fix bug where we failed to send ban events to the banned server (PR #1758) +* Fix bug where we sent event that didn't originate on this server to + other servers (PR #1764) +* Fix bug where processing an event from a remote server took a long time + because we were making long HTTP requests (PR #1765, PR #1744) + +Changes: + +* Improve logging for debugging deadlocks (PR #1766, PR #1767) + +Changes in synapse v0.18.6-rc2 (2016-12-30) +=========================================== + +Bug fixes: + +* Fix memory leak in twisted by initialising logging correctly (PR #1731) +* Fix bug where fetching missing events took an unacceptable amount of time in + large rooms (PR #1734) + + Changes in synapse v0.18.6-rc1 (2016-12-29) =========================================== diff --git a/synapse/__init__.py b/synapse/__init__.py
index 84592f53ea..a1da92ef92 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py
@@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.18.6-rc1" +__version__ = "0.18.6-rc3" diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index a99986714d..f93e45a744 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py
@@ -290,6 +290,7 @@ class Auth(object): with Measure(self.clock, "check_host_in_room"): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.info("calling resolve_state_groups from check_host_in_room") entry = yield self.state.resolve_state_groups( room_id, latest_event_ids ) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index bcb8f33a58..8c71aeb5e4 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py
@@ -36,6 +36,15 @@ class _EventInternalMetadata(object): def is_invite_from_remote(self): return getattr(self, "invite_from_remote", False) + def get_send_on_behalf_of(self): + """Whether this server should send the event on behalf of another server. + This is used by the federation "send_join" API to forward the initial join + event for a server in the room. + + returns a str with the name of the server this event is sent on behalf of. + """ + return getattr(self, "get_send_on_behalf_of", None) + def _event_dict_property(key): def getter(self): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 6851f2376d..b4bcec77ed 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -707,7 +707,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def get_missing_events(self, destination, room_id, earliest_events_ids, - latest_events, limit, min_depth): + latest_events, limit, min_depth, timeout): """Tries to fetch events we are missing. This is called when we receive an event without having received all of its ancestors. @@ -721,6 +721,7 @@ class FederationClient(FederationBase): have all previous events for. limit (int): Maximum number of events to return. min_depth (int): Minimum depth of events tor return. + timeout (int): Max time to wait in ms """ try: content = yield self.transport_layer.get_missing_events( @@ -730,6 +731,7 @@ class FederationClient(FederationBase): latest_events=[e.event_id for e in latest_events], limit=limit, min_depth=min_depth, + timeout=timeout, ) events = [ diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f4c60e67e3..800f04189f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -425,6 +425,7 @@ class FederationServer(FederationBase): " limit: %d, min_depth: %d", earliest_events, latest_events, limit, min_depth ) + missing_events = yield self.handler.on_get_missing_events( origin, room_id, earliest_events, latest_events, limit, min_depth ) @@ -567,6 +568,25 @@ class FederationServer(FederationBase): len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] ) + # XXX: we set timeout to 10s to help workaround + # https://github.com/matrix-org/synapse/issues/1733. + # The reason is to avoid holding the linearizer lock + # whilst processing inbound /send transactions, causing + # FDs to stack up and block other inbound transactions + # which empirically can currently take up to 30 minutes. + # + # N.B. this explicitly disables retry attempts. + # + # N.B. this also increases our chances of falling back to + # fetching fresh state for the room if the missing event + # can't be found, which slightly reduces our security. + # it may also increase our DAG extremity count for the room, + # causing additional state resolution? See #1760. + # However, fetching state doesn't hold the linearizer lock + # apparently. + # + # see https://github.com/matrix-org/synapse/pull/1744 + missing_events = yield self.get_missing_events( origin, pdu.room_id, @@ -574,6 +594,7 @@ class FederationServer(FederationBase): latest_events=[pdu], limit=10, min_depth=min_depth, + timeout=10000, ) # We want to sort these by depth so we process them and diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 51b656d74a..7db7b806dc 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py
@@ -19,7 +19,6 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu -from synapse.api.constants import EventTypes, Membership from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn @@ -62,6 +61,7 @@ class TransactionQueue(object): self.transport_layer = hs.get_federation_transport_client() self.clock = hs.get_clock() + self.is_mine_id = hs.is_mine_id # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -153,17 +153,32 @@ class TransactionQueue(object): break for event in events: + # Only send events for this server. + send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() + is_mine = self.is_mine_id(event.event_id) + if not is_mine and send_on_behalf_of is None: + continue + + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. users_in_room = yield self.state.get_current_user_in_room( - event.room_id, latest_event_ids=[event.event_id], + event.room_id, latest_event_ids=[ + prev_id for prev_id, _ in event.prev_events + ], ) destinations = set( get_domain_from_id(user_id) for user_id in users_in_room ) - - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.JOIN: - destinations.add(get_domain_from_id(event.state_key)) + if send_on_behalf_of is not None: + # If we are sending the event on behalf of another server + # then it already has the event and there is no reason to + # send the event to it. + destinations.discard(send_on_behalf_of) logger.debug("Sending %s to %r", event, destinations) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 491cdc29e1..915af34409 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -386,7 +386,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def get_missing_events(self, destination, room_id, earliest_events, - latest_events, limit, min_depth): + latest_events, limit, min_depth, timeout): path = PREFIX + "/get_missing_events/%s" % (room_id,) content = yield self.client.post_json( @@ -397,7 +397,8 @@ class TransportLayerClient(object): "min_depth": int(min_depth), "earliest_events": earliest_events, "latest_events": latest_events, - } + }, + timeout=timeout, ) defer.returnValue(content) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1d07e4d02b..1021bcc405 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -591,6 +591,7 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) + logger.info("calling resolve_state_groups in _maybe_backfill") states = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids @@ -790,6 +791,10 @@ class FederationHandler(BaseHandler): ) event.internal_metadata.outlier = False + # Send this event on behalf of the origin server since they may not + # have an up to data view of the state of the room at this event so + # will not know which servers to send the event to. + event.internal_metadata.send_on_behalf_of = origin context, event_stream_id, max_stream_id = yield self._handle_new_event( origin, event diff --git a/synapse/state.py b/synapse/state.py
index b4eca0e5d5..ba0d2a39ac 100644 --- a/synapse/state.py +++ b/synapse/state.py
@@ -123,6 +123,7 @@ class StateHandler(object): if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.info("calling resolve_state_groups from get_current_state") ret = yield self.resolve_state_groups(room_id, latest_event_ids) state = ret.state @@ -147,6 +148,7 @@ class StateHandler(object): if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.info("calling resolve_state_groups from get_current_state_ids") ret = yield self.resolve_state_groups(room_id, latest_event_ids) state = ret.state @@ -158,6 +160,7 @@ class StateHandler(object): @defer.inlineCallbacks def get_current_user_in_room(self, room_id, latest_event_ids=None): + logger.info("calling resolve_state_groups from get_current_user_in_room") if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) entry = yield self.resolve_state_groups(room_id, latest_event_ids) @@ -223,6 +226,7 @@ class StateHandler(object): context.prev_state_events = [] defer.returnValue(context) + logger.info("calling resolve_state_groups from compute_event_context") if event.is_state(): entry = yield self.resolve_state_groups( event.room_id, [e for e, _ in event.prev_events], diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1a0ec5db9b..13c7cb3cbc 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py
@@ -1091,8 +1091,10 @@ class EventsStore(SQLBaseStore): self._do_fetch ) + logger.info("Loading %d events", len(events)) with PreserveLoggingContext(): rows = yield events_d + logger.info("Loaded %d events (%d rows)", len(events), len(rows)) if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 946d5a81cc..5d18037c7c 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py
@@ -393,8 +393,8 @@ class RoomMemberStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2, cache_context=True) def _get_joined_users_from_context(self, room_id, state_group, current_state_ids, cache_context, event=None): - # We don't use `state_group`, its there so that we can cache based - # on it. However, its important that its never None, since two current_state's + # We don't use `state_group`, it's there so that we can cache based + # on it. However, it's important that it's never None, since two current_states # with a state_group of None are likely to be different. # See bulk_get_push_rules_for_room for how we work around this. assert state_group is not None diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 23e7ad9922..7f466c40ac 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py
@@ -384,7 +384,7 @@ class StateStore(SQLBaseStore): # We did this before by getting the list of group ids, and # then passing that list to sqlite to get latest event for # each (type, state_key). However, that was terribly slow - # without the right indicies (which we can't add until + # without the right indices (which we can't add until # after we finish deduping state, which requires this func) args = [next_group] if types: diff --git a/synapse/util/async.py b/synapse/util/async.py
index 4280455cbe..83875edc85 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py
@@ -166,7 +166,11 @@ class Linearizer(object): # do some work. """ - def __init__(self): + def __init__(self, name=None): + if name is None: + self.name = id(self) + else: + self.name = name self.key_to_defer = {} @defer.inlineCallbacks @@ -185,15 +189,20 @@ class Linearizer(object): self.key_to_defer[key] = new_defer if current_defer: - logger.info("Waiting to acquire linearizer lock for key %r", key) + logger.info( + "Waiting to acquire linearizer lock %r for key %r", self.name, key + ) with PreserveLoggingContext(): yield current_defer + logger.info("Acquired linearizer lock %r for key %r", self.name, key) + @contextmanager def _ctx_manager(): try: yield finally: + logger.info("Releasing linearizer lock %r for key %r", self.name, key) new_defer.callback(None) current_d = self.key_to_defer.get(key) if current_d is new_defer: