summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/federation_sender.py4
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/federation/send_queue.py19
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/replication/tcp/streams.py104
5 files changed, 88 insertions, 47 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index cbddc80ca9..145c01f3a3 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -51,7 +51,6 @@ from daemonize import Daemonize
 import sys
 import logging
 import gc
-import ujson as json
 
 logger = logging.getLogger("synapse.app.appservice")
 
@@ -290,8 +289,7 @@ class FederationSenderHandler(object):
             # Parse the rows in the stream
             for row in rows:
                 typ = row.type
-                content_js = row.data
-                content = json.loads(content_js)
+                content = row.data
 
                 if typ == send_queue.PRESENCE_TYPE:
                     destination = content["destination"]
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 67d9210f2a..d39e3161fe 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -62,7 +62,6 @@ import sys
 import logging
 import contextlib
 import gc
-import ujson as json
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
@@ -254,9 +253,8 @@ class SynchrotronTyping(object):
         self._latest_room_serial = token
 
         for row in rows:
-            typing = json.loads(row.user_ids)
             self._room_serials[row.room_id] = token
-            self._room_typing[row.room_id] = typing
+            self._room_typing[row.room_id] = row.user_ids
 
 
 class SynchrotronApplicationService(object):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 4bde66fbf8..78c852ed69 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,6 @@ from synapse.util.metrics import Measure
 import synapse.metrics
 
 from blist import sorteddict
-import ujson
 
 
 metrics = synapse.metrics.get_metrics_for(__name__)
@@ -258,10 +257,10 @@ class FederationRemoteSendQueue(object):
         )
 
         for (key, (dest, user_id)) in dest_user_ids:
-            rows.append((key, PRESENCE_TYPE, ujson.dumps({
+            rows.append((key, PRESENCE_TYPE, {
                 "destination": dest,
                 "state": self.presence_map[user_id].as_dict(),
-            })))
+            }))
 
         # Fetch changes keyed edus
         keys = self.keyed_edu_changed.keys()
@@ -271,10 +270,10 @@ class FederationRemoteSendQueue(object):
 
         for (pos, (destination, edu_key)) in keyed_edus:
             rows.append(
-                (pos, KEYED_EDU_TYPE, ujson.dumps({
+                (pos, KEYED_EDU_TYPE, {
                     "key": edu_key,
                     "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
-                }))
+                })
             )
 
         # Fetch changed edus
@@ -284,7 +283,7 @@ class FederationRemoteSendQueue(object):
         edus = set((k, self.edus[k]) for k in keys[i:j])
 
         for (pos, edu) in edus:
-            rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
+            rows.append((pos, EDU_TYPE, edu.get_internal_dict()))
 
         # Fetch changed failures
         keys = self.failures.keys()
@@ -293,10 +292,10 @@ class FederationRemoteSendQueue(object):
         failures = set((k, self.failures[k]) for k in keys[i:j])
 
         for (pos, (destination, failure)) in failures:
-            rows.append((pos, FAILURE_TYPE, ujson.dumps({
+            rows.append((pos, FAILURE_TYPE, {
                 "destination": destination,
                 "failure": failure,
-            })))
+            }))
 
         # Fetch changed device messages
         keys = self.device_messages.keys()
@@ -305,9 +304,9 @@ class FederationRemoteSendQueue(object):
         device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
 
         for (pos, destination) in device_messages:
-            rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
+            rows.append((pos, DEVICE_MESSAGE_TYPE, {
                 "destination": destination,
-            })))
+            }))
 
         # Sort rows based on pos
         rows.sort()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d6809862e0..3b7818af5c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id
 import logging
 
 from collections import namedtuple
-import ujson as json
 
 logger = logging.getLogger(__name__)
 
@@ -288,8 +287,7 @@ class TypingHandler(object):
         for room_id, serial in self._room_serials.items():
             if last_id < serial and serial <= current_id:
                 typing = self._room_typing[room_id]
-                typing_bytes = json.dumps(list(typing), ensure_ascii=False)
-                rows.append((serial, room_id, typing_bytes))
+                rows.append((serial, room_id, list(typing)))
         rows.sort()
         return rows
 
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 4de4ebe84d..967b459e0e 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -36,34 +36,82 @@ logger = logging.getLogger(__name__)
 MAX_EVENTS_BEHIND = 10000
 
 
-EventStreamRow = namedtuple("EventStreamRow",
-                            ("event_id", "room_id", "type", "state_key", "redacts"))
-BackfillStreamRow = namedtuple("BackfillStreamRow",
-                               ("event_id", "room_id", "type", "state_key", "redacts"))
-PresenceStreamRow = namedtuple("PresenceStreamRow",
-                               ("user_id", "state", "last_active_ts",
-                                "last_federation_update_ts", "last_user_sync_ts",
-                                "status_msg", "currently_active"))
-TypingStreamRow = namedtuple("TypingStreamRow",
-                             ("room_id", "user_ids"))
-ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
-                               ("room_id", "receipt_type", "user_id", "event_id",
-                                "data"))
-PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
-PushersStreamRow = namedtuple("PushersStreamRow",
-                              ("user_id", "app_id", "pushkey", "deleted",))
-CachesStreamRow = namedtuple("CachesStreamRow",
-                             ("cache_func", "keys", "invalidation_ts",))
-PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
-                                  ("room_id", "visibility", "appservice_id",
-                                   "network_id",))
-DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
-ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
-FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
-TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
-                                     ("user_id", "room_id", "data"))
-AccountDataStreamRow = namedtuple("AccountDataStream",
-                                  ("user_id", "room_id", "data_type", "data"))
+EventStreamRow = namedtuple("EventStreamRow", (
+    "event_id",  # str
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str, optional
+    "redacts",  # str, optional
+))
+BackfillStreamRow = namedtuple("BackfillStreamRow", (
+    "event_id",  # str
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str, optional
+    "redacts",  # str, optional
+))
+PresenceStreamRow = namedtuple("PresenceStreamRow", (
+    "user_id",  # str
+    "state",  # str
+    "last_active_ts",  # int
+    "last_federation_update_ts",  # int
+    "last_user_sync_ts",  # int
+    "status_msg",   # str
+    "currently_active",  # bool
+))
+TypingStreamRow = namedtuple("TypingStreamRow", (
+    "room_id",  # str
+    "user_ids",  # list(str)
+))
+ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
+    "room_id",  # str
+    "receipt_type",  # str
+    "user_id",  # str
+    "event_id",  # str
+    "data",  # dict
+))
+PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
+    "user_id",  # str
+))
+PushersStreamRow = namedtuple("PushersStreamRow", (
+    "user_id",  # str
+    "app_id",  # str
+    "pushkey",  # str
+    "deleted",  # bool
+))
+CachesStreamRow = namedtuple("CachesStreamRow", (
+    "cache_func",  # str
+    "keys",  # list(str)
+    "invalidation_ts",  # int
+))
+PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
+    "room_id",  # str
+    "visibility",  # str
+    "appservice_id",  # str, optional
+    "network_id",  # str, optional
+))
+DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
+    "user_id",  # str
+    "destination",  # str
+))
+ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
+    "entity",  # str
+))
+FederationStreamRow = namedtuple("FederationStreamRow", (
+    "type",  # str
+    "data",  # dict
+))
+TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
+    "user_id",  # str
+    "room_id",  # str
+    "data",  # dict
+))
+AccountDataStreamRow = namedtuple("AccountDataStream", (
+    "user_id",  # str
+    "room_id",  # str
+    "data_type",  # str
+    "data",  # dict
+))
 
 
 class Stream(object):