summary refs log tree commit diff
path: root/synapse/app/federation_reader.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/federation_reader.py')
-rw-r--r--synapse/app/federation_reader.py125
1 files changed, 43 insertions, 82 deletions
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index eb392e1c9d..7af00b8bcf 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -13,43 +13,37 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
-import synapse
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
 
+import synapse
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
-from twisted.internet import reactor
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
 
 logger = logging.getLogger("synapse.app.federation_reader")
 
@@ -66,19 +60,6 @@ class FederationReaderSlavedStore(
 
 
 class FederationReaderServer(HomeServer):
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def setup(self):
         logger.info("Setting up.")
         self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
@@ -92,25 +73,25 @@ class FederationReaderServer(HomeServer):
         for res in listener_config["resources"]:
             for name in res["names"]:
                 if name == "metrics":
-                    resources[METRICS_PREFIX] = MetricsResource(self)
+                    resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "federation":
                     resources.update({
                         FEDERATION_PREFIX: TransportLayerServer(self),
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
-
-        for address in bind_addresses:
-            reactor.listenTCP(
-                port,
-                SynapseSite(
-                    "synapse.access.http.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                ),
-                interface=address
+        root_resource = create_resource_tree(resources, NoResource())
+
+        _base.listen_tcp(
+            bind_addresses,
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+                self.version_string,
             )
+        )
 
         logger.info("Synapse federation reader now listening on port %d", port)
 
@@ -119,18 +100,22 @@ class FederationReaderServer(HomeServer):
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
-                bind_addresses = listener["bind_addresses"]
-
-                for address in bind_addresses:
-                    reactor.listenTCP(
-                        listener["port"],
-                        manhole(
-                            username="matrix",
-                            password="rabbithole",
-                            globals={"hs": self},
-                        ),
-                        interface=address
+                _base.listen_tcp(
+                    listener["bind_addresses"],
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
                     )
+                )
+            elif listener["type"] == "metrics":
+                if not self.get_config().enable_metrics:
+                    logger.warn(("Metrics listener configured, but "
+                                 "enable_metrics is not True!"))
+                else:
+                    _base.listen_metrics(listener["bind_addresses"],
+                                         listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -169,39 +154,15 @@ def start(config_options):
     )
 
     ss.setup()
-    ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-reader", config)
 
 
 if __name__ == '__main__':