diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e7a07967d8..465b2a19bc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -670,14 +670,14 @@ class EventCreationHandler(object):
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
if event.is_state():
- prev_state = await self.deduplicate_state_event(event, context)
- if prev_state is not None:
+ prev_event = await self.deduplicate_state_event(event, context)
+ if prev_event is not None:
logger.info(
"Not bothering to persist state event %s duplicated by %s",
event.event_id,
- prev_state.event_id,
+ prev_event.event_id,
)
- return prev_state
+ return await self.store.get_stream_id_for_event(prev_event.event_id)
return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
@@ -685,27 +685,32 @@ class EventCreationHandler(object):
async def deduplicate_state_event(
self, event: EventBase, context: EventContext
- ) -> None:
+ ) -> Optional[EventBase]:
"""
Checks whether event is in the latest resolved state in context.
- If so, returns the version of the event in context.
- Otherwise, returns None.
+ Args:
+ event: The event to check for duplication.
+ context: The event context.
+
+ Returns:
+ The previous verion of the event is returned, if it is found in the
+ event context. Otherwise, None is returned.
"""
prev_state_ids = await context.get_prev_state_ids()
prev_event_id = prev_state_ids.get((event.type, event.state_key))
if not prev_event_id:
- return
+ return None
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
- return
+ return None
if prev_event and event.user_id == prev_event.user_id:
prev_content = encode_canonical_json(prev_event.content)
next_content = encode_canonical_json(event.content)
if prev_content == next_content:
return prev_event
- return
+ return None
async def create_and_send_nonmember_event(
self,
diff --git a/synapse/http/server.py b/synapse/http/server.py
index ffe6cfa09e..37fdf14405 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -22,12 +22,13 @@ import types
import urllib
from http import HTTPStatus
from io import BytesIO
-from typing import Any, Callable, Dict, Tuple, Union
+from typing import Any, Callable, Dict, Iterator, List, Tuple, Union
import jinja2
-from canonicaljson import encode_canonical_json, encode_pretty_printed_json
+from canonicaljson import iterencode_canonical_json, iterencode_pretty_printed_json
+from zope.interface import implementer
-from twisted.internet import defer
+from twisted.internet import defer, interfaces
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
@@ -499,6 +500,78 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
pass
+@implementer(interfaces.IPullProducer)
+class _ByteProducer:
+ """
+ Iteratively write bytes to the request.
+ """
+
+ # The minimum number of bytes for each chunk. Note that the last chunk will
+ # usually be smaller than this.
+ min_chunk_size = 1024
+
+ def __init__(
+ self, request: Request, iterator: Iterator[bytes],
+ ):
+ self._request = request
+ self._iterator = iterator
+
+ def start(self) -> None:
+ self._request.registerProducer(self, False)
+
+ def _send_data(self, data: List[bytes]) -> None:
+ """
+ Send a list of strings as a response to the request.
+ """
+ if not data:
+ return
+ self._request.write(b"".join(data))
+
+ def resumeProducing(self) -> None:
+ # We've stopped producing in the meantime (note that this might be
+ # re-entrant after calling write).
+ if not self._request:
+ return
+
+ # Get the next chunk and write it to the request.
+ #
+ # The output of the JSON encoder is coalesced until min_chunk_size is
+ # reached. (This is because JSON encoders produce a very small output
+ # per iteration.)
+ #
+ # Note that buffer stores a list of bytes (instead of appending to
+ # bytes) to hopefully avoid many allocations.
+ buffer = []
+ buffered_bytes = 0
+ while buffered_bytes < self.min_chunk_size:
+ try:
+ data = next(self._iterator)
+ buffer.append(data)
+ buffered_bytes += len(data)
+ except StopIteration:
+ # The entire JSON object has been serialized, write any
+ # remaining data, finalize the producer and the request, and
+ # clean-up any references.
+ self._send_data(buffer)
+ self._request.unregisterProducer()
+ self._request.finish()
+ self.stopProducing()
+ return
+
+ self._send_data(buffer)
+
+ def stopProducing(self) -> None:
+ self._request = None
+
+
+def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
+ """
+ Encode an object into JSON. Returns an iterator of bytes.
+ """
+ for chunk in json_encoder.iterencode(json_object):
+ yield chunk.encode("utf-8")
+
+
def respond_with_json(
request: Request,
code: int,
@@ -533,15 +606,23 @@ def respond_with_json(
return None
if pretty_print:
- json_bytes = encode_pretty_printed_json(json_object) + b"\n"
+ encoder = iterencode_pretty_printed_json
else:
if canonical_json or synapse.events.USE_FROZEN_DICTS:
- # canonicaljson already encodes to bytes
- json_bytes = encode_canonical_json(json_object)
+ encoder = iterencode_canonical_json
else:
- json_bytes = json_encoder.encode(json_object).encode("utf-8")
+ encoder = _encode_json_bytes
+
+ request.setResponseCode(code)
+ request.setHeader(b"Content-Type", b"application/json")
+ request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
- return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors)
+ if send_cors:
+ set_cors_headers(request)
+
+ producer = _ByteProducer(request, encoder(json_object))
+ producer.start()
+ return NOT_DONE_YET
def respond_with_json_bytes(
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 3250d41dde..dd77a44b8d 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -43,7 +43,7 @@ REQUIREMENTS = [
"jsonschema>=2.5.1",
"frozendict>=1",
"unpaddedbase64>=1.1.0",
- "canonicaljson>=1.2.0",
+ "canonicaljson>=1.3.0",
# we use the type definitions added in signedjson 1.1.
"signedjson>=1.1.0",
"pynacl>=1.2.1",
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index 9b3f85b306..e266204f95 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -15,12 +15,12 @@
import logging
from typing import Dict, Set
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import json
from signedjson.sign import sign_json
from synapse.api.errors import Codes, SynapseError
from synapse.crypto.keyring import ServerKeyFetcher
-from synapse.http.server import DirectServeJsonResource, respond_with_json_bytes
+from synapse.http.server import DirectServeJsonResource, respond_with_json
from synapse.http.servlet import parse_integer, parse_json_object_from_request
logger = logging.getLogger(__name__)
@@ -223,4 +223,4 @@ class RemoteKey(DirectServeJsonResource):
results = {"server_keys": signed_keys}
- respond_with_json_bytes(request, 200, encode_canonical_json(results))
+ respond_with_json(request, 200, results, canonical_json=True)
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 8ccfb8fc46..4377bddb8c 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -582,6 +582,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
return "t%d-%d" % (topo, token)
+ async def get_stream_id_for_event(self, event_id: str) -> int:
+ """The stream ID for an event
+ Args:
+ event_id: The id of the event to look up a stream token for.
+ Raises:
+ StoreError if the event wasn't in the database.
+ Returns:
+ A stream ID.
+ """
+ return await self.db_pool.simple_select_one_onecol(
+ table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering"
+ )
+
async def get_stream_token_for_event(self, event_id: str) -> str:
"""The stream token for an event
Args:
@@ -591,10 +604,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Returns:
A "s%d" stream token.
"""
- row = await self.db_pool.simple_select_one_onecol(
- table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering"
- )
- return "s%d" % (row,)
+ stream_id = await self.get_stream_id_for_event(event_id)
+ return "s%d" % (stream_id,)
async def get_topological_token_for_event(self, event_id: str) -> str:
"""The stream token for an event
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 2e2b40a426..61d96a6c28 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -24,9 +24,7 @@ from synapse.api.errors import Codes, SynapseError
_string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
# https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-register-email-requesttoken
-# Note: The : character is allowed here for older clients, but will be removed in a
-# future release. Context: https://github.com/matrix-org/synapse/issues/6766
-client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-\:]+$")
+client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-]+$")
# random_string and random_string_with_symbols are used for a range of things,
# some cryptographically important, some less so. We use SystemRandom to make sure
|