diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 19b1ce504f..5770b7125a 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -85,6 +85,8 @@ logger = logging.getLogger(__name__)
PING_TIME = 5000
+PING_TIMEOUT_MULTIPLIER = 5
+PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER
class ConnectionStates(object):
@@ -166,7 +168,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
now = self.clock.time_msec()
if self.time_we_closed:
- if now - self.time_we_closed > PING_TIME * 3:
+ if now - self.time_we_closed > PING_TIMEOUT_MS:
logger.info(
"[%s] Failed to close connection gracefully, aborting", self.id()
)
@@ -175,7 +177,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
if now - self.last_sent_command >= PING_TIME:
self.send_command(PingCommand(now))
- if self.received_ping and now - self.last_received_command > PING_TIME * 3:
+ if self.received_ping and now - self.last_received_command > PING_TIMEOUT_MS:
logger.info(
"[%s] Connection hasn't received command in %r ms. Closing.",
self.id(), now - self.last_received_command
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index fb23f6f462..acd69944c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,7 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches import intern_string
from synapse.storage.engines import PostgresEngine
@@ -69,17 +69,33 @@ class StateStore(SQLBaseStore):
where_clause="type='m.room.member'",
)
- @cachedInlineCallbacks(max_entries=100000, iterable=True)
+ @cached(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
- rows = yield self._simple_select_list(
- table="current_state_events",
- keyvalues={"room_id": room_id},
- retcols=["event_id", "type", "state_key"],
- desc="_calculate_state_delta",
+ """Get the current state event ids for a room based on the
+ current_state_events table.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ deferred: dict of (type, state_key) -> event_id
+ """
+ def _get_current_state_ids_txn(txn):
+ txn.execute(
+ """SELECT type, state_key, event_id FROM current_state_events
+ WHERE room_id = ?
+ """,
+ (room_id,)
+ )
+
+ return {
+ (r[0], r[1]): r[2] for r in txn
+ }
+
+ return self.runInteraction(
+ "get_current_state_ids",
+ _get_current_state_ids_txn,
)
- defer.returnValue({
- (r["type"], r["state_key"]): r["event_id"] for r in rows
- })
@defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids):
|