summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py1
-rw-r--r--synapse/replication/http/membership.py4
-rw-r--r--synapse/replication/http/send_event.py18
-rw-r--r--synapse/replication/slave/storage/_base.py4
-rw-r--r--synapse/replication/slave/storage/appservice.py3
-rw-r--r--synapse/replication/slave/storage/client_ips.py3
-rw-r--r--synapse/replication/slave/storage/deviceinbox.py7
-rw-r--r--synapse/replication/slave/storage/devices.py5
-rw-r--r--synapse/replication/slave/storage/directory.py3
-rw-r--r--synapse/replication/slave/storage/events.py5
-rw-r--r--synapse/replication/slave/storage/filtering.py3
-rw-r--r--synapse/replication/slave/storage/groups.py5
-rw-r--r--synapse/replication/slave/storage/keys.py3
-rw-r--r--synapse/replication/slave/storage/presence.py8
-rw-r--r--synapse/replication/slave/storage/push_rule.py5
-rw-r--r--synapse/replication/slave/storage/pushers.py4
-rw-r--r--synapse/replication/slave/storage/receipts.py6
-rw-r--r--synapse/replication/slave/storage/registration.py3
-rw-r--r--synapse/replication/slave/storage/room.py3
-rw-r--r--synapse/replication/slave/storage/transactions.py3
-rw-r--r--synapse/replication/tcp/client.py15
-rw-r--r--synapse/replication/tcp/commands.py16
-rw-r--r--synapse/replication/tcp/protocol.py97
-rw-r--r--synapse/replication/tcp/resource.py21
-rw-r--r--synapse/replication/tcp/streams.py5
25 files changed, 149 insertions, 101 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 1d7a607529..589ee94c66 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -16,7 +16,6 @@
 from synapse.http.server import JsonResource
 from synapse.replication.http import membership, send_event
 
-
 REPLICATION_PREFIX = "/_synapse/replication"
 
 
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e66c4e881f..6bfc8a5b89 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -18,10 +18,10 @@ import re
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, MatrixCodeMessageException
+from synapse.api.errors import MatrixCodeMessageException, SynapseError
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.types import Requester, UserID
-from synapse.util.distributor import user_left_room, user_joined_room
+from synapse.util.distributor import user_joined_room, user_left_room
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index a9baa2c1c3..2eede54792 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -13,31 +13,33 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+import re
+
 from twisted.internet import defer
 
 from synapse.api.errors import (
-    SynapseError, MatrixCodeMessageException, CodeMessageException,
+    CodeMessageException,
+    MatrixCodeMessageException,
+    SynapseError,
 )
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.util.async import sleep
+from synapse.types import Requester, UserID
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.metrics import Measure
-from synapse.types import Requester, UserID
-
-import logging
-import re
 
 logger = logging.getLogger(__name__)
 
 
 @defer.inlineCallbacks
-def send_event_to_master(client, host, port, requester, event, context,
+def send_event_to_master(clock, client, host, port, requester, event, context,
                          ratelimit, extra_users):
     """Send event to be handled on the master
 
     Args:
+        clock (synapse.util.Clock)
         client (SimpleHttpClient)
         host (str): host of master
         port (int): port on master listening for HTTP replication
@@ -77,7 +79,7 @@ def send_event_to_master(client, host, port, requester, event, context,
 
             # If we timed out we probably don't need to worry about backing
             # off too much, but lets just wait a little anyway.
-            yield sleep(1)
+            yield clock.sleep(1)
     except MatrixCodeMessageException as e:
         # We convert to SynapseError as we know that it was a SynapseError
         # on the master process that we should send to the client. (And
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 61f5590c53..3f7be74e02 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.engines import PostgresEngine
 
 from ._slaved_id_tracker import SlavedIdTracker
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 8cae3076f4..b53a4c6bd1 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -15,7 +15,8 @@
 # limitations under the License.
 
 from synapse.storage.appservice import (
-    ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
+    ApplicationServiceTransactionWorkerStore,
+    ApplicationServiceWorkerStore,
 )
 
 
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 352c9a2aa8..60641f1a49 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.caches.descriptors import Cache
 
+from ._base import BaseSlavedStore
+
 
 class SlavedClientIpStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 6f3fb64770..87eaa53004 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 
 class SlavedDeviceInboxStore(BaseSlavedStore):
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 7687867aee..8206a988f7 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
 from synapse.storage.end_to_end_keys import EndToEndKeyStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
 
 class SlavedDeviceStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 6deecd3963..1d1d48709a 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage.directory import DirectoryWorkerStore
 
+from ._base import BaseSlavedStore
+
 
 class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
     pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b1f64ef0d8..bdb5eee4af 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -20,9 +20,11 @@ from synapse.storage.event_federation import EventFederationWorkerStore
 from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.storage.roommember import RoomMemberWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
 from synapse.storage.state import StateGroupWorkerStore
 from synapse.storage.stream import StreamWorkerStore
-from synapse.storage.signatures import SignatureWorkerStore
+from synapse.storage.user_erasure_store import UserErasureWorkerStore
+
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
@@ -45,6 +47,7 @@ class SlavedEventStore(EventFederationWorkerStore,
                        EventsWorkerStore,
                        StateGroupWorkerStore,
                        SignatureWorkerStore,
+                       UserErasureWorkerStore,
                        BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 819ed62881..456a14cd5c 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage.filtering import FilteringStore
 
+from ._base import BaseSlavedStore
+
 
 class SlavedFilteringStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 0bc4bce5b0..5777f07c8d 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -13,11 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage import DataStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
 
 class SlavedGroupServerStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index dd2ae49e48..05ed168463 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.storage.keys import KeyStore
 
+from ._base import BaseSlavedStore
+
 
 class SlavedKeyStore(BaseSlavedStore):
     _get_server_verify_key = KeyStore.__dict__[
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index cfb9280181..80b744082a 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-
-from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.storage import DataStore
 from synapse.storage.presence import PresenceStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 
 class SlavedPresenceStore(BaseSlavedStore):
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index bb2c40b6e3..f0200c1e98 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -14,10 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from .events import SlavedEventStore
-from ._slaved_id_tracker import SlavedIdTracker
 from synapse.storage.push_rule import PushRulesWorkerStore
 
+from ._slaved_id_tracker import SlavedIdTracker
+from .events import SlavedEventStore
+
 
 class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index a7cd5a7291..3b2213c0d4 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -14,11 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.storage.pusher import PusherWorkerStore
+
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage.pusher import PusherWorkerStore
-
 
 class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
 
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index 1647072f65..ed12342f40 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -14,11 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.storage.receipts import ReceiptsWorkerStore
+
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage.receipts import ReceiptsWorkerStore
-
 # So, um, we want to borrow a load of functions intended for reading from
 # a DataStore, but we don't want to take functions that either write to the
 # DataStore or are cached and don't have cache invalidation logic.
@@ -49,7 +49,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
 
     def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
         self.get_receipts_for_user.invalidate((user_id, receipt_type))
-        self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+        self._get_linearized_receipts_for_room.invalidate_many((room_id,))
         self.get_last_receipt_event_id_for_user.invalidate(
             (user_id, room_id, receipt_type)
         )
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index 7323bf0f1e..408d91df1c 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage.registration import RegistrationWorkerStore
 
+from ._base import BaseSlavedStore
+
 
 class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
     pass
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 5ae1670157..0cb474928c 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -13,8 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage.room import RoomWorkerStore
+
+from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index fbb58f35da..9c9a5eadd9 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,10 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
 
+from ._base import BaseSlavedStore
+
 
 class TransactionStore(BaseSlavedStore):
     get_destination_retry_timings = TransactionStore.__dict__[
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6d2513c4e2..e592ab57bf 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -15,17 +15,20 @@
 """A replication client for use by synapse workers.
 """
 
-from twisted.internet import reactor, defer
+import logging
+
+from twisted.internet import defer
 from twisted.internet.protocol import ReconnectingClientFactory
 
 from .commands import (
-    FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
+    FederationAckCommand,
+    InvalidateCacheCommand,
+    RemovePusherCommand,
     UserIpCommand,
+    UserSyncCommand,
 )
 from .protocol import ClientReplicationStreamProtocol
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
@@ -44,7 +47,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
         self.server_name = hs.config.server_name
         self._clock = hs.get_clock()  # As self.clock is defined in super class
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
 
     def startedConnecting(self, connector):
         logger.info("Connecting to replication: %r", connector.getDestination())
@@ -95,7 +98,7 @@ class ReplicationClientHandler(object):
         factory = ReplicationClientFactory(hs, client_name, self)
         host = hs.config.worker_replication_host
         port = hs.config.worker_replication_port
-        reactor.connectTCP(host, port, factory)
+        hs.get_reactor().connectTCP(host, port, factory)
 
     def on_rdata(self, stream_name, token, rows):
         """Called when we get new replication data. By default this just pokes
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 12aac3cc6b..f3908df642 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,13 +19,17 @@ allowed to be sent by which side.
 """
 
 import logging
-import simplejson
+import platform
 
+if platform.python_implementation() == "PyPy":
+    import json
+    _json_encoder = json.JSONEncoder()
+else:
+    import simplejson as json
+    _json_encoder = json.JSONEncoder(namedtuple_as_object=False)
 
 logger = logging.getLogger(__name__)
 
-_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
-
 
 class Command(object):
     """The base command class.
@@ -102,7 +106,7 @@ class RdataCommand(Command):
         return cls(
             stream_name,
             None if token == "batch" else int(token),
-            simplejson.loads(row_json)
+            json.loads(row_json)
         )
 
     def to_line(self):
@@ -300,7 +304,7 @@ class InvalidateCacheCommand(Command):
     def from_line(cls, line):
         cache_func, keys_json = line.split(" ", 1)
 
-        return cls(cache_func, simplejson.loads(keys_json))
+        return cls(cache_func, json.loads(keys_json))
 
     def to_line(self):
         return " ".join((
@@ -329,7 +333,7 @@ class UserIpCommand(Command):
     def from_line(cls, line):
         user_id, jsn = line.split(" ", 1)
 
-        access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
+        access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
 
         return cls(
             user_id, access_token, ip, user_agent, device_id, last_seen
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index a6280aae70..dec5ac0913 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -49,29 +49,37 @@ indicate which side is sending, these are *not* included on the wire::
     * connection closed by server *
 """
 
+import fcntl
+import logging
+import struct
+from collections import defaultdict
+
+from six import iteritems, iterkeys
+
+from prometheus_client import Counter
+
 from twisted.internet import defer
 from twisted.protocols.basic import LineOnlyReceiver
 from twisted.python.failure import Failure
 
-from .commands import (
-    COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
-    ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
-    NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
-)
-from .streams import STREAMS_MAP
-
 from synapse.metrics import LaterGauge
 from synapse.util.stringutils import random_string
 
-from prometheus_client import Counter
-
-from collections import defaultdict
-
-from six import iterkeys, iteritems
-
-import logging
-import struct
-import fcntl
+from .commands import (
+    COMMAND_MAP,
+    VALID_CLIENT_COMMANDS,
+    VALID_SERVER_COMMANDS,
+    ErrorCommand,
+    NameCommand,
+    PingCommand,
+    PositionCommand,
+    RdataCommand,
+    ReplicateCommand,
+    ServerCommand,
+    SyncCommand,
+    UserSyncCommand,
+)
+from .streams import STREAMS_MAP
 
 connection_close_counter = Counter(
     "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
@@ -564,11 +572,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 # The following simply registers metrics for the replication connections
 
 pending_commands = LaterGauge(
-    "pending_commands", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_pending_commands",
+    "",
+    ["name", "conn_id"],
     lambda: {
-        (p.name, p.conn_id): len(p.pending_commands)
-        for p in connected_connections
-    })
+        (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
+    },
+)
 
 
 def transport_buffer_size(protocol):
@@ -579,11 +589,13 @@ def transport_buffer_size(protocol):
 
 
 transport_send_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_send_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
-        (p.name, p.conn_id): transport_buffer_size(p)
-        for p in connected_connections
-    })
+        (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
+    },
+)
 
 
 def transport_kernel_read_buffer_size(protocol, read=True):
@@ -602,37 +614,50 @@ def transport_kernel_read_buffer_size(protocol, read=True):
 
 
 tcp_transport_kernel_send_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_kernel_send_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
         for p in connected_connections
-    })
+    },
+)
 
 
 tcp_transport_kernel_read_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_kernel_read_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
         for p in connected_connections
-    })
+    },
+)
 
 
 tcp_inbound_commands = LaterGauge(
-    "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
+    "synapse_replication_tcp_protocol_inbound_commands",
+    "",
+    ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
-        for k, count in iteritems(p.inbound_commands_counter.counts)
-    })
+        for k, count in iteritems(p.inbound_commands_counter)
+    },
+)
 
 tcp_outbound_commands = LaterGauge(
-    "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
+    "synapse_replication_tcp_protocol_outbound_commands",
+    "",
+    ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
-        for k, count in iteritems(p.outbound_commands_counter.counts)
-    })
+        for k, count in iteritems(p.outbound_commands_counter)
+    },
+)
 
 # number of updates received for each RDATA stream
-inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
-                              ["stream_name"])
+inbound_rdata_count = Counter(
+    "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
+)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 63bd6d2652..611fb66e1d 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -15,19 +15,20 @@
 """The server side of the replication stream.
 """
 
-from twisted.internet import defer, reactor
-from twisted.internet.protocol import Factory
+import logging
 
-from .streams import STREAMS_MAP, FederationStream
-from .protocol import ServerReplicationStreamProtocol
+from six import itervalues
 
-from synapse.util.metrics import Measure, measure_func
-from synapse.metrics import LaterGauge
+from prometheus_client import Counter
 
-import logging
+from twisted.internet import defer
+from twisted.internet.protocol import Factory
 
-from prometheus_client import Counter
-from six import itervalues
+from synapse.metrics import LaterGauge
+from synapse.util.metrics import Measure, measure_func
+
+from .protocol import ServerReplicationStreamProtocol
+from .streams import STREAMS_MAP, FederationStream
 
 stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
                                  "", ["stream_name"])
@@ -109,7 +110,7 @@ class ReplicationStreamer(object):
         self.is_looping = False
         self.pending_updates = False
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
 
     def on_shutdown(self):
         # close all connections on shutdown
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 4c60bf79f9..55fe701c5c 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -24,11 +24,10 @@ Each stream is defined by the following information:
     update_function:    The function that returns a list of updates between two tokens
 """
 
-from twisted.internet import defer
-from collections import namedtuple
-
 import logging
+from collections import namedtuple
 
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)