From 501994582899ad9d790029b3d7c48ba32f5720a9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 20 Aug 2019 11:20:10 +0100 Subject: Refactor the Appservice scheduler code Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code (it seemed to be previously inexplicably split between `ApplicationServiceScheduler.start` and `_Recoverer.start`). Add some docstrings too. --- synapse/appservice/scheduler.py | 110 ++++++++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 45 deletions(-) (limited to 'synapse/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 42a350bff8..03a14402c5 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object): self.store = hs.get_datastore() self.as_api = hs.get_application_service_api() - def create_recoverer(service, callback): - return _Recoverer(self.clock, self.store, self.as_api, service, callback) - - self.txn_ctrl = _TransactionController( - self.clock, self.store, self.as_api, create_recoverer - ) + self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api) self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) @defer.inlineCallbacks def start(self): logger.info("Starting appservice scheduler") + # check for any DOWN ASes and start recoverers for them. - recoverers = yield _Recoverer.start( - self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN ) - self.txn_ctrl.add_recoverers(recoverers) + + for service in services: + self.txn_ctrl.start_recoverer(service) def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) class _ServiceQueuer(object): - """Queues events for the same application service together, sending - transactions as soon as possible. Once a transaction is sent successfully, - this schedules any other events in the queue to run. + """Queue of events waiting to be sent to appservices. + + Groups events into transactions per-appservice, and sends them on to the + TransactionController. Makes sure that we only have one transaction in flight per + appservice at a given time. """ def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + + # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock @@ -136,13 +138,29 @@ class _ServiceQueuer(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, recoverer_fn): + """Transaction manager. + + Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer + if a transaction fails. + + (Note we have only have one of these in the homeserver.) + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + """ + + def __init__(self, clock, store, as_api): self.clock = clock self.store = store self.as_api = as_api - self.recoverer_fn = recoverer_fn - # keep track of how many recoverers there are - self.recoverers = [] + + # map from service id to recoverer instance + self.recoverers = {} + + # for UTs + self.RECOVERER_CLASS = _Recoverer @defer.inlineCallbacks def send(self, service, events): @@ -154,42 +172,45 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) except Exception: logger.exception("Error creating appservice transaction") - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) @defer.inlineCallbacks def on_recovered(self, recoverer): - self.recoverers.remove(recoverer) logger.info( "Successfully recovered application service AS ID %s", recoverer.service.id ) + self.recoverers.pop(recoverer.service.id) logger.info("Remaining active recoverers: %s", len(self.recoverers)) yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - def add_recoverers(self, recoverers): - for r in recoverers: - self.recoverers.append(r) - if len(recoverers) > 0: - logger.info("New active recoverers: %s", len(self.recoverers)) - @defer.inlineCallbacks - def _start_recoverer(self, service): + def _on_txn_fail(self, service): try: yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id, - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + self.start_recoverer(service) except Exception: logger.exception("Error starting AS recoverer") + def start_recoverer(self, service): + """Start a Recoverer for the given service + + Args: + service (synapse.appservice.ApplicationService): + """ + logger.info("Starting recoverer for AS ID %s", service.id) + assert service.id not in self.recoverers + recoverer = self.RECOVERER_CLASS( + self.clock, self.store, self.as_api, service, self.on_recovered + ) + self.recoverers[service.id] = recoverer + recoverer.recover() + logger.info("Now %i active recoverers", len(self.recoverers)) + @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) @@ -197,18 +218,17 @@ class _TransactionController(object): class _Recoverer(object): - @staticmethod - @defer.inlineCallbacks - def start(clock, store, as_api, callback): - services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN) - recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] - for r in recoverers: - logger.info( - "Starting recoverer for AS ID %s which was marked as " "DOWN", - r.service.id, - ) - r.recover() - return recoverers + """Manages retries and backoff for a DOWN appservice. + + We have one of these for each appservice which is currently considered DOWN. + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + service (synapse.appservice.ApplicationService): the service we are managing + callback (callable[_Recoverer]): called once the service recovers. + """ def __init__(self, clock, store, as_api, service, callback): self.clock = clock -- cgit 1.5.1 From baa3f4a80d55615f35e073eecaebd5edd1c86113 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 20 Aug 2019 17:39:38 +0100 Subject: Avoid deep recursion in appservice recovery (#5885) Hopefully, this will fix a stack overflow when recovering an appservice. The recursion here leads to a huge chain of deferred callbacks, which then overflows the stack when the chain completes. `inlineCallbacks` makes a better job of this if we use iteration instead. Clean up the code a bit too, while we're there. --- changelog.d/5885.bugfix | 1 + synapse/appservice/scheduler.py | 43 ++++++++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 18 deletions(-) create mode 100644 changelog.d/5885.bugfix (limited to 'synapse/appservice') diff --git a/changelog.d/5885.bugfix b/changelog.d/5885.bugfix new file mode 100644 index 0000000000..411d925fd4 --- /dev/null +++ b/changelog.d/5885.bugfix @@ -0,0 +1 @@ +Fix stack overflow when recovering an appservice which had an outage. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 42a350bff8..0ae12cbac9 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -224,7 +224,9 @@ class _Recoverer(object): "as-recoverer-%s" % (self.service.id,), self.retry ) - self.clock.call_later((2 ** self.backoff_counter), _retry) + delay = 2 ** self.backoff_counter + logger.info("Scheduling retries on %s in %fs", self.service.id, delay) + self.clock.call_later(delay, _retry) def _backoff(self): # cap the backoff to be around 8.5min => (2^9) = 512 secs @@ -234,25 +236,30 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): + logger.info("Starting retries on %s", self.service.id) try: - txn = yield self.store.get_oldest_unsent_txn(self.service) - if txn: + while True: + txn = yield self.store.get_oldest_unsent_txn(self.service) + if not txn: + # nothing left: we're done! + self.callback(self) + return + logger.info( "Retrying transaction %s for AS ID %s", txn.id, txn.service.id ) sent = yield txn.send(self.as_api) - if sent: - yield txn.complete(self.store) - # reset the backoff counter and retry immediately - self.backoff_counter = 1 - yield self.retry() - else: - self._backoff() - else: - self._set_service_recovered() - except Exception as e: - logger.exception(e) - self._backoff() - - def _set_service_recovered(self): - self.callback(self) + if not sent: + break + + yield txn.complete(self.store) + + # reset the backoff counter and then process the next transaction + self.backoff_counter = 1 + + except Exception: + logger.exception("Unexpected error running retries") + + # we didn't manage to send all of the transactions before we got an error of + # some flavour: reschedule the next retry. + self._backoff() -- cgit 1.5.1 From 2a447826665a5ac8e12736214f0ef2401e72f1f9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 3 Sep 2019 11:42:45 +0100 Subject: Remove double return statements (#5962) Remove all the "double return" statements which were a result of us removing all the instances of ``` defer.returnValue(...) return ``` statements when we switched to python3 fully. --- changelog.d/5962.misc | 1 + synapse/api/auth.py | 1 - synapse/appservice/api.py | 3 --- synapse/handlers/appservice.py | 2 -- synapse/handlers/events.py | 1 - synapse/handlers/initial_sync.py | 2 -- synapse/handlers/room.py | 1 - synapse/handlers/sync.py | 1 - synapse/rest/client/v1/room.py | 1 - synapse/rest/client/v2_alpha/register.py | 2 -- synapse/rest/media/v1/preview_url_resource.py | 1 - synapse/state/__init__.py | 1 - synapse/storage/appservice.py | 1 - synapse/storage/directory.py | 2 -- synapse/storage/profile.py | 1 - 15 files changed, 1 insertion(+), 20 deletions(-) create mode 100644 changelog.d/5962.misc (limited to 'synapse/appservice') diff --git a/changelog.d/5962.misc b/changelog.d/5962.misc new file mode 100644 index 0000000000..d97d376c36 --- /dev/null +++ b/changelog.d/5962.misc @@ -0,0 +1 @@ +Remove unnecessary return statements in the codebase which were the result of a regex run. \ No newline at end of file diff --git a/synapse/api/auth.py b/synapse/api/auth.py index fd3cdf50b0..ddc195bc32 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -704,7 +704,6 @@ class Auth(object): and visibility.content["history_visibility"] == "world_readable" ): return Membership.JOIN, None - return raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 007ca75a94..3e25bf5747 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -107,7 +107,6 @@ class ApplicationServiceApi(SimpleHttpClient): except CodeMessageException as e: if e.code == 404: return False - return logger.warning("query_user to %s received %s", uri, e.code) except Exception as ex: logger.warning("query_user to %s threw exception %s", uri, ex) @@ -127,7 +126,6 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("query_alias to %s received %s", uri, e.code) if e.code == 404: return False - return except Exception as ex: logger.warning("query_alias to %s threw exception %s", uri, ex) return False @@ -230,7 +228,6 @@ class ApplicationServiceApi(SimpleHttpClient): sent_transactions_counter.labels(service.id).inc() sent_events_counter.labels(service.id).inc(len(events)) return True - return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index d1a51df6f9..3e9b298154 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -294,12 +294,10 @@ class ApplicationServicesHandler(object): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. return False - return user_info = yield self.store.get_user_by_id(user_id) if user_info: return False - return # user not found; could be the AS though, so check. services = self.store.get_app_services() diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 2f1f10a9af..5e748687e3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -167,7 +167,6 @@ class EventHandler(BaseHandler): if not event: return None - return users = yield self.store.get_users_in_room(event.room_id) is_peeking = user.to_string() not in users diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 595f75400b..f991efeee3 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -450,7 +450,6 @@ class InitialSyncHandler(BaseHandler): # else it will throw. member_event = yield self.auth.check_user_was_in_room(room_id, user_id) return member_event.membership, member_event.event_id - return except AuthError: visibility = yield self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility, "" @@ -460,7 +459,6 @@ class InitialSyncHandler(BaseHandler): and visibility.content["history_visibility"] == "world_readable" ): return Membership.JOIN, None - return raise AuthError( 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6e47fe7867..a509e11d69 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -852,7 +852,6 @@ class RoomContextHandler(object): ) if not event: return None - return filtered = yield (filter_evts([event])) if not filtered: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d582f8e494..19bca6717f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -578,7 +578,6 @@ class SyncHandler(object): if not last_events: return None - return last_event = last_events[-1] state_ids = yield self.store.get_state_ids_for_event( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index f244e8f469..3582259026 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -703,7 +703,6 @@ class RoomMembershipRestServlet(TransactionRestServlet): txn_id, ) return 200, {} - return target = requester.user if membership_action in ["invite", "ban", "unban", "kick"]: diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 65f9fce2ff..107854c669 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -230,7 +230,6 @@ class RegisterRestServlet(RestServlet): if kind == b"guest": ret = yield self._do_guest_registration(body, address=client_addr) return ret - return elif kind != b"user": raise UnrecognizedRequestError( "Do not understand membership kind: %s" % (kind,) @@ -280,7 +279,6 @@ class RegisterRestServlet(RestServlet): desired_username, access_token, body ) return 200, result # we throw for non 200 responses - return # for regular registration, downcase the provided username before # attempting to register it. This should mean diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index bd40891a7f..7a56cd4b6c 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -183,7 +183,6 @@ class PreviewUrlResource(DirectServeResource): if isinstance(og, six.text_type): og = og.encode("utf8") return og - return media_info = yield self._download_url(url, user) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index a0d34f16ea..2b0f4c79ee 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -136,7 +136,6 @@ class StateHandler(object): if event_id: event = yield self.store.get_event(event_id, allow_none=True) return event - return state_map = yield self.store.get_events( list(state.values()), get_prev_content=False diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 36657753cd..435b2acd4d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -165,7 +165,6 @@ class ApplicationServiceTransactionWorkerStore( ) if result: return result.get("state") - return return None def set_appservice_state(self, service, state): diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index e966a73f3d..eed7757ed5 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -47,7 +47,6 @@ class DirectoryWorkerStore(SQLBaseStore): if not room_id: return None - return servers = yield self._simple_select_onecol( "room_alias_servers", @@ -58,7 +57,6 @@ class DirectoryWorkerStore(SQLBaseStore): if not servers: return None - return return RoomAliasMapping(room_id, room_alias.to_string(), servers) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 8a5d8e9b18..912c1df6be 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -35,7 +35,6 @@ class ProfileWorkerStore(SQLBaseStore): if e.code == 404: # no match return ProfileInfo(None, None) - return else: raise -- cgit 1.5.1