summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/replication/tcp/protocol.py6
-rw-r--r--synapse/storage/state.py36
2 files changed, 30 insertions, 12 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 19b1ce504f..5770b7125a 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -85,6 +85,8 @@ logger = logging.getLogger(__name__)
 
 
 PING_TIME = 5000
+PING_TIMEOUT_MULTIPLIER = 5
+PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER
 
 
 class ConnectionStates(object):
@@ -166,7 +168,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         now = self.clock.time_msec()
 
         if self.time_we_closed:
-            if now - self.time_we_closed > PING_TIME * 3:
+            if now - self.time_we_closed > PING_TIMEOUT_MS:
                 logger.info(
                     "[%s] Failed to close connection gracefully, aborting", self.id()
                 )
@@ -175,7 +177,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             if now - self.last_sent_command >= PING_TIME:
                 self.send_command(PingCommand(now))
 
-            if self.received_ping and now - self.last_received_command > PING_TIME * 3:
+            if self.received_ping and now - self.last_received_command > PING_TIMEOUT_MS:
                 logger.info(
                     "[%s] Connection hasn't received command in %r ms. Closing.",
                     self.id(), now - self.last_received_command
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index fb23f6f462..acd69944c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches import intern_string
 from synapse.storage.engines import PostgresEngine
 
@@ -69,17 +69,33 @@ class StateStore(SQLBaseStore):
             where_clause="type='m.room.member'",
         )
 
-    @cachedInlineCallbacks(max_entries=100000, iterable=True)
+    @cached(max_entries=100000, iterable=True)
     def get_current_state_ids(self, room_id):
-        rows = yield self._simple_select_list(
-            table="current_state_events",
-            keyvalues={"room_id": room_id},
-            retcols=["event_id", "type", "state_key"],
-            desc="_calculate_state_delta",
+        """Get the current state event ids for a room based on the
+        current_state_events table.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            deferred: dict of (type, state_key) -> event_id
+        """
+        def _get_current_state_ids_txn(txn):
+            txn.execute(
+                """SELECT type, state_key, event_id FROM current_state_events
+                WHERE room_id = ?
+                """,
+                (room_id,)
+            )
+
+            return {
+                (r[0], r[1]): r[2] for r in txn
+            }
+
+        return self.runInteraction(
+            "get_current_state_ids",
+            _get_current_state_ids_txn,
         )
-        defer.returnValue({
-            (r["type"], r["state_key"]): r["event_id"] for r in rows
-        })
 
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):