summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/pusher.py29
-rw-r--r--synapse/app/synchrotron.py29
-rw-r--r--synapse/config/workers.py49
-rw-r--r--synapse/server.py3
-rw-r--r--synapse/storage/_base.py2
5 files changed, 33 insertions, 79 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 5d4db4f892..9ac26d52c6 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -111,7 +111,7 @@ class PusherServer(HomeServer):
 
     def remove_pusher(self, app_id, push_key, user_id):
         http_client = self.get_simple_http_client()
-        replication_url = self.worker_config.replication_url
+        replication_url = self.config.worker_replication_url
         url = replication_url + "/remove_pushers"
         return http_client.post_json_get_json(url, {
             "remove": [{
@@ -165,7 +165,7 @@ class PusherServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.worker_config.replication_url
+        replication_url = self.config.worker_replication_url
         pusher_pool = self.get_pusherpool()
         clock = self.get_clock()
 
@@ -240,11 +240,8 @@ class PusherServer(HomeServer):
                 logger.exception("Error replicating from %r", replication_url)
                 yield sleep(30)
 
-    def get_event_cache_size(self):
-        return self.worker_config.event_cache_size
 
-
-def setup(worker_name, config_options):
+def start(config_options):
     try:
         config = HomeServerConfig.load_config(
             "Synapse pusher", config_options
@@ -253,9 +250,9 @@ def setup(worker_name, config_options):
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    worker_config = config.workers[worker_name]
+    assert config.worker_app == "synapse.app.pusher"
 
-    setup_logging(worker_config.log_config, worker_config.log_file)
+    setup_logging(config.worker_log_config, config.worker_log_file)
 
     if config.start_pushers:
         sys.stderr.write(
@@ -275,20 +272,19 @@ def setup(worker_name, config_options):
         config.server_name,
         db_config=config.database_config,
         config=config,
-        worker_config=worker_config,
         version_string=get_version_string("Synapse", synapse),
         database_engine=database_engine,
     )
 
     ps.setup()
-    ps.start_listening(worker_config.listeners)
+    ps.start_listening(config.worker_listeners)
 
     def run():
         with LoggingContext("run"):
             logger.info("Running")
-            change_resource_limit(worker_config.soft_file_limit)
-            if worker_config.gc_thresholds:
-                ps.set_threshold(worker_config.gc_thresholds)
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                ps.set_threshold(config.gc_thresholds)
             reactor.run()
 
     def start():
@@ -298,10 +294,10 @@ def setup(worker_name, config_options):
 
     reactor.callWhenRunning(start)
 
-    if worker_config.daemonize:
+    if config.worker_daemonize:
         daemon = Daemonize(
             app="synapse-pusher",
-            pid=worker_config.pid_file,
+            pid=config.worker_pid_file,
             action=run,
             auto_close_fds=False,
             verbose=True,
@@ -314,5 +310,4 @@ def setup(worker_name, config_options):
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        worker_name = sys.argv[1]
-        ps = setup(worker_name, sys.argv[2:])
+        ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index d10bb2b3f0..160db8637e 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -97,7 +97,7 @@ class SynchrotronPresence(object):
         self.http_client = hs.get_simple_http_client()
         self.store = hs.get_datastore()
         self.user_to_num_current_syncs = {}
-        self.syncing_users_url = hs.worker_config.replication_url + "/syncing_users"
+        self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
         self.clock = hs.get_clock()
 
         active_presence = self.store.take_presence_startup_info()
@@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.worker_config.replication_url
+        replication_url = self.config.worker_replication_url
         clock = self.get_clock()
         notifier = self.get_notifier()
         presence_handler = self.get_presence_handler()
@@ -403,11 +403,8 @@ class SynchrotronServer(HomeServer):
     def build_typing_handler(self):
         return SynchrotronTyping(self)
 
-    def get_event_cache_size(self):
-        return self.worker_config.event_cache_size
 
-
-def start(worker_name, config_options):
+def start(config_options):
     try:
         config = HomeServerConfig.load_config(
             "Synapse synchrotron", config_options
@@ -416,9 +413,9 @@ def start(worker_name, config_options):
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    worker_config = config.workers[worker_name]
+    assert config.worker_app == "synapse.app.synchrotron"
 
-    setup_logging(worker_config.log_config, worker_config.log_file)
+    setup_logging(config.worker_log_config, config.worker_log_file)
 
     database_engine = create_engine(config.database_config)
 
@@ -426,21 +423,20 @@ def start(worker_name, config_options):
         config.server_name,
         db_config=config.database_config,
         config=config,
-        worker_config=worker_config,
         version_string=get_version_string("Synapse", synapse),
         database_engine=database_engine,
         application_service_handler=SynchrotronApplicationService(),
     )
 
     ss.setup()
-    ss.start_listening(worker_config.listeners)
+    ss.start_listening(config.worker_listeners)
 
     def run():
         with LoggingContext("run"):
             logger.info("Running")
-            change_resource_limit(worker_config.soft_file_limit)
-            if worker_config.gc_thresholds:
-                ss.set_threshold(worker_config.gc_thresholds)
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                ss.set_threshold(config.gc_thresholds)
             reactor.run()
 
     def start():
@@ -449,10 +445,10 @@ def start(worker_name, config_options):
 
     reactor.callWhenRunning(start)
 
-    if worker_config.daemonize:
+    if config.worker_daemonize:
         daemon = Daemonize(
             app="synapse-synchrotron",
-            pid=worker_config.pid_file,
+            pid=config.worker_pid_file,
             action=run,
             auto_close_fds=False,
             verbose=True,
@@ -465,5 +461,4 @@ def start(worker_name, config_options):
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        worker_name = sys.argv[1]
-        start(worker_name, sys.argv[2:])
+        start(sys.argv[1:])
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 503358e03e..904789d155 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,52 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
-
 from ._base import Config
-from .server import read_gc_thresholds
-
-
-Worker = collections.namedtuple("Worker", [
-    "app",
-    "listeners",
-    "pid_file",
-    "daemonize",
-    "log_file",
-    "log_config",
-    "event_cache_size",
-    "soft_file_limit",
-    "gc_thresholds",
-    "replication_url",
-])
-
-
-def read_worker_config(config):
-    return Worker(
-        app=config["app"],
-        listeners=config.get("listeners", []),
-        pid_file=config.get("pid_file"),
-        daemonize=config["daemonize"],
-        log_file=config.get("log_file"),
-        log_config=config.get("log_config"),
-        event_cache_size=Config.parse_size(config.get("event_cache_size", "10K")),
-        soft_file_limit=config.get("soft_file_limit"),
-        gc_thresholds=read_gc_thresholds(config.get("gc_thresholds")),
-        replication_url=config.get("replication_url"),
-    )
 
 
 class WorkerConfig(Config):
     """The workers are processes run separately to the main synapse process.
-    Each worker has a name that identifies it within the config file.
     They have their own pid_file and listener configuration. They use the
-    replication_url to talk to the main synapse process. They have their
-    own cache size tuning, gc threshold tuning and open file limits."""
+    replication_url to talk to the main synapse process."""
 
     def read_config(self, config):
-        workers = config.get("workers", {})
-
-        self.workers = {
-            worker_name: read_worker_config(worker_config)
-            for worker_name, worker_config in workers.items()
-        }
+        self.worker_app = config.get("worker_app")
+        self.worker_listeners = config.get("worker_listeners")
+        self.worker_daemonize = config.get("worker_daemonize")
+        self.worker_pid_file = config.get("worker_pid_file")
+        self.worker_log_file = config.get("worker_log_file")
+        self.worker_log_config = config.get("worker_log_config")
+        self.worker_replication_url = config.get("worker_replication_url")
diff --git a/synapse/server.py b/synapse/server.py
index b3c31ece73..dd4b81c658 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -236,9 +236,6 @@ class HomeServer(object):
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
-    def get_event_cache_size(self):
-        return self.config.event_cache_size
-
 
 def _make_dependency_method(depname):
     def _get(hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2932880cc5..32c6677d47 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -166,7 +166,7 @@ class SQLBaseStore(object):
         self._get_event_counters = PerformanceCounters()
 
         self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
-                                      max_entries=hs.get_event_cache_size())
+                                      max_entries=hs.config.event_cache_size)
 
         self._state_group_cache = DictionaryCache(
             "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR