diff options
author | Richard van der Hoff <github@rvanderhoff.org.uk> | 2018-01-16 13:13:53 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-16 13:13:53 +0000 |
commit | 4a31a61ef9cbff3f61d9b86cce0446948658fb80 (patch) | |
tree | b62d9034c546983b9e229257b469474dd1861637 /synapse/replication/tcp/protocol.py | |
parent | Merge pull request #2767 from matrix-org/erikj/media_storage_refactor (diff) | |
parent | Metrics for number of RDATA commands received (diff) | |
download | synapse-4a31a61ef9cbff3f61d9b86cce0446948658fb80.tar.xz |
Merge pull request #2786 from matrix-org/rav/rdata_metrics
Metrics for number of RDATA commands received
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r-- | synapse/replication/tcp/protocol.py | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d59503b905..0a9a290af4 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.send_error("Wrong remote") def on_RDATA(self, cmd): + stream_name = cmd.stream_name + inbound_rdata_count.inc(stream_name) + try: - row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row) + row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) except Exception: logger.exception( "[%s] Failed to parse RDATA: %r %r", - self.id(), cmd.stream_name, cmd.row + self.id(), stream_name, cmd.row ) raise if cmd.token is None: # I.e. this is part of a batch of updates for this stream. Batch # until we get an update for the stream with a non None token - self.pending_batches.setdefault(cmd.stream_name, []).append(row) + self.pending_batches.setdefault(stream_name, []).append(row) else: # Check if this is the last of a batch of updates - rows = self.pending_batches.pop(cmd.stream_name, []) + rows = self.pending_batches.pop(stream_name, []) rows.append(row) - self.handler.on_rdata(cmd.stream_name, cmd.token, rows) + self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): self.handler.on_position(cmd.stream_name, cmd.token) @@ -644,3 +647,9 @@ metrics.register_callback( }, labels=["command", "name", "conn_id"], ) + +# number of updates received for each RDATA stream +inbound_rdata_count = metrics.register_counter( + "inbound_rdata_count", + labels=["stream_name"], +) |