diff options
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r-- | synapse/app/federation_sender.py | 157 |
1 files changed, 61 insertions, 96 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 03327dc47a..18469013fa 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -13,44 +13,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys -import synapse +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource -from synapse.server import HomeServer +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse import events - -from twisted.internet import reactor, defer -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_sender") @@ -83,19 +78,6 @@ class FederationSenderSlaveStore( class FederationSenderServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self) @@ -109,21 +91,21 @@ class FederationSenderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) - - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse federation_sender now listening on port %d", port) @@ -132,18 +114,22 @@ class FederationSenderServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -213,36 +199,12 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-sender", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-sender", config) class FederationSenderHandler(object): @@ -277,7 +239,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": @@ -285,19 +247,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") if __name__ == '__main__': |