summary refs log tree commit diff
path: root/synapse/replication/tcp/commands.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-23 18:40:03 +0000
committerErik Johnston <erik@matrix.org>2020-03-23 18:52:24 +0000
commit225b993cf6d3a24e3f7e0d0be28b663e4157c53d (patch)
tree95a8cd374497bb19c53db16a5bc67b56f68840e9 /synapse/replication/tcp/commands.py
parentHandle connection closing under us (diff)
downloadsynapse-225b993cf6d3a24e3f7e0d0be28b663e4157c53d.tar.xz
Remove `conn_id` usage for UserSyncCommand.
Each tcp replication connection is assigned a "conn_id", which is used
to give an ID to a remotely connected worker. In a redis world, there
will no longer be a one to one mapping between connection and instance,
so instead we need to replace such usages with an ID generated by the
remote instances and included in the replicaiton commands.

This really only effects UserSyncCommand.
Diffstat (limited to '')
-rw-r--r--synapse/replication/tcp/commands.py10
1 files changed, 6 insertions, 4 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py

index 5a6b734094..fe3668838b 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -207,30 +207,32 @@ class UserSyncCommand(Command): Format:: - USER_SYNC <user_id> <state> <last_sync_ms> + USER_SYNC <instance_id> <user_id> <state> <last_sync_ms> Where <state> is either "start" or "stop" """ NAME = "USER_SYNC" - def __init__(self, user_id, is_syncing, last_sync_ms): + def __init__(self, instance_id, user_id, is_syncing, last_sync_ms): + self.instance_id = instance_id self.user_id = user_id self.is_syncing = is_syncing self.last_sync_ms = last_sync_ms @classmethod def from_line(cls, line): - user_id, state, last_sync_ms = line.split(" ", 2) + instance_id, user_id, state, last_sync_ms = line.split(" ", 3) if state not in ("start", "end"): raise Exception("Invalid USER_SYNC state %r" % (state,)) - return cls(user_id, state == "start", int(last_sync_ms)) + return cls(instance_id, user_id, state == "start", int(last_sync_ms)) def to_line(self): return " ".join( ( + self.instance_id, self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms),