summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/filtering.py1
-rw-r--r--synapse/appservice/api.py13
-rw-r--r--synapse/federation/federation_server.py10
-rw-r--r--synapse/federation/transaction_queue.py12
-rw-r--r--synapse/handlers/auth.py8
-rw-r--r--synapse/handlers/e2e_keys.py5
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/search.py14
-rw-r--r--synapse/handlers/sync.py22
-rw-r--r--synapse/http/client.py82
-rw-r--r--synapse/http/matrixfederationclient.py204
-rw-r--r--synapse/http/site.py4
-rw-r--r--synapse/python_dependencies.py5
-rw-r--r--synapse/replication/tcp/protocol.py24
-rw-r--r--synapse/rest/client/v2_alpha/account.py16
-rw-r--r--synapse/rest/client/v2_alpha/register.py12
-rw-r--r--synapse/rest/client/v2_alpha/sync.py51
-rw-r--r--synapse/storage/monthly_active_users.py6
19 files changed, 251 insertions, 246 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 186831e118..a31a9a17e0 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -251,6 +251,7 @@ class FilterCollection(object):
             "include_leave", False
         )
         self.event_fields = filter_json.get("event_fields", [])
+        self.event_format = filter_json.get("event_format", "client")
 
     def __repr__(self):
         return "<FilterCollection %s>" % (json.dumps(self._filter_json),)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 6980e5890e..9ccc5a80fc 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -13,7 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import urllib
+
+from six.moves import urllib
 
 from prometheus_client import Counter
 
@@ -98,7 +99,7 @@ class ApplicationServiceApi(SimpleHttpClient):
     def query_user(self, service, user_id):
         if service.url is None:
             defer.returnValue(False)
-        uri = service.url + ("/users/%s" % urllib.quote(user_id))
+        uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
         response = None
         try:
             response = yield self.get_json(uri, {
@@ -119,7 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient):
     def query_alias(self, service, alias):
         if service.url is None:
             defer.returnValue(False)
-        uri = service.url + ("/rooms/%s" % urllib.quote(alias))
+        uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
         response = None
         try:
             response = yield self.get_json(uri, {
@@ -153,7 +154,7 @@ class ApplicationServiceApi(SimpleHttpClient):
             service.url,
             APP_SERVICE_PREFIX,
             kind,
-            urllib.quote(protocol)
+            urllib.parse.quote(protocol)
         )
         try:
             response = yield self.get_json(uri, fields)
@@ -188,7 +189,7 @@ class ApplicationServiceApi(SimpleHttpClient):
             uri = "%s%s/thirdparty/protocol/%s" % (
                 service.url,
                 APP_SERVICE_PREFIX,
-                urllib.quote(protocol)
+                urllib.parse.quote(protocol)
             )
             try:
                 info = yield self.get_json(uri, {})
@@ -228,7 +229,7 @@ class ApplicationServiceApi(SimpleHttpClient):
         txn_id = str(txn_id)
 
         uri = service.url + ("/transactions/%s" %
-                             urllib.quote(txn_id))
+                             urllib.parse.quote(txn_id))
         try:
             yield self.put_json(
                 uri=uri,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 547c6aec80..dbee404ea7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -838,9 +838,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
             )
 
         return self._send_edu(
-                edu_type=edu_type,
-                origin=origin,
-                content=content,
+            edu_type=edu_type,
+            origin=origin,
+            content=content,
         )
 
     def on_query(self, query_type, args):
@@ -851,6 +851,6 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
             return handler(args)
 
         return self._get_query_client(
-                query_type=query_type,
-                args=args,
+            query_type=query_type,
+            args=args,
         )
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 94d7423d01..8cbf8c4f7f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -463,7 +463,19 @@ class TransactionQueue(object):
                 # pending_transactions flag.
 
                 pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+
+                # We can only include at most 50 PDUs per transactions
+                pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
+                if leftover_pdus:
+                    self.pending_pdus_by_dest[destination] = leftover_pdus
+
                 pending_edus = self.pending_edus_by_dest.pop(destination, [])
+
+                # We can only include at most 100 EDUs per transactions
+                pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
+                if leftover_edus:
+                    self.pending_edus_by_dest[destination] = leftover_edus
+
                 pending_presence = self.pending_presence_by_dest.pop(destination, {})
 
                 pending_edus.extend(
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4a81bd2ba9..2a5eab124f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -895,22 +895,24 @@ class AuthHandler(BaseHandler):
 
         Args:
             password (unicode): Password to hash.
-            stored_hash (unicode): Expected hash value.
+            stored_hash (bytes): Expected hash value.
 
         Returns:
             Deferred(bool): Whether self.hash(password) == stored_hash.
         """
-
         def _do_validate_hash():
             # Normalise the Unicode in the password
             pw = unicodedata.normalize("NFKC", password)
 
             return bcrypt.checkpw(
                 pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
-                stored_hash.encode('utf8')
+                stored_hash
             )
 
         if stored_hash:
+            if not isinstance(stored_hash, bytes):
+                stored_hash = stored_hash.encode('ascii')
+
             return make_deferred_yieldable(
                 threads.deferToThreadPool(
                     self.hs.get_reactor(),
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5816bf8b4f..578e9250fb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -330,7 +330,8 @@ class E2eKeysHandler(object):
                         (algorithm, key_id, ex_json, key)
                     )
             else:
-                new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+                new_keys.append((
+                    algorithm, key_id, encode_canonical_json(key).decode('ascii')))
 
         yield self.store.add_e2e_one_time_keys(
             user_id, device_id, time_now, new_keys
@@ -358,7 +359,7 @@ def _exception_to_failure(e):
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
     # give a string for e.message, which json then fails to serialize.
     return {
-        "status": 503, "message": str(e.message),
+        "status": 503, "message": str(e),
     }
 
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3fa7a98445..0c68e8a472 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -594,7 +594,7 @@ class FederationHandler(BaseHandler):
 
         required_auth = set(
             a_id
-            for event in events + state_events.values() + auth_events.values()
+            for event in events + list(state_events.values()) + list(auth_events.values())
             for a_id, _ in event.auth_events
         )
         auth_events.update({
@@ -802,7 +802,7 @@ class FederationHandler(BaseHandler):
                     )
                     continue
                 except NotRetryingDestination as e:
-                    logger.info(e.message)
+                    logger.info(str(e))
                     continue
                 except FederationDeniedError as e:
                     logger.info(e)
@@ -1358,7 +1358,7 @@ class FederationHandler(BaseHandler):
         )
 
         if state_groups:
-            _, state = state_groups.items().pop()
+            _, state = list(state_groups.items()).pop()
             results = state
 
             if event.is_state():
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 37e41afd61..38e1737ec9 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -162,7 +162,7 @@ class RoomListHandler(BaseHandler):
         # Filter out rooms that we don't want to return
         rooms_to_scan = [
             r for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
+            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
         ]
 
         total_room_count = len(rooms_to_scan)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index c464adbd0b..0c1d52fd11 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -54,7 +54,7 @@ class SearchHandler(BaseHandler):
         batch_token = None
         if batch:
             try:
-                b = decode_base64(batch)
+                b = decode_base64(batch).decode('ascii')
                 batch_group, batch_group_key, batch_token = b.split("\n")
 
                 assert batch_group is not None
@@ -258,18 +258,18 @@ class SearchHandler(BaseHandler):
                 # it returns more from the same group (if applicable) rather
                 # than reverting to searching all results again.
                 if batch_group and batch_group_key:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         batch_group, batch_group_key, pagination_token
-                    ))
+                    )).encode('ascii'))
                 else:
-                    global_next_batch = encode_base64("%s\n%s\n%s" % (
+                    global_next_batch = encode_base64(("%s\n%s\n%s" % (
                         "all", "", pagination_token
-                    ))
+                    )).encode('ascii'))
 
                 for room_id, group in room_groups.items():
-                    group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+                    group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
                         "room_id", room_id, pagination_token
-                    ))
+                    )).encode('ascii'))
 
             allowed_events.extend(room_events)
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ef20c2296c..9f133ded3f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -545,7 +545,7 @@ class SyncHandler(object):
 
         member_ids = {
             state_key: event_id
-            for (t, state_key), event_id in state_ids.iteritems()
+            for (t, state_key), event_id in iteritems(state_ids)
             if t == EventTypes.Member
         }
         name_id = state_ids.get((EventTypes.Name, ''))
@@ -774,7 +774,7 @@ class SyncHandler(object):
                     logger.debug("filtering state from %r...", state_ids)
                     state_ids = {
                         t: event_id
-                        for t, event_id in state_ids.iteritems()
+                        for t, event_id in iteritems(state_ids)
                         if cache.get(t[1]) != event_id
                     }
                     logger.debug("...to %r", state_ids)
@@ -1729,17 +1729,17 @@ def _calculate_state(
     event_id_to_key = {
         e: key
         for key, e in itertools.chain(
-            timeline_contains.items(),
-            previous.items(),
-            timeline_start.items(),
-            current.items(),
+            iteritems(timeline_contains),
+            iteritems(previous),
+            iteritems(timeline_start),
+            iteritems(current),
         )
     }
 
-    c_ids = set(e for e in current.values())
-    ts_ids = set(e for e in timeline_start.values())
-    p_ids = set(e for e in previous.values())
-    tc_ids = set(e for e in timeline_contains.values())
+    c_ids = set(e for e in itervalues(current))
+    ts_ids = set(e for e in itervalues(timeline_start))
+    p_ids = set(e for e in itervalues(previous))
+    tc_ids = set(e for e in itervalues(timeline_contains))
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -1753,7 +1753,7 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in timeline_start.iteritems()
+            e for t, e in iteritems(timeline_start)
             if t[0] == EventTypes.Member
         )
 
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ab4fbf59b2..4ba54fed05 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -13,24 +13,25 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import logging
-import urllib
 
-from six import StringIO
+from six import text_type
+from six.moves import urllib
 
+import treq
 from canonicaljson import encode_canonical_json, json
 from prometheus_client import Counter
 
 from OpenSSL import SSL
 from OpenSSL.SSL import VERIFY_NONE
-from twisted.internet import defer, protocol, reactor, ssl, task
+from twisted.internet import defer, protocol, reactor, ssl
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import (
     Agent,
     BrowserLikeRedirectAgent,
     ContentDecoderAgent,
-    FileBodyProducer as TwistedFileBodyProducer,
     GzipDecoder,
     HTTPConnectionPool,
     PartialDownloadError,
@@ -83,18 +84,20 @@ class SimpleHttpClient(object):
         if hs.config.user_agent_suffix:
             self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
 
+        self.user_agent = self.user_agent.encode('ascii')
+
     @defer.inlineCallbacks
-    def request(self, method, uri, *args, **kwargs):
+    def request(self, method, uri, data=b'', headers=None):
         # A small wrapper around self.agent.request() so we can easily attach
         # counters to it
         outgoing_requests_counter.labels(method).inc()
 
         # log request but strip `access_token` (AS requests for example include this)
-        logger.info("Sending request %s %s", method, redact_uri(uri))
+        logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii')))
 
         try:
-            request_deferred = self.agent.request(
-                method, uri, *args, **kwargs
+            request_deferred = treq.request(
+                method, uri, agent=self.agent, data=data, headers=headers
             )
             add_timeout_to_deferred(
                 request_deferred, 60, self.hs.get_reactor(),
@@ -105,14 +108,14 @@ class SimpleHttpClient(object):
             incoming_responses_counter.labels(method, response.code).inc()
             logger.info(
                 "Received response to  %s %s: %s",
-                method, redact_uri(uri), response.code
+                method, redact_uri(uri.encode('ascii')), response.code
             )
             defer.returnValue(response)
         except Exception as e:
             incoming_responses_counter.labels(method, "ERR").inc()
             logger.info(
                 "Error sending request to  %s %s: %s %s",
-                method, redact_uri(uri), type(e).__name__, e.message
+                method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0]
             )
             raise
 
@@ -137,7 +140,8 @@ class SimpleHttpClient(object):
         # TODO: Do we ever want to log message contents?
         logger.debug("post_urlencoded_get_json args: %s", args)
 
-        query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+        query_bytes = urllib.parse.urlencode(
+            encode_urlencode_args(args), True).encode("utf8")
 
         actual_headers = {
             b"Content-Type": [b"application/x-www-form-urlencoded"],
@@ -148,15 +152,14 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "POST",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(query_bytes))
+            data=query_bytes
         )
 
-        body = yield make_deferred_yieldable(readBody(response))
-
         if 200 <= response.code < 300:
-            defer.returnValue(json.loads(body))
+            body = yield make_deferred_yieldable(treq.json_content(response))
+            defer.returnValue(body)
         else:
             raise HttpResponseException(response.code, response.phrase, body)
 
@@ -191,9 +194,9 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "POST",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(json_str))
+            data=json_str
         )
 
         body = yield make_deferred_yieldable(readBody(response))
@@ -248,7 +251,7 @@ class SimpleHttpClient(object):
             ValueError: if the response was not JSON
         """
         if len(args):
-            query_bytes = urllib.urlencode(args, True)
+            query_bytes = urllib.parse.urlencode(args, True)
             uri = "%s?%s" % (uri, query_bytes)
 
         json_str = encode_canonical_json(json_body)
@@ -262,9 +265,9 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "PUT",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(json_str))
+            data=json_str
         )
 
         body = yield make_deferred_yieldable(readBody(response))
@@ -293,7 +296,7 @@ class SimpleHttpClient(object):
             HttpResponseException on a non-2xx HTTP response.
         """
         if len(args):
-            query_bytes = urllib.urlencode(args, True)
+            query_bytes = urllib.parse.urlencode(args, True)
             uri = "%s?%s" % (uri, query_bytes)
 
         actual_headers = {
@@ -304,7 +307,7 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "GET",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
         )
 
@@ -339,7 +342,7 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "GET",
-            url.encode("ascii"),
+            url,
             headers=Headers(actual_headers),
         )
 
@@ -434,12 +437,12 @@ class CaptchaServerHttpClient(SimpleHttpClient):
 
     @defer.inlineCallbacks
     def post_urlencoded_get_raw(self, url, args={}):
-        query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+        query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
 
         response = yield self.request(
             "POST",
-            url.encode("ascii"),
-            bodyProducer=FileBodyProducer(StringIO(query_bytes)),
+            url,
+            data=query_bytes,
             headers=Headers({
                 b"Content-Type": [b"application/x-www-form-urlencoded"],
                 b"User-Agent": [self.user_agent],
@@ -510,7 +513,7 @@ def encode_urlencode_args(args):
 
 
 def encode_urlencode_arg(arg):
-    if isinstance(arg, unicode):
+    if isinstance(arg, text_type):
         return arg.encode('utf-8')
     elif isinstance(arg, list):
         return [encode_urlencode_arg(i) for i in arg]
@@ -542,26 +545,3 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
 
     def creatorForNetloc(self, hostname, port):
         return self
-
-
-class FileBodyProducer(TwistedFileBodyProducer):
-    """Workaround for https://twistedmatrix.com/trac/ticket/8473
-
-    We override the pauseProducing and resumeProducing methods in twisted's
-    FileBodyProducer so that they do not raise exceptions if the task has
-    already completed.
-    """
-
-    def pauseProducing(self):
-        try:
-            super(FileBodyProducer, self).pauseProducing()
-        except task.TaskDone:
-            # task has already completed
-            pass
-
-    def resumeProducing(self):
-        try:
-            super(FileBodyProducer, self).resumeProducing()
-        except task.NotPaused:
-            # task was not paused (probably because it had already completed)
-            pass
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b34bb8e31a..6a1fc8ca55 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,19 +17,19 @@ import cgi
 import logging
 import random
 import sys
-import urllib
 
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from six import PY3, string_types
+from six.moves import urllib
 
-from canonicaljson import encode_canonical_json, json
+import treq
+from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
 from signedjson.sign import sign_json
 
 from twisted.internet import defer, protocol, reactor
 from twisted.internet.error import DNSLookupError
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import Agent, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -58,13 +58,18 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
 MAX_LONG_RETRIES = 10
 MAX_SHORT_RETRIES = 3
 
+if PY3:
+    MAXINT = sys.maxsize
+else:
+    MAXINT = sys.maxint
+
 
 class MatrixFederationEndpointFactory(object):
     def __init__(self, hs):
         self.tls_client_options_factory = hs.tls_client_options_factory
 
     def endpointForURI(self, uri):
-        destination = uri.netloc
+        destination = uri.netloc.decode('ascii')
 
         return matrix_federation_endpoint(
             reactor, destination, timeout=10,
@@ -93,26 +98,32 @@ class MatrixFederationHttpClient(object):
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
-        self.version_string = hs.version_string
+        self.version_string = hs.version_string.encode('ascii')
         self._next_id = 1
 
     def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
-        return urlparse.urlunparse(
-            ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+        return urllib.parse.urlunparse(
+            (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
         )
 
     @defer.inlineCallbacks
     def _request(self, destination, method, path,
-                 body_callback, headers_dict={}, param_bytes=b"",
-                 query_bytes=b"", retry_on_dns_fail=True,
+                 json=None, json_callback=None,
+                 param_bytes=b"",
+                 query=None, retry_on_dns_fail=True,
                  timeout=None, long_retries=False,
                  ignore_backoff=False,
                  backoff_on_404=False):
-        """ Creates and sends a request to the given server
+        """
+        Creates and sends a request to the given server.
+
         Args:
             destination (str): The remote server to send the HTTP request to.
             method (str): HTTP method
             path (str): The HTTP path
+            json (dict or None): JSON to send in the body.
+            json_callback (func or None): A callback to generate the JSON.
+            query (dict or None): Query arguments.
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
             backoff_on_404 (bool): Back off if we get a 404
@@ -146,22 +157,29 @@ class MatrixFederationHttpClient(object):
             ignore_backoff=ignore_backoff,
         )
 
-        destination = destination.encode("ascii")
+        headers_dict = {}
         path_bytes = path.encode("ascii")
-        with limiter:
-            headers_dict[b"User-Agent"] = [self.version_string]
-            headers_dict[b"Host"] = [destination]
+        if query:
+            query_bytes = encode_query_args(query)
+        else:
+            query_bytes = b""
 
-            url_bytes = self._create_url(
-                destination, path_bytes, param_bytes, query_bytes
-            )
+        headers_dict = {
+            "User-Agent": [self.version_string],
+            "Host": [destination],
+        }
+
+        with limiter:
+            url = self._create_url(
+                destination.encode("ascii"), path_bytes, param_bytes, query_bytes
+            ).decode('ascii')
 
             txn_id = "%s-O-%s" % (method, self._next_id)
-            self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+            self._next_id = (self._next_id + 1) % (MAXINT - 1)
 
             outbound_logger.info(
                 "{%s} [%s] Sending request: %s %s",
-                txn_id, destination, method, url_bytes
+                txn_id, destination, method, url
             )
 
             # XXX: Would be much nicer to retry only at the transaction-layer
@@ -171,23 +189,33 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            http_url_bytes = urlparse.urlunparse(
-                ("", "", path_bytes, param_bytes, query_bytes, "")
-            )
+            http_url = urllib.parse.urlunparse(
+                (b"", b"", path_bytes, param_bytes, query_bytes, b"")
+            ).decode('ascii')
 
             log_result = None
             try:
                 while True:
-                    producer = None
-                    if body_callback:
-                        producer = body_callback(method, http_url_bytes, headers_dict)
-
                     try:
-                        request_deferred = self.agent.request(
+                        if json_callback:
+                            json = json_callback()
+
+                        if json:
+                            data = encode_canonical_json(json)
+                            headers_dict["Content-Type"] = ["application/json"]
+                            self.sign_request(
+                                destination, method, http_url, headers_dict, json
+                            )
+                        else:
+                            data = None
+                            self.sign_request(destination, method, http_url, headers_dict)
+
+                        request_deferred = treq.request(
                             method,
-                            url_bytes,
-                            Headers(headers_dict),
-                            producer
+                            url,
+                            headers=Headers(headers_dict),
+                            data=data,
+                            agent=self.agent,
                         )
                         add_timeout_to_deferred(
                             request_deferred,
@@ -218,7 +246,7 @@ class MatrixFederationHttpClient(object):
                             txn_id,
                             destination,
                             method,
-                            url_bytes,
+                            url,
                             _flatten_response_never_received(e),
                         )
 
@@ -252,7 +280,7 @@ class MatrixFederationHttpClient(object):
                 # :'(
                 # Update transactions table?
                 with logcontext.PreserveLoggingContext():
-                    body = yield readBody(response)
+                    body = yield treq.content(response)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -297,11 +325,11 @@ class MatrixFederationHttpClient(object):
         auth_headers = []
 
         for key, sig in request["signatures"][self.server_name].items():
-            auth_headers.append(bytes(
+            auth_headers.append((
                 "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
                     self.server_name, key, sig,
-                )
-            ))
+                )).encode('ascii')
+            )
 
         headers_dict[b"Authorization"] = auth_headers
 
@@ -347,24 +375,14 @@ class MatrixFederationHttpClient(object):
         """
 
         if not json_data_callback:
-            def json_data_callback():
-                return data
-
-        def body_callback(method, url_bytes, headers_dict):
-            json_data = json_data_callback()
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, json_data
-            )
-            producer = _JsonProducer(json_data)
-            return producer
+            json_data_callback = lambda: data
 
         response = yield self._request(
             destination,
             "PUT",
             path,
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
-            query_bytes=encode_query_args(args),
+            json_callback=json_data_callback,
+            query=args,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -376,8 +394,8 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
-        defer.returnValue(json.loads(body))
+            body = yield treq.json_content(response)
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}, long_retries=False,
@@ -410,20 +428,12 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, data
-            )
-            return _JsonProducer(data)
-
         response = yield self._request(
             destination,
             "POST",
             path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
+            query=args,
+            json=data,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -434,9 +444,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            body = yield treq.json_content(response)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
@@ -471,16 +481,11 @@ class MatrixFederationHttpClient(object):
 
         logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
-
         response = yield self._request(
             destination,
             "GET",
             path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
+            query=args,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -491,9 +496,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            body = yield treq.json_content(response)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def delete_json(self, destination, path, long_retries=False,
@@ -523,13 +528,11 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
         response = yield self._request(
             destination,
             "DELETE",
             path,
-            query_bytes=encode_query_args(args),
-            headers_dict={"Content-Type": ["application/json"]},
+            query=args,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -540,9 +543,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+            body = yield treq.json_content(response)
 
-        defer.returnValue(json.loads(body))
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_file(self, destination, path, output_stream, args={},
@@ -569,26 +572,11 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
-
-        encoded_args = {}
-        for k, vs in args.items():
-            if isinstance(vs, string_types):
-                vs = [vs]
-            encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
-        query_bytes = urllib.urlencode(encoded_args, True)
-        logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
-
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
-
         response = yield self._request(
             destination,
             "GET",
             path,
-            query_bytes=query_bytes,
-            body_callback=body_callback,
+            query=args,
             retry_on_dns_fail=retry_on_dns_fail,
             ignore_backoff=ignore_backoff,
         )
@@ -639,30 +627,6 @@ def _readBodyToFile(response, stream, max_size):
     return d
 
 
-class _JsonProducer(object):
-    """ Used by the twisted http client to create the HTTP body from json
-    """
-    def __init__(self, jsn):
-        self.reset(jsn)
-
-    def reset(self, jsn):
-        self.body = encode_canonical_json(jsn)
-        self.length = len(self.body)
-
-    def startProducing(self, consumer):
-        consumer.write(self.body)
-        return defer.succeed(None)
-
-    def pauseProducing(self):
-        pass
-
-    def stopProducing(self):
-        pass
-
-    def resumeProducing(self):
-        pass
-
-
 def _flatten_response_never_received(e):
     if hasattr(e, "reasons"):
         reasons = ", ".join(
@@ -693,7 +657,7 @@ def check_content_type_is_json(headers):
             "No Content-Type header"
         )
 
-    c_type = c_type[0]  # only the first header
+    c_type = c_type[0].decode('ascii')  # only the first header
     val, options = cgi.parse_header(c_type)
     if val != "application/json":
         raise RuntimeError(
@@ -711,6 +675,6 @@ def encode_query_args(args):
             vs = [vs]
         encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
-    query_bytes = urllib.urlencode(encoded_args, True)
+    query_bytes = urllib.parse.urlencode(encoded_args, True)
 
-    return query_bytes
+    return query_bytes.encode('utf8')
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 88ed3714f9..f0828c6542 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -204,14 +204,14 @@ class SynapseRequest(Request):
         self.start_time = time.time()
         self.request_metrics = RequestMetrics()
         self.request_metrics.start(
-            self.start_time, name=servlet_name, method=self.method,
+            self.start_time, name=servlet_name, method=self.method.decode('ascii'),
         )
 
         self.site.access_logger.info(
             "%s - %s - Received request: %s %s",
             self.getClientIP(),
             self.site.site_tag,
-            self.method,
+            self.method.decode('ascii'),
             self.get_redacted_uri()
         )
 
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 942d7c721f..0d8de600cf 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,9 +40,10 @@ REQUIREMENTS = {
     "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
     "service_identity>=1.0.0": ["service_identity>=1.0.0"],
     "Twisted>=17.1.0": ["twisted>=17.1.0"],
+    "treq>=15.1": ["treq>=15.1"],
 
-    # We use crypto.get_elliptic_curve which is only supported in >=0.15
-    "pyopenssl>=0.15": ["OpenSSL>=0.15"],
+    # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
+    "pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
 
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 74e892c104..5dc7b3fffc 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -590,9 +590,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 pending_commands = LaterGauge(
     "synapse_replication_tcp_protocol_pending_commands",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
+        (p.name,): len(p.pending_commands) for p in connected_connections
     },
 )
 
@@ -607,9 +607,9 @@ def transport_buffer_size(protocol):
 transport_send_buffer = LaterGauge(
     "synapse_replication_tcp_protocol_transport_send_buffer",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
+        (p.name,): transport_buffer_size(p) for p in connected_connections
     },
 )
 
@@ -632,9 +632,9 @@ def transport_kernel_read_buffer_size(protocol, read=True):
 tcp_transport_kernel_send_buffer = LaterGauge(
     "synapse_replication_tcp_protocol_transport_kernel_send_buffer",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
+        (p.name,): transport_kernel_read_buffer_size(p, False)
         for p in connected_connections
     },
 )
@@ -643,9 +643,9 @@ tcp_transport_kernel_send_buffer = LaterGauge(
 tcp_transport_kernel_read_buffer = LaterGauge(
     "synapse_replication_tcp_protocol_transport_kernel_read_buffer",
     "",
-    ["name", "conn_id"],
+    ["name"],
     lambda: {
-        (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
+        (p.name,): transport_kernel_read_buffer_size(p, True)
         for p in connected_connections
     },
 )
@@ -654,9 +654,9 @@ tcp_transport_kernel_read_buffer = LaterGauge(
 tcp_inbound_commands = LaterGauge(
     "synapse_replication_tcp_protocol_inbound_commands",
     "",
-    ["command", "name", "conn_id"],
+    ["command", "name"],
     lambda: {
-        (k[0], p.name, p.conn_id): count
+        (k[0], p.name,): count
         for p in connected_connections
         for k, count in iteritems(p.inbound_commands_counter)
     },
@@ -665,9 +665,9 @@ tcp_inbound_commands = LaterGauge(
 tcp_outbound_commands = LaterGauge(
     "synapse_replication_tcp_protocol_outbound_commands",
     "",
-    ["command", "name", "conn_id"],
+    ["command", "name"],
     lambda: {
-        (k[0], p.name, p.conn_id): count
+        (k[0], p.name,): count
         for p in connected_connections
         for k, count in iteritems(p.outbound_commands_counter)
     },
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 372648cafd..37b32dd37b 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -53,7 +53,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "email", body['email']):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Your email domain is not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -89,7 +91,9 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Account phone numbers are not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.datastore.get_user_id_by_threepid(
@@ -241,7 +245,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "email", body['email']):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Your email domain is not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.datastore.get_user_id_by_threepid(
@@ -276,7 +282,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Account phone numbers are not authorized on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.datastore.get_user_id_by_threepid(
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 2fb4d43ccb..192f52e462 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -75,7 +75,9 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "email", body['email']):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Your email domain is not authorized to register on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -115,7 +117,9 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
 
         if not check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
-                403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+                403,
+                "Phone numbers are not authorized to register on this server",
+                Codes.THREEPID_DENIED,
             )
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -373,7 +377,9 @@ class RegisterRestServlet(RestServlet):
 
                     if not check_3pid_allowed(self.hs, medium, address):
                         raise SynapseError(
-                            403, "Third party identifier is not allowed",
+                            403,
+                            "Third party identifiers (email/phone numbers)" +
+                            " are not authorized on this server",
                             Codes.THREEPID_DENIED,
                         )
 
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 1275baa1ba..263d8eb73e 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -25,6 +25,7 @@ from synapse.api.errors import SynapseError
 from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
 from synapse.events.utils import (
     format_event_for_client_v2_without_room_id,
+    format_event_raw,
     serialize_event,
 )
 from synapse.handlers.presence import format_user_presence_state
@@ -175,17 +176,28 @@ class SyncRestServlet(RestServlet):
 
     @staticmethod
     def encode_response(time_now, sync_result, access_token_id, filter):
+        if filter.event_format == 'client':
+            event_formatter = format_event_for_client_v2_without_room_id
+        elif filter.event_format == 'federation':
+            event_formatter = format_event_raw
+        else:
+            raise Exception("Unknown event format %s" % (filter.event_format, ))
+
         joined = SyncRestServlet.encode_joined(
-            sync_result.joined, time_now, access_token_id, filter.event_fields
+            sync_result.joined, time_now, access_token_id,
+            filter.event_fields,
+            event_formatter,
         )
 
         invited = SyncRestServlet.encode_invited(
             sync_result.invited, time_now, access_token_id,
+            event_formatter,
         )
 
         archived = SyncRestServlet.encode_archived(
             sync_result.archived, time_now, access_token_id,
             filter.event_fields,
+            event_formatter,
         )
 
         return {
@@ -228,7 +240,7 @@ class SyncRestServlet(RestServlet):
         }
 
     @staticmethod
-    def encode_joined(rooms, time_now, token_id, event_fields):
+    def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
         """
         Encode the joined rooms in a sync result
 
@@ -240,7 +252,9 @@ class SyncRestServlet(RestServlet):
             token_id(int): ID of the user's auth token - used for namespacing
                 of transaction IDs
             event_fields(list<str>): List of event fields to include. If empty,
-            all fields will be returned.
+                all fields will be returned.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
         Returns:
             dict[str, dict[str, object]]: the joined rooms list, in our
                 response format
@@ -248,13 +262,14 @@ class SyncRestServlet(RestServlet):
         joined = {}
         for room in rooms:
             joined[room.room_id] = SyncRestServlet.encode_room(
-                room, time_now, token_id, only_fields=event_fields
+                room, time_now, token_id, joined=True, only_fields=event_fields,
+                event_formatter=event_formatter,
             )
 
         return joined
 
     @staticmethod
-    def encode_invited(rooms, time_now, token_id):
+    def encode_invited(rooms, time_now, token_id, event_formatter):
         """
         Encode the invited rooms in a sync result
 
@@ -264,7 +279,9 @@ class SyncRestServlet(RestServlet):
             time_now(int): current time - used as a baseline for age
                 calculations
             token_id(int): ID of the user's auth token - used for namespacing
-            of transaction IDs
+                of transaction IDs
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
 
         Returns:
             dict[str, dict[str, object]]: the invited rooms list, in our
@@ -274,7 +291,7 @@ class SyncRestServlet(RestServlet):
         for room in rooms:
             invite = serialize_event(
                 room.invite, time_now, token_id=token_id,
-                event_format=format_event_for_client_v2_without_room_id,
+                event_format=event_formatter,
                 is_invite=True,
             )
             unsigned = dict(invite.get("unsigned", {}))
@@ -288,7 +305,7 @@ class SyncRestServlet(RestServlet):
         return invited
 
     @staticmethod
-    def encode_archived(rooms, time_now, token_id, event_fields):
+    def encode_archived(rooms, time_now, token_id, event_fields, event_formatter):
         """
         Encode the archived rooms in a sync result
 
@@ -300,7 +317,9 @@ class SyncRestServlet(RestServlet):
             token_id(int): ID of the user's auth token - used for namespacing
                 of transaction IDs
             event_fields(list<str>): List of event fields to include. If empty,
-            all fields will be returned.
+                all fields will be returned.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
         Returns:
             dict[str, dict[str, object]]: The invited rooms list, in our
                 response format
@@ -308,13 +327,18 @@ class SyncRestServlet(RestServlet):
         joined = {}
         for room in rooms:
             joined[room.room_id] = SyncRestServlet.encode_room(
-                room, time_now, token_id, joined=False, only_fields=event_fields
+                room, time_now, token_id, joined=False,
+                only_fields=event_fields,
+                event_formatter=event_formatter,
             )
 
         return joined
 
     @staticmethod
-    def encode_room(room, time_now, token_id, joined=True, only_fields=None):
+    def encode_room(
+            room, time_now, token_id, joined,
+            only_fields, event_formatter,
+    ):
         """
         Args:
             room (JoinedSyncResult|ArchivedSyncResult): sync result for a
@@ -326,14 +350,15 @@ class SyncRestServlet(RestServlet):
             joined (bool): True if the user is joined to this room - will mean
                 we handle ephemeral events
             only_fields(list<str>): Optional. The list of event fields to include.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
         Returns:
             dict[str, object]: the room, encoded in our response format
         """
         def serialize(event):
-            # TODO(mjark): Respect formatting requirements in the filter.
             return serialize_event(
                 event, time_now, token_id=token_id,
-                event_format=format_event_for_client_v2_without_room_id,
+                event_format=event_formatter,
                 only_event_fields=only_fields,
             )
 
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index c7899d7fd2..b890c152db 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -199,10 +199,14 @@ class MonthlyActiveUsersStore(SQLBaseStore):
         Args:
             user_id(str): the user_id to query
         """
+
         if self.hs.config.limit_usage_by_mau:
+            # Trial users and guests should not be included as part of MAU group
+            is_guest = yield self.is_guest(user_id)
+            if is_guest:
+                return
             is_trial = yield self.is_trial_user(user_id)
             if is_trial:
-                # we don't track trial users in the MAU table.
                 return
 
             last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)