summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/_base.py8
-rw-r--r--synapse/app/generic_worker.py41
-rw-r--r--synapse/app/homeserver.py56
3 files changed, 60 insertions, 45 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index dedff81af3..373a80a4a7 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -20,6 +20,7 @@ import signal
 import socket
 import sys
 import traceback
+from typing import Iterable
 
 from daemonize import Daemonize
 from typing_extensions import NoReturn
@@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
 
 import synapse
 from synapse.app import check_bind_error
+from synapse.config.server import ListenerConfig
 from synapse.crypto import context_factory
 from synapse.logging.context import PreserveLoggingContext
 from synapse.util.async_helpers import Linearizer
@@ -234,7 +236,7 @@ def refresh_certificate(hs):
         logger.info("Context factories updated.")
 
 
-def start(hs, listeners=None):
+def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
     """
     Start a Synapse server or worker.
 
@@ -245,8 +247,8 @@ def start(hs, listeners=None):
     notify systemd.
 
     Args:
-        hs (synapse.server.HomeServer)
-        listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
+        hs: homeserver instance
+        listeners: Listener configuration ('listeners' in homeserver.yaml)
     """
     try:
         # Set up the SIGHUP machinery.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f3ec2a34ec..27a3fc9ed6 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -37,6 +37,7 @@ from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.config.server import ListenerConfig
 from synapse.federation import send_queue
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.handlers.presence import (
@@ -514,13 +515,18 @@ class GenericWorkerSlavedStore(
 class GenericWorkerServer(HomeServer):
     DATASTORE_CLASS = GenericWorkerSlavedStore
 
-    def _listen_http(self, listener_config):
-        port = listener_config["port"]
-        bind_addresses = listener_config["bind_addresses"]
-        site_tag = listener_config.get("tag", port)
+    def _listen_http(self, listener_config: ListenerConfig):
+        port = listener_config.port
+        bind_addresses = listener_config.bind_addresses
+
+        assert listener_config.http_options is not None
+
+        site_tag = listener_config.http_options.tag
+        if site_tag is None:
+            site_tag = port
         resources = {}
-        for res in listener_config["resources"]:
-            for name in res["names"]:
+        for res in listener_config.http_options.resources:
+            for name in res.names:
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "client":
@@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer):
                             " repository is disabled. Ignoring."
                         )
 
-                if name == "openid" and "federation" not in res["names"]:
+                if name == "openid" and "federation" not in res.names:
                     # Only load the openid resource separately if federation resource
                     # is not specified since federation resource includes openid
                     # resource.
@@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer):
 
         logger.info("Synapse worker now listening on port %d", port)
 
-    def start_listening(self, listeners):
+    def start_listening(self, listeners: Iterable[ListenerConfig]):
         for listener in listeners:
-            if listener["type"] == "http":
+            if listener.type == "http":
                 self._listen_http(listener)
-            elif listener["type"] == "manhole":
+            elif listener.type == "manhole":
                 _base.listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     manhole(
                         username="matrix", password="rabbithole", globals={"hs": self}
                     ),
                 )
-            elif listener["type"] == "metrics":
+            elif listener.type == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warning(
                         (
@@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer):
                         )
                     )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
+                    _base.listen_metrics(listener.bind_addresses, listener.port)
             else:
-                logger.warning("Unrecognized listener type: %s", listener["type"])
+                logger.warning("Unsupported listener type: %s", listener.type)
 
         self.get_tcp_replication().start_replication(self)
 
@@ -738,6 +744,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         except Exception:
             logger.exception("Error processing replication")
 
+    async def on_position(self, stream_name: str, instance_name: str, token: int):
+        await super().on_position(stream_name, instance_name, token)
+        # Also call on_rdata to ensure that stream positions are properly reset.
+        await self.on_rdata(stream_name, instance_name, token, [])
+
     def stop_pusher(self, user_id, app_id, pushkey):
         if not self.notify_pushers:
             return
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 41994dc14b..09291d86ad 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -23,8 +23,7 @@ import math
 import os
 import resource
 import sys
-
-from six import iteritems
+from typing import Iterable
 
 from prometheus_client import Gauge
 
@@ -50,6 +49,7 @@ from synapse.app import _base
 from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.server import ListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import (
@@ -90,24 +90,24 @@ def gz_wrap(r):
 class SynapseHomeServer(HomeServer):
     DATASTORE_CLASS = DataStore
 
-    def _listener_http(self, config, listener_config):
-        port = listener_config["port"]
-        bind_addresses = listener_config["bind_addresses"]
-        tls = listener_config.get("tls", False)
-        site_tag = listener_config.get("tag", port)
+    def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
+        port = listener_config.port
+        bind_addresses = listener_config.bind_addresses
+        tls = listener_config.tls
+        site_tag = listener_config.http_options.tag
+        if site_tag is None:
+            site_tag = port
 
         resources = {}
-        for res in listener_config["resources"]:
-            for name in res["names"]:
-                if name == "openid" and "federation" in res["names"]:
+        for res in listener_config.http_options.resources:
+            for name in res.names:
+                if name == "openid" and "federation" in res.names:
                     # Skip loading openid resource if federation is defined
                     # since federation resource will include openid
                     continue
-                resources.update(
-                    self._configure_named_resource(name, res.get("compress", False))
-                )
+                resources.update(self._configure_named_resource(name, res.compress))
 
-        additional_resources = listener_config.get("additional_resources", {})
+        additional_resources = listener_config.http_options.additional_resources
         logger.debug("Configuring additional resources: %r", additional_resources)
         module_api = ModuleApi(self, self.get_auth_handler())
         for path, resmodule in additional_resources.items():
@@ -279,7 +279,7 @@ class SynapseHomeServer(HomeServer):
 
         return resources
 
-    def start_listening(self, listeners):
+    def start_listening(self, listeners: Iterable[ListenerConfig]):
         config = self.get_config()
 
         if config.redis_enabled:
@@ -289,25 +289,25 @@ class SynapseHomeServer(HomeServer):
             self.get_tcp_replication().start_replication(self)
 
         for listener in listeners:
-            if listener["type"] == "http":
+            if listener.type == "http":
                 self._listening_services.extend(self._listener_http(config, listener))
-            elif listener["type"] == "manhole":
+            elif listener.type == "manhole":
                 listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     manhole(
                         username="matrix", password="rabbithole", globals={"hs": self}
                     ),
                 )
-            elif listener["type"] == "replication":
+            elif listener.type == "replication":
                 services = listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     ReplicationStreamProtocolFactory(self),
                 )
                 for s in services:
                     reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
-            elif listener["type"] == "metrics":
+            elif listener.type == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warning(
                         (
@@ -316,9 +316,11 @@ class SynapseHomeServer(HomeServer):
                         )
                     )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
+                    _base.listen_metrics(listener.bind_addresses, listener.port)
             else:
-                logger.warning("Unrecognized listener type: %s", listener["type"])
+                # this shouldn't happen, as the listener type should have been checked
+                # during parsing
+                logger.warning("Unrecognized listener type: %s", listener.type)
 
 
 # Gauges to expose monthly active user control metrics
@@ -526,7 +528,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
     stats["total_nonbridged_users"] = total_nonbridged_users
 
     daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
-    for name, count in iteritems(daily_user_type_results):
+    for name, count in daily_user_type_results.items():
         stats["daily_user_type_" + name] = count
 
     room_count = yield hs.get_datastore().get_room_count()
@@ -538,7 +540,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
     stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
     r30_results = yield hs.get_datastore().count_r30_users()
-    for name, count in iteritems(r30_results):
+    for name, count in r30_results.items():
         stats["r30_users_" + name] = count
 
     daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()