diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 5a011eac90..cc0bcd126a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -233,6 +233,7 @@ class GenericWorkerPresence(object):
self.user_to_num_current_syncs = {}
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self.instance_id = hs.get_instance_id()
active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}
@@ -251,7 +252,7 @@ class GenericWorkerPresence(object):
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
- user_id, is_syncing, last_sync_ms
+ self.instance_id, user_id, is_syncing, last_sync_ms
)
def mark_as_coming_online(self, user_id):
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 7e7ad0f798..e86d9805f1 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -189,10 +189,12 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
"""
self.send_command(FederationAckCommand(token))
- def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+ def send_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
"""Poke the master that a user has started/stopped syncing.
"""
- self.send_command(UserSyncCommand(user_id, is_syncing, last_sync_ms))
+ self.send_command(
+ UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms)
+ )
def send_remove_pusher(self, app_id, push_key, user_id):
"""Poke the master to remove a pusher for a user
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 5a6b734094..fe3668838b 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -207,30 +207,32 @@ class UserSyncCommand(Command):
Format::
- USER_SYNC <user_id> <state> <last_sync_ms>
+ USER_SYNC <instance_id> <user_id> <state> <last_sync_ms>
Where <state> is either "start" or "stop"
"""
NAME = "USER_SYNC"
- def __init__(self, user_id, is_syncing, last_sync_ms):
+ def __init__(self, instance_id, user_id, is_syncing, last_sync_ms):
+ self.instance_id = instance_id
self.user_id = user_id
self.is_syncing = is_syncing
self.last_sync_ms = last_sync_ms
@classmethod
def from_line(cls, line):
- user_id, state, last_sync_ms = line.split(" ", 2)
+ instance_id, user_id, state, last_sync_ms = line.split(" ", 3)
if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))
- return cls(user_id, state == "start", int(last_sync_ms))
+ return cls(instance_id, user_id, state == "start", int(last_sync_ms))
def to_line(self):
return " ".join(
(
+ self.instance_id,
self.user_id,
"start" if self.is_syncing else "end",
str(self.last_sync_ms),
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index e266c72417..8b9d65362b 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -419,7 +419,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
async def on_USER_SYNC(self, cmd):
await self.streamer.on_user_sync(
- self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
+ cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
)
async def on_REPLICATE(self, cmd):
@@ -547,6 +547,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
):
BaseReplicationStreamProtocol.__init__(self, clock)
+ self.instance_id = hs.get_instance_id()
+
self.client_name = client_name
self.server_name = server_name
self.handler = handler
@@ -576,7 +578,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
currently_syncing = self.handler.get_currently_syncing_users()
now = self.clock.time_msec()
for user_id in currently_syncing:
- self.send_command(UserSyncCommand(user_id, True, now))
+ self.send_command(UserSyncCommand(self.instance_id, user_id, True, now))
# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 757129b6d5..8d720694e9 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -246,12 +246,12 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync")
- async def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
+ async def on_user_sync(self, instance_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
await self.presence_handler.update_external_syncs_row(
- conn_id, user_id, is_syncing, last_sync_ms
+ instance_id, user_id, is_syncing, last_sync_ms
)
@measure_func("repl.on_remove_pusher")
@@ -316,14 +316,6 @@ class ReplicationStreamer(object):
except ValueError:
pass
- # We need to tell the presence handler that the connection has been
- # lost so that it can handle any ongoing syncs on that connection.
- run_as_background_process(
- "update_external_syncs_clear",
- self.presence_handler.update_external_syncs_clear,
- connection.conn_id,
- )
-
def _batch_updates(updates):
"""Takes a list of updates of form [(token, row)] and sets the token to
diff --git a/synapse/server.py b/synapse/server.py
index 1b980371de..392f9e8e38 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -100,6 +100,7 @@ from synapse.storage import DataStores, Storage
from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
+from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
@@ -224,6 +225,8 @@ class HomeServer(object):
self._listening_services = []
self.start_time = None
+ self.instance_id = random_string(5)
+
self.clock = Clock(reactor)
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
@@ -236,6 +239,11 @@ class HomeServer(object):
for depname in kwargs:
setattr(self, depname, kwargs[depname])
+ def get_instance_id(self):
+ """A unique ID for this synapse process instance.
+ """
+ return self.instance_id
+
def setup(self):
logger.info("Setting up.")
self.start_time = int(self.get_clock().time())
|