From 145d14656b19d64a6deca8facca02508ecc751fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Feb 2018 13:52:03 +0000 Subject: Handle exceptions in get_hosts_for_room when sending events over federation --- synapse/federation/transaction_queue.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a141ec9953..12e8df9cc6 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -184,17 +184,22 @@ class TransactionQueue(object): if not is_mine and send_on_behalf_of is None: continue - # Get the state from before the event. - # We need to make sure that this is the state from before - # the event and not from after it. - # Otherwise if the last member on a server in a room is - # banned then it won't receive the event because it won't - # be in the room after the ban. - destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=[ - prev_id for prev_id, _ in event.prev_events - ], - ) + try: + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. + destinations = yield self.state.get_current_hosts_in_room( + event.room_id, latest_event_ids=[ + prev_id for prev_id, _ in event.prev_events + ], + ) + except Exception: + logger.exception("Failed to calculate hosts in room") + continue + destinations = set(destinations) if send_on_behalf_of is not None: -- cgit 1.4.1 From 11974f37871f41a5827f243a3fc2057703f04d72 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Apr 2018 11:45:22 +0100 Subject: Send federation events concurrently --- synapse/federation/transaction_queue.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 12e8df9cc6..43daf673c0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -169,7 +169,7 @@ class TransactionQueue(object): while True: last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( - last_token, self._last_poked_id, limit=20, + last_token, self._last_poked_id, limit=100, ) logger.debug("Handling %s -> %s", last_token, next_token) @@ -177,12 +177,13 @@ class TransactionQueue(object): if not events and next_token >= self._last_poked_id: break - for event in events: + @defer.inlineCallbacks + def handle_event(event): # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.event_id) if not is_mine and send_on_behalf_of is None: - continue + return try: # Get the state from before the event. @@ -198,7 +199,7 @@ class TransactionQueue(object): ) except Exception: logger.exception("Failed to calculate hosts in room") - continue + return destinations = set(destinations) @@ -212,6 +213,19 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + def handle_room_events(events): + for event in events: + return handle_event(event) + + events_by_room = {} + for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + yield logcontext.make_deferred_yieldable(defer.gatherResults( + [handle_room_events(evs) for evs in events_by_room.itervalues()], + consumeErrors=True + )) + events_processed_counter.inc_by(len(events)) yield self.store.update_federation_out_pos( -- cgit 1.4.1 From 6e025a97b4639e5869ecf7a9762aa559bfdf85f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Apr 2018 16:02:48 +0100 Subject: Handle all events in a room correctly --- synapse/federation/transaction_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 43daf673c0..d8e5c08a3c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -213,9 +213,10 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + @defer.inlineCallbacks def handle_room_events(events): for event in events: - return handle_event(event) + yield handle_event(event) events_by_room = {} for event in events: -- cgit 1.4.1 From d49cbf712fcb152c7d3fb5b3b9bace277a3935d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Apr 2018 12:03:33 +0100 Subject: Log event ID on exception --- synapse/federation/transaction_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d8e5c08a3c..8499d2429e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -198,7 +198,10 @@ class TransactionQueue(object): ], ) except Exception: - logger.exception("Failed to calculate hosts in room") + logger.exception( + "Failed to calculate hosts in room for event: %s", + event.event_id, + ) return destinations = set(destinations) -- cgit 1.4.1 From 1246d23710741be866692d47b15c432f48483a52 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Apr 2018 12:04:32 +0100 Subject: Preserve log contexts correctly --- synapse/federation/transaction_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 8499d2429e..4dce984c1a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -226,7 +226,10 @@ class TransactionQueue(object): events_by_room.setdefault(event.room_id, []).append(event) yield logcontext.make_deferred_yieldable(defer.gatherResults( - [handle_room_events(evs) for evs in events_by_room.itervalues()], + [ + logcontext.preserve_fn(handle_room_events)(evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True )) -- cgit 1.4.1 From a060dfa1324b5ec12e0af96ec090a3b997582cb2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Apr 2018 14:25:11 +0100 Subject: Use run_in_background instead --- synapse/federation/transaction_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4dce984c1a..5b0b798e57 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -227,7 +227,7 @@ class TransactionQueue(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(handle_room_events)(evs) + logcontext.run_in_background(handle_room_events, evs) for evs in events_by_room.itervalues() ], consumeErrors=True -- cgit 1.4.1