summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorRichard van der Hoff <github@rvanderhoff.org.uk>2018-01-16 13:13:53 +0000
committerGitHub <noreply@github.com>2018-01-16 13:13:53 +0000
commit4a31a61ef9cbff3f61d9b86cce0446948658fb80 (patch)
treeb62d9034c546983b9e229257b469474dd1861637 /synapse/replication/tcp/protocol.py
parentMerge pull request #2767 from matrix-org/erikj/media_storage_refactor (diff)
parentMetrics for number of RDATA commands received (diff)
downloadsynapse-4a31a61ef9cbff3f61d9b86cce0446948658fb80.tar.xz
Merge pull request #2786 from matrix-org/rav/rdata_metrics
Metrics for number of RDATA commands received
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py19
1 files changed, 14 insertions, 5 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index d59503b905..0a9a290af4 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             self.send_error("Wrong remote")
 
     def on_RDATA(self, cmd):
+        stream_name = cmd.stream_name
+        inbound_rdata_count.inc(stream_name)
+
         try:
-            row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
+            row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
         except Exception:
             logger.exception(
                 "[%s] Failed to parse RDATA: %r %r",
-                self.id(), cmd.stream_name, cmd.row
+                self.id(), stream_name, cmd.row
             )
             raise
 
         if cmd.token is None:
             # I.e. this is part of a batch of updates for this stream. Batch
             # until we get an update for the stream with a non None token
-            self.pending_batches.setdefault(cmd.stream_name, []).append(row)
+            self.pending_batches.setdefault(stream_name, []).append(row)
         else:
             # Check if this is the last of a batch of updates
-            rows = self.pending_batches.pop(cmd.stream_name, [])
+            rows = self.pending_batches.pop(stream_name, [])
             rows.append(row)
 
-            self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
+            self.handler.on_rdata(stream_name, cmd.token, rows)
 
     def on_POSITION(self, cmd):
         self.handler.on_position(cmd.stream_name, cmd.token)
@@ -644,3 +647,9 @@ metrics.register_callback(
     },
     labels=["command", "name", "conn_id"],
 )
+
+# number of updates received for each RDATA stream
+inbound_rdata_count = metrics.register_counter(
+    "inbound_rdata_count",
+    labels=["stream_name"],
+)