diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 9633404f73..0005ad5879 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,7 +19,7 @@ allowed to be sent by which side.
"""
import logging
-import simplejson as json
+import simplejson
logger = logging.getLogger(__name__)
@@ -100,14 +100,14 @@ class RdataCommand(Command):
return cls(
stream_name,
None if token == "batch" else int(token),
- json.loads(row_json)
+ simplejson.loads(row_json)
)
def to_line(self):
return " ".join((
self.stream_name,
str(self.token) if self.token is not None else "batch",
- json.dumps(self.row),
+ simplejson.dumps(self.row, namedtuple_as_object=False),
))
@@ -298,10 +298,12 @@ class InvalidateCacheCommand(Command):
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)
- return cls(cache_func, json.loads(keys_json))
+ return cls(cache_func, simplejson.loads(keys_json))
def to_line(self):
- return " ".join((self.cache_func, json.dumps(self.keys)))
+ return " ".join((
+ self.cache_func, simplejson.dumps(self.keys, namedtuple_as_object=False)
+ ))
class UserIpCommand(Command):
@@ -325,14 +327,14 @@ class UserIpCommand(Command):
def from_line(cls, line):
user_id, jsn = line.split(" ", 1)
- access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+ access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
return cls(
user_id, access_token, ip, user_agent, device_id, last_seen
)
def to_line(self):
- return self.user_id + " " + json.dumps((
+ return self.user_id + " " + simplejson.dumps((
self.access_token, self.ip, self.user_agent, self.device_id,
self.last_seen,
))
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index d59503b905..0a9a290af4 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.send_error("Wrong remote")
def on_RDATA(self, cmd):
+ stream_name = cmd.stream_name
+ inbound_rdata_count.inc(stream_name)
+
try:
- row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
+ row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",
- self.id(), cmd.stream_name, cmd.row
+ self.id(), stream_name, cmd.row
)
raise
if cmd.token is None:
# I.e. this is part of a batch of updates for this stream. Batch
# until we get an update for the stream with a non None token
- self.pending_batches.setdefault(cmd.stream_name, []).append(row)
+ self.pending_batches.setdefault(stream_name, []).append(row)
else:
# Check if this is the last of a batch of updates
- rows = self.pending_batches.pop(cmd.stream_name, [])
+ rows = self.pending_batches.pop(stream_name, [])
rows.append(row)
- self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
+ self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
self.handler.on_position(cmd.stream_name, cmd.token)
@@ -644,3 +647,9 @@ metrics.register_callback(
},
labels=["command", "name", "conn_id"],
)
+
+# number of updates received for each RDATA stream
+inbound_rdata_count = metrics.register_counter(
+ "inbound_rdata_count",
+ labels=["stream_name"],
+)
|