summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorJeroen <vo.jeroen@gmail.com>2018-07-09 08:51:11 +0200
committerJeroen <vo.jeroen@gmail.com>2018-07-09 08:51:11 +0200
commitb5e157d895dcf182d7ffc58edf6442deeaebc3f0 (patch)
treea3f8a4549d0fe4947bef34632218b41c94202b19 /synapse/replication/tcp
parenttake idna implementation from twisted (diff)
parentAdd an isort configuration (#3463) (diff)
downloadsynapse-b5e157d895dcf182d7ffc58edf6442deeaebc3f0.tar.xz
Merge branch 'develop' into send_sni_for_federation_requests
# Conflicts:
#	synapse/http/endpoint.py
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py6
-rw-r--r--synapse/replication/tcp/commands.py16
-rw-r--r--synapse/replication/tcp/resource.py4
3 files changed, 15 insertions, 11 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6d2513c4e2..bb852b00af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -15,7 +15,7 @@
 """A replication client for use by synapse workers.
 """
 
-from twisted.internet import reactor, defer
+from twisted.internet import defer
 from twisted.internet.protocol import ReconnectingClientFactory
 
 from .commands import (
@@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
         self.server_name = hs.config.server_name
         self._clock = hs.get_clock()  # As self.clock is defined in super class
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
 
     def startedConnecting(self, connector):
         logger.info("Connecting to replication: %r", connector.getDestination())
@@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
         factory = ReplicationClientFactory(hs, client_name, self)
         host = hs.config.worker_replication_host
         port = hs.config.worker_replication_port
-        reactor.connectTCP(host, port, factory)
+        hs.get_reactor().connectTCP(host, port, factory)
 
     def on_rdata(self, stream_name, token, rows):
         """Called when we get new replication data. By default this just pokes
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 12aac3cc6b..f3908df642 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,13 +19,17 @@ allowed to be sent by which side.
 """
 
 import logging
-import simplejson
+import platform
 
+if platform.python_implementation() == "PyPy":
+    import json
+    _json_encoder = json.JSONEncoder()
+else:
+    import simplejson as json
+    _json_encoder = json.JSONEncoder(namedtuple_as_object=False)
 
 logger = logging.getLogger(__name__)
 
-_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
-
 
 class Command(object):
     """The base command class.
@@ -102,7 +106,7 @@ class RdataCommand(Command):
         return cls(
             stream_name,
             None if token == "batch" else int(token),
-            simplejson.loads(row_json)
+            json.loads(row_json)
         )
 
     def to_line(self):
@@ -300,7 +304,7 @@ class InvalidateCacheCommand(Command):
     def from_line(cls, line):
         cache_func, keys_json = line.split(" ", 1)
 
-        return cls(cache_func, simplejson.loads(keys_json))
+        return cls(cache_func, json.loads(keys_json))
 
     def to_line(self):
         return " ".join((
@@ -329,7 +333,7 @@ class UserIpCommand(Command):
     def from_line(cls, line):
         user_id, jsn = line.split(" ", 1)
 
-        access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
+        access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
 
         return cls(
             user_id, access_token, ip, user_agent, device_id, last_seen
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 63bd6d2652..95ad8c1b4c 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -15,7 +15,7 @@
 """The server side of the replication stream.
 """
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.protocol import Factory
 
 from .streams import STREAMS_MAP, FederationStream
@@ -109,7 +109,7 @@ class ReplicationStreamer(object):
         self.is_looping = False
         self.pending_updates = False
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
 
     def on_shutdown(self):
         # close all connections on shutdown