diff options
-rw-r--r-- | synapse/app/federation_sender.py | 4 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 4 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 19 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 4 | ||||
-rw-r--r-- | synapse/replication/tcp/streams.py | 104 |
5 files changed, 88 insertions, 47 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index cbddc80ca9..145c01f3a3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,6 @@ from daemonize import Daemonize import sys import logging import gc -import ujson as json logger = logging.getLogger("synapse.app.appservice") @@ -290,8 +289,7 @@ class FederationSenderHandler(object): # Parse the rows in the stream for row in rows: typ = row.type - content_js = row.data - content = json.loads(content_js) + content = row.data if typ == send_queue.PRESENCE_TYPE: destination = content["destination"] diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 67d9210f2a..d39e3161fe 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -62,7 +62,6 @@ import sys import logging import contextlib import gc -import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -254,9 +253,8 @@ class SynchrotronTyping(object): self._latest_room_serial = token for row in rows: - typing = json.loads(row.user_ids) self._room_serials[row.room_id] = token - self._room_typing[row.room_id] = typing + self._room_typing[row.room_id] = row.user_ids class SynchrotronApplicationService(object): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4bde66fbf8..78c852ed69 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,6 @@ from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict -import ujson metrics = synapse.metrics.get_metrics_for(__name__) @@ -258,10 +257,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, ujson.dumps({ + rows.append((key, PRESENCE_TYPE, { "destination": dest, "state": self.presence_map[user_id].as_dict(), - }))) + })) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -271,10 +270,10 @@ class FederationRemoteSendQueue(object): for (pos, (destination, edu_key)) in keyed_edus: rows.append( - (pos, KEYED_EDU_TYPE, ujson.dumps({ + (pos, KEYED_EDU_TYPE, { "key": edu_key, "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - })) + }) ) # Fetch changed edus @@ -284,7 +283,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) + rows.append((pos, EDU_TYPE, edu.get_internal_dict())) # Fetch changed failures keys = self.failures.keys() @@ -293,10 +292,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, { "destination": destination, "failure": failure, - }))) + })) # Fetch changed device messages keys = self.device_messages.keys() @@ -305,9 +304,9 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + rows.append((pos, DEVICE_MESSAGE_TYPE, { "destination": destination, - }))) + })) # Sort rows based on pos rows.sort() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d6809862e0..3b7818af5c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id import logging from collections import namedtuple -import ujson as json logger = logging.getLogger(__name__) @@ -288,8 +287,7 @@ class TypingHandler(object): for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: typing = self._room_typing[room_id] - typing_bytes = json.dumps(list(typing), ensure_ascii=False) - rows.append((serial, room_id, typing_bytes)) + rows.append((serial, room_id, list(typing))) rows.sort() return rows diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 4de4ebe84d..967b459e0e 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -36,34 +36,82 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 10000 -EventStreamRow = namedtuple("EventStreamRow", - ("event_id", "room_id", "type", "state_key", "redacts")) -BackfillStreamRow = namedtuple("BackfillStreamRow", - ("event_id", "room_id", "type", "state_key", "redacts")) -PresenceStreamRow = namedtuple("PresenceStreamRow", - ("user_id", "state", "last_active_ts", - "last_federation_update_ts", "last_user_sync_ts", - "status_msg", "currently_active")) -TypingStreamRow = namedtuple("TypingStreamRow", - ("room_id", "user_ids")) -ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", - ("room_id", "receipt_type", "user_id", "event_id", - "data")) -PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) -PushersStreamRow = namedtuple("PushersStreamRow", - ("user_id", "app_id", "pushkey", "deleted",)) -CachesStreamRow = namedtuple("CachesStreamRow", - ("cache_func", "keys", "invalidation_ts",)) -PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", - ("room_id", "visibility", "appservice_id", - "network_id",)) -DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",)) -ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) -FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",)) -TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", - ("user_id", "room_id", "data")) -AccountDataStreamRow = namedtuple("AccountDataStream", - ("user_id", "room_id", "data_type", "data")) +EventStreamRow = namedtuple("EventStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +BackfillStreamRow = namedtuple("BackfillStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +PresenceStreamRow = namedtuple("PresenceStreamRow", ( + "user_id", # str + "state", # str + "last_active_ts", # int + "last_federation_update_ts", # int + "last_user_sync_ts", # int + "status_msg", # str + "currently_active", # bool +)) +TypingStreamRow = namedtuple("TypingStreamRow", ( + "room_id", # str + "user_ids", # list(str) +)) +ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", ( + "room_id", # str + "receipt_type", # str + "user_id", # str + "event_id", # str + "data", # dict +)) +PushRulesStreamRow = namedtuple("PushRulesStreamRow", ( + "user_id", # str +)) +PushersStreamRow = namedtuple("PushersStreamRow", ( + "user_id", # str + "app_id", # str + "pushkey", # str + "deleted", # bool +)) +CachesStreamRow = namedtuple("CachesStreamRow", ( + "cache_func", # str + "keys", # list(str) + "invalidation_ts", # int +)) +PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", ( + "room_id", # str + "visibility", # str + "appservice_id", # str, optional + "network_id", # str, optional +)) +DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( + "user_id", # str + "destination", # str +)) +ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( + "entity", # str +)) +FederationStreamRow = namedtuple("FederationStreamRow", ( + "type", # str + "data", # dict +)) +TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( + "user_id", # str + "room_id", # str + "data", # dict +)) +AccountDataStreamRow = namedtuple("AccountDataStream", ( + "user_id", # str + "room_id", # str + "data_type", # str + "data", # dict +)) class Stream(object): |