summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/devices.py23
-rw-r--r--synapse/replication/tcp/protocol.py24
-rw-r--r--synapse/replication/tcp/streams.py2
3 files changed, 29 insertions, 20 deletions
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 8206a988f7..21b8c468fa 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import six
+
 from synapse.storage import DataStore
 from synapse.storage.end_to_end_keys import EndToEndKeyStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -21,6 +23,13 @@ from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 
+def __func__(inp):
+    if six.PY3:
+        return inp
+    else:
+        return inp.__func__
+
+
 class SlavedDeviceStore(BaseSlavedStore):
     def __init__(self, db_conn, hs):
         super(SlavedDeviceStore, self).__init__(db_conn, hs)
@@ -38,14 +47,14 @@ class SlavedDeviceStore(BaseSlavedStore):
             "DeviceListFederationStreamChangeCache", device_list_max,
         )
 
-    get_device_stream_token = DataStore.get_device_stream_token.__func__
-    get_user_whose_devices_changed = DataStore.get_user_whose_devices_changed.__func__
-    get_devices_by_remote = DataStore.get_devices_by_remote.__func__
-    _get_devices_by_remote_txn = DataStore._get_devices_by_remote_txn.__func__
-    _get_e2e_device_keys_txn = DataStore._get_e2e_device_keys_txn.__func__
-    mark_as_sent_devices_by_remote = DataStore.mark_as_sent_devices_by_remote.__func__
+    get_device_stream_token = __func__(DataStore.get_device_stream_token)
+    get_user_whose_devices_changed = __func__(DataStore.get_user_whose_devices_changed)
+    get_devices_by_remote = __func__(DataStore.get_devices_by_remote)
+    _get_devices_by_remote_txn = __func__(DataStore._get_devices_by_remote_txn)
+    _get_e2e_device_keys_txn = __func__(DataStore._get_e2e_device_keys_txn)
+    mark_as_sent_devices_by_remote = __func__(DataStore.mark_as_sent_devices_by_remote)
     _mark_as_sent_devices_by_remote_txn = (
-        DataStore._mark_as_sent_devices_by_remote_txn.__func__
+        __func__(DataStore._mark_as_sent_devices_by_remote_txn)
     )
     count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]
 
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 74e892c104..5dc7b3fffc 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -590,9 +590,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 pending_commands = LaterGauge(
     "synapse_replication_tcp_protocol_pending_commands",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
+        (p.name,): len(p.pending_commands) for p in connected_connections
     },
 )
 
@@ -607,9 +607,9 @@ def transport_buffer_size(protocol):
 transport_send_buffer = LaterGauge(
     "synapse_replication_tcp_protocol_transport_send_buffer",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
+        (p.name,): transport_buffer_size(p) for p in connected_connections
     },
 )
 
@@ -632,9 +632,9 @@ def transport_kernel_read_buffer_size(protocol, read=True):
 tcp_transport_kernel_send_buffer = LaterGauge(
     "synapse_replication_tcp_protocol_transport_kernel_send_buffer",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
+        (p.name,): transport_kernel_read_buffer_size(p, False)
         for p in connected_connections
     },
 )
@@ -643,9 +643,9 @@ tcp_transport_kernel_send_buffer = LaterGauge(
 tcp_transport_kernel_read_buffer = LaterGauge(
     "synapse_replication_tcp_protocol_transport_kernel_read_buffer",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
+        (p.name,): transport_kernel_read_buffer_size(p, True)
         for p in connected_connections
     },
 )
@@ -654,9 +654,9 @@ tcp_transport_kernel_read_buffer = LaterGauge(
 tcp_inbound_commands = LaterGauge(
     "synapse_replication_tcp_protocol_inbound_commands",
     "",
-    ["command", "name", "conn_id"],
+    ["command", "name"],
     lambda: {
-        (k[0], p.name, p.conn_id): count
+        (k[0], p.name,): count
         for p in connected_connections
         for k, count in iteritems(p.inbound_commands_counter)
     },
@@ -665,9 +665,9 @@ tcp_inbound_commands = LaterGauge(
 tcp_outbound_commands = LaterGauge(
     "synapse_replication_tcp_protocol_outbound_commands",
     "",
-    ["command", "name", "conn_id"],
+    ["command", "name"],
     lambda: {
-        (k[0], p.name, p.conn_id): count
+        (k[0], p.name,): count
         for p in connected_connections
         for k, count in iteritems(p.outbound_commands_counter)
     },
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 55fe701c5c..c1e626be3f 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -196,7 +196,7 @@ class Stream(object):
             )
 
             if len(rows) >= MAX_EVENTS_BEHIND:
-                raise Exception("stream %s has fallen behined" % (self.NAME))
+                raise Exception("stream %s has fallen behind" % (self.NAME))
         else:
             rows = yield self.update_function(
                 from_token, current_token,