diff options
author | Erik Johnston <erik@matrix.org> | 2016-08-18 14:59:55 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-08-18 14:59:55 +0100 |
commit | 07229bbdae6081f0c91a60e76de8fa848903b5bd (patch) | |
tree | fa5536a62b57a3d833e16108ac64c2f2d5beca5e | |
parent | Make AppserviceHandler stream events from database (diff) | |
download | synapse-07229bbdae6081f0c91a60e76de8fa848903b5bd.tar.xz |
Add appservice worker
-rw-r--r-- | synapse/app/appservice.py | 211 | ||||
-rw-r--r-- | synapse/config/appservice.py | 1 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 89 | ||||
-rw-r--r-- | synapse/replication/slave/storage/appservice.py | 10 | ||||
-rw-r--r-- | synapse/replication/slave/storage/registration.py | 3 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 145 | ||||
-rw-r--r-- | synapse/storage/registration.py | 33 |
7 files changed, 369 insertions, 123 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py new file mode 100644 index 0000000000..afc3709409 --- /dev/null +++ b/synapse/app/appservice.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.appservice") + + +class AppserviceSlaveStore( + DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore, + SlavedRegistrationStore, +): + pass + + +class AppserviceServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = AppserviceSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse appservice now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + appservice_handler = self.get_application_service_handler() + + @defer.inlineCallbacks + def replicate(results): + stream = results.get("events") + if stream: + max_stream_id = stream["position"] + yield appservice_handler.notify_interested_services(max_stream_id) + + while True: + try: + logger.info("Hitting replication") + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + logger.info("Got replication response") + yield store.process_replication(result) + replicate(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(30) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse appservice", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.appservice" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + if config.notify_appservices: + sys.stderr.write( + "\nThe appservices must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``notify_appservices: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.notify_appservices = True + + ps = AppserviceServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.replicate() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-appservice", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index eade803909..70d28892c6 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -28,6 +28,7 @@ class AppServiceConfig(Config): def read_config(self, config): self.app_service_config_files = config.get("app_service_config_files", []) + self.notify_appservices = config.get("notify_appservices", True) def default_config(cls, **kwargs): return """\ diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 84341b0d20..6556dd1ae8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -44,6 +44,10 @@ class ApplicationServicesHandler(object): self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False self.clock = hs.get_clock() + self.notify_appservices = hs.config.notify_appservices + + self.current_max = 0 + self.is_processing = False @defer.inlineCallbacks def notify_interested_services(self, current_id): @@ -56,47 +60,56 @@ class ApplicationServicesHandler(object): current_id(int): The current maximum ID. """ services = yield self.store.get_app_services() - if not services: + if not services or not self.notify_appservices: return - with Measure(self.clock, "notify_interested_services"): - upper_bound = current_id - limit = 100 - while True: - upper_bound, events = yield self.store.get_new_events_for_appservice( - upper_bound, limit - ) - - logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) - - if not events: - break - - for event in events: - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - continue # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as - # these user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - preserve_fn(self.scheduler.submit_event_for_as)(service, event) - - yield self.store.set_appservice_last_pos(upper_bound) + self.current_max = max(self.current_max, current_id) + if self.is_processing: + return - if len(events) < limit: - break + with Measure(self.clock, "notify_interested_services"): + self.is_processing = True + try: + upper_bound = self.current_max + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user + # query API for all services which match that user regex. + # This needs to block as these user queries need to be + # made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)( + service, event + ) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break + finally: + self.is_processing = False @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index 25792d9429..a374f2f1a2 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -28,3 +28,13 @@ class SlavedApplicationServiceStore(BaseSlavedStore): get_app_service_by_token = DataStore.get_app_service_by_token.__func__ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ + get_app_services = DataStore.get_app_services.__func__ + get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__ + create_appservice_txn = DataStore.create_appservice_txn.__func__ + get_appservices_by_state = DataStore.get_appservices_by_state.__func__ + get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__ + _get_last_txn = DataStore._get_last_txn.__func__ + complete_appservice_txn = DataStore.complete_appservice_txn.__func__ + get_appservice_state = DataStore.get_appservice_state.__func__ + set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__ + set_appservice_state = DataStore.set_appservice_state.__func__ diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index 38b78b97fc..e27c7332d2 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -28,3 +28,6 @@ class SlavedRegistrationStore(BaseSlavedStore): ] _query_for_auth = DataStore._query_for_auth.__func__ + get_user_by_id = RegistrationStore.__dict__[ + "get_user_by_id" + ] diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index f0c88e05cd..b496b918b7 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -218,38 +218,37 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ - return self.runInteraction( - "create_appservice_txn", - self._create_appservice_txn, - service, events - ) + def _create_appservice_txn(txn): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + last_txn_id = self._get_last_txn(txn, service.id) - def _create_appservice_txn(self, txn, service, events): - # work out new txn id (highest txn id for this service += 1) - # The highest id may be the last one sent (in which case it is last_txn) - # or it may be the highest in the txns list (which are waiting to be/are - # being sent) - last_txn_id = self._get_last_txn(txn, service.id) + txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = txn.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 - txn.execute( - "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", - (service.id,) - ) - highest_txn_id = txn.fetchone()[0] - if highest_txn_id is None: - highest_txn_id = 0 + new_txn_id = max(highest_txn_id, last_txn_id) + 1 - new_txn_id = max(highest_txn_id, last_txn_id) + 1 + # Insert new txn into txn table + event_ids = json.dumps([e.event_id for e in events]) + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " + "VALUES(?,?,?)", + (service.id, new_txn_id, event_ids) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) - # Insert new txn into txn table - event_ids = json.dumps([e.event_id for e in events]) - txn.execute( - "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " - "VALUES(?,?,?)", - (service.id, new_txn_id, event_ids) - ) - return AppServiceTransaction( - service=service, id=new_txn_id, events=events + return self.runInteraction( + "create_appservice_txn", + _create_appservice_txn, ) def complete_appservice_txn(self, txn_id, service): @@ -263,39 +262,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves if this transaction was stored successfully. """ - return self.runInteraction( - "complete_appservice_txn", - self._complete_appservice_txn, - txn_id, service - ) - - def _complete_appservice_txn(self, txn, txn_id, service): txn_id = int(txn_id) - # Debugging query: Make sure the txn being completed is EXACTLY +1 from - # what was there before. If it isn't, we've got problems (e.g. the AS - # has probably missed some events), so whine loudly but still continue, - # since it shouldn't fail completion of the transaction. - last_txn_id = self._get_last_txn(txn, service.id) - if (last_txn_id + 1) != txn_id: - logger.error( - "appservice: Completing a transaction which has an ID > 1 from " - "the last ID sent to this AS. We've either dropped events or " - "sent it to the AS out of order. FIX ME. last_txn=%s " - "completing_txn=%s service_id=%s", last_txn_id, txn_id, - service.id + def _complete_appservice_txn(txn): + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) ) - # Set current txn_id for AS to 'txn_id' - self._simple_upsert_txn( - txn, "application_services_state", dict(as_id=service.id), - dict(last_txn=txn_id) - ) + # Delete txn + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) - # Delete txn - self._simple_delete_txn( - txn, "application_services_txns", - dict(txn_id=txn_id, as_id=service.id) + return self.runInteraction( + "complete_appservice_txn", + _complete_appservice_txn, ) @defer.inlineCallbacks @@ -309,10 +307,25 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ + def _get_oldest_unsent_txn(txn): + # Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) + txn.execute( + "SELECT * FROM application_services_txns WHERE as_id=?" + " ORDER BY txn_id ASC LIMIT 1", + (service.id,) + ) + rows = self.cursor_to_dict(txn) + if not rows: + return None + + entry = rows[0] + + return entry + entry = yield self.runInteraction( "get_oldest_unsent_appservice_txn", - self._get_oldest_unsent_txn, - service + _get_oldest_unsent_txn, ) if not entry: @@ -326,22 +339,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service=service, id=entry["txn_id"], events=events )) - def _get_oldest_unsent_txn(self, txn, service): - # Monotonically increasing txn ids, so just select the smallest - # one in the txns table (we delete them when they are sent) - txn.execute( - "SELECT * FROM application_services_txns WHERE as_id=?" - " ORDER BY txn_id ASC LIMIT 1", - (service.id,) - ) - rows = self.cursor_to_dict(txn) - if not rows: - return None - - entry = rows[0] - - return entry - def _get_last_txn(self, txn, service_id): txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 19cb3b31c6..e404fa72de 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,7 +93,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): desc="add_refresh_token_to_user", ) - @defer.inlineCallbacks def register(self, user_id, token=None, password_hash=None, was_guest=False, make_guest=False, appservice_id=None, create_profile_with_localpart=None, admin=False): @@ -115,7 +114,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Raises: StoreError if the user_id could not be registered. """ - yield self.runInteraction( + return self.runInteraction( "register", self._register, user_id, @@ -127,8 +126,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): create_profile_with_localpart, admin ) - self.get_user_by_id.invalidate((user_id,)) - self.is_guest.invalidate((user_id,)) def _register( self, @@ -210,6 +207,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): (create_profile_with_localpart,) ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + txn.call_after(self.is_guest.invalidate, (user_id,)) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( @@ -236,19 +238,28 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): return self.runInteraction("get_users_by_id_case_insensitive", f) - @defer.inlineCallbacks def user_set_password_hash(self, user_id, password_hash): """ NB. This does *not* evict any cache because the one use for this removes most of the entries subsequently anyway so it would be pointless. Use flush_user separately. """ - yield self._simple_update_one('users', { - 'name': user_id - }, { - 'password_hash': password_hash - }) - self.get_user_by_id.invalidate((user_id,)) + def user_set_password_hash_txn(txn): + self._simple_update_one_txn( + txn, + 'users', { + 'name': user_id + }, + { + 'password_hash': password_hash + } + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction( + "user_set_password_hash", user_set_password_hash_txn + ) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_id=None, |