summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py9
-rw-r--r--synapse/replication/http/_base.py8
-rw-r--r--synapse/replication/http/account_data.py14
-rw-r--r--synapse/replication/http/devices.py8
-rw-r--r--synapse/replication/http/federation.py16
-rw-r--r--synapse/replication/http/login.py8
-rw-r--r--synapse/replication/http/membership.py6
-rw-r--r--synapse/replication/http/presence.py2
-rw-r--r--synapse/replication/http/push.py2
-rw-r--r--synapse/replication/http/register.py10
-rw-r--r--synapse/replication/http/send_event.py8
-rw-r--r--synapse/replication/http/streams.py8
-rw-r--r--synapse/replication/slave/storage/_base.py7
-rw-r--r--synapse/replication/slave/storage/client_ips.py7
-rw-r--r--synapse/replication/slave/storage/devices.py7
-rw-r--r--synapse/replication/slave/storage/events.py6
-rw-r--r--synapse/replication/slave/storage/filtering.py7
-rw-r--r--synapse/replication/slave/storage/groups.py7
-rw-r--r--synapse/replication/tcp/external_cache.py9
-rw-r--r--synapse/replication/tcp/handler.py6
-rw-r--r--synapse/replication/tcp/resource.py8
-rw-r--r--synapse/replication/tcp/streams/_base.py20
22 files changed, 130 insertions, 53 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index ba8114ac9e..1457d9d59b 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -12,6 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import TYPE_CHECKING
+
 from synapse.http.server import JsonResource
 from synapse.replication.http import (
     account_data,
@@ -26,16 +28,19 @@ from synapse.replication.http import (
     streams,
 )
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 REPLICATION_PREFIX = "/_synapse/replication"
 
 
 class ReplicationRestResource(JsonResource):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         # We enable extracting jaeger contexts here as these are internal APIs.
         super().__init__(hs, canonical_json=False, extract_context=True)
         self.register_servlets(hs)
 
-    def register_servlets(self, hs):
+    def register_servlets(self, hs: "HomeServer"):
         send_event.register_servlets(hs, self)
         federation.register_servlets(hs, self)
         presence.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index e047ec74d8..585332b244 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -17,7 +17,7 @@ import logging
 import re
 import urllib
 from inspect import signature
-from typing import TYPE_CHECKING, Dict, List, Tuple
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
 
 from prometheus_client import Counter, Gauge
 
@@ -156,7 +156,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         pass
 
     @classmethod
-    def make_client(cls, hs):
+    def make_client(cls, hs: "HomeServer"):
         """Create a client that makes requests.
 
         Returns a callable that accepts the same parameters as
@@ -208,7 +208,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                     url_args.append(txn_id)
 
                 if cls.METHOD == "POST":
-                    request_func = client.post_json_get_json
+                    request_func: Callable[
+                        ..., Awaitable[Any]
+                    ] = client.post_json_get_json
                 elif cls.METHOD == "PUT":
                     request_func = client.put_json
                 elif cls.METHOD == "GET":
diff --git a/synapse/replication/http/account_data.py b/synapse/replication/http/account_data.py
index 70e951af63..5f0f225aa9 100644
--- a/synapse/replication/http/account_data.py
+++ b/synapse/replication/http/account_data.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -37,7 +41,7 @@ class ReplicationUserAccountDataRestServlet(ReplicationEndpoint):
     PATH_ARGS = ("user_id", "account_data_type")
     CACHE = False
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.handler = hs.get_account_data_handler()
@@ -78,7 +82,7 @@ class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint):
     PATH_ARGS = ("user_id", "room_id", "account_data_type")
     CACHE = False
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.handler = hs.get_account_data_handler()
@@ -119,7 +123,7 @@ class ReplicationAddTagRestServlet(ReplicationEndpoint):
     PATH_ARGS = ("user_id", "room_id", "tag")
     CACHE = False
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.handler = hs.get_account_data_handler()
@@ -162,7 +166,7 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
     )
     CACHE = False
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.handler = hs.get_account_data_handler()
@@ -183,7 +187,7 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
         return 200, {"max_stream_id": max_stream_id}
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationUserAccountDataRestServlet(hs).register(http_server)
     ReplicationRoomAccountDataRestServlet(hs).register(http_server)
     ReplicationAddTagRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 5a5818ef61..42dffb39cb 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -13,9 +13,13 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.replication.http._base import ReplicationEndpoint
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -51,7 +55,7 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
     PATH_ARGS = ("user_id",)
     CACHE = False
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.device_list_updater = hs.get_device_handler().device_list_updater
@@ -68,5 +72,5 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
         return 200, user_devices
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index a0b3145f4e..5ed535c90d 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import make_event_from_dict
@@ -21,6 +22,9 @@ from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 from synapse.util.metrics import Measure
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -56,7 +60,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
     NAME = "fed_send_events"
     PATH_ARGS = ()
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.store = hs.get_datastore()
@@ -151,7 +155,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
     NAME = "fed_send_edu"
     PATH_ARGS = ("edu_type",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.store = hs.get_datastore()
@@ -194,7 +198,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
     # This is a query, so let's not bother caching
     CACHE = False
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.store = hs.get_datastore()
@@ -238,7 +242,7 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
     NAME = "fed_cleanup_room"
     PATH_ARGS = ("room_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.store = hs.get_datastore()
@@ -273,7 +277,7 @@ class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
     NAME = "store_room_on_outlier_membership"
     PATH_ARGS = ("room_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.store = hs.get_datastore()
@@ -289,7 +293,7 @@ class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationFederationSendEventsRestServlet(hs).register(http_server)
     ReplicationFederationSendEduRestServlet(hs).register(http_server)
     ReplicationGetQueryRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index 550bd5c95f..0db419ea57 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -30,7 +34,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
     NAME = "device_check_registered"
     PATH_ARGS = ("user_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.registration_handler = hs.get_registration_handler()
 
@@ -82,5 +86,5 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
         return 200, res
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     RegisterDeviceReplicationServlet(hs).register(http_server)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 34206c5060..7371c240b2 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -45,7 +45,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
     NAME = "remote_join"
     PATH_ARGS = ("room_id", "user_id")
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.federation_handler = hs.get_federation_handler()
@@ -320,7 +320,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
     PATH_ARGS = ("room_id", "user_id", "change")
     CACHE = False  # No point caching as should return instantly.
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.registeration_handler = hs.get_registration_handler()
@@ -360,7 +360,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationRemoteJoinRestServlet(hs).register(http_server)
     ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
     ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
index bb00247953..63143085d5 100644
--- a/synapse/replication/http/presence.py
+++ b/synapse/replication/http/presence.py
@@ -117,6 +117,6 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
         )
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationBumpPresenceActiveTime(hs).register(http_server)
     ReplicationPresenceSetState(hs).register(http_server)
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
index 139427cb1f..6c8db3061e 100644
--- a/synapse/replication/http/push.py
+++ b/synapse/replication/http/push.py
@@ -67,5 +67,5 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationRemovePusherRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index d6dd7242eb..7adfbb666f 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -26,7 +30,7 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
     NAME = "register_user"
     PATH_ARGS = ("user_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.store = hs.get_datastore()
         self.registration_handler = hs.get_registration_handler()
@@ -100,7 +104,7 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
     NAME = "post_register"
     PATH_ARGS = ("user_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.store = hs.get_datastore()
         self.registration_handler = hs.get_registration_handler()
@@ -130,6 +134,6 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationRegisterServlet(hs).register(http_server)
     ReplicationPostRegisterActionsServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index fae5ffa451..9f6851d059 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import make_event_from_dict
@@ -22,6 +23,9 @@ from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import Requester, UserID
 from synapse.util.metrics import Measure
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -57,7 +61,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
     NAME = "send_event"
     PATH_ARGS = ("event_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.event_creation_handler = hs.get_event_creation_handler()
@@ -135,5 +139,5 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
         )
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 9afa147d00..3223bc2432 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -13,11 +13,15 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.api.errors import SynapseError
 from synapse.http.servlet import parse_integer
 from synapse.replication.http._base import ReplicationEndpoint
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -46,7 +50,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
     PATH_ARGS = ("stream_name",)
     METHOD = "GET"
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self._instance_name = hs.get_instance_name()
@@ -74,5 +78,5 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         )
 
 
-def register_servlets(hs, http_server):
+def register_servlets(hs: "HomeServer", http_server):
     ReplicationGetStreamUpdates(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index e460dd85cd..7ecb446e7c 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -13,18 +13,21 @@
 # limitations under the License.
 
 import logging
-from typing import Optional
+from typing import TYPE_CHECKING, Optional
 
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class BaseSlavedStore(CacheInvalidationWorkerStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen: Optional[
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 436d39c320..61cd7e5228 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -12,15 +12,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import TYPE_CHECKING
+
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
 from synapse.util.caches.lrucache import LruCache
 
 from ._base import BaseSlavedStore
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class SlavedClientIpStore(BaseSlavedStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         self.client_ip_last_seen: LruCache[tuple, int] = LruCache(
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 26bdead565..0a58296089 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -12,6 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import TYPE_CHECKING
+
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
@@ -20,9 +22,12 @@ from synapse.storage.databases.main.devices import DeviceWorkerStore
 from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         self.hs = hs
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index d4d3f8c448..63ed50caa5 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
@@ -30,6 +31,9 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from ._base import BaseSlavedStore
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -54,7 +58,7 @@ class SlavedEventStore(
     RelationsWorkerStore,
     BaseSlavedStore,
 ):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         events_max = self._stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 37875bc973..90284c202d 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -12,14 +12,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import TYPE_CHECKING
+
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.filtering import FilteringStore
 
 from ._base import BaseSlavedStore
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class SlavedFilteringStore(BaseSlavedStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
     # Filters are immutable so this cache doesn't need to be expired
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index e9bdc38470..497e16c69e 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -12,6 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import TYPE_CHECKING
+
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import GroupServerStream
@@ -19,9 +21,12 @@ from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.group_server import GroupServerWorkerStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         self.hs = hs
diff --git a/synapse/replication/tcp/external_cache.py b/synapse/replication/tcp/external_cache.py
index b402f82810..aaf91e5e02 100644
--- a/synapse/replication/tcp/external_cache.py
+++ b/synapse/replication/tcp/external_cache.py
@@ -21,6 +21,8 @@ from synapse.logging.context import make_deferred_yieldable
 from synapse.util import json_decoder, json_encoder
 
 if TYPE_CHECKING:
+    from txredisapi import RedisProtocol
+
     from synapse.server import HomeServer
 
 set_counter = Counter(
@@ -59,7 +61,12 @@ class ExternalCache:
     """
 
     def __init__(self, hs: "HomeServer"):
-        self._redis_connection = hs.get_outbound_redis_connection()
+        if hs.config.redis.redis_enabled:
+            self._redis_connection: Optional[
+                "RedisProtocol"
+            ] = hs.get_outbound_redis_connection()
+        else:
+            self._redis_connection = None
 
     def _get_redis_key(self, cache_name: str, key: str) -> str:
         return "cache_v1:%s:%s" % (cache_name, key)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 6aa9318027..06fd06fdf3 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -294,7 +294,7 @@ class ReplicationCommandHandler:
             # This shouldn't be possible
             raise Exception("Unrecognised command %s in stream queue", cmd.NAME)
 
-    def start_replication(self, hs):
+    def start_replication(self, hs: "HomeServer"):
         """Helper method to start a replication connection to the remote server
         using TCP.
         """
@@ -321,6 +321,8 @@ class ReplicationCommandHandler:
                 hs.config.redis.redis_host,  # type: ignore[arg-type]
                 hs.config.redis.redis_port,
                 self._factory,
+                timeout=30,
+                bindAddress=None,
             )
         else:
             client_name = hs.get_instance_name()
@@ -331,6 +333,8 @@ class ReplicationCommandHandler:
                 host,  # type: ignore[arg-type]
                 port,
                 self._factory,
+                timeout=30,
+                bindAddress=None,
             )
 
     def get_streams(self) -> Dict[str, Stream]:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 80f9b23bfd..55326877fd 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@
 
 import logging
 import random
+from typing import TYPE_CHECKING
 
 from prometheus_client import Counter
 
@@ -27,6 +28,9 @@ from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
 from synapse.replication.tcp.streams import EventsStream
 from synapse.util.metrics import Measure
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 stream_updates_counter = Counter(
     "synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
 )
@@ -37,7 +41,7 @@ logger = logging.getLogger(__name__)
 class ReplicationStreamProtocolFactory(Factory):
     """Factory for new replication connections."""
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.command_handler = hs.get_tcp_replication()
         self.clock = hs.get_clock()
         self.server_name = hs.config.server.server_name
@@ -65,7 +69,7 @@ class ReplicationStreamer:
     data is available it will propagate to all connected clients.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 9b905aba9d..c8b188ae4e 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -241,7 +241,7 @@ class BackfillStream(Stream):
     NAME = "backfill"
     ROW_TYPE = BackfillStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
@@ -363,7 +363,7 @@ class ReceiptsStream(Stream):
     NAME = "receipts"
     ROW_TYPE = ReceiptsStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
@@ -380,7 +380,7 @@ class PushRulesStream(Stream):
     NAME = "push_rules"
     ROW_TYPE = PushRulesStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
 
         super().__init__(
@@ -405,7 +405,7 @@ class PushersStream(Stream):
     NAME = "pushers"
     ROW_TYPE = PushersStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
 
         super().__init__(
@@ -438,7 +438,7 @@ class CachesStream(Stream):
     NAME = "caches"
     ROW_TYPE = CachesStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
@@ -459,7 +459,7 @@ class DeviceListsStream(Stream):
     NAME = "device_lists"
     ROW_TYPE = DeviceListsStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
@@ -476,7 +476,7 @@ class ToDeviceStream(Stream):
     NAME = "to_device"
     ROW_TYPE = ToDeviceStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
@@ -495,7 +495,7 @@ class TagAccountDataStream(Stream):
     NAME = "tag_account_data"
     ROW_TYPE = TagAccountDataStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
@@ -582,7 +582,7 @@ class GroupServerStream(Stream):
     NAME = "groups"
     ROW_TYPE = GroupsStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
@@ -599,7 +599,7 @@ class UserSignatureStream(Stream):
     NAME = "user_signature"
     ROW_TYPE = UserSignatureStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),