summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/synchrotron.py47
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/replication/tcp/client.py2
-rw-r--r--synapse/replication/tcp/protocol.py48
4 files changed, 86 insertions, 15 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 1fac021ea9..d39e3161fe 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -119,12 +119,53 @@ class SynchrotronPresence(object):
             for state in active_presence
         }
 
+        # user_id -> last_sync_ms. Lists the users that have stopped syncing
+        # but we haven't notified the master of that yet
+        self.users_going_offline = {}
+
+        self._send_stop_syncing_loop = self.clock.looping_call(
+            self.send_stop_syncing, 10 * 1000
+        )
+
         self.process_id = random_string(16)
         logger.info("Presence process_id is %r", self.process_id)
 
     def send_user_sync(self, user_id, is_syncing, last_sync_ms):
         self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
 
+    def mark_as_coming_online(self, user_id):
+        """A user has started syncing. Send a UserSync to the master, unless they
+        had recently stopped syncing.
+
+        Args:
+            user_id (str)
+        """
+        going_offline = self.users_going_offline.pop(user_id, None)
+        if not going_offline:
+            # Safe to skip because we haven't yet told the master they were offline
+            self.send_user_sync(user_id, True, self.clock.time_msec())
+
+    def mark_as_going_offline(self, user_id):
+        """A user has stopped syncing. We wait before notifying the master as
+        its likely they'll come back soon. This allows us to avoid sending
+        a stopped syncing immediately followed by a started syncing notification
+        to the master
+
+        Args:
+            user_id (str)
+        """
+        self.users_going_offline[user_id] = self.clock.time_msec()
+
+    def send_stop_syncing(self):
+        """Check if there are any users who have stopped syncing a while ago
+        and haven't come back yet. If there are poke the master about them.
+        """
+        now = self.clock.time_msec()
+        for user_id, last_sync_ms in self.users_going_offline.items():
+            if now - last_sync_ms > 10 * 1000:
+                self.users_going_offline.pop(user_id, None)
+                self.send_user_sync(user_id, False, last_sync_ms)
+
     def set_state(self, user, state, ignore_status_msg=False):
         # TODO Hows this supposed to work?
         pass
@@ -141,8 +182,7 @@ class SynchrotronPresence(object):
 
             # If we went from no in flight sync to some, notify replication
             if self.user_to_num_current_syncs[user_id] == 1:
-                now = self.clock.time_msec()
-                self.send_user_sync(user_id, True, now)
+                self.mark_as_coming_online(user_id)
 
         def _end():
             # We check that the user_id is in user_to_num_current_syncs because
@@ -153,8 +193,7 @@ class SynchrotronPresence(object):
 
                 # If we went from one in flight sync to non, notify replication
                 if self.user_to_num_current_syncs[user_id] == 0:
-                    now = self.clock.time_msec()
-                    self.send_user_sync(user_id, False, now)
+                    self.mark_as_going_offline(user_id)
 
         @contextlib.contextmanager
         def _user_syncing():
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 53baf3e79a..9ed5af3cb4 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -596,14 +596,14 @@ class PresenceHandler(object):
             for user_id in user_ids
         }
 
-        missing = [user_id for user_id, state in states.items() if not state]
+        missing = [user_id for user_id, state in states.iteritems() if not state]
         if missing:
             # There are things not in our in memory cache. Lets pull them out of
             # the database.
             res = yield self.store.get_presence_for_users(missing)
             states.update(res)
 
-            missing = [user_id for user_id, state in states.items() if not state]
+            missing = [user_id for user_id, state in states.iteritems() if not state]
             if missing:
                 new = {
                     user_id: UserPresenceState.default(user_id)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 251d3afcf4..90fb6c1336 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -175,7 +175,7 @@ class ReplicationClientHandler(object):
     def send_invalidate_cache(self, cache_func, keys):
         """Poke the master to invalidate a cache.
         """
-        cmd = InvalidateCacheCommand(cache_func, keys)
+        cmd = InvalidateCacheCommand(cache_func.__name__, keys)
         self.send_command(cmd)
 
     def await_sync(self, data):
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 6864204616..19b1ce504f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -51,6 +51,7 @@ indicate which side is sending, these are *not* included on the wire::
 
 from twisted.internet import defer
 from twisted.protocols.basic import LineOnlyReceiver
+from twisted.python.failure import Failure
 
 from commands import (
     COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
@@ -60,6 +61,7 @@ from commands import (
 from streams import STREAMS_MAP
 
 from synapse.util.stringutils import random_string
+from synapse.metrics.metric import CounterMetric
 
 import logging
 import synapse.metrics
@@ -69,11 +71,8 @@ import fcntl
 
 metrics = synapse.metrics.get_metrics_for(__name__)
 
-inbound_commands_counter = metrics.register_counter(
-    "inbound_commands", labels=["command", "name", "conn_id"],
-)
-outbound_commands_counter = metrics.register_counter(
-    "outbound_commands", labels=["command", "name", "conn_id"],
+connection_close_counter = metrics.register_counter(
+    "close_reason", labels=["reason_type"],
 )
 
 
@@ -135,6 +134,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         # The LoopingCall for sending pings.
         self._send_ping_loop = None
 
+        self.inbound_commands_counter = CounterMetric(
+            "inbound_commands", labels=["command"],
+        )
+        self.outbound_commands_counter = CounterMetric(
+            "outbound_commands", labels=["command"],
+        )
+
     def connectionMade(self):
         logger.info("[%s] Connection established", self.id())
 
@@ -193,7 +199,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         self.last_received_command = self.clock.time_msec()
 
-        inbound_commands_counter.inc(cmd_name, self.name, self.conn_id)
+        self.inbound_commands_counter.inc(cmd_name)
 
         cmd_cls = COMMAND_MAP[cmd_name]
         try:
@@ -214,6 +220,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             logger.exception("[%s] Failed to handle line: %r", self.id(), line)
 
     def close(self):
+        logger.warn("[%s] Closing connection", self.id())
         self.time_we_closed = self.clock.time_msec()
         self.transport.loseConnection()
         self.on_connection_closed()
@@ -242,7 +249,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             self._queue_command(cmd)
             return
 
-        outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id)
+        self.outbound_commands_counter.inc(cmd.NAME)
 
         string = "%s %s" % (cmd.NAME, cmd.to_line(),)
         if "\n" in string:
@@ -307,6 +314,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
     def connectionLost(self, reason):
         logger.info("[%s] Replication connection closed: %r", self.id(), reason)
+        if isinstance(reason, Failure):
+            connection_close_counter.inc(reason.type.__name__)
+        else:
+            connection_close_counter.inc(reason.__class__.__name__)
 
         try:
             # Remove us from list of connections to be monitored
@@ -495,7 +506,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
     def on_SERVER(self, cmd):
         if cmd.data != self.server_name:
             logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
-            self.transport.abortConnection()
+            self.send_error("Wrong remote")
 
     def on_RDATA(self, cmd):
         try:
@@ -604,3 +615,24 @@ metrics.register_callback(
     },
     labels=["name", "conn_id"],
 )
+
+
+metrics.register_callback(
+    "inbound_commands",
+    lambda: {
+        (k[0], p.name, p.conn_id): count
+        for p in connected_connections
+        for k, count in p.inbound_commands_counter.counts.iteritems()
+    },
+    labels=["command", "name", "conn_id"],
+)
+
+metrics.register_callback(
+    "outbound_commands",
+    lambda: {
+        (k[0], p.name, p.conn_id): count
+        for p in connected_connections
+        for k, count in p.outbound_commands_counter.counts.iteritems()
+    },
+    labels=["command", "name", "conn_id"],
+)