diff options
author | Neil Johnson <neil@fragile.org.uk> | 2018-03-26 14:51:11 +0100 |
---|---|---|
committer | Neil Johnson <neil@fragile.org.uk> | 2018-03-26 14:51:11 +0100 |
commit | aa3587fdd111d01b3bd0a610dab6e102d79f230c (patch) | |
tree | 1fad5548eab845afcb1fe5f0c5e26694b322d6d3 /synapse/replication/tcp/protocol.py | |
parent | Merge branch 'hotfixes-v0.26.1' of github.com:matrix-org/synapse (diff) | |
parent | version bump (diff) | |
download | synapse-aa3587fdd111d01b3bd0a610dab6e102d79f230c.tar.xz |
Merge branch 'release-v0.27.0' of https://github.com/matrix-org/synapse v0.27.0
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r-- | synapse/replication/tcp/protocol.py | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d59503b905..0a9a290af4 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.send_error("Wrong remote") def on_RDATA(self, cmd): + stream_name = cmd.stream_name + inbound_rdata_count.inc(stream_name) + try: - row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row) + row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) except Exception: logger.exception( "[%s] Failed to parse RDATA: %r %r", - self.id(), cmd.stream_name, cmd.row + self.id(), stream_name, cmd.row ) raise if cmd.token is None: # I.e. this is part of a batch of updates for this stream. Batch # until we get an update for the stream with a non None token - self.pending_batches.setdefault(cmd.stream_name, []).append(row) + self.pending_batches.setdefault(stream_name, []).append(row) else: # Check if this is the last of a batch of updates - rows = self.pending_batches.pop(cmd.stream_name, []) + rows = self.pending_batches.pop(stream_name, []) rows.append(row) - self.handler.on_rdata(cmd.stream_name, cmd.token, rows) + self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): self.handler.on_position(cmd.stream_name, cmd.token) @@ -644,3 +647,9 @@ metrics.register_callback( }, labels=["command", "name", "conn_id"], ) + +# number of updates received for each RDATA stream +inbound_rdata_count = metrics.register_counter( + "inbound_rdata_count", + labels=["stream_name"], +) |