diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py
index 523a1358d4..1b8718b11d 100644
--- a/synapse/replication/tcp/__init__.py
+++ b/synapse/replication/tcp/__init__.py
@@ -25,7 +25,7 @@ Structure of the module:
* command.py - the definitions of all the valid commands
* protocol.py - the TCP protocol classes
* resource.py - handles streaming stream updates to replications
- * streams/ - the definitons of all the valid streams
+ * streams/ - the definitions of all the valid streams
The general interaction of the classes are:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index df29732f51..4985e40b1f 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -33,8 +33,8 @@ from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure
if TYPE_CHECKING:
- from synapse.server import HomeServer
from synapse.replication.tcp.handler import ReplicationCommandHandler
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index ea5937a20c..ccc7f1f0d1 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -18,18 +18,11 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
import abc
+import json
import logging
-import platform
from typing import Tuple, Type
-if platform.python_implementation() == "PyPy":
- import json
-
- _json_encoder = json.JSONEncoder()
-else:
- import simplejson as json # type: ignore[no-redef] # noqa: F821
-
- _json_encoder = json.JSONEncoder(namedtuple_as_object=False) # type: ignore[call-arg] # noqa: F821
+_json_encoder = json.JSONEncoder()
logger = logging.getLogger(__name__)
@@ -54,7 +47,7 @@ class Command(metaclass=abc.ABCMeta):
@abc.abstractmethod
def to_line(self) -> str:
- """Serialises the comamnd for the wire. Does not include the command
+ """Serialises the command for the wire. Does not include the command
prefix.
"""
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e6a2e2598b..55b3b79008 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar
@@ -149,10 +148,11 @@ class ReplicationCommandHandler:
using TCP.
"""
if hs.config.redis.redis_enabled:
+ import txredisapi
+
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
)
- import txredisapi
logger.info(
"Connecting to redis (host=%r port=%r)",
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 4198eece71..ca47f5cc88 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -317,7 +317,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def _queue_command(self, cmd):
"""Queue the command until the connection is ready to write to again.
"""
- logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
+ logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd)
self.pending_commands.append(cmd)
if len(self.pending_commands) > self.max_line_buffer:
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index e776b63183..0a7e7f67be 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -177,7 +177,7 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
Args:
hs
outbound_redis_connection: A connection to redis that will be used to
- send outbound commands (this is seperate to the redis connection
+ send outbound commands (this is separate to the redis connection
used to subscribe).
"""
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f196eff072..9076bbe9f1 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -198,26 +198,6 @@ def current_token_without_instance(
return lambda instance_name: current_token()
-def db_query_to_update_function(
- query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]]
-) -> UpdateFunction:
- """Wraps a db query function which returns a list of rows to make it
- suitable for use as an `update_function` for the Stream class
- """
-
- async def update_function(instance_name, from_token, upto_token, limit):
- rows = await query_function(from_token, upto_token, limit)
- updates = [(row[0], row[1:]) for row in rows]
- limited = False
- if len(updates) >= limit:
- upto_token = updates[-1][0]
- limited = True
-
- return updates, upto_token, limited
-
- return update_function
-
-
def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
"""Makes a suitable function for use as an `update_function` that queries
the master process for updates.
@@ -393,7 +373,7 @@ class PushersStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_pushers_stream_token),
- db_query_to_update_function(store.get_all_updated_pushers_rows),
+ store.get_all_updated_pushers_rows,
)
@@ -421,26 +401,12 @@ class CachesStream(Stream):
ROW_TYPE = CachesStreamRow
def __init__(self, hs):
- self.store = hs.get_datastore()
+ store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
- self.store.get_cache_stream_token,
- self._update_function,
- )
-
- async def _update_function(
- self, instance_name: str, from_token: int, upto_token: int, limit: int
- ):
- rows = await self.store.get_all_updated_caches(
- instance_name, from_token, upto_token, limit
+ store.get_cache_stream_token,
+ store.get_all_updated_caches,
)
- updates = [(row[0], row[1:]) for row in rows]
- limited = False
- if len(updates) >= limit:
- upto_token = updates[-1][0]
- limited = True
-
- return updates, upto_token, limited
class PublicRoomsStream(Stream):
@@ -465,7 +431,7 @@ class PublicRoomsStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_public_room_stream_id),
- db_query_to_update_function(store.get_all_new_public_rooms),
+ store.get_all_new_public_rooms,
)
@@ -486,7 +452,7 @@ class DeviceListsStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_device_stream_token),
- db_query_to_update_function(store.get_all_device_list_changes_for_remotes),
+ store.get_all_device_list_changes_for_remotes,
)
@@ -504,7 +470,7 @@ class ToDeviceStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_to_device_stream_token),
- db_query_to_update_function(store.get_all_new_device_messages),
+ store.get_all_new_device_messages,
)
@@ -524,7 +490,7 @@ class TagAccountDataStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_account_data_stream_id),
- db_query_to_update_function(store.get_all_updated_tags),
+ store.get_all_updated_tags,
)
@@ -612,7 +578,7 @@ class GroupServerStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_group_stream_token),
- db_query_to_update_function(store.get_all_groups_changes),
+ store.get_all_groups_changes,
)
@@ -630,7 +596,5 @@ class UserSignatureStream(Stream):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_device_stream_token),
- db_query_to_update_function(
- store.get_all_user_signature_changes_for_remotes
- ),
+ store.get_all_user_signature_changes_for_remotes,
)
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f370390331..1c2a4cce7f 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import heapq
from collections import Iterable
from typing import List, Tuple, Type
@@ -22,7 +21,6 @@ import attr
from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
-
"""Handling of the 'events' replication stream
This stream contains rows of various types. Each row therefore contains a 'type'
@@ -64,7 +62,7 @@ class BaseEventsStreamRow(object):
Specifies how to identify, serialize and deserialize the different types.
"""
- # Unique string that ids the type. Must be overriden in sub classes.
+ # Unique string that ids the type. Must be overridden in sub classes.
TypeId = None # type: str
@classmethod
|