diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 10f5c98ff8..e616b5e1c8 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -267,27 +267,38 @@ class UserSyncCommand(Command):
NAME = "USER_SYNC"
def __init__(
- self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
+ self,
+ instance_id: str,
+ user_id: str,
+ device_id: Optional[str],
+ is_syncing: bool,
+ last_sync_ms: int,
):
self.instance_id = instance_id
self.user_id = user_id
+ self.device_id = device_id
self.is_syncing = is_syncing
self.last_sync_ms = last_sync_ms
@classmethod
def from_line(cls: Type["UserSyncCommand"], line: str) -> "UserSyncCommand":
- instance_id, user_id, state, last_sync_ms = line.split(" ", 3)
+ device_id: Optional[str]
+ instance_id, user_id, device_id, state, last_sync_ms = line.split(" ", 4)
+
+ if device_id == "None":
+ device_id = None
if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))
- return cls(instance_id, user_id, state == "start", int(last_sync_ms))
+ return cls(instance_id, user_id, device_id, state == "start", int(last_sync_ms))
def to_line(self) -> str:
return " ".join(
(
self.instance_id,
self.user_id,
+ str(self.device_id),
"start" if self.is_syncing else "end",
str(self.last_sync_ms),
)
@@ -452,6 +463,17 @@ class LockReleasedCommand(Command):
return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key])
+class NewActiveTaskCommand(_SimpleCommand):
+ """Sent to inform instance handling background tasks that a new active task is available to run.
+
+ Format::
+
+ NEW_ACTIVE_TASK "<task_id>"
+ """
+
+ NAME = "NEW_ACTIVE_TASK"
+
+
_COMMANDS: Tuple[Type[Command], ...] = (
ServerCommand,
RdataCommand,
@@ -466,6 +488,7 @@ _COMMANDS: Tuple[Type[Command], ...] = (
RemoteServerUpCommand,
ClearUserSyncsCommand,
LockReleasedCommand,
+ NewActiveTaskCommand,
)
# Map of command name to command type.
|