diff --git a/synapse/__init__.py b/synapse/__init__.py
index 252c49ca82..65a2b894cc 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.33.3rc2"
+__version__ = "0.33.4"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 6502a6be7b..34382e4e3c 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -26,6 +26,7 @@ import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, ResourceLimitError
+from synapse.config.server import is_threepid_reserved
from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache
@@ -775,34 +776,56 @@ class Auth(object):
)
@defer.inlineCallbacks
- def check_auth_blocking(self, user_id=None):
+ def check_auth_blocking(self, user_id=None, threepid=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
Args:
user_id(str|None): If present, checks for presence against existing
MAU cohort
+
+ threepid(dict|None): If present, checks for presence against configured
+ reserved threepid. Used in cases where the user is trying register
+ with a MAU blocked server, normally they would be rejected but their
+ threepid is on the reserved list. user_id and
+ threepid should never be set at the same time.
"""
+
+ # Never fail an auth check for the server notices users
+ # This can be a problem where event creation is prohibited due to blocking
+ if user_id == self.hs.config.server_notices_mxid:
+ return
+
if self.hs.config.hs_disabled:
raise ResourceLimitError(
403, self.hs.config.hs_disabled_message,
- errcode=Codes.RESOURCE_LIMIT_EXCEED,
- admin_uri=self.hs.config.admin_uri,
+ errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
+ admin_contact=self.hs.config.admin_contact,
limit_type=self.hs.config.hs_disabled_limit_type
)
if self.hs.config.limit_usage_by_mau is True:
- # If the user is already part of the MAU cohort
+ assert not (user_id and threepid)
+
+ # If the user is already part of the MAU cohort or a trial user
if user_id:
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
if timestamp:
return
+
+ is_trial = yield self.store.is_trial_user(user_id)
+ if is_trial:
+ return
+ elif threepid:
+ # If the user does not exist yet, but is signing up with a
+ # reserved threepid then pass auth check
+ if is_threepid_reserved(self.hs.config, threepid):
+ return
# Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise ResourceLimitError(
403, "Monthly Active User Limit Exceeded",
-
- admin_uri=self.hs.config.admin_uri,
- errcode=Codes.RESOURCE_LIMIT_EXCEED,
+ admin_contact=self.hs.config.admin_contact,
+ errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
limit_type="monthly_active_user"
)
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 912bf024bf..c2630c4c64 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -78,6 +78,7 @@ class EventTypes(object):
Name = "m.room.name"
ServerACL = "m.room.server_acl"
+ Pinned = "m.room.pinned_events"
class RejectedReason(object):
@@ -108,3 +109,6 @@ DEFAULT_ROOM_VERSION = RoomVersions.V1
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST}
+
+ServerNoticeMsgType = "m.server_notice"
+ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index e26001ab12..2e7f98404d 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -56,7 +56,7 @@ class Codes(object):
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
- RESOURCE_LIMIT_EXCEED = "M_RESOURCE_LIMIT_EXCEED"
+ RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
@@ -238,11 +238,11 @@ class ResourceLimitError(SynapseError):
"""
def __init__(
self, code, msg,
- errcode=Codes.RESOURCE_LIMIT_EXCEED,
- admin_uri=None,
+ errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
+ admin_contact=None,
limit_type=None,
):
- self.admin_uri = admin_uri
+ self.admin_contact = admin_contact
self.limit_type = limit_type
super(ResourceLimitError, self).__init__(code, msg, errcode=errcode)
@@ -250,7 +250,7 @@ class ResourceLimitError(SynapseError):
return cs_error(
self.msg,
self.errcode,
- admin_uri=self.admin_uri,
+ admin_contact=self.admin_contact,
limit_type=self.limit_type
)
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 186831e118..a31a9a17e0 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -251,6 +251,7 @@ class FilterCollection(object):
"include_leave", False
)
self.event_fields = filter_json.get("event_fields", [])
+ self.event_format = filter_json.get("event_format", "client")
def __repr__(self):
return "<FilterCollection %s>" % (json.dumps(self._filter_json),)
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 3348a8ec6d..86b5067400 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -51,10 +51,7 @@ class AppserviceSlaveStore(
class AppserviceServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = AppserviceSlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ab79a45646..ce2b113dbb 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -74,10 +74,7 @@ class ClientReaderSlavedStore(
class ClientReaderServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = ClientReaderSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index a34c89fa99..f98e456ea0 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -90,10 +90,7 @@ class EventCreatorSlavedStore(
class EventCreatorServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = EventCreatorSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 7d8105778d..60f5973505 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -72,10 +72,7 @@ class FederationReaderSlavedStore(
class FederationReaderServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = FederationReaderSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index d59007099b..60dd09aac3 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -78,10 +78,7 @@ class FederationSenderSlaveStore(
class FederationSenderServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = FederationSenderSlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 8d484c1cd4..8c0b9c67b0 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -148,10 +148,7 @@ class FrontendProxySlavedStore(
class FrontendProxyServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = FrontendProxySlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 005921dcf7..3eb5b663de 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -62,7 +62,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
-from synapse.storage import are_all_users_on_domain
+from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -111,6 +111,8 @@ def build_resource_for_web_client(hs):
class SynapseHomeServer(HomeServer):
+ DATASTORE_CLASS = DataStore
+
def _listener_http(self, config, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
@@ -356,13 +358,13 @@ def setup(config_options):
logger.info("Preparing database: %s...", config.database_config['name'])
try:
- db_conn = hs.get_db_conn(run_new_connection=False)
- prepare_database(db_conn, database_engine, config=config)
- database_engine.on_new_connection(db_conn)
+ with hs.get_db_conn(run_new_connection=False) as db_conn:
+ prepare_database(db_conn, database_engine, config=config)
+ database_engine.on_new_connection(db_conn)
- hs.run_startup_checks(db_conn, database_engine)
+ hs.run_startup_checks(db_conn, database_engine)
- db_conn.commit()
+ db_conn.commit()
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index fd1f6cbf7e..e3dbb3b4e6 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -60,10 +60,7 @@ class MediaRepositorySlavedStore(
class MediaRepositoryServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = MediaRepositorySlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index a4fc7e91fa..244c604de9 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -78,10 +78,7 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = PusherSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = PusherSlaveStore
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 27e1998660..6662340797 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -249,10 +249,7 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = SynchrotronSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 1388a42b59..96ffcaf073 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -94,10 +94,7 @@ class UserDirectorySlaveStore(
class UserDirectoryServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = UserDirectorySlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 6980e5890e..9ccc5a80fc 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import urllib
+
+from six.moves import urllib
from prometheus_client import Counter
@@ -98,7 +99,7 @@ class ApplicationServiceApi(SimpleHttpClient):
def query_user(self, service, user_id):
if service.url is None:
defer.returnValue(False)
- uri = service.url + ("/users/%s" % urllib.quote(user_id))
+ uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
response = None
try:
response = yield self.get_json(uri, {
@@ -119,7 +120,7 @@ class ApplicationServiceApi(SimpleHttpClient):
def query_alias(self, service, alias):
if service.url is None:
defer.returnValue(False)
- uri = service.url + ("/rooms/%s" % urllib.quote(alias))
+ uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
response = None
try:
response = yield self.get_json(uri, {
@@ -153,7 +154,7 @@ class ApplicationServiceApi(SimpleHttpClient):
service.url,
APP_SERVICE_PREFIX,
kind,
- urllib.quote(protocol)
+ urllib.parse.quote(protocol)
)
try:
response = yield self.get_json(uri, fields)
@@ -188,7 +189,7 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = "%s%s/thirdparty/protocol/%s" % (
service.url,
APP_SERVICE_PREFIX,
- urllib.quote(protocol)
+ urllib.parse.quote(protocol)
)
try:
info = yield self.get_json(uri, {})
@@ -228,7 +229,7 @@ class ApplicationServiceApi(SimpleHttpClient):
txn_id = str(txn_id)
uri = service.url + ("/transactions/%s" %
- urllib.quote(txn_id))
+ urllib.parse.quote(txn_id))
try:
yield self.put_json(
uri=uri,
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 68a612e594..c1c7c0105e 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -77,10 +77,15 @@ class ServerConfig(Config):
self.max_mau_value = config.get(
"max_mau_value", 0,
)
+
self.mau_limits_reserved_threepids = config.get(
"mau_limit_reserved_threepids", []
)
+ self.mau_trial_days = config.get(
+ "mau_trial_days", 0,
+ )
+
# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
@@ -88,7 +93,7 @@ class ServerConfig(Config):
# Admin uri to direct users at should their instance become blocked
# due to resource constraints
- self.admin_uri = config.get("admin_uri", None)
+ self.admin_contact = config.get("admin_contact", None)
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None
@@ -352,7 +357,7 @@ class ServerConfig(Config):
# Homeserver blocking
#
# How to reach the server admin, used in ResourceLimitError
- # admin_uri: 'mailto:admin@server.com'
+ # admin_contact: 'mailto:admin@server.com'
#
# Global block config
#
@@ -365,6 +370,7 @@ class ServerConfig(Config):
# Enables monthly active user checking
# limit_usage_by_mau: False
# max_mau_value: 50
+ # mau_trial_days: 2
#
# Sometimes the server admin will want to ensure certain accounts are
# never blocked by mau checking. These accounts are specified here.
@@ -398,6 +404,23 @@ class ServerConfig(Config):
" service on the given port.")
+def is_threepid_reserved(config, threepid):
+ """Check the threepid against the reserved threepid config
+ Args:
+ config(ServerConfig) - to access server config attributes
+ threepid(dict) - The threepid to test for
+
+ Returns:
+ boolean Is the threepid undertest reserved_user
+ """
+
+ for tp in config.mau_limits_reserved_threepids:
+ if (threepid['medium'] == tp['medium']
+ and threepid['address'] == tp['address']):
+ return True
+ return False
+
+
def read_gc_thresholds(thresholds):
"""Reads the three integer thresholds for garbage collection. Ensures that
the thresholds are integers if thresholds are supplied.
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index c11798093d..5be8e66fb8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -13,17 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from collections import namedtuple
import six
from twisted.internet import defer
+from twisted.internet.defer import DeferredList
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
+from synapse.types import get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
logger = logging.getLogger(__name__)
@@ -133,34 +136,25 @@ class FederationBase(object):
* throws a SynapseError if the signature check failed.
The deferreds run their callbacks in the sentinel logcontext.
"""
-
- redacted_pdus = [
- prune_event(pdu)
- for pdu in pdus
- ]
-
- deferreds = self.keyring.verify_json_objects_for_server([
- (p.origin, p.get_pdu_json())
- for p in redacted_pdus
- ])
+ deferreds = _check_sigs_on_pdus(self.keyring, pdus)
ctx = logcontext.LoggingContext.current_context()
- def callback(_, pdu, redacted):
+ def callback(_, pdu):
with logcontext.PreserveLoggingContext(ctx):
if not check_event_content_hash(pdu):
logger.warn(
"Event content has been tampered, redacting %s: %s",
pdu.event_id, pdu.get_pdu_json()
)
- return redacted
+ return prune_event(pdu)
if self.spam_checker.check_event_for_spam(pdu):
logger.warn(
"Event contains spam, redacting %s: %s",
pdu.event_id, pdu.get_pdu_json()
)
- return redacted
+ return prune_event(pdu)
return pdu
@@ -173,16 +167,116 @@ class FederationBase(object):
)
return failure
- for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus):
+ for deferred, pdu in zip(deferreds, pdus):
deferred.addCallbacks(
callback, errback,
- callbackArgs=[pdu, redacted],
+ callbackArgs=[pdu],
errbackArgs=[pdu],
)
return deferreds
+class PduToCheckSig(namedtuple("PduToCheckSig", [
+ "pdu", "redacted_pdu_json", "event_id_domain", "sender_domain", "deferreds",
+])):
+ pass
+
+
+def _check_sigs_on_pdus(keyring, pdus):
+ """Check that the given events are correctly signed
+
+ Args:
+ keyring (synapse.crypto.Keyring): keyring object to do the checks
+ pdus (Collection[EventBase]): the events to be checked
+
+ Returns:
+ List[Deferred]: a Deferred for each event in pdus, which will either succeed if
+ the signatures are valid, or fail (with a SynapseError) if not.
+ """
+
+ # (currently this is written assuming the v1 room structure; we'll probably want a
+ # separate function for checking v2 rooms)
+
+ # we want to check that the event is signed by:
+ #
+ # (a) the server which created the event_id
+ #
+ # (b) the sender's server.
+ #
+ # - except in the case of invites created from a 3pid invite, which are exempt
+ # from this check, because the sender has to match that of the original 3pid
+ # invite, but the event may come from a different HS, for reasons that I don't
+ # entirely grok (why do the senders have to match? and if they do, why doesn't the
+ # joining server ask the inviting server to do the switcheroo with
+ # exchange_third_party_invite?).
+ #
+ # That's pretty awful, since redacting such an invite will render it invalid
+ # (because it will then look like a regular invite without a valid signature),
+ # and signatures are *supposed* to be valid whether or not an event has been
+ # redacted. But this isn't the worst of the ways that 3pid invites are broken.
+ #
+ # let's start by getting the domain for each pdu, and flattening the event back
+ # to JSON.
+ pdus_to_check = [
+ PduToCheckSig(
+ pdu=p,
+ redacted_pdu_json=prune_event(p).get_pdu_json(),
+ event_id_domain=get_domain_from_id(p.event_id),
+ sender_domain=get_domain_from_id(p.sender),
+ deferreds=[],
+ )
+ for p in pdus
+ ]
+
+ # first make sure that the event is signed by the event_id's domain
+ deferreds = keyring.verify_json_objects_for_server([
+ (p.event_id_domain, p.redacted_pdu_json)
+ for p in pdus_to_check
+ ])
+
+ for p, d in zip(pdus_to_check, deferreds):
+ p.deferreds.append(d)
+
+ # now let's look for events where the sender's domain is different to the
+ # event id's domain (normally only the case for joins/leaves), and add additional
+ # checks.
+ pdus_to_check_sender = [
+ p for p in pdus_to_check
+ if p.sender_domain != p.event_id_domain and not _is_invite_via_3pid(p.pdu)
+ ]
+
+ more_deferreds = keyring.verify_json_objects_for_server([
+ (p.sender_domain, p.redacted_pdu_json)
+ for p in pdus_to_check_sender
+ ])
+
+ for p, d in zip(pdus_to_check_sender, more_deferreds):
+ p.deferreds.append(d)
+
+ # replace lists of deferreds with single Deferreds
+ return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]
+
+
+def _flatten_deferred_list(deferreds):
+ """Given a list of one or more deferreds, either return the single deferred, or
+ combine into a DeferredList.
+ """
+ if len(deferreds) > 1:
+ return DeferredList(deferreds, fireOnOneErrback=True, consumeErrors=True)
+ else:
+ assert len(deferreds) == 1
+ return deferreds[0]
+
+
+def _is_invite_via_3pid(event):
+ return (
+ event.type == EventTypes.Member
+ and event.membership == Membership.INVITE
+ and "third_party_invite" in event.content
+ )
+
+
def event_from_pdu_json(pdu_json, outlier=False):
"""Construct a FrozenEvent from an event json received over federation
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 3e0cd294a1..dbee404ea7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -99,7 +99,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
- def on_incoming_transaction(self, transaction_data):
+ def on_incoming_transaction(self, origin, transaction_data):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = self._clock.time_msec()
@@ -108,34 +108,33 @@ class FederationServer(FederationBase):
if not transaction.transaction_id:
raise Exception("Transaction missing transaction_id")
- if not transaction.origin:
- raise Exception("Transaction missing origin")
logger.debug("[%s] Got transaction", transaction.transaction_id)
# use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel.
with (yield self._transaction_linearizer.queue(
- (transaction.origin, transaction.transaction_id),
+ (origin, transaction.transaction_id),
)):
result = yield self._handle_incoming_transaction(
- transaction, request_time,
+ origin, transaction, request_time,
)
defer.returnValue(result)
@defer.inlineCallbacks
- def _handle_incoming_transaction(self, transaction, request_time):
+ def _handle_incoming_transaction(self, origin, transaction, request_time):
""" Process an incoming transaction and return the HTTP response
Args:
+ origin (unicode): the server making the request
transaction (Transaction): incoming transaction
request_time (int): timestamp that the HTTP request arrived at
Returns:
Deferred[(int, object)]: http response code and body
"""
- response = yield self.transaction_actions.have_responded(transaction)
+ response = yield self.transaction_actions.have_responded(origin, transaction)
if response:
logger.debug(
@@ -149,7 +148,7 @@ class FederationServer(FederationBase):
received_pdus_counter.inc(len(transaction.pdus))
- origin_host, _ = parse_server_name(transaction.origin)
+ origin_host, _ = parse_server_name(origin)
pdus_by_room = {}
@@ -190,7 +189,7 @@ class FederationServer(FederationBase):
event_id = pdu.event_id
try:
yield self._handle_received_pdu(
- transaction.origin, pdu
+ origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
@@ -212,7 +211,7 @@ class FederationServer(FederationBase):
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
yield self.received_edu(
- transaction.origin,
+ origin,
edu.edu_type,
edu.content
)
@@ -224,6 +223,7 @@ class FederationServer(FederationBase):
logger.debug("Returning: %s", str(response))
yield self.transaction_actions.set_response(
+ origin,
transaction,
200, response
)
@@ -838,9 +838,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
)
return self._send_edu(
- edu_type=edu_type,
- origin=origin,
- content=content,
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
)
def on_query(self, query_type, args):
@@ -851,6 +851,6 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
return handler(args)
return self._get_query_client(
- query_type=query_type,
- args=args,
+ query_type=query_type,
+ args=args,
)
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 9146215c21..74ffd13b4f 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -36,7 +36,7 @@ class TransactionActions(object):
self.store = datastore
@log_function
- def have_responded(self, transaction):
+ def have_responded(self, origin, transaction):
""" Have we already responded to a transaction with the same id and
origin?
@@ -50,11 +50,11 @@ class TransactionActions(object):
"transaction_id")
return self.store.get_received_txn_response(
- transaction.transaction_id, transaction.origin
+ transaction.transaction_id, origin
)
@log_function
- def set_response(self, transaction, code, response):
+ def set_response(self, origin, transaction, code, response):
""" Persist how we responded to a transaction.
Returns:
@@ -66,7 +66,7 @@ class TransactionActions(object):
return self.store.set_received_txn_response(
transaction.transaction_id,
- transaction.origin,
+ origin,
code,
response,
)
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 0bb468385d..6f5995735a 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -32,7 +32,7 @@ Events are replicated via a separate events stream.
import logging
from collections import namedtuple
-from six import iteritems, itervalues
+from six import iteritems
from sortedcontainers import SortedDict
@@ -117,7 +117,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
- for uids in itervalues(self.presence_changed)
+ for uids in self.presence_changed.values()
for user_id in uids
)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 94d7423d01..8cbf8c4f7f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -463,7 +463,19 @@ class TransactionQueue(object):
# pending_transactions flag.
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+
+ # We can only include at most 50 PDUs per transactions
+ pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:]
+ if leftover_pdus:
+ self.pending_pdus_by_dest[destination] = leftover_pdus
+
pending_edus = self.pending_edus_by_dest.pop(destination, [])
+
+ # We can only include at most 100 EDUs per transactions
+ pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:]
+ if leftover_edus:
+ self.pending_edus_by_dest[destination] = leftover_edus
+
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_edus.extend(
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 7a993fd1cf..3972922ff9 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -353,7 +353,7 @@ class FederationSendServlet(BaseFederationServlet):
try:
code, response = yield self.handler.on_incoming_transaction(
- transaction_data
+ origin, transaction_data,
)
except Exception:
logger.exception("on_incoming_transaction failed")
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 4a81bd2ba9..2a5eab124f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -895,22 +895,24 @@ class AuthHandler(BaseHandler):
Args:
password (unicode): Password to hash.
- stored_hash (unicode): Expected hash value.
+ stored_hash (bytes): Expected hash value.
Returns:
Deferred(bool): Whether self.hash(password) == stored_hash.
"""
-
def _do_validate_hash():
# Normalise the Unicode in the password
pw = unicodedata.normalize("NFKC", password)
return bcrypt.checkpw(
pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
- stored_hash.encode('utf8')
+ stored_hash
)
if stored_hash:
+ if not isinstance(stored_hash, bytes):
+ stored_hash = stored_hash.encode('ascii')
+
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5816bf8b4f..578e9250fb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -330,7 +330,8 @@ class E2eKeysHandler(object):
(algorithm, key_id, ex_json, key)
)
else:
- new_keys.append((algorithm, key_id, encode_canonical_json(key)))
+ new_keys.append((
+ algorithm, key_id, encode_canonical_json(key).decode('ascii')))
yield self.store.add_e2e_one_time_keys(
user_id, device_id, time_now, new_keys
@@ -358,7 +359,7 @@ def _exception_to_failure(e):
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
# give a string for e.message, which json then fails to serialize.
return {
- "status": 503, "message": str(e.message),
+ "status": 503, "message": str(e),
}
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0ebf0fd188..0c68e8a472 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -594,7 +594,7 @@ class FederationHandler(BaseHandler):
required_auth = set(
a_id
- for event in events + state_events.values() + auth_events.values()
+ for event in events + list(state_events.values()) + list(auth_events.values())
for a_id, _ in event.auth_events
)
auth_events.update({
@@ -802,7 +802,7 @@ class FederationHandler(BaseHandler):
)
continue
except NotRetryingDestination as e:
- logger.info(e.message)
+ logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(e)
@@ -1358,7 +1358,7 @@ class FederationHandler(BaseHandler):
)
if state_groups:
- _, state = state_groups.items().pop()
+ _, state = list(state_groups.items()).pop()
results = state
if event.is_state():
@@ -1831,7 +1831,7 @@ class FederationHandler(BaseHandler):
room_version = yield self.store.get_room_version(event.room_id)
- new_state = self.state_handler.resolve_events(
+ new_state = yield self.state_handler.resolve_events(
room_version,
[list(local_view.values()), list(remote_view.values())],
event
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f03ee1476b..1e53f2c635 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -125,6 +125,7 @@ class RegistrationHandler(BaseHandler):
guest_access_token=None,
make_guest=False,
admin=False,
+ threepid=None,
):
"""Registers a new client on the server.
@@ -145,7 +146,7 @@ class RegistrationHandler(BaseHandler):
RegistrationError if there was a problem registering.
"""
- yield self.auth.check_auth_blocking()
+ yield self.auth.check_auth_blocking(threepid=threepid)
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 030e890071..c05aa7ba65 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -165,7 +165,7 @@ class RoomListHandler(BaseHandler):
# Filter out rooms that we don't want to return
rooms_to_scan = [
r for r in sorted_rooms
- if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0
+ if r not in newly_unpublished and rooms_to_num_joined[r] > 0
]
total_room_count = len(rooms_to_scan)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index c464adbd0b..0c1d52fd11 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -54,7 +54,7 @@ class SearchHandler(BaseHandler):
batch_token = None
if batch:
try:
- b = decode_base64(batch)
+ b = decode_base64(batch).decode('ascii')
batch_group, batch_group_key, batch_token = b.split("\n")
assert batch_group is not None
@@ -258,18 +258,18 @@ class SearchHandler(BaseHandler):
# it returns more from the same group (if applicable) rather
# than reverting to searching all results again.
if batch_group and batch_group_key:
- global_next_batch = encode_base64("%s\n%s\n%s" % (
+ global_next_batch = encode_base64(("%s\n%s\n%s" % (
batch_group, batch_group_key, pagination_token
- ))
+ )).encode('ascii'))
else:
- global_next_batch = encode_base64("%s\n%s\n%s" % (
+ global_next_batch = encode_base64(("%s\n%s\n%s" % (
"all", "", pagination_token
- ))
+ )).encode('ascii'))
for room_id, group in room_groups.items():
- group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+ group["next_batch"] = encode_base64(("%s\n%s\n%s" % (
"room_id", room_id, pagination_token
- ))
+ )).encode('ascii'))
allowed_events.extend(room_events)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b5566e14e0..90e99bdf9d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -549,7 +549,7 @@ class SyncHandler(object):
member_ids = {
state_key: event_id
- for (t, state_key), event_id in state_ids.iteritems()
+ for (t, state_key), event_id in iteritems(state_ids)
if t == EventTypes.Member
}
name_id = state_ids.get((EventTypes.Name, ''))
@@ -749,9 +749,16 @@ class SyncHandler(object):
state_ids = {}
if lazy_load_members:
if types:
+ # We're returning an incremental sync, with no "gap" since
+ # the previous sync, so normally there would be no state to return
+ # But we're lazy-loading, so the client might need some more
+ # member events to understand the events in this timeline.
+ # So we fish out all the member events corresponding to the
+ # timeline here, and then dedupe any redundant ones below.
+
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
- filtered_types=filtered_types,
+ filtered_types=None, # we only want members!
)
if lazy_load_members and not include_redundant_members:
@@ -771,7 +778,7 @@ class SyncHandler(object):
logger.debug("filtering state from %r...", state_ids)
state_ids = {
t: event_id
- for t, event_id in state_ids.iteritems()
+ for t, event_id in iteritems(state_ids)
if cache.get(t[1]) != event_id
}
logger.debug("...to %r", state_ids)
@@ -1572,6 +1579,19 @@ class SyncHandler(object):
newly_joined_room=newly_joined,
)
+ # When we join the room (or the client requests full_state), we should
+ # send down any existing tags. Usually the user won't have tags in a
+ # newly joined room, unless either a) they've joined before or b) the
+ # tag was added by synapse e.g. for server notice rooms.
+ if full_state:
+ user_id = sync_result_builder.sync_config.user.to_string()
+ tags = yield self.store.get_tags_for_room(user_id, room_id)
+
+ # If there aren't any tags, don't send the empty tags list down
+ # sync
+ if not tags:
+ tags = None
+
account_data_events = []
if tags is not None:
account_data_events.append({
@@ -1726,17 +1746,17 @@ def _calculate_state(
event_id_to_key = {
e: key
for key, e in itertools.chain(
- timeline_contains.items(),
- previous.items(),
- timeline_start.items(),
- current.items(),
+ iteritems(timeline_contains),
+ iteritems(previous),
+ iteritems(timeline_start),
+ iteritems(current),
)
}
- c_ids = set(e for e in current.values())
- ts_ids = set(e for e in timeline_start.values())
- p_ids = set(e for e in previous.values())
- tc_ids = set(e for e in timeline_contains.values())
+ c_ids = set(e for e in itervalues(current))
+ ts_ids = set(e for e in itervalues(timeline_start))
+ p_ids = set(e for e in itervalues(previous))
+ tc_ids = set(e for e in itervalues(timeline_contains))
# If we are lazyloading room members, we explicitly add the membership events
# for the senders in the timeline into the state block returned by /sync,
@@ -1750,7 +1770,7 @@ def _calculate_state(
if lazy_load_members:
p_ids.difference_update(
- e for t, e in timeline_start.iteritems()
+ e for t, e in iteritems(timeline_start)
if t[0] == EventTypes.Member
)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ab4fbf59b2..4ba54fed05 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -13,24 +13,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import logging
-import urllib
-from six import StringIO
+from six import text_type
+from six.moves import urllib
+import treq
from canonicaljson import encode_canonical_json, json
from prometheus_client import Counter
from OpenSSL import SSL
from OpenSSL.SSL import VERIFY_NONE
-from twisted.internet import defer, protocol, reactor, ssl, task
+from twisted.internet import defer, protocol, reactor, ssl
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web._newclient import ResponseDone
from twisted.web.client import (
Agent,
BrowserLikeRedirectAgent,
ContentDecoderAgent,
- FileBodyProducer as TwistedFileBodyProducer,
GzipDecoder,
HTTPConnectionPool,
PartialDownloadError,
@@ -83,18 +84,20 @@ class SimpleHttpClient(object):
if hs.config.user_agent_suffix:
self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
+ self.user_agent = self.user_agent.encode('ascii')
+
@defer.inlineCallbacks
- def request(self, method, uri, *args, **kwargs):
+ def request(self, method, uri, data=b'', headers=None):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
- logger.info("Sending request %s %s", method, redact_uri(uri))
+ logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii')))
try:
- request_deferred = self.agent.request(
- method, uri, *args, **kwargs
+ request_deferred = treq.request(
+ method, uri, agent=self.agent, data=data, headers=headers
)
add_timeout_to_deferred(
request_deferred, 60, self.hs.get_reactor(),
@@ -105,14 +108,14 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
- method, redact_uri(uri), response.code
+ method, redact_uri(uri.encode('ascii')), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
- method, redact_uri(uri), type(e).__name__, e.message
+ method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0]
)
raise
@@ -137,7 +140,8 @@ class SimpleHttpClient(object):
# TODO: Do we ever want to log message contents?
logger.debug("post_urlencoded_get_json args: %s", args)
- query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+ query_bytes = urllib.parse.urlencode(
+ encode_urlencode_args(args), True).encode("utf8")
actual_headers = {
b"Content-Type": [b"application/x-www-form-urlencoded"],
@@ -148,15 +152,14 @@ class SimpleHttpClient(object):
response = yield self.request(
"POST",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
- bodyProducer=FileBodyProducer(StringIO(query_bytes))
+ data=query_bytes
)
- body = yield make_deferred_yieldable(readBody(response))
-
if 200 <= response.code < 300:
- defer.returnValue(json.loads(body))
+ body = yield make_deferred_yieldable(treq.json_content(response))
+ defer.returnValue(body)
else:
raise HttpResponseException(response.code, response.phrase, body)
@@ -191,9 +194,9 @@ class SimpleHttpClient(object):
response = yield self.request(
"POST",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
- bodyProducer=FileBodyProducer(StringIO(json_str))
+ data=json_str
)
body = yield make_deferred_yieldable(readBody(response))
@@ -248,7 +251,7 @@ class SimpleHttpClient(object):
ValueError: if the response was not JSON
"""
if len(args):
- query_bytes = urllib.urlencode(args, True)
+ query_bytes = urllib.parse.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
json_str = encode_canonical_json(json_body)
@@ -262,9 +265,9 @@ class SimpleHttpClient(object):
response = yield self.request(
"PUT",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
- bodyProducer=FileBodyProducer(StringIO(json_str))
+ data=json_str
)
body = yield make_deferred_yieldable(readBody(response))
@@ -293,7 +296,7 @@ class SimpleHttpClient(object):
HttpResponseException on a non-2xx HTTP response.
"""
if len(args):
- query_bytes = urllib.urlencode(args, True)
+ query_bytes = urllib.parse.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
actual_headers = {
@@ -304,7 +307,7 @@ class SimpleHttpClient(object):
response = yield self.request(
"GET",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
)
@@ -339,7 +342,7 @@ class SimpleHttpClient(object):
response = yield self.request(
"GET",
- url.encode("ascii"),
+ url,
headers=Headers(actual_headers),
)
@@ -434,12 +437,12 @@ class CaptchaServerHttpClient(SimpleHttpClient):
@defer.inlineCallbacks
def post_urlencoded_get_raw(self, url, args={}):
- query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+ query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
response = yield self.request(
"POST",
- url.encode("ascii"),
- bodyProducer=FileBodyProducer(StringIO(query_bytes)),
+ url,
+ data=query_bytes,
headers=Headers({
b"Content-Type": [b"application/x-www-form-urlencoded"],
b"User-Agent": [self.user_agent],
@@ -510,7 +513,7 @@ def encode_urlencode_args(args):
def encode_urlencode_arg(arg):
- if isinstance(arg, unicode):
+ if isinstance(arg, text_type):
return arg.encode('utf-8')
elif isinstance(arg, list):
return [encode_urlencode_arg(i) for i in arg]
@@ -542,26 +545,3 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
def creatorForNetloc(self, hostname, port):
return self
-
-
-class FileBodyProducer(TwistedFileBodyProducer):
- """Workaround for https://twistedmatrix.com/trac/ticket/8473
-
- We override the pauseProducing and resumeProducing methods in twisted's
- FileBodyProducer so that they do not raise exceptions if the task has
- already completed.
- """
-
- def pauseProducing(self):
- try:
- super(FileBodyProducer, self).pauseProducing()
- except task.TaskDone:
- # task has already completed
- pass
-
- def resumeProducing(self):
- try:
- super(FileBodyProducer, self).resumeProducing()
- except task.NotPaused:
- # task was not paused (probably because it had already completed)
- pass
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 44b61e70a4..6a1fc8ca55 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,19 +17,19 @@ import cgi
import logging
import random
import sys
-import urllib
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from six import PY3, string_types
+from six.moves import urllib
-from canonicaljson import encode_canonical_json, json
+import treq
+from canonicaljson import encode_canonical_json
from prometheus_client import Counter
from signedjson.sign import sign_json
from twisted.internet import defer, protocol, reactor
from twisted.internet.error import DNSLookupError
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@@ -58,13 +58,18 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
+if PY3:
+ MAXINT = sys.maxsize
+else:
+ MAXINT = sys.maxint
+
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
- destination = uri.netloc
+ destination = uri.netloc.decode('ascii')
return matrix_federation_endpoint(
reactor, destination, timeout=10,
@@ -93,26 +98,32 @@ class MatrixFederationHttpClient(object):
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
- self.version_string = hs.version_string
+ self.version_string = hs.version_string.encode('ascii')
self._next_id = 1
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
- return urlparse.urlunparse(
- ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+ return urllib.parse.urlunparse(
+ (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
)
@defer.inlineCallbacks
def _request(self, destination, method, path,
- body_callback, headers_dict={}, param_bytes=b"",
- query_bytes=b"", retry_on_dns_fail=True,
+ json=None, json_callback=None,
+ param_bytes=b"",
+ query=None, retry_on_dns_fail=True,
timeout=None, long_retries=False,
ignore_backoff=False,
backoff_on_404=False):
- """ Creates and sends a request to the given server
+ """
+ Creates and sends a request to the given server.
+
Args:
destination (str): The remote server to send the HTTP request to.
method (str): HTTP method
path (str): The HTTP path
+ json (dict or None): JSON to send in the body.
+ json_callback (func or None): A callback to generate the JSON.
+ query (dict or None): Query arguments.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): Back off if we get a 404
@@ -133,7 +144,7 @@ class MatrixFederationHttpClient(object):
failures, connection failures, SSL failures.)
"""
if (
- self.hs.config.federation_domain_whitelist and
+ self.hs.config.federation_domain_whitelist is not None and
destination not in self.hs.config.federation_domain_whitelist
):
raise FederationDeniedError(destination)
@@ -146,22 +157,29 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff,
)
- destination = destination.encode("ascii")
+ headers_dict = {}
path_bytes = path.encode("ascii")
- with limiter:
- headers_dict[b"User-Agent"] = [self.version_string]
- headers_dict[b"Host"] = [destination]
+ if query:
+ query_bytes = encode_query_args(query)
+ else:
+ query_bytes = b""
- url_bytes = self._create_url(
- destination, path_bytes, param_bytes, query_bytes
- )
+ headers_dict = {
+ "User-Agent": [self.version_string],
+ "Host": [destination],
+ }
+
+ with limiter:
+ url = self._create_url(
+ destination.encode("ascii"), path_bytes, param_bytes, query_bytes
+ ).decode('ascii')
txn_id = "%s-O-%s" % (method, self._next_id)
- self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+ self._next_id = (self._next_id + 1) % (MAXINT - 1)
outbound_logger.info(
"{%s} [%s] Sending request: %s %s",
- txn_id, destination, method, url_bytes
+ txn_id, destination, method, url
)
# XXX: Would be much nicer to retry only at the transaction-layer
@@ -171,23 +189,33 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- http_url_bytes = urlparse.urlunparse(
- ("", "", path_bytes, param_bytes, query_bytes, "")
- )
+ http_url = urllib.parse.urlunparse(
+ (b"", b"", path_bytes, param_bytes, query_bytes, b"")
+ ).decode('ascii')
log_result = None
try:
while True:
- producer = None
- if body_callback:
- producer = body_callback(method, http_url_bytes, headers_dict)
-
try:
- request_deferred = self.agent.request(
+ if json_callback:
+ json = json_callback()
+
+ if json:
+ data = encode_canonical_json(json)
+ headers_dict["Content-Type"] = ["application/json"]
+ self.sign_request(
+ destination, method, http_url, headers_dict, json
+ )
+ else:
+ data = None
+ self.sign_request(destination, method, http_url, headers_dict)
+
+ request_deferred = treq.request(
method,
- url_bytes,
- Headers(headers_dict),
- producer
+ url,
+ headers=Headers(headers_dict),
+ data=data,
+ agent=self.agent,
)
add_timeout_to_deferred(
request_deferred,
@@ -218,7 +246,7 @@ class MatrixFederationHttpClient(object):
txn_id,
destination,
method,
- url_bytes,
+ url,
_flatten_response_never_received(e),
)
@@ -252,7 +280,7 @@ class MatrixFederationHttpClient(object):
# :'(
# Update transactions table?
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ body = yield treq.content(response)
raise HttpResponseException(
response.code, response.phrase, body
)
@@ -297,11 +325,11 @@ class MatrixFederationHttpClient(object):
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
- auth_headers.append(bytes(
+ auth_headers.append((
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
self.server_name, key, sig,
- )
- ))
+ )).encode('ascii')
+ )
headers_dict[b"Authorization"] = auth_headers
@@ -347,24 +375,14 @@ class MatrixFederationHttpClient(object):
"""
if not json_data_callback:
- def json_data_callback():
- return data
-
- def body_callback(method, url_bytes, headers_dict):
- json_data = json_data_callback()
- self.sign_request(
- destination, method, url_bytes, headers_dict, json_data
- )
- producer = _JsonProducer(json_data)
- return producer
+ json_data_callback = lambda: data
response = yield self._request(
destination,
"PUT",
path,
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
- query_bytes=encode_query_args(args),
+ json_callback=json_data_callback,
+ query=args,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -376,8 +394,8 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
- defer.returnValue(json.loads(body))
+ body = yield treq.json_content(response)
+ defer.returnValue(body)
@defer.inlineCallbacks
def post_json(self, destination, path, data={}, long_retries=False,
@@ -410,20 +428,12 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
-
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(
- destination, method, url_bytes, headers_dict, data
- )
- return _JsonProducer(data)
-
response = yield self._request(
destination,
"POST",
path,
- query_bytes=encode_query_args(args),
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
+ query=args,
+ json=data,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -434,9 +444,9 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ body = yield treq.json_content(response)
- defer.returnValue(json.loads(body))
+ defer.returnValue(body)
@defer.inlineCallbacks
def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
@@ -471,16 +481,11 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
-
response = yield self._request(
destination,
"GET",
path,
- query_bytes=encode_query_args(args),
- body_callback=body_callback,
+ query=args,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -491,9 +496,9 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ body = yield treq.json_content(response)
- defer.returnValue(json.loads(body))
+ defer.returnValue(body)
@defer.inlineCallbacks
def delete_json(self, destination, path, long_retries=False,
@@ -523,13 +528,11 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
-
response = yield self._request(
destination,
"DELETE",
path,
- query_bytes=encode_query_args(args),
- headers_dict={"Content-Type": ["application/json"]},
+ query=args,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -540,9 +543,9 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ body = yield treq.json_content(response)
- defer.returnValue(json.loads(body))
+ defer.returnValue(body)
@defer.inlineCallbacks
def get_file(self, destination, path, output_stream, args={},
@@ -569,26 +572,11 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
-
- encoded_args = {}
- for k, vs in args.items():
- if isinstance(vs, string_types):
- vs = [vs]
- encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
- query_bytes = urllib.urlencode(encoded_args, True)
- logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
-
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
-
response = yield self._request(
destination,
"GET",
path,
- query_bytes=query_bytes,
- body_callback=body_callback,
+ query=args,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@@ -639,30 +627,6 @@ def _readBodyToFile(response, stream, max_size):
return d
-class _JsonProducer(object):
- """ Used by the twisted http client to create the HTTP body from json
- """
- def __init__(self, jsn):
- self.reset(jsn)
-
- def reset(self, jsn):
- self.body = encode_canonical_json(jsn)
- self.length = len(self.body)
-
- def startProducing(self, consumer):
- consumer.write(self.body)
- return defer.succeed(None)
-
- def pauseProducing(self):
- pass
-
- def stopProducing(self):
- pass
-
- def resumeProducing(self):
- pass
-
-
def _flatten_response_never_received(e):
if hasattr(e, "reasons"):
reasons = ", ".join(
@@ -693,7 +657,7 @@ def check_content_type_is_json(headers):
"No Content-Type header"
)
- c_type = c_type[0] # only the first header
+ c_type = c_type[0].decode('ascii') # only the first header
val, options = cgi.parse_header(c_type)
if val != "application/json":
raise RuntimeError(
@@ -711,6 +675,6 @@ def encode_query_args(args):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
- query_bytes = urllib.urlencode(encoded_args, True)
+ query_bytes = urllib.parse.urlencode(encoded_args, True)
- return query_bytes
+ return query_bytes.encode('utf8')
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 88ed3714f9..f0828c6542 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -204,14 +204,14 @@ class SynapseRequest(Request):
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
- self.start_time, name=servlet_name, method=self.method,
+ self.start_time, name=servlet_name, method=self.method.decode('ascii'),
)
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
self.site.site_tag,
- self.method,
+ self.method.decode('ascii'),
self.get_redacted_uri()
)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 987eec3ef2..0d8de600cf 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -39,10 +39,11 @@ REQUIREMENTS = {
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
- "Twisted>=16.0.0": ["twisted>=16.0.0"],
+ "Twisted>=17.1.0": ["twisted>=17.1.0"],
+ "treq>=15.1": ["treq>=15.1"],
- # We use crypto.get_elliptic_curve which is only supported in >=0.15
- "pyopenssl>=0.15": ["OpenSSL>=0.15"],
+ # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
+ "pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
"pyyaml": ["yaml"],
"pyasn1": ["pyasn1"],
@@ -78,6 +79,9 @@ CONDITIONAL_REQUIREMENTS = {
"affinity": {
"affinity": ["affinity"],
},
+ "postgres": {
+ "psycopg2>=2.6": ["psycopg2"]
+ }
}
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 74e892c104..5dc7b3fffc 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -590,9 +590,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
pending_commands = LaterGauge(
"synapse_replication_tcp_protocol_pending_commands",
"",
- ["name", "conn_id"],
+ ["name"],
lambda: {
- (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
+ (p.name,): len(p.pending_commands) for p in connected_connections
},
)
@@ -607,9 +607,9 @@ def transport_buffer_size(protocol):
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_send_buffer",
"",
- ["name", "conn_id"],
+ ["name"],
lambda: {
- (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
+ (p.name,): transport_buffer_size(p) for p in connected_connections
},
)
@@ -632,9 +632,9 @@ def transport_kernel_read_buffer_size(protocol, read=True):
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_send_buffer",
"",
- ["name", "conn_id"],
+ ["name"],
lambda: {
- (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
+ (p.name,): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
)
@@ -643,9 +643,9 @@ tcp_transport_kernel_send_buffer = LaterGauge(
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_protocol_transport_kernel_read_buffer",
"",
- ["name", "conn_id"],
+ ["name"],
lambda: {
- (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
+ (p.name,): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
)
@@ -654,9 +654,9 @@ tcp_transport_kernel_read_buffer = LaterGauge(
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_inbound_commands",
"",
- ["command", "name", "conn_id"],
+ ["command", "name"],
lambda: {
- (k[0], p.name, p.conn_id): count
+ (k[0], p.name,): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
},
@@ -665,9 +665,9 @@ tcp_inbound_commands = LaterGauge(
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_protocol_outbound_commands",
"",
- ["command", "name", "conn_id"],
+ ["command", "name"],
lambda: {
- (k[0], p.name, p.conn_id): count
+ (k[0], p.name,): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
},
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index fcc1091760..976d98387d 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -531,7 +531,7 @@ class RoomEventServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
event = yield self.event_handler.get_event(requester.user, room_id, event_id)
time_now = self.clock.time_msec()
diff --git a/synapse/rest/client/v1_only/register.py b/synapse/rest/client/v1_only/register.py
index 5e99cffbcb..dadb376b02 100644
--- a/synapse/rest/client/v1_only/register.py
+++ b/synapse/rest/client/v1_only/register.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError
+from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.rest.client.v1.base import ClientV1RestServlet
from synapse.types import create_requester
@@ -281,12 +282,20 @@ class RegisterRestServlet(ClientV1RestServlet):
register_json["user"].encode("utf-8")
if "user" in register_json else None
)
+ threepid = None
+ if session.get(LoginType.EMAIL_IDENTITY):
+ threepid = session["threepidCreds"]
handler = self.handlers.registration_handler
(user_id, token) = yield handler.register(
localpart=desired_user_id,
- password=password
+ password=password,
+ threepid=threepid,
)
+ # Necessary due to auth checks prior to the threepid being
+ # written to the db
+ if is_threepid_reserved(self.hs.config, threepid):
+ yield self.store.upsert_monthly_active_user(user_id)
if session[LoginType.EMAIL_IDENTITY]:
logger.debug("Binding emails %s to %s" % (
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 372648cafd..37b32dd37b 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -53,7 +53,9 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
if not check_3pid_allowed(self.hs, "email", body['email']):
raise SynapseError(
- 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ 403,
+ "Your email domain is not authorized on this server",
+ Codes.THREEPID_DENIED,
)
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -89,7 +91,9 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
if not check_3pid_allowed(self.hs, "msisdn", msisdn):
raise SynapseError(
- 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ 403,
+ "Account phone numbers are not authorized on this server",
+ Codes.THREEPID_DENIED,
)
existingUid = yield self.datastore.get_user_id_by_threepid(
@@ -241,7 +245,9 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
if not check_3pid_allowed(self.hs, "email", body['email']):
raise SynapseError(
- 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ 403,
+ "Your email domain is not authorized on this server",
+ Codes.THREEPID_DENIED,
)
existingUid = yield self.datastore.get_user_id_by_threepid(
@@ -276,7 +282,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
if not check_3pid_allowed(self.hs, "msisdn", msisdn):
raise SynapseError(
- 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ 403,
+ "Account phone numbers are not authorized on this server",
+ Codes.THREEPID_DENIED,
)
existingUid = yield self.datastore.get_user_id_by_threepid(
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 2f64155d13..192f52e462 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -26,6 +26,7 @@ import synapse
import synapse.types
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError, UnrecognizedRequestError
+from synapse.config.server import is_threepid_reserved
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
@@ -74,7 +75,9 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
if not check_3pid_allowed(self.hs, "email", body['email']):
raise SynapseError(
- 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ 403,
+ "Your email domain is not authorized to register on this server",
+ Codes.THREEPID_DENIED,
)
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -114,7 +117,9 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
if not check_3pid_allowed(self.hs, "msisdn", msisdn):
raise SynapseError(
- 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ 403,
+ "Phone numbers are not authorized to register on this server",
+ Codes.THREEPID_DENIED,
)
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
@@ -372,7 +377,9 @@ class RegisterRestServlet(RestServlet):
if not check_3pid_allowed(self.hs, medium, address):
raise SynapseError(
- 403, "Third party identifier is not allowed",
+ 403,
+ "Third party identifiers (email/phone numbers)" +
+ " are not authorized on this server",
Codes.THREEPID_DENIED,
)
@@ -395,12 +402,21 @@ class RegisterRestServlet(RestServlet):
if desired_username is not None:
desired_username = desired_username.lower()
+ threepid = None
+ if auth_result:
+ threepid = auth_result.get(LoginType.EMAIL_IDENTITY)
+
(registered_user_id, _) = yield self.registration_handler.register(
localpart=desired_username,
password=new_password,
guest_access_token=guest_access_token,
generate_token=False,
+ threepid=threepid,
)
+ # Necessary due to auth checks prior to the threepid being
+ # written to the db
+ if is_threepid_reserved(self.hs.config, threepid):
+ yield self.store.upsert_monthly_active_user(registered_user_id)
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 1275baa1ba..263d8eb73e 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -25,6 +25,7 @@ from synapse.api.errors import SynapseError
from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
from synapse.events.utils import (
format_event_for_client_v2_without_room_id,
+ format_event_raw,
serialize_event,
)
from synapse.handlers.presence import format_user_presence_state
@@ -175,17 +176,28 @@ class SyncRestServlet(RestServlet):
@staticmethod
def encode_response(time_now, sync_result, access_token_id, filter):
+ if filter.event_format == 'client':
+ event_formatter = format_event_for_client_v2_without_room_id
+ elif filter.event_format == 'federation':
+ event_formatter = format_event_raw
+ else:
+ raise Exception("Unknown event format %s" % (filter.event_format, ))
+
joined = SyncRestServlet.encode_joined(
- sync_result.joined, time_now, access_token_id, filter.event_fields
+ sync_result.joined, time_now, access_token_id,
+ filter.event_fields,
+ event_formatter,
)
invited = SyncRestServlet.encode_invited(
sync_result.invited, time_now, access_token_id,
+ event_formatter,
)
archived = SyncRestServlet.encode_archived(
sync_result.archived, time_now, access_token_id,
filter.event_fields,
+ event_formatter,
)
return {
@@ -228,7 +240,7 @@ class SyncRestServlet(RestServlet):
}
@staticmethod
- def encode_joined(rooms, time_now, token_id, event_fields):
+ def encode_joined(rooms, time_now, token_id, event_fields, event_formatter):
"""
Encode the joined rooms in a sync result
@@ -240,7 +252,9 @@ class SyncRestServlet(RestServlet):
token_id(int): ID of the user's auth token - used for namespacing
of transaction IDs
event_fields(list<str>): List of event fields to include. If empty,
- all fields will be returned.
+ all fields will be returned.
+ event_formatter (func[dict]): function to convert from federation format
+ to client format
Returns:
dict[str, dict[str, object]]: the joined rooms list, in our
response format
@@ -248,13 +262,14 @@ class SyncRestServlet(RestServlet):
joined = {}
for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room(
- room, time_now, token_id, only_fields=event_fields
+ room, time_now, token_id, joined=True, only_fields=event_fields,
+ event_formatter=event_formatter,
)
return joined
@staticmethod
- def encode_invited(rooms, time_now, token_id):
+ def encode_invited(rooms, time_now, token_id, event_formatter):
"""
Encode the invited rooms in a sync result
@@ -264,7 +279,9 @@ class SyncRestServlet(RestServlet):
time_now(int): current time - used as a baseline for age
calculations
token_id(int): ID of the user's auth token - used for namespacing
- of transaction IDs
+ of transaction IDs
+ event_formatter (func[dict]): function to convert from federation format
+ to client format
Returns:
dict[str, dict[str, object]]: the invited rooms list, in our
@@ -274,7 +291,7 @@ class SyncRestServlet(RestServlet):
for room in rooms:
invite = serialize_event(
room.invite, time_now, token_id=token_id,
- event_format=format_event_for_client_v2_without_room_id,
+ event_format=event_formatter,
is_invite=True,
)
unsigned = dict(invite.get("unsigned", {}))
@@ -288,7 +305,7 @@ class SyncRestServlet(RestServlet):
return invited
@staticmethod
- def encode_archived(rooms, time_now, token_id, event_fields):
+ def encode_archived(rooms, time_now, token_id, event_fields, event_formatter):
"""
Encode the archived rooms in a sync result
@@ -300,7 +317,9 @@ class SyncRestServlet(RestServlet):
token_id(int): ID of the user's auth token - used for namespacing
of transaction IDs
event_fields(list<str>): List of event fields to include. If empty,
- all fields will be returned.
+ all fields will be returned.
+ event_formatter (func[dict]): function to convert from federation format
+ to client format
Returns:
dict[str, dict[str, object]]: The invited rooms list, in our
response format
@@ -308,13 +327,18 @@ class SyncRestServlet(RestServlet):
joined = {}
for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room(
- room, time_now, token_id, joined=False, only_fields=event_fields
+ room, time_now, token_id, joined=False,
+ only_fields=event_fields,
+ event_formatter=event_formatter,
)
return joined
@staticmethod
- def encode_room(room, time_now, token_id, joined=True, only_fields=None):
+ def encode_room(
+ room, time_now, token_id, joined,
+ only_fields, event_formatter,
+ ):
"""
Args:
room (JoinedSyncResult|ArchivedSyncResult): sync result for a
@@ -326,14 +350,15 @@ class SyncRestServlet(RestServlet):
joined (bool): True if the user is joined to this room - will mean
we handle ephemeral events
only_fields(list<str>): Optional. The list of event fields to include.
+ event_formatter (func[dict]): function to convert from federation format
+ to client format
Returns:
dict[str, object]: the room, encoded in our response format
"""
def serialize(event):
- # TODO(mjark): Respect formatting requirements in the filter.
return serialize_event(
event, time_now, token_id=token_id,
- event_format=format_event_for_client_v2_without_room_id,
+ event_format=event_formatter,
only_event_fields=only_fields,
)
diff --git a/synapse/server.py b/synapse/server.py
index a6fbc6ec0c..938a05f9dc 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -19,6 +19,7 @@
# partial one for unit test mocking.
# Imports required for the default HomeServer() implementation
+import abc
import logging
from twisted.enterprise import adbapi
@@ -81,7 +82,6 @@ from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
from synapse.state import StateHandler, StateResolutionHandler
-from synapse.storage import DataStore
from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
@@ -111,6 +111,8 @@ class HomeServer(object):
config (synapse.config.homeserver.HomeserverConfig):
"""
+ __metaclass__ = abc.ABCMeta
+
DEPENDENCIES = [
'http_client',
'db_pool',
@@ -172,6 +174,11 @@ class HomeServer(object):
'room_context_handler',
]
+ # This is overridden in derived application classes
+ # (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
+ # instantiated during setup() for future return by get_datastore()
+ DATASTORE_CLASS = abc.abstractproperty()
+
def __init__(self, hostname, reactor=None, **kwargs):
"""
Args:
@@ -188,13 +195,16 @@ class HomeServer(object):
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
+ self.datastore = None
+
# Other kwargs are explicit dependencies
for depname in kwargs:
setattr(self, depname, kwargs[depname])
def setup(self):
logger.info("Setting up.")
- self.datastore = DataStore(self.get_db_conn(), self)
+ with self.get_db_conn() as conn:
+ self.datastore = self.DATASTORE_CLASS(conn, self)
logger.info("Finished setting up.")
def get_reactor(self):
diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py
new file mode 100644
index 0000000000..af15cba0ee
--- /dev/null
+++ b/synapse/server_notices/resource_limits_server_notices.py
@@ -0,0 +1,203 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from six import iteritems
+
+from twisted.internet import defer
+
+from synapse.api.constants import (
+ EventTypes,
+ ServerNoticeLimitReached,
+ ServerNoticeMsgType,
+)
+from synapse.api.errors import AuthError, ResourceLimitError, SynapseError
+from synapse.server_notices.server_notices_manager import SERVER_NOTICE_ROOM_TAG
+
+logger = logging.getLogger(__name__)
+
+
+class ResourceLimitsServerNotices(object):
+ """ Keeps track of whether the server has reached it's resource limit and
+ ensures that the client is kept up to date.
+ """
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer):
+ """
+ self._server_notices_manager = hs.get_server_notices_manager()
+ self._store = hs.get_datastore()
+ self._auth = hs.get_auth()
+ self._config = hs.config
+ self._resouce_limited = False
+ self._message_handler = hs.get_message_handler()
+ self._state = hs.get_state_handler()
+
+ self._notifier = hs.get_notifier()
+
+ @defer.inlineCallbacks
+ def maybe_send_server_notice_to_user(self, user_id):
+ """Check if we need to send a notice to this user, this will be true in
+ two cases.
+ 1. The server has reached its limit does not reflect this
+ 2. The room state indicates that the server has reached its limit when
+ actually the server is fine
+
+ Args:
+ user_id (str): user to check
+
+ Returns:
+ Deferred
+ """
+ if self._config.hs_disabled is True:
+ return
+
+ if self._config.limit_usage_by_mau is False:
+ return
+
+ if not self._server_notices_manager.is_enabled():
+ # Don't try and send server notices unles they've been enabled
+ return
+
+ timestamp = yield self._store.user_last_seen_monthly_active(user_id)
+ if timestamp is None:
+ # This user will be blocked from receiving the notice anyway.
+ # In practice, not sure we can ever get here
+ return
+
+ # Determine current state of room
+
+ room_id = yield self._server_notices_manager.get_notice_room_for_user(user_id)
+
+ if not room_id:
+ logger.warn("Failed to get server notices room")
+ return
+
+ yield self._check_and_set_tags(user_id, room_id)
+ currently_blocked, ref_events = yield self._is_room_currently_blocked(room_id)
+
+ try:
+ # Normally should always pass in user_id if you have it, but in
+ # this case are checking what would happen to other users if they
+ # were to arrive.
+ try:
+ yield self._auth.check_auth_blocking()
+ is_auth_blocking = False
+ except ResourceLimitError as e:
+ is_auth_blocking = True
+ event_content = e.msg
+ event_limit_type = e.limit_type
+
+ if currently_blocked and not is_auth_blocking:
+ # Room is notifying of a block, when it ought not to be.
+ # Remove block notification
+ content = {
+ "pinned": ref_events
+ }
+ yield self._server_notices_manager.send_notice(
+ user_id, content, EventTypes.Pinned, '',
+ )
+
+ elif not currently_blocked and is_auth_blocking:
+ # Room is not notifying of a block, when it ought to be.
+ # Add block notification
+ content = {
+ 'body': event_content,
+ 'msgtype': ServerNoticeMsgType,
+ 'server_notice_type': ServerNoticeLimitReached,
+ 'admin_contact': self._config.admin_contact,
+ 'limit_type': event_limit_type
+ }
+ event = yield self._server_notices_manager.send_notice(
+ user_id, content, EventTypes.Message,
+ )
+
+ content = {
+ "pinned": [
+ event.event_id,
+ ]
+ }
+ yield self._server_notices_manager.send_notice(
+ user_id, content, EventTypes.Pinned, '',
+ )
+
+ except SynapseError as e:
+ logger.error("Error sending resource limits server notice: %s", e)
+
+ @defer.inlineCallbacks
+ def _check_and_set_tags(self, user_id, room_id):
+ """
+ Since server notices rooms were originally not with tags,
+ important to check that tags have been set correctly
+ Args:
+ user_id(str): the user in question
+ room_id(str): the server notices room for that user
+ """
+ tags = yield self._store.get_tags_for_room(user_id, room_id)
+ need_to_set_tag = True
+ if tags:
+ if SERVER_NOTICE_ROOM_TAG in tags:
+ # tag already present, nothing to do here
+ need_to_set_tag = False
+ if need_to_set_tag:
+ max_id = yield self._store.add_tag_to_room(
+ user_id, room_id, SERVER_NOTICE_ROOM_TAG, {}
+ )
+ self._notifier.on_new_event(
+ "account_data_key", max_id, users=[user_id]
+ )
+
+ @defer.inlineCallbacks
+ def _is_room_currently_blocked(self, room_id):
+ """
+ Determines if the room is currently blocked
+
+ Args:
+ room_id(str): The room id of the server notices room
+
+ Returns:
+
+ bool: Is the room currently blocked
+ list: The list of pinned events that are unrelated to limit blocking
+ This list can be used as a convenience in the case where the block
+ is to be lifted and the remaining pinned event references need to be
+ preserved
+ """
+ currently_blocked = False
+ pinned_state_event = None
+ try:
+ pinned_state_event = yield self._state.get_current_state(
+ room_id, event_type=EventTypes.Pinned
+ )
+ except AuthError:
+ # The user has yet to join the server notices room
+ pass
+
+ referenced_events = []
+ if pinned_state_event is not None:
+ referenced_events = list(pinned_state_event.content.get('pinned', []))
+
+ events = yield self._store.get_events(referenced_events)
+ for event_id, event in iteritems(events):
+ if event.type != EventTypes.Message:
+ continue
+ if event.content.get("msgtype") == ServerNoticeMsgType:
+ currently_blocked = True
+ # remove event in case we need to disable blocking later on.
+ if event_id in referenced_events:
+ referenced_events.remove(event.event_id)
+
+ defer.returnValue((currently_blocked, referenced_events))
diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py
index a26deace53..c5cc6d728e 100644
--- a/synapse/server_notices/server_notices_manager.py
+++ b/synapse/server_notices/server_notices_manager.py
@@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks
logger = logging.getLogger(__name__)
+SERVER_NOTICE_ROOM_TAG = "m.server_notice"
+
class ServerNoticesManager(object):
def __init__(self, hs):
@@ -37,6 +39,8 @@ class ServerNoticesManager(object):
self._event_creation_handler = hs.get_event_creation_handler()
self._is_mine_id = hs.is_mine_id
+ self._notifier = hs.get_notifier()
+
def is_enabled(self):
"""Checks if server notices are enabled on this server.
@@ -46,7 +50,10 @@ class ServerNoticesManager(object):
return self._config.server_notices_mxid is not None
@defer.inlineCallbacks
- def send_notice(self, user_id, event_content):
+ def send_notice(
+ self, user_id, event_content,
+ type=EventTypes.Message, state_key=None
+ ):
"""Send a notice to the given user
Creates the server notices room, if none exists.
@@ -54,9 +61,11 @@ class ServerNoticesManager(object):
Args:
user_id (str): mxid of user to send event to.
event_content (dict): content of event to send
+ type(EventTypes): type of event
+ is_state_event(bool): Is the event a state event
Returns:
- Deferred[None]
+ Deferred[FrozenEvent]
"""
room_id = yield self.get_notice_room_for_user(user_id)
@@ -65,15 +74,20 @@ class ServerNoticesManager(object):
logger.info("Sending server notice to %s", user_id)
- yield self._event_creation_handler.create_and_send_nonmember_event(
- requester, {
- "type": EventTypes.Message,
- "room_id": room_id,
- "sender": system_mxid,
- "content": event_content,
- },
- ratelimit=False,
+ event_dict = {
+ "type": type,
+ "room_id": room_id,
+ "sender": system_mxid,
+ "content": event_content,
+ }
+
+ if state_key is not None:
+ event_dict['state_key'] = state_key
+
+ res = yield self._event_creation_handler.create_and_send_nonmember_event(
+ requester, event_dict, ratelimit=False,
)
+ defer.returnValue(res)
@cachedInlineCallbacks()
def get_notice_room_for_user(self, user_id):
@@ -142,5 +156,12 @@ class ServerNoticesManager(object):
)
room_id = info['room_id']
+ max_id = yield self._store.add_tag_to_room(
+ user_id, room_id, SERVER_NOTICE_ROOM_TAG, {},
+ )
+ self._notifier.on_new_event(
+ "account_data_key", max_id, users=[user_id]
+ )
+
logger.info("Created server notices room %s for %s", room_id, user_id)
defer.returnValue(room_id)
diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py
index 5d23965f34..6121b2f267 100644
--- a/synapse/server_notices/server_notices_sender.py
+++ b/synapse/server_notices/server_notices_sender.py
@@ -12,7 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
+
from synapse.server_notices.consent_server_notices import ConsentServerNotices
+from synapse.server_notices.resource_limits_server_notices import (
+ ResourceLimitsServerNotices,
+)
class ServerNoticesSender(object):
@@ -25,34 +30,34 @@ class ServerNoticesSender(object):
Args:
hs (synapse.server.HomeServer):
"""
- # todo: it would be nice to make this more dynamic
- self._consent_server_notices = ConsentServerNotices(hs)
+ self._server_notices = (
+ ConsentServerNotices(hs),
+ ResourceLimitsServerNotices(hs)
+ )
+ @defer.inlineCallbacks
def on_user_syncing(self, user_id):
"""Called when the user performs a sync operation.
Args:
user_id (str): mxid of user who synced
-
- Returns:
- Deferred
"""
- return self._consent_server_notices.maybe_send_server_notice_to_user(
- user_id,
- )
+ for sn in self._server_notices:
+ yield sn.maybe_send_server_notice_to_user(
+ user_id,
+ )
+ @defer.inlineCallbacks
def on_user_ip(self, user_id):
"""Called on the master when a worker process saw a client request.
Args:
user_id (str): mxid
-
- Returns:
- Deferred
"""
# The synchrotrons use a stubbed version of ServerNoticesSender, so
# we check for notices to send to the user in on_user_ip as well as
# in on_user_syncing
- return self._consent_server_notices.maybe_send_server_notice_to_user(
- user_id,
- )
+ for sn in self._server_notices:
+ yield sn.maybe_send_server_notice_to_user(
+ user_id,
+ )
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index b34970e4d1..d7ae22a661 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -385,6 +385,7 @@ class StateHandler(object):
ev_ids, get_prev_content=False, check_redacted=False,
)
+ @defer.inlineCallbacks
def resolve_events(self, room_version, state_sets, event):
logger.info(
"Resolving state for %s with %d groups", event.room_id, len(state_sets)
@@ -401,15 +402,17 @@ class StateHandler(object):
}
with Measure(self.clock, "state._resolve_events"):
- new_state = resolve_events_with_state_map(
- room_version, state_set_ids, state_map,
+ new_state = yield resolve_events_with_factory(
+ room_version, state_set_ids,
+ event_map=state_map,
+ state_map_factory=self._state_map_factory
)
new_state = {
key: state_map[ev_id] for key, ev_id in iteritems(new_state)
}
- return new_state
+ defer.returnValue(new_state)
class StateResolutionHandler(object):
@@ -589,31 +592,6 @@ def _make_state_cache_entry(
)
-def resolve_events_with_state_map(room_version, state_sets, state_map):
- """
- Args:
- room_version(str): Version of the room
- state_sets(list): List of dicts of (type, state_key) -> event_id,
- which are the different state groups to resolve.
- state_map(dict): a dict from event_id to event, for all events in
- state_sets.
-
- Returns
- dict[(str, str), str]:
- a map from (type, state_key) to event_id.
- """
- if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
- return v1.resolve_events_with_state_map(
- state_sets, state_map,
- )
- else:
- # This should only happen if we added a version but forgot to add it to
- # the list above.
- raise Exception(
- "No state resolution algorithm defined for version %r" % (room_version,)
- )
-
-
def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
"""
Args:
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 3a1f7054a1..c95477d318 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -30,34 +30,6 @@ logger = logging.getLogger(__name__)
POWER_KEY = (EventTypes.PowerLevels, "")
-def resolve_events_with_state_map(state_sets, state_map):
- """
- Args:
- state_sets(list): List of dicts of (type, state_key) -> event_id,
- which are the different state groups to resolve.
- state_map(dict): a dict from event_id to event, for all events in
- state_sets.
-
- Returns
- dict[(str, str), str]:
- a map from (type, state_key) to event_id.
- """
- if len(state_sets) == 1:
- return state_sets[0]
-
- unconflicted_state, conflicted_state = _seperate(
- state_sets,
- )
-
- auth_events = _create_auth_events_from_maps(
- unconflicted_state, conflicted_state, state_map
- )
-
- return _resolve_with_state(
- unconflicted_state, conflicted_state, auth_events, state_map
- )
-
-
@defer.inlineCallbacks
def resolve_events_with_factory(state_sets, event_map, state_map_factory):
"""
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 08dffd774f..be61147b9b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -17,9 +17,10 @@ import sys
import threading
import time
-from six import iteritems, iterkeys, itervalues
+from six import PY2, iteritems, iterkeys, itervalues
from six.moves import intern, range
+from canonicaljson import json
from prometheus_client import Histogram
from twisted.internet import defer
@@ -1216,3 +1217,32 @@ class _RollbackButIsFineException(Exception):
something went wrong.
"""
pass
+
+
+def db_to_json(db_content):
+ """
+ Take some data from a database row and return a JSON-decoded object.
+
+ Args:
+ db_content (memoryview|buffer|bytes|bytearray|unicode)
+ """
+ # psycopg2 on Python 3 returns memoryview objects, which we need to
+ # cast to bytes to decode
+ if isinstance(db_content, memoryview):
+ db_content = db_content.tobytes()
+
+ # psycopg2 on Python 2 returns buffer objects, which we need to cast to
+ # bytes to decode
+ if PY2 and isinstance(db_content, buffer):
+ db_content = bytes(db_content)
+
+ # Decode it to a Unicode string before feeding it to json.loads, so we
+ # consistenty get a Unicode-containing object out.
+ if isinstance(db_content, (bytes, bytearray)):
+ db_content = db_content.decode('utf8')
+
+ try:
+ return json.loads(db_content)
+ except Exception:
+ logging.warning("Tried to decode '%r' as JSON and failed", db_content)
+ raise
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 73646da025..e06b0bc56d 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -169,7 +169,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
messages_json_for_user = {}
- devices = messages_by_device.keys()
+ devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
# Handle wildcard device_ids.
sql = (
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index c0943ecf91..d10ff9e4b9 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -24,7 +24,7 @@ from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
-from ._base import Cache, SQLBaseStore
+from ._base import Cache, SQLBaseStore, db_to_json
logger = logging.getLogger(__name__)
@@ -411,7 +411,7 @@ class DeviceStore(SQLBaseStore):
if device is not None:
key_json = device.get("key_json", None)
if key_json:
- result["keys"] = json.loads(key_json)
+ result["keys"] = db_to_json(key_json)
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name
@@ -466,7 +466,7 @@ class DeviceStore(SQLBaseStore):
retcol="content",
desc="_get_cached_user_device",
)
- defer.returnValue(json.loads(content))
+ defer.returnValue(db_to_json(content))
@cachedInlineCallbacks()
def _get_cached_devices_for_user(self, user_id):
@@ -479,7 +479,7 @@ class DeviceStore(SQLBaseStore):
desc="_get_cached_devices_for_user",
)
defer.returnValue({
- device["device_id"]: json.loads(device["content"])
+ device["device_id"]: db_to_json(device["content"])
for device in devices
})
@@ -511,7 +511,7 @@ class DeviceStore(SQLBaseStore):
key_json = device.get("key_json", None)
if key_json:
- result["keys"] = json.loads(key_json)
+ result["keys"] = db_to_json(key_json)
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 523b4360c3..1f1721e820 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -14,13 +14,13 @@
# limitations under the License.
from six import iteritems
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, db_to_json
class EndToEndKeyStore(SQLBaseStore):
@@ -90,7 +90,7 @@ class EndToEndKeyStore(SQLBaseStore):
for user_id, device_keys in iteritems(results):
for device_id, device_info in iteritems(device_keys):
- device_info["keys"] = json.loads(device_info.pop("key_json"))
+ device_info["keys"] = db_to_json(device_info.pop("key_json"))
defer.returnValue(results)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 8a0386c1a4..42225f8a2a 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -41,13 +41,18 @@ class PostgresEngine(object):
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
+
+ # Set the bytea output to escape, vs the default of hex
+ cursor = db_conn.cursor()
+ cursor.execute("SET bytea_output TO escape")
+
# Asynchronous commit, don't wait for the server to call fsync before
# ending the transaction.
# https://www.postgresql.org/docs/current/static/wal-async-commit.html
if not self.synchronous_commit:
- cursor = db_conn.cursor()
cursor.execute("SET synchronous_commit TO OFF")
- cursor.close()
+
+ cursor.close()
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f39c8c8461..8bf87f38f7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
from collections import OrderedDict, deque, namedtuple
from functools import wraps
-from six import iteritems
+from six import iteritems, text_type
from six.moves import range
from canonicaljson import json
@@ -1220,7 +1220,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
"sender": event.sender,
"contains_url": (
"url" in event.content
- and isinstance(event.content["url"], basestring)
+ and isinstance(event.content["url"], text_type)
),
}
for event, _ in events_and_contexts
@@ -1529,7 +1529,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
contains_url = "url" in content
if contains_url:
- contains_url &= isinstance(content["url"], basestring)
+ contains_url &= isinstance(content["url"], text_type)
except (KeyError, AttributeError):
# If the event is missing a necessary field then
# skip over it.
@@ -1910,9 +1910,9 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
(room_id,)
)
rows = txn.fetchall()
- max_depth = max(row[0] for row in rows)
+ max_depth = max(row[1] for row in rows)
- if max_depth <= token.topological:
+ if max_depth < token.topological:
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 59822178ff..a8326f5296 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import itertools
import logging
from collections import namedtuple
@@ -265,7 +266,7 @@ class EventsWorkerStore(SQLBaseStore):
"""
with Measure(self._clock, "_fetch_event_list"):
try:
- event_id_lists = zip(*event_list)[0]
+ event_id_lists = list(zip(*event_list))[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
@@ -299,14 +300,14 @@ class EventsWorkerStore(SQLBaseStore):
logger.exception("do_fetch")
# We only want to resolve deferreds from the main thread
- def fire(evs):
+ def fire(evs, exc):
for _, d in evs:
if not d.called:
with PreserveLoggingContext():
- d.errback(e)
+ d.errback(exc)
with PreserveLoggingContext():
- self.hs.get_reactor().callFromThread(fire, event_list)
+ self.hs.get_reactor().callFromThread(fire, event_list, e)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 2d5896c5b4..6ddcc909bf 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.api.errors import Codes, SynapseError
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, db_to_json
class FilteringStore(SQLBaseStore):
@@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore):
desc="get_user_filter",
)
- defer.returnValue(json.loads(bytes(def_json).decode("utf-8")))
+ defer.returnValue(db_to_json(def_json))
def add_user_filter(self, user_localpart, user_filter):
def_json = encode_canonical_json(user_filter)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 06f9a75a97..b890c152db 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -36,7 +36,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks
def initialise_reserved_users(self, threepids):
- # TODO Why can't I do this in init?
store = self.hs.get_datastore()
reserved_user_list = []
@@ -147,6 +146,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
return count
return self.runInteraction("count_users", _count_users)
+ @defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
"""
Updates or inserts monthly active user member
@@ -155,7 +155,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
"""
- is_insert = self._simple_upsert(
+ is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
table="monthly_active_users",
keyvalues={
@@ -199,7 +199,16 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Args:
user_id(str): the user_id to query
"""
+
if self.hs.config.limit_usage_by_mau:
+ # Trial users and guests should not be included as part of MAU group
+ is_guest = yield self.is_guest(user_id)
+ if is_guest:
+ return
+ is_trial = yield self.is_trial_user(user_id)
+ if is_trial:
+ return
+
last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
now = self.hs.get_clock().time_msec()
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 8443bd4c1b..c7987bfcdd 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -15,7 +15,8 @@
# limitations under the License.
import logging
-import types
+
+import six
from canonicaljson import encode_canonical_json, json
@@ -27,6 +28,11 @@ from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
+if six.PY2:
+ db_binary_type = buffer
+else:
+ db_binary_type = memoryview
+
class PusherWorkerStore(SQLBaseStore):
def _decode_pushers_rows(self, rows):
@@ -34,18 +40,18 @@ class PusherWorkerStore(SQLBaseStore):
dataJson = r['data']
r['data'] = None
try:
- if isinstance(dataJson, types.BufferType):
+ if isinstance(dataJson, db_binary_type):
dataJson = str(dataJson).decode("UTF8")
r['data'] = json.loads(dataJson)
except Exception as e:
logger.warn(
"Invalid JSON in data for pusher %d: %s, %s",
- r['id'], dataJson, e.message,
+ r['id'], dataJson, e.args[0],
)
pass
- if isinstance(r['pushkey'], types.BufferType):
+ if isinstance(r['pushkey'], db_binary_type):
r['pushkey'] = str(r['pushkey']).decode("UTF8")
return rows
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 07333f777d..26b429e307 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -26,6 +26,11 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
class RegistrationWorkerStore(SQLBaseStore):
+ def __init__(self, db_conn, hs):
+ super(RegistrationWorkerStore, self).__init__(db_conn, hs)
+
+ self.config = hs.config
+
@cached()
def get_user_by_id(self, user_id):
return self._simple_select_one(
@@ -36,12 +41,33 @@ class RegistrationWorkerStore(SQLBaseStore):
retcols=[
"name", "password_hash", "is_guest",
"consent_version", "consent_server_notice_sent",
- "appservice_id",
+ "appservice_id", "creation_ts",
],
allow_none=True,
desc="get_user_by_id",
)
+ @defer.inlineCallbacks
+ def is_trial_user(self, user_id):
+ """Checks if user is in the "trial" period, i.e. within the first
+ N days of registration defined by `mau_trial_days` config
+
+ Args:
+ user_id (str)
+
+ Returns:
+ Deferred[bool]
+ """
+
+ info = yield self.get_user_by_id(user_id)
+ if not info:
+ defer.returnValue(False)
+
+ now = self.clock.time_msec()
+ trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000
+ is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
+ defer.returnValue(is_trial)
+
@cached()
def get_user_by_access_token(self, token):
"""Get a user from the given access token.
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 428e7fa36e..0c42bd3322 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -18,14 +18,14 @@ from collections import namedtuple
import six
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, db_to_json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
@@ -95,7 +95,8 @@ class TransactionStore(SQLBaseStore):
)
if result and result["response_code"]:
- return result["response_code"], json.loads(str(result["response_json"]))
+ return result["response_code"], db_to_json(result["response_json"])
+
else:
return None
|