diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 07fccdd8f9..a5454556cc 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -19,7 +19,10 @@ from six import string_types
from frozendict import frozendict
+from twisted.internet import defer
+
from synapse.api.constants import EventTypes
+from synapse.util.async_helpers import yieldable_gather_results
from . import EventBase
@@ -311,3 +314,44 @@ def serialize_event(e, time_now_ms, as_client_event=True,
d = only_fields(d, only_event_fields)
return d
+
+
+class EventClientSerializer(object):
+ """Serializes events that are to be sent to clients.
+
+ This is used for bundling extra information with any events to be sent to
+ clients.
+ """
+
+ def __init__(self, hs):
+ pass
+
+ def serialize_event(self, event, time_now, **kwargs):
+ """Serializes a single event.
+
+ Args:
+ event (EventBase)
+ time_now (int): The current time in milliseconds
+ **kwargs: Arguments to pass to `serialize_event`
+
+ Returns:
+ Deferred[dict]: The serialized event
+ """
+ event = serialize_event(event, time_now, **kwargs)
+ return defer.succeed(event)
+
+ def serialize_events(self, events, time_now, **kwargs):
+ """Serializes multiple events.
+
+ Args:
+ event (iter[EventBase])
+ time_now (int): The current time in milliseconds
+ **kwargs: Arguments to pass to `serialize_event`
+
+ Returns:
+ Deferred[list[dict]]: The list of serialized events
+ """
+ return yieldable_gather_results(
+ self.serialize_event, events,
+ time_now=time_now, **kwargs
+ )
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1b4d8c74ae..6003ad9cca 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -21,7 +21,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
-from synapse.events.utils import serialize_event
from synapse.types import UserID
from synapse.util.logutils import log_function
from synapse.visibility import filter_events_for_client
@@ -50,6 +49,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
@log_function
@@ -120,9 +120,9 @@ class EventStreamHandler(BaseHandler):
time_now = self.clock.time_msec()
- chunks = [
- serialize_event(e, time_now, as_client_event) for e in events
- ]
+ chunks = yield self._event_serializer.serialize_events(
+ events, time_now, as_client_event=as_client_event,
+ )
chunk = {
"chunk": chunks,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 7dfae78db0..aaee5db0b7 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
@@ -43,6 +42,7 @@ class InitialSyncHandler(BaseHandler):
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()
+ self._event_serializer = hs.get_event_client_serializer()
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
as_client_event=True, include_archived=False):
@@ -138,7 +138,9 @@ class InitialSyncHandler(BaseHandler):
d["inviter"] = event.sender
invite_event = yield self.store.get_event(event.event_id)
- d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+ d["invite"] = yield self._event_serializer.serialize_event(
+ invite_event, time_now, as_client_event,
+ )
rooms_ret.append(d)
@@ -185,18 +187,21 @@ class InitialSyncHandler(BaseHandler):
time_now = self.clock.time_msec()
d["messages"] = {
- "chunk": [
- serialize_event(m, time_now, as_client_event)
- for m in messages
- ],
+ "chunk": (
+ yield self._event_serializer.serialize_events(
+ messages, time_now=time_now,
+ as_client_event=as_client_event,
+ )
+ ),
"start": start_token.to_string(),
"end": end_token.to_string(),
}
- d["state"] = [
- serialize_event(c, time_now, as_client_event)
- for c in current_state.values()
- ]
+ d["state"] = yield self._event_serializer.serialize_events(
+ current_state.values(),
+ time_now=time_now,
+ as_client_event=as_client_event
+ )
account_data_events = []
tags = tags_by_room.get(event.room_id)
@@ -337,11 +342,15 @@ class InitialSyncHandler(BaseHandler):
"membership": membership,
"room_id": room_id,
"messages": {
- "chunk": [serialize_event(m, time_now) for m in messages],
+ "chunk": (yield self._event_serializer.serialize_events(
+ messages, time_now,
+ )),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
- "state": [serialize_event(s, time_now) for s in room_state.values()],
+ "state": (yield self._event_serializer.serialize_events(
+ room_state.values(), time_now,
+ )),
"presence": [],
"receipts": [],
})
@@ -355,10 +364,9 @@ class InitialSyncHandler(BaseHandler):
# TODO: These concurrently
time_now = self.clock.time_msec()
- state = [
- serialize_event(x, time_now)
- for x in current_state.values()
- ]
+ state = yield self._event_serializer.serialize_events(
+ current_state.values(), time_now,
+ )
now_token = yield self.hs.get_event_sources().get_current_token()
@@ -425,7 +433,9 @@ class InitialSyncHandler(BaseHandler):
ret = {
"room_id": room_id,
"messages": {
- "chunk": [serialize_event(m, time_now) for m in messages],
+ "chunk": (yield self._event_serializer.serialize_events(
+ messages, time_now,
+ )),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e5afeadf68..7b2c33a922 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -32,7 +32,6 @@ from synapse.api.errors import (
)
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
-from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
@@ -57,6 +56,7 @@ class MessageHandler(object):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.store = hs.get_datastore()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -164,9 +164,10 @@ class MessageHandler(object):
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
- defer.returnValue(
- [serialize_event(c, now) for c in room_state.values()]
+ events = yield self._event_serializer.serialize_events(
+ room_state.values(), now,
)
+ defer.returnValue(events)
@defer.inlineCallbacks
def get_joined_members(self, requester, room_id):
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e4fdae9266..8f811e24fe 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -20,7 +20,6 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
-from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
@@ -78,6 +77,7 @@ class PaginationHandler(object):
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
+ self._event_serializer = hs.get_event_client_serializer()
def start_purge_history(self, room_id, token,
delete_local_events=False):
@@ -278,18 +278,22 @@ class PaginationHandler(object):
time_now = self.clock.time_msec()
chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
+ "chunk": (
+ yield self._event_serializer.serialize_events(
+ events, time_now,
+ as_client_event=as_client_event,
+ )
+ ),
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
if state:
- chunk["state"] = [
- serialize_event(e, time_now, as_client_event)
- for e in state
- ]
+ chunk["state"] = (
+ yield self._event_serializer.serialize_events(
+ state, time_now,
+ as_client_event=as_client_event,
+ )
+ )
defer.returnValue(chunk)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 49c439313e..9bba74d6c9 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -23,7 +23,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
-from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client
@@ -36,6 +35,7 @@ class SearchHandler(BaseHandler):
def __init__(self, hs):
super(SearchHandler, self).__init__(hs)
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
@@ -401,14 +401,16 @@ class SearchHandler(BaseHandler):
time_now = self.clock.time_msec()
for context in contexts.values():
- context["events_before"] = [
- serialize_event(e, time_now)
- for e in context["events_before"]
- ]
- context["events_after"] = [
- serialize_event(e, time_now)
- for e in context["events_after"]
- ]
+ context["events_before"] = (
+ yield self._event_serializer.serialize_events(
+ context["events_before"], time_now,
+ )
+ )
+ context["events_after"] = (
+ yield self._event_serializer.serialize_events(
+ context["events_after"], time_now,
+ )
+ )
state_results = {}
if include_state:
@@ -422,14 +424,13 @@ class SearchHandler(BaseHandler):
# We're now about to serialize the events. We should not make any
# blocking calls after this. Otherwise the 'age' will be wrong
- results = [
- {
+ results = []
+ for e in allowed_events:
+ results.append({
"rank": rank_map[e.event_id],
- "result": serialize_event(e, time_now),
+ "result": (yield self._event_serializer.serialize_event(e, time_now)),
"context": contexts.get(e.event_id, {}),
- }
- for e in allowed_events
- ]
+ })
rooms_cat_res = {
"results": results,
@@ -438,10 +439,13 @@ class SearchHandler(BaseHandler):
}
if state_results:
- rooms_cat_res["state"] = {
- room_id: [serialize_event(e, time_now) for e in state]
- for room_id, state in state_results.items()
- }
+ s = {}
+ for room_id, state in state_results.items():
+ s[room_id] = yield self._event_serializer.serialize_events(
+ state, time_now,
+ )
+
+ rooms_cat_res["state"] = s
if room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index cd9b3bdbd1..c3b0a39ab7 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -19,7 +19,6 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
-from synapse.events.utils import serialize_event
from synapse.streams.config import PaginationConfig
from .base import ClientV1RestServlet, client_path_patterns
@@ -84,6 +83,7 @@ class EventRestServlet(ClientV1RestServlet):
super(EventRestServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def on_GET(self, request, event_id):
@@ -92,7 +92,8 @@ class EventRestServlet(ClientV1RestServlet):
time_now = self.clock.time_msec()
if event:
- defer.returnValue((200, serialize_event(event, time_now)))
+ event = yield self._event_serializer.serialize_event(event, time_now)
+ defer.returnValue((200, event))
else:
defer.returnValue((404, "Event not found."))
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index fab04965cb..255a85c588 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.api.filtering import Filter
-from synapse.events.utils import format_event_for_client_v2, serialize_event
+from synapse.events.utils import format_event_for_client_v2
from synapse.http.servlet import (
assert_params_in_dict,
parse_integer,
@@ -537,6 +537,7 @@ class RoomEventServlet(ClientV1RestServlet):
super(RoomEventServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@@ -545,7 +546,8 @@ class RoomEventServlet(ClientV1RestServlet):
time_now = self.clock.time_msec()
if event:
- defer.returnValue((200, serialize_event(event, time_now)))
+ event = yield self._event_serializer.serialize_event(event, time_now)
+ defer.returnValue((200, event))
else:
defer.returnValue((404, "Event not found."))
@@ -559,6 +561,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@@ -588,16 +591,18 @@ class RoomEventContextServlet(ClientV1RestServlet):
)
time_now = self.clock.time_msec()
- results["events_before"] = [
- serialize_event(event, time_now) for event in results["events_before"]
- ]
- results["event"] = serialize_event(results["event"], time_now)
- results["events_after"] = [
- serialize_event(event, time_now) for event in results["events_after"]
- ]
- results["state"] = [
- serialize_event(event, time_now) for event in results["state"]
- ]
+ results["events_before"] = yield self._event_serializer.serialize_events(
+ results["events_before"], time_now,
+ )
+ results["event"] = yield self._event_serializer.serialize_event(
+ results["event"], time_now,
+ )
+ results["events_after"] = yield self._event_serializer.serialize_events(
+ results["events_after"], time_now,
+ )
+ results["state"] = yield self._event_serializer.serialize_events(
+ results["state"], time_now,
+ )
defer.returnValue((200, results))
diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py
index 2a6ea3df5f..0a1eb0ae45 100644
--- a/synapse/rest/client/v2_alpha/notifications.py
+++ b/synapse/rest/client/v2_alpha/notifications.py
@@ -17,10 +17,7 @@ import logging
from twisted.internet import defer
-from synapse.events.utils import (
- format_event_for_client_v2_without_room_id,
- serialize_event,
-)
+from synapse.events.utils import format_event_for_client_v2_without_room_id
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from ._base import client_v2_patterns
@@ -36,6 +33,7 @@ class NotificationsServlet(RestServlet):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def on_GET(self, request):
@@ -69,11 +67,11 @@ class NotificationsServlet(RestServlet):
"profile_tag": pa["profile_tag"],
"actions": pa["actions"],
"ts": pa["received_ts"],
- "event": serialize_event(
+ "event": (yield self._event_serializer.serialize_event(
notif_events[pa["event_id"]],
self.clock.time_msec(),
event_format=format_event_for_client_v2_without_room_id,
- ),
+ )),
}
if pa["room_id"] not in receipts_by_room:
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 39d157a44b..078d65969a 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -26,7 +26,6 @@ from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
from synapse.events.utils import (
format_event_for_client_v2_without_room_id,
format_event_raw,
- serialize_event,
)
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.sync import SyncConfig
@@ -86,6 +85,7 @@ class SyncRestServlet(RestServlet):
self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler()
self._server_notices_sender = hs.get_server_notices_sender()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def on_GET(self, request):
@@ -168,14 +168,14 @@ class SyncRestServlet(RestServlet):
)
time_now = self.clock.time_msec()
- response_content = self.encode_response(
+ response_content = yield self.encode_response(
time_now, sync_result, requester.access_token_id, filter
)
defer.returnValue((200, response_content))
- @staticmethod
- def encode_response(time_now, sync_result, access_token_id, filter):
+ @defer.inlineCallbacks
+ def encode_response(self, time_now, sync_result, access_token_id, filter):
if filter.event_format == 'client':
event_formatter = format_event_for_client_v2_without_room_id
elif filter.event_format == 'federation':
@@ -183,18 +183,18 @@ class SyncRestServlet(RestServlet):
else:
raise Exception("Unknown event format %s" % (filter.event_format, ))
- joined = SyncRestServlet.encode_joined(
+ joined = yield self.encode_joined(
sync_result.joined, time_now, access_token_id,
filter.event_fields,
event_formatter,
)
- invited = SyncRestServlet.encode_invited(
+ invited = yield self.encode_invited(
sync_result.invited, time_now, access_token_id,
event_formatter,
)
- archived = SyncRestServlet.encode_archived(
+ archived = yield self.encode_archived(
sync_result.archived, time_now, access_token_id,
filter.event_fields,
event_formatter,
@@ -239,8 +239,8 @@ class SyncRestServlet(RestServlet):
]
}
- @staticmethod
- def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
+ @defer.inlineCallbacks
+ def encode_joined(self, rooms, time_now, token_id, event_fields, event_formatter):
"""
Encode the joined rooms in a sync result
@@ -261,15 +261,15 @@ class SyncRestServlet(RestServlet):
"""
joined = {}
for room in rooms:
- joined[room.room_id] = SyncRestServlet.encode_room(
+ joined[room.room_id] = yield self.encode_room(
room, time_now, token_id, joined=True, only_fields=event_fields,
event_formatter=event_formatter,
)
return joined
- @staticmethod
- def encode_invited(rooms, time_now, token_id, event_formatter):
+ @defer.inlineCallbacks
+ def encode_invited(self, rooms, time_now, token_id, event_formatter):
"""
Encode the invited rooms in a sync result
@@ -289,7 +289,7 @@ class SyncRestServlet(RestServlet):
"""
invited = {}
for room in rooms:
- invite = serialize_event(
+ invite = yield self._event_serializer.serialize_event(
room.invite, time_now, token_id=token_id,
event_format=event_formatter,
is_invite=True,
@@ -304,8 +304,8 @@ class SyncRestServlet(RestServlet):
return invited
- @staticmethod
- def encode_archived(rooms, time_now, token_id, event_fields, event_formatter):
+ @defer.inlineCallbacks
+ def encode_archived(self, rooms, time_now, token_id, event_fields, event_formatter):
"""
Encode the archived rooms in a sync result
@@ -326,7 +326,7 @@ class SyncRestServlet(RestServlet):
"""
joined = {}
for room in rooms:
- joined[room.room_id] = SyncRestServlet.encode_room(
+ joined[room.room_id] = yield self.encode_room(
room, time_now, token_id, joined=False,
only_fields=event_fields,
event_formatter=event_formatter,
@@ -334,9 +334,9 @@ class SyncRestServlet(RestServlet):
return joined
- @staticmethod
+ @defer.inlineCallbacks
def encode_room(
- room, time_now, token_id, joined,
+ self, room, time_now, token_id, joined,
only_fields, event_formatter,
):
"""
@@ -355,9 +355,10 @@ class SyncRestServlet(RestServlet):
Returns:
dict[str, object]: the room, encoded in our response format
"""
- def serialize(event):
- return serialize_event(
- event, time_now, token_id=token_id,
+ def serialize(events):
+ return self._event_serializer.serialize_events(
+ events, time_now=time_now,
+ token_id=token_id,
event_format=event_formatter,
only_event_fields=only_fields,
)
@@ -376,8 +377,8 @@ class SyncRestServlet(RestServlet):
event.event_id, room.room_id, event.room_id,
)
- serialized_state = [serialize(e) for e in state_events]
- serialized_timeline = [serialize(e) for e in timeline_events]
+ serialized_state = yield serialize(state_events)
+ serialized_timeline = yield serialize(timeline_events)
account_data = room.account_data
diff --git a/synapse/server.py b/synapse/server.py
index 8c30ac2fa5..80d40b9272 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -35,6 +35,7 @@ from synapse.crypto import context_factory
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker
+from synapse.events.utils import EventClientSerializer
from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
@@ -185,6 +186,7 @@ class HomeServer(object):
'sendmail',
'registration_handler',
'account_validity_handler',
+ 'event_client_serializer',
]
REQUIRED_ON_MASTER_STARTUP = [
@@ -511,6 +513,9 @@ class HomeServer(object):
def build_account_validity_handler(self):
return AccountValidityHandler(self)
+ def build_event_client_serializer(self):
+ return EventClientSerializer(self)
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 2f16f23d91..9a17dfdab2 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -156,6 +156,25 @@ def concurrently_execute(func, args, limit):
], consumeErrors=True)).addErrback(unwrapFirstError)
+def yieldable_gather_results(func, iter, *args, **kwargs):
+ """Executes the function with each argument concurrently.
+
+ Args:
+ func (func): Function to execute that returns a Deferred
+ iter (iter): An iterable that yields items that get passed as the first
+ argument to the function
+ *args: Arguments to be passed to each call to func
+
+ Returns
+ Deferred: Resolved when all functions have been invoked, or errors if
+ one of the function calls fails.
+ """
+ return logcontext.make_deferred_yieldable(defer.gatherResults([
+ run_in_background(func, item, *args, **kwargs)
+ for item in iter
+ ], consumeErrors=True)).addErrback(unwrapFirstError)
+
+
class Linearizer(object):
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few things happen at a time on a given resource.
|