diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index f592ad352e..7b45c87a96 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,7 +16,7 @@
import synapse
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, PresenceState
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
@@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
@@ -135,6 +135,8 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["who_forgot_in_room"]
)
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
class SynchrotronPresence(object):
def __init__(self, hs):
@@ -153,6 +155,13 @@ class SynchrotronPresence(object):
self.process_id = random_string(16)
logger.info("Presence process_id is %r", self.process_id)
+ self._sending_sync = False
+ self._need_to_send_sync = False
+ self.clock.looping_call(
+ self._send_syncing_users_regularly,
+ UPDATE_SYNCING_USERS_MS,
+ )
+
def set_state(self, user, state):
# TODO Hows this supposed to work?
pass
@@ -165,12 +174,10 @@ class SynchrotronPresence(object):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
- # TODO: Send this less frequently.
- # TODO: Make sure this doesn't race. Currently we can lose updates
- # if two users come online in quick sucession and the second http
- # to the master completes before the first.
- # TODO: Don't block the sync request on this HTTP hit.
- yield self._send_syncing_users()
+ prev_states = yield self.current_state_for_users([user_id])
+ if prev_states[user_id].state == PresenceState.OFFLINE:
+ # TODO: Don't block the sync request on this HTTP hit.
+ yield self._send_syncing_users_now()
def _end():
if affect_presence:
@@ -185,8 +192,24 @@ class SynchrotronPresence(object):
defer.returnValue(_user_syncing())
- def _send_syncing_users(self):
- return self.http_client.post_json_get_json(self.syncing_users_url, {
+ def _send_syncing_users_regularly(self):
+ # Only send an update if we aren't in the middle of sending one.
+ if not self._sending_sync:
+ preserve_fn(self._send_syncing_users_now)()
+
+ @defer.inlineCallbacks
+ def _send_syncing_users_now(self):
+ if self._sending_sync:
+ # We don't want to race with sending another update.
+ # Instead we wait for that update to finish and send another
+ # update afterwards.
+ self._need_to_send_sync = True
+ return
+
+ # Flag that we are sending an update.
+ self._sending_sync = True
+
+ yield self.http_client.post_json_get_json(self.syncing_users_url, {
"process_id": self.process_id,
"syncing_users": [
user_id for user_id, count in self.user_to_num_current_syncs.items()
@@ -194,6 +217,16 @@ class SynchrotronPresence(object):
],
})
+ # Unset the flag as we are no longer sending an update.
+ self._sending_sync = False
+ if self._need_to_send_sync:
+ # If something happened while we were sending the update then
+ # we might need to send another update.
+ # TODO: Check if the update that was sent matches the current state
+ # as we only need to send an update if they are different.
+ self._need_to_send_sync = False
+ yield self._send_syncing_users_now()
+
def process_replication(self, result):
stream = result.get("presence", {"rows": []})
for row in stream["rows"]:
|