diff options
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/commands.py | 16 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 19 |
2 files changed, 23 insertions, 12 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 9633404f73..0005ad5879 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -19,7 +19,7 @@ allowed to be sent by which side. """ import logging -import simplejson as json +import simplejson logger = logging.getLogger(__name__) @@ -100,14 +100,14 @@ class RdataCommand(Command): return cls( stream_name, None if token == "batch" else int(token), - json.loads(row_json) + simplejson.loads(row_json) ) def to_line(self): return " ".join(( self.stream_name, str(self.token) if self.token is not None else "batch", - json.dumps(self.row), + simplejson.dumps(self.row, namedtuple_as_object=False), )) @@ -298,10 +298,12 @@ class InvalidateCacheCommand(Command): def from_line(cls, line): cache_func, keys_json = line.split(" ", 1) - return cls(cache_func, json.loads(keys_json)) + return cls(cache_func, simplejson.loads(keys_json)) def to_line(self): - return " ".join((self.cache_func, json.dumps(self.keys))) + return " ".join(( + self.cache_func, simplejson.dumps(self.keys, namedtuple_as_object=False) + )) class UserIpCommand(Command): @@ -325,14 +327,14 @@ class UserIpCommand(Command): def from_line(cls, line): user_id, jsn = line.split(" ", 1) - access_token, ip, user_agent, device_id, last_seen = json.loads(jsn) + access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn) return cls( user_id, access_token, ip, user_agent, device_id, last_seen ) def to_line(self): - return self.user_id + " " + json.dumps(( + return self.user_id + " " + simplejson.dumps(( self.access_token, self.ip, self.user_agent, self.device_id, self.last_seen, )) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d59503b905..0a9a290af4 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.send_error("Wrong remote") def on_RDATA(self, cmd): + stream_name = cmd.stream_name + inbound_rdata_count.inc(stream_name) + try: - row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row) + row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) except Exception: logger.exception( "[%s] Failed to parse RDATA: %r %r", - self.id(), cmd.stream_name, cmd.row + self.id(), stream_name, cmd.row ) raise if cmd.token is None: # I.e. this is part of a batch of updates for this stream. Batch # until we get an update for the stream with a non None token - self.pending_batches.setdefault(cmd.stream_name, []).append(row) + self.pending_batches.setdefault(stream_name, []).append(row) else: # Check if this is the last of a batch of updates - rows = self.pending_batches.pop(cmd.stream_name, []) + rows = self.pending_batches.pop(stream_name, []) rows.append(row) - self.handler.on_rdata(cmd.stream_name, cmd.token, rows) + self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): self.handler.on_position(cmd.stream_name, cmd.token) @@ -644,3 +647,9 @@ metrics.register_callback( }, labels=["command", "name", "conn_id"], ) + +# number of updates received for each RDATA stream +inbound_rdata_count = metrics.register_counter( + "inbound_rdata_count", + labels=["stream_name"], +) |