diff options
-rw-r--r-- | synapse/replication/tcp/protocol.py | 6 | ||||
-rw-r--r-- | synapse/storage/state.py | 36 |
2 files changed, 30 insertions, 12 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 19b1ce504f..5770b7125a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -85,6 +85,8 @@ logger = logging.getLogger(__name__) PING_TIME = 5000 +PING_TIMEOUT_MULTIPLIER = 5 +PING_TIMEOUT_MS = PING_TIME * PING_TIMEOUT_MULTIPLIER class ConnectionStates(object): @@ -166,7 +168,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): now = self.clock.time_msec() if self.time_we_closed: - if now - self.time_we_closed > PING_TIME * 3: + if now - self.time_we_closed > PING_TIMEOUT_MS: logger.info( "[%s] Failed to close connection gracefully, aborting", self.id() ) @@ -175,7 +177,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): if now - self.last_sent_command >= PING_TIME: self.send_command(PingCommand(now)) - if self.received_ping and now - self.last_received_command > PING_TIME * 3: + if self.received_ping and now - self.last_received_command > PING_TIMEOUT_MS: logger.info( "[%s] Connection hasn't received command in %r ms. Closing.", self.id(), now - self.last_received_command diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fb23f6f462..acd69944c4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string from synapse.storage.engines import PostgresEngine @@ -69,17 +69,33 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) - @cachedInlineCallbacks(max_entries=100000, iterable=True) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): - rows = yield self._simple_select_list( - table="current_state_events", - keyvalues={"room_id": room_id}, - retcols=["event_id", "type", "state_key"], - desc="_calculate_state_delta", + """Get the current state event ids for a room based on the + current_state_events table. + + Args: + room_id (str) + + Returns: + deferred: dict of (type, state_key) -> event_id + """ + def _get_current_state_ids_txn(txn): + txn.execute( + """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? + """, + (room_id,) + ) + + return { + (r[0], r[1]): r[2] for r in txn + } + + return self.runInteraction( + "get_current_state_ids", + _get_current_state_ids_txn, ) - defer.returnValue({ - (r["type"], r["state_key"]): r["event_id"] for r in rows - }) @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): |