diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2020-07-27 18:54:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-27 18:54:43 +0100 |
commit | f57b99af22de874b11f44ef32c1f1425ec1344b9 (patch) | |
tree | d11bbffa9546fd70152a4a75dfad0a0c221f8bbe /synapse/replication/tcp/redis.py | |
parent | update changelog (diff) | |
download | synapse-f57b99af22de874b11f44ef32c1f1425ec1344b9.tar.xz |
Handle replication commands synchronously where possible (#7876)
Most of the stuff we do for replication commands can be done synchronously. There's no point spinning up background processes if we're not going to need them.
Diffstat (limited to '')
-rw-r--r-- | synapse/replication/tcp/redis.py | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index b5c533a607..f225e533de 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from inspect import isawaitable from typing import TYPE_CHECKING import txredisapi @@ -124,36 +125,32 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): # remote instances. tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc() - # Now lets try and call on_<CMD_NAME> function - run_as_background_process( - "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd - ) + self.handle_command(cmd) - async def handle_command(self, cmd: Command): + def handle_command(self, cmd: Command) -> None: """Handle a command we have received over the replication stream. - By default delegates to on_<COMMAND>, which should return an awaitable. + Delegates to `self.handler.on_<COMMAND>` (which can optionally return an + Awaitable). Args: cmd: received command """ - handled = False - - # First call any command handlers on this instance. These are for redis - # specific handling. - cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(cmd) - handled = True - # Then call out to the handler. cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(self, cmd) - handled = True - - if not handled: + if not cmd_func: logger.warning("Unhandled command: %r", cmd) + return + + res = cmd_func(self, cmd) + + # the handler might be a coroutine: fire it off as a background process + # if so. + + if isawaitable(res): + run_as_background_process( + "replication-" + cmd.get_logcontext_id(), lambda: res + ) def connectionLost(self, reason): logger.info("Lost connection to redis") |