From 145d14656b19d64a6deca8facca02508ecc751fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Feb 2018 13:52:03 +0000 Subject: Handle exceptions in get_hosts_for_room when sending events over federation --- synapse/federation/transaction_queue.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a141ec9953..12e8df9cc6 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -184,17 +184,22 @@ class TransactionQueue(object): if not is_mine and send_on_behalf_of is None: continue - # Get the state from before the event. - # We need to make sure that this is the state from before - # the event and not from after it. - # Otherwise if the last member on a server in a room is - # banned then it won't receive the event because it won't - # be in the room after the ban. - destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=[ - prev_id for prev_id, _ in event.prev_events - ], - ) + try: + # Get the state from before the event. + # We need to make sure that this is the state from before + # the event and not from after it. + # Otherwise if the last member on a server in a room is + # banned then it won't receive the event because it won't + # be in the room after the ban. + destinations = yield self.state.get_current_hosts_in_room( + event.room_id, latest_event_ids=[ + prev_id for prev_id, _ in event.prev_events + ], + ) + except Exception: + logger.exception("Failed to calculate hosts in room") + continue + destinations = set(destinations) if send_on_behalf_of is not None: -- cgit 1.5.1 From 11974f37871f41a5827f243a3fc2057703f04d72 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Apr 2018 11:45:22 +0100 Subject: Send federation events concurrently --- synapse/federation/transaction_queue.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 12e8df9cc6..43daf673c0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -169,7 +169,7 @@ class TransactionQueue(object): while True: last_token = yield self.store.get_federation_out_pos("events") next_token, events = yield self.store.get_all_new_events_stream( - last_token, self._last_poked_id, limit=20, + last_token, self._last_poked_id, limit=100, ) logger.debug("Handling %s -> %s", last_token, next_token) @@ -177,12 +177,13 @@ class TransactionQueue(object): if not events and next_token >= self._last_poked_id: break - for event in events: + @defer.inlineCallbacks + def handle_event(event): # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.event_id) if not is_mine and send_on_behalf_of is None: - continue + return try: # Get the state from before the event. @@ -198,7 +199,7 @@ class TransactionQueue(object): ) except Exception: logger.exception("Failed to calculate hosts in room") - continue + return destinations = set(destinations) @@ -212,6 +213,19 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + def handle_room_events(events): + for event in events: + return handle_event(event) + + events_by_room = {} + for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + yield logcontext.make_deferred_yieldable(defer.gatherResults( + [handle_room_events(evs) for evs in events_by_room.itervalues()], + consumeErrors=True + )) + events_processed_counter.inc_by(len(events)) yield self.store.update_federation_out_pos( -- cgit 1.5.1 From 6e025a97b4639e5869ecf7a9762aa559bfdf85f6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Apr 2018 16:02:48 +0100 Subject: Handle all events in a room correctly --- synapse/federation/transaction_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 43daf673c0..d8e5c08a3c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -213,9 +213,10 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + @defer.inlineCallbacks def handle_room_events(events): for event in events: - return handle_event(event) + yield handle_event(event) events_by_room = {} for event in events: -- cgit 1.5.1 From d49cbf712fcb152c7d3fb5b3b9bace277a3935d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Apr 2018 12:03:33 +0100 Subject: Log event ID on exception --- synapse/federation/transaction_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d8e5c08a3c..8499d2429e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -198,7 +198,10 @@ class TransactionQueue(object): ], ) except Exception: - logger.exception("Failed to calculate hosts in room") + logger.exception( + "Failed to calculate hosts in room for event: %s", + event.event_id, + ) return destinations = set(destinations) -- cgit 1.5.1 From 1246d23710741be866692d47b15c432f48483a52 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Apr 2018 12:04:32 +0100 Subject: Preserve log contexts correctly --- synapse/federation/transaction_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 8499d2429e..4dce984c1a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -226,7 +226,10 @@ class TransactionQueue(object): events_by_room.setdefault(event.room_id, []).append(event) yield logcontext.make_deferred_yieldable(defer.gatherResults( - [handle_room_events(evs) for evs in events_by_room.itervalues()], + [ + logcontext.preserve_fn(handle_room_events)(evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True )) -- cgit 1.5.1 From a060dfa1324b5ec12e0af96ec090a3b997582cb2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Apr 2018 14:25:11 +0100 Subject: Use run_in_background instead --- synapse/federation/transaction_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4dce984c1a..5b0b798e57 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -227,7 +227,7 @@ class TransactionQueue(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(handle_room_events)(evs) + logcontext.run_in_background(handle_room_events, evs) for evs in events_by_room.itervalues() ], consumeErrors=True -- cgit 1.5.1 From 92e34615c5c9ed5df2b0072a2686d3e42239e420 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:07:51 +0100 Subject: Track where event stream processing have gotten up to --- synapse/federation/transaction_queue.py | 4 ++++ synapse/handlers/appservice.py | 4 ++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events.py | 3 +++ 4 files changed, 24 insertions(+) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5b0b798e57..d68f0d8950 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -239,6 +239,10 @@ class TransactionQueue(object): "events", next_token ) + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..2a7e31e711 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -111,6 +111,10 @@ class ApplicationServicesHandler(object): events_processed_counter.inc_by(len(events)) yield self.store.set_appservice_last_pos(upper_bound) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2666f982a6..a6c0d7e1bf 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -151,6 +151,19 @@ reactor_metrics = get_metrics_for("python.twisted.reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +synapse_metrics = get_metrics_for("synapse") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = synapse_metrics.register_gauge( + "event_processing_positions", labels=["name"], +) + +# Used to track the current max events stream position +event_persisted_position = synapse_metrics.register_gauge( + "event_persisted_position", +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ece5e6c41f..da44b52fd6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" -- cgit 1.5.1 From 4dae4a97ed0e0b2cc9b5493172670ec7353ded2e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Apr 2018 11:52:19 +0100 Subject: Track last processed event received_ts --- synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/appservice.py | 10 ++++++++++ synapse/metrics/__init__.py | 13 +++++++++++++ synapse/storage/events_worker.py | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d68f0d8950..1c466cbded 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -243,6 +243,17 @@ class TransactionQueue(object): next_token, "federation_sender", ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2a7e31e711..fcc1246bee 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -115,6 +115,16 @@ class ApplicationServicesHandler(object): synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a6c0d7e1bf..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge( "event_persisted_position", ) +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..769eb51489 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timstamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, -- cgit 1.5.1 From f67e906e189cb528cab18cb5190be352dcc8914b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Apr 2018 11:18:19 +0100 Subject: Set all metrics at the same time --- synapse/federation/transaction_queue.py | 12 ++++++------ synapse/handlers/appservice.py | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1c466cbded..963d938edd 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -233,16 +233,10 @@ class TransactionQueue(object): consumeErrors=True )) - events_processed_counter.inc_by(len(events)) - yield self.store.update_federation_out_pos( "events", next_token ) - synapse.metrics.event_processing_positions.set( - next_token, "federation_sender", - ) - if events: now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) @@ -254,6 +248,12 @@ class TransactionQueue(object): ts, "federation_sender", ) + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fcc1246bee..ce0814bc25 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -108,16 +108,16 @@ class ApplicationServicesHandler(object): service, event ) - events_processed_counter.inc_by(len(events)) - yield self.store.set_appservice_last_pos(upper_bound) + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) - now = self.clock.time_msec() - ts = yield self.store.get_received_ts(events[-1].event_id) + events_processed_counter.inc_by(len(events)) synapse.metrics.event_processing_lag.set( now - ts, "appservice_sender", -- cgit 1.5.1