diff options
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/http/send_event.py | 20 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 18 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 4 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 4 |
4 files changed, 22 insertions, 24 deletions
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index bbe2f967b7..a9baa2c1c3 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.metrics import Measure from synapse.types import Requester, UserID @@ -115,20 +114,15 @@ class ReplicationSendEventRestServlet(RestServlet): self.clock = hs.get_clock() # The responses are tiny, so we may as well cache them for a while - self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000) + self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) def on_PUT(self, request, event_id): - result = self.response_cache.get(event_id) - if not result: - result = self.response_cache.set( - event_id, - self._handle_request(request) - ) - else: - logger.warn("Returning cached response") - return make_deferred_yieldable(result) - - @preserve_fn + return self.response_cache.wrap( + event_id, + self._handle_request, + request + ) + @defer.inlineCallbacks def _handle_request(self, request): with Measure(self.clock, "repl_send_event_parse"): diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 171227cce2..12aac3cc6b 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -19,11 +19,13 @@ allowed to be sent by which side. """ import logging -import ujson as json +import simplejson logger = logging.getLogger(__name__) +_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False) + class Command(object): """The base command class. @@ -100,14 +102,14 @@ class RdataCommand(Command): return cls( stream_name, None if token == "batch" else int(token), - json.loads(row_json) + simplejson.loads(row_json) ) def to_line(self): return " ".join(( self.stream_name, str(self.token) if self.token is not None else "batch", - json.dumps(self.row), + _json_encoder.encode(self.row), )) @@ -298,10 +300,12 @@ class InvalidateCacheCommand(Command): def from_line(cls, line): cache_func, keys_json = line.split(" ", 1) - return cls(cache_func, json.loads(keys_json)) + return cls(cache_func, simplejson.loads(keys_json)) def to_line(self): - return " ".join((self.cache_func, json.dumps(self.keys))) + return " ".join(( + self.cache_func, _json_encoder.encode(self.keys), + )) class UserIpCommand(Command): @@ -325,14 +329,14 @@ class UserIpCommand(Command): def from_line(cls, line): user_id, jsn = line.split(" ", 1) - access_token, ip, user_agent, device_id, last_seen = json.loads(jsn) + access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn) return cls( user_id, access_token, ip, user_agent, device_id, last_seen ) def to_line(self): - return self.user_id + " " + json.dumps(( + return self.user_id + " " + _json_encoder.encode(( self.access_token, self.ip, self.user_agent, self.device_id, self.last_seen, )) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 0a9a290af4..d7d38464b2 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -53,12 +53,12 @@ from twisted.internet import defer from twisted.protocols.basic import LineOnlyReceiver from twisted.python.failure import Failure -from commands import ( +from .commands import ( COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS, ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand, NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand, ) -from streams import STREAMS_MAP +from .streams import STREAMS_MAP from synapse.util.stringutils import random_string from synapse.metrics.metric import CounterMetric diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 786c3fe864..a41af4fd6c 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -18,8 +18,8 @@ from twisted.internet import defer, reactor from twisted.internet.protocol import Factory -from streams import STREAMS_MAP, FederationStream -from protocol import ServerReplicationStreamProtocol +from .streams import STREAMS_MAP, FederationStream +from .protocol import ServerReplicationStreamProtocol from synapse.util.metrics import Measure, measure_func |