diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 1d7a607529..589ee94c66 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -16,7 +16,6 @@
from synapse.http.server import JsonResource
from synapse.replication.http import membership, send_event
-
REPLICATION_PREFIX = "/_synapse/replication"
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e66c4e881f..6bfc8a5b89 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -18,10 +18,10 @@ import re
from twisted.internet import defer
-from synapse.api.errors import SynapseError, MatrixCodeMessageException
+from synapse.api.errors import MatrixCodeMessageException, SynapseError
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.types import Requester, UserID
-from synapse.util.distributor import user_left_room, user_joined_room
+from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index f080f96cc1..2eede54792 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -13,20 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import re
+
from twisted.internet import defer
from synapse.api.errors import (
- SynapseError, MatrixCodeMessageException, CodeMessageException,
+ CodeMessageException,
+ MatrixCodeMessageException,
+ SynapseError,
)
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import Requester, UserID
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
-from synapse.types import Requester, UserID
-
-import logging
-import re
logger = logging.getLogger(__name__)
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 61f5590c53..3f7be74e02 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -13,13 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
from ._slaved_id_tracker import SlavedIdTracker
-import logging
-
logger = logging.getLogger(__name__)
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index 8cae3076f4..b53a4c6bd1 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -15,7 +15,8 @@
# limitations under the License.
from synapse.storage.appservice import (
- ApplicationServiceWorkerStore, ApplicationServiceTransactionWorkerStore,
+ ApplicationServiceTransactionWorkerStore,
+ ApplicationServiceWorkerStore,
)
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 352c9a2aa8..60641f1a49 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache
+from ._base import BaseSlavedStore
+
class SlavedClientIpStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py
index 6f3fb64770..87eaa53004 100644
--- a/synapse/replication/slave/storage/deviceinbox.py
+++ b/synapse/replication/slave/storage/deviceinbox.py
@@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
class SlavedDeviceInboxStore(BaseSlavedStore):
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 7687867aee..8206a988f7 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -13,12 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
class SlavedDeviceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py
index 6deecd3963..1d1d48709a 100644
--- a/synapse/replication/slave/storage/directory.py
+++ b/synapse/replication/slave/storage/directory.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
from synapse.storage.directory import DirectoryWorkerStore
+from ._base import BaseSlavedStore
+
class DirectoryStore(DirectoryWorkerStore, BaseSlavedStore):
pass
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 97d3196633..bdb5eee4af 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -20,10 +20,11 @@ from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
+from synapse.storage.signatures import SignatureWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamWorkerStore
-from synapse.storage.signatures import SignatureWorkerStore
from synapse.storage.user_erasure_store import UserErasureWorkerStore
+
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 819ed62881..456a14cd5c 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
from synapse.storage.filtering import FilteringStore
+from ._base import BaseSlavedStore
+
class SlavedFilteringStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 0bc4bce5b0..5777f07c8d 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -13,11 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+
class SlavedGroupServerStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index dd2ae49e48..05ed168463 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
from synapse.storage import DataStore
from synapse.storage.keys import KeyStore
+from ._base import BaseSlavedStore
+
class SlavedKeyStore(BaseSlavedStore):
_get_server_verify_key = KeyStore.__dict__[
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index cfb9280181..80b744082a 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-
-from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.storage import DataStore
from synapse.storage.presence import PresenceStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
class SlavedPresenceStore(BaseSlavedStore):
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index bb2c40b6e3..f0200c1e98 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -14,10 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from .events import SlavedEventStore
-from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage.push_rule import PushRulesWorkerStore
+from ._slaved_id_tracker import SlavedIdTracker
+from .events import SlavedEventStore
+
class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore):
def __init__(self, db_conn, hs):
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index a7cd5a7291..3b2213c0d4 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -14,11 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.storage.pusher import PusherWorkerStore
+
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage.pusher import PusherWorkerStore
-
class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index 1647072f65..7ab12b850f 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -14,11 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.storage.receipts import ReceiptsWorkerStore
+
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
-from synapse.storage.receipts import ReceiptsWorkerStore
-
# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
# DataStore or are cached and don't have cache invalidation logic.
diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py
index 7323bf0f1e..408d91df1c 100644
--- a/synapse/replication/slave/storage/registration.py
+++ b/synapse/replication/slave/storage/registration.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
from synapse.storage.registration import RegistrationWorkerStore
+from ._base import BaseSlavedStore
+
class SlavedRegistrationStore(RegistrationWorkerStore, BaseSlavedStore):
pass
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 5ae1670157..0cb474928c 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
from synapse.storage.room import RoomWorkerStore
+
+from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index fbb58f35da..9c9a5eadd9 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseSlavedStore
from synapse.storage import DataStore
from synapse.storage.transactions import TransactionStore
+from ._base import BaseSlavedStore
+
class TransactionStore(BaseSlavedStore):
get_destination_retry_timings = TransactionStore.__dict__[
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index bb852b00af..e592ab57bf 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -15,17 +15,20 @@
"""A replication client for use by synapse workers.
"""
+import logging
+
from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
- FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
+ FederationAckCommand,
+ InvalidateCacheCommand,
+ RemovePusherCommand,
UserIpCommand,
+ UserSyncCommand,
)
from .protocol import ClientReplicationStreamProtocol
-import logging
-
logger = logging.getLogger(__name__)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 171a698e14..dec5ac0913 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -49,29 +49,37 @@ indicate which side is sending, these are *not* included on the wire::
* connection closed by server *
"""
+import fcntl
+import logging
+import struct
+from collections import defaultdict
+
+from six import iteritems, iterkeys
+
+from prometheus_client import Counter
+
from twisted.internet import defer
from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
-from .commands import (
- COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
- ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
- NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
-)
-from .streams import STREAMS_MAP
-
from synapse.metrics import LaterGauge
from synapse.util.stringutils import random_string
-from prometheus_client import Counter
-
-from collections import defaultdict
-
-from six import iterkeys, iteritems
-
-import logging
-import struct
-import fcntl
+from .commands import (
+ COMMAND_MAP,
+ VALID_CLIENT_COMMANDS,
+ VALID_SERVER_COMMANDS,
+ ErrorCommand,
+ NameCommand,
+ PingCommand,
+ PositionCommand,
+ RdataCommand,
+ ReplicateCommand,
+ ServerCommand,
+ SyncCommand,
+ UserSyncCommand,
+)
+from .streams import STREAMS_MAP
connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 95ad8c1b4c..611fb66e1d 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -15,19 +15,20 @@
"""The server side of the replication stream.
"""
+import logging
+
+from six import itervalues
+
+from prometheus_client import Counter
+
from twisted.internet import defer
from twisted.internet.protocol import Factory
-from .streams import STREAMS_MAP, FederationStream
-from .protocol import ServerReplicationStreamProtocol
-
-from synapse.util.metrics import Measure, measure_func
from synapse.metrics import LaterGauge
+from synapse.util.metrics import Measure, measure_func
-import logging
-
-from prometheus_client import Counter
-from six import itervalues
+from .protocol import ServerReplicationStreamProtocol
+from .streams import STREAMS_MAP, FederationStream
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 4c60bf79f9..55fe701c5c 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -24,11 +24,10 @@ Each stream is defined by the following information:
update_function: The function that returns a list of updates between two tokens
"""
-from twisted.internet import defer
-from collections import namedtuple
-
import logging
+from collections import namedtuple
+from twisted.internet import defer
logger = logging.getLogger(__name__)
|