summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/replication/tcp/client.py4
-rw-r--r--synapse/replication/tcp/commands.py3
-rw-r--r--synapse/replication/tcp/handler.py4
-rw-r--r--synapse/replication/tcp/protocol.py1
-rw-r--r--synapse/replication/tcp/resource.py5
-rw-r--r--synapse/replication/tcp/streams/_base.py2
-rw-r--r--synapse/replication/tcp/streams/events.py6
7 files changed, 13 insertions, 12 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index 3dddbb70b4..0bd5478cd3 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -18,8 +18,8 @@ # [This file includes modifications made by New Vector Limited] # # -"""A replication client for use by synapse workers. -""" +"""A replication client for use by synapse workers.""" + import logging from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index b7a7e77597..6ab5356660 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -23,6 +23,7 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are allowed to be sent by which side. """ + import abc import logging from typing import List, Optional, Tuple, Type, TypeVar @@ -494,7 +495,7 @@ class LockReleasedCommand(Command): class NewActiveTaskCommand(_SimpleCommand): - """Sent to inform instance handling background tasks that a new active task is available to run. + """Sent to inform instance handling background tasks that a new task is ready to run. Format:: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 72a42cb6cc..1fafbb48c3 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -727,7 +727,7 @@ class ReplicationCommandHandler: ) -> None: """Called when get a new NEW_ACTIVE_TASK command.""" if self._task_scheduler: - self._task_scheduler.launch_task_by_id(cmd.data) + self._task_scheduler.on_new_task(cmd.data) def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" @@ -857,7 +857,7 @@ UpdateRow = TypeVar("UpdateRow") def _batch_updates( - updates: Iterable[Tuple[UpdateToken, UpdateRow]] + updates: Iterable[Tuple[UpdateToken, UpdateRow]], ) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]: """Collect stream updates with the same token together diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 4471cc8f0c..fb9c539122 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -23,6 +23,7 @@ protocols. An explanation of this protocol is available in docs/tcp_replication.md """ + import fcntl import logging import struct diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index c0329378ac..d647a2b332 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -18,8 +18,7 @@ # [This file includes modifications made by New Vector Limited] # # -"""The server side of the replication stream. -""" +"""The server side of the replication stream.""" import logging import random @@ -307,7 +306,7 @@ class ReplicationStreamer: def _batch_updates( - updates: List[Tuple[Token, StreamRow]] + updates: List[Tuple[Token, StreamRow]], ) -> List[Tuple[Optional[Token], StreamRow]]: """Takes a list of updates of form [(token, row)] and sets the token to None for all rows where the next row has the same token. This is used to diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index d021904de7..ebf5964d29 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -247,7 +247,7 @@ class _StreamFromIdGen(Stream): def current_token_without_instance( - current_token: Callable[[], int] + current_token: Callable[[], int], ) -> Callable[[str], int]: """Takes a current token callback function for a single writer stream that doesn't take an instance name parameter and wraps it in a function that diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ea0803dfc2..05b55fb033 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -200,9 +200,9 @@ class EventsStream(_StreamFromIdGen): # we rely on get_all_new_forward_event_rows strictly honouring the limit, so # that we know it is safe to just take upper_limit = event_rows[-1][0]. - assert ( - len(event_rows) <= target_row_count - ), "get_all_new_forward_event_rows did not honour row limit" + assert len(event_rows) <= target_row_count, ( + "get_all_new_forward_event_rows did not honour row limit" + ) # if we hit the limit on event_updates, there's no point in going beyond the # last stream_id in the batch for the other sources.