diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2020-07-30 19:00:29 +0100 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2020-07-30 19:00:29 +0100 |
commit | 69158e554f30ac8b6b646a62fa496a2c0005dea6 (patch) | |
tree | 42fdb177abede9c0128906d4e6661cde0ee9cd6c /synapse/replication/tcp/redis.py | |
parent | Changelog (diff) | |
parent | Update workers docs (#7990) (diff) | |
download | synapse-69158e554f30ac8b6b646a62fa496a2c0005dea6.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/new_push_rules
Diffstat (limited to 'synapse/replication/tcp/redis.py')
-rw-r--r-- | synapse/replication/tcp/redis.py | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index b5c533a607..f225e533de 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from inspect import isawaitable from typing import TYPE_CHECKING import txredisapi @@ -124,36 +125,32 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): # remote instances. tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc() - # Now lets try and call on_<CMD_NAME> function - run_as_background_process( - "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd - ) + self.handle_command(cmd) - async def handle_command(self, cmd: Command): + def handle_command(self, cmd: Command) -> None: """Handle a command we have received over the replication stream. - By default delegates to on_<COMMAND>, which should return an awaitable. + Delegates to `self.handler.on_<COMMAND>` (which can optionally return an + Awaitable). Args: cmd: received command """ - handled = False - - # First call any command handlers on this instance. These are for redis - # specific handling. - cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(cmd) - handled = True - # Then call out to the handler. cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None) - if cmd_func: - await cmd_func(self, cmd) - handled = True - - if not handled: + if not cmd_func: logger.warning("Unhandled command: %r", cmd) + return + + res = cmd_func(self, cmd) + + # the handler might be a coroutine: fire it off as a background process + # if so. + + if isawaitable(res): + run_as_background_process( + "replication-" + cmd.get_logcontext_id(), lambda: res + ) def connectionLost(self, reason): logger.info("Lost connection to redis") |