diff --git a/.circleci/config.yml b/.circleci/config.yml
new file mode 100644
index 0000000000..e03f01b837
--- /dev/null
+++ b/.circleci/config.yml
@@ -0,0 +1,48 @@
+version: 2
+jobs:
+ sytestpy2:
+ machine: true
+ steps:
+ - checkout
+ - run: docker pull matrixdotorg/sytest-synapsepy2
+ - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
+ - store_artifacts:
+ path: ~/project/logs
+ destination: logs
+ sytestpy2postgres:
+ machine: true
+ steps:
+ - checkout
+ - run: docker pull matrixdotorg/sytest-synapsepy2
+ - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
+ - store_artifacts:
+ path: ~/project/logs
+ destination: logs
+ sytestpy3:
+ machine: true
+ steps:
+ - checkout
+ - run: docker pull matrixdotorg/sytest-synapsepy3
+ - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
+ - store_artifacts:
+ path: ~/project/logs
+ destination: logs
+ sytestpy3postgres:
+ machine: true
+ steps:
+ - checkout
+ - run: docker pull matrixdotorg/sytest-synapsepy3
+ - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
+ - store_artifacts:
+ path: ~/project/logs
+ destination: logs
+
+workflows:
+ version: 2
+ build:
+ jobs:
+ - sytestpy2
+ - sytestpy2postgres
+# Currently broken while the Python 3 port is incomplete
+# - sytestpy3
+# - sytestpy3postgres
diff --git a/.dockerignore b/.dockerignore
index f36f86fbb7..6cdb8532d3 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -3,3 +3,6 @@ Dockerfile
.gitignore
demo/etc
tox.ini
+synctl
+.git/*
+.tox/*
diff --git a/MANIFEST.in b/MANIFEST.in
index 1ff98d95df..e0826ba544 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -36,3 +36,4 @@ recursive-include changelog.d *
prune .github
prune demo/etc
prune docker
+prune .circleci
diff --git a/changelog.d/3568.feature b/changelog.d/3568.feature
new file mode 100644
index 0000000000..247f02ba4e
--- /dev/null
+++ b/changelog.d/3568.feature
@@ -0,0 +1 @@
+speed up /members API and add `at` and `membership` params as per MSC1227
diff --git a/changelog.d/3653.feature b/changelog.d/3653.feature
new file mode 100644
index 0000000000..6c5422994f
--- /dev/null
+++ b/changelog.d/3653.feature
@@ -0,0 +1 @@
+Support more federation endpoints on workers
diff --git a/changelog.d/3660.misc b/changelog.d/3660.misc
new file mode 100644
index 0000000000..acd814c273
--- /dev/null
+++ b/changelog.d/3660.misc
@@ -0,0 +1 @@
+Sytests can now be run inside a Docker container.
diff --git a/changelog.d/3661.bugfix b/changelog.d/3661.bugfix
new file mode 100644
index 0000000000..f2b4703d80
--- /dev/null
+++ b/changelog.d/3661.bugfix
@@ -0,0 +1 @@
+Fix bug on deleting 3pid when using identity servers that don't support unbind API
diff --git a/changelog.d/3669.misc b/changelog.d/3669.misc
new file mode 100644
index 0000000000..fc579ddc60
--- /dev/null
+++ b/changelog.d/3669.misc
@@ -0,0 +1 @@
+Update docker base image from alpine 3.7 to 3.8.
diff --git a/changelog.d/3670.feature b/changelog.d/3670.feature
new file mode 100644
index 0000000000..ba00f2d2ec
--- /dev/null
+++ b/changelog.d/3670.feature
@@ -0,0 +1 @@
+Where server is disabled, block ability for locked out users to read new messages
diff --git a/changelog.d/3681.bugfix b/changelog.d/3681.bugfix
new file mode 100644
index 0000000000..d18a69cd0c
--- /dev/null
+++ b/changelog.d/3681.bugfix
@@ -0,0 +1 @@
+Fixes test_reap_monthly_active_users so it passes under postgres
diff --git a/changelog.d/3684.misc b/changelog.d/3684.misc
new file mode 100644
index 0000000000..4c013263c4
--- /dev/null
+++ b/changelog.d/3684.misc
@@ -0,0 +1 @@
+Implemented a new testing base class to reduce test boilerplate.
diff --git a/changelog.d/3687.feature b/changelog.d/3687.feature
new file mode 100644
index 0000000000..64b89f6411
--- /dev/null
+++ b/changelog.d/3687.feature
@@ -0,0 +1 @@
+set admin uri via config, to be used in error messages where the user should contact the administrator
diff --git a/changelog.d/3689.bugfix b/changelog.d/3689.bugfix
new file mode 100644
index 0000000000..934d039836
--- /dev/null
+++ b/changelog.d/3689.bugfix
@@ -0,0 +1 @@
+Fix mau blocking calulation bug on login
diff --git a/changelog.d/3690.misc b/changelog.d/3690.misc
new file mode 100644
index 0000000000..710add0243
--- /dev/null
+++ b/changelog.d/3690.misc
@@ -0,0 +1 @@
+Rename MAU prometheus metrics
diff --git a/changelog.d/3692.bugfix b/changelog.d/3692.bugfix
new file mode 100644
index 0000000000..f44e13dca1
--- /dev/null
+++ b/changelog.d/3692.bugfix
@@ -0,0 +1 @@
+Fix missing yield in synapse.storage.monthly_active_users.initialise_reserved_users
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 26fb3a6bff..777976217d 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -1,4 +1,4 @@
-FROM docker.io/python:2-alpine3.7
+FROM docker.io/python:2-alpine3.8
RUN apk add --no-cache --virtual .nacl_deps \
build-base \
diff --git a/docs/workers.rst b/docs/workers.rst
index c5b37c3ded..ac9efb621f 100644
--- a/docs/workers.rst
+++ b/docs/workers.rst
@@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
^/_matrix/federation/v1/backfill/
^/_matrix/federation/v1/get_missing_events/
^/_matrix/federation/v1/publicRooms
+ ^/_matrix/federation/v1/query/
+ ^/_matrix/federation/v1/make_join/
+ ^/_matrix/federation/v1/make_leave/
+ ^/_matrix/federation/v1/send_join/
+ ^/_matrix/federation/v1/send_leave/
+ ^/_matrix/federation/v1/invite/
+ ^/_matrix/federation/v1/query_auth/
+ ^/_matrix/federation/v1/event_auth/
+ ^/_matrix/federation/v1/exchange_third_party_invite/
+ ^/_matrix/federation/v1/send/
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
+The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
+instance.
+
``synapse.app.federation_sender``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9c62ec4374..3b2a2ab77a 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -775,17 +775,31 @@ class Auth(object):
)
@defer.inlineCallbacks
- def check_auth_blocking(self):
+ def check_auth_blocking(self, user_id=None):
"""Checks if the user should be rejected for some external reason,
such as monthly active user limiting or global disable flag
+
+ Args:
+ user_id(str|None): If present, checks for presence against existing
+ MAU cohort
"""
if self.hs.config.hs_disabled:
raise AuthError(
- 403, self.hs.config.hs_disabled_message, errcode=Codes.HS_DISABLED
+ 403, self.hs.config.hs_disabled_message,
+ errcode=Codes.RESOURCE_LIMIT_EXCEED,
+ admin_uri=self.hs.config.admin_uri,
)
if self.hs.config.limit_usage_by_mau is True:
+ # If the user is already part of the MAU cohort
+ if user_id:
+ timestamp = yield self.store.user_last_seen_monthly_active(user_id)
+ if timestamp:
+ return
+ # Else if there is no room in the MAU bucket, bail
current_mau = yield self.store.get_monthly_active_count()
if current_mau >= self.hs.config.max_mau_value:
raise AuthError(
- 403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
+ 403, "Monthly Active User Limits AU Limit Exceeded",
+ admin_uri=self.hs.config.admin_uri,
+ errcode=Codes.RESOURCE_LIMIT_EXCEED
)
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index dc3bed5fcb..08f0cb5554 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -56,8 +56,7 @@ class Codes(object):
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
- MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
- HS_DISABLED = "M_HS_DISABLED"
+ RESOURCE_LIMIT_EXCEED = "M_RESOURCE_LIMIT_EXCEED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
@@ -225,11 +224,16 @@ class NotFoundError(SynapseError):
class AuthError(SynapseError):
"""An error raised when there was a problem authorising an event."""
+ def __init__(self, code, msg, errcode=Codes.FORBIDDEN, admin_uri=None):
+ self.admin_uri = admin_uri
+ super(AuthError, self).__init__(code, msg, errcode=errcode)
- def __init__(self, *args, **kwargs):
- if "errcode" not in kwargs:
- kwargs["errcode"] = Codes.FORBIDDEN
- super(AuthError, self).__init__(*args, **kwargs)
+ def error_dict(self):
+ return cs_error(
+ self.msg,
+ self.errcode,
+ admin_uri=self.admin_uri,
+ )
class EventSizeError(SynapseError):
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 6b77aec832..ab79a45646 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
- TransactionStore,
+ SlavedTransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index a385793dd4..03d39968a8 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
@@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
DirectoryStore,
- TransactionStore,
+ SlavedTransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 57d96c13a2..52522e9d33 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -32,11 +32,16 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -49,11 +54,16 @@ logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
+ SlavedProfileStore,
+ SlavedApplicationServiceStore,
+ SlavedPusherStore,
+ SlavedPushRuleStore,
+ SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
- TransactionStore,
+ SlavedTransactionStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 7bbf0ad082..7a4310ca18 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -36,7 +36,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
- SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+ SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 37a9b126a5..a98bb506e5 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
# Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
-max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
+current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
def setup(config_options):
@@ -532,7 +532,7 @@ def run(hs):
if hs.config.limit_usage_by_mau:
count = yield hs.get_datastore().get_monthly_active_count()
current_mau_gauge.set(float(count))
- max_mau_value_gauge.set(float(hs.config.max_mau_value))
+ max_mau_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(
hs.config.mau_limits_reserved_threepids
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 1423056732..fd1f6cbf7e 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
- TransactionStore,
+ SlavedTransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 3b078d72ca..2190f3210a 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -82,6 +82,10 @@ class ServerConfig(Config):
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
+ # Admin uri to direct users at should their instance become blocked
+ # due to resource constraints
+ self.admin_uri = config.get("admin_uri", None)
+
# FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None
federation_domain_whitelist = config.get(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a23136784a..3e0cd294a1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+ ReplicationFederationSendEduRestServlet,
+ ReplicationGetQueryRestServlet,
+)
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
+ logger.info("Registering federation EDU handler for %r", edu_type)
+
self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
@@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
"Already have a Query handler for %s" % (query_type,)
)
+ logger.info("Registering federation query handler for %r", query_type)
+
self.query_handlers[query_type] = handler
@defer.inlineCallbacks
@@ -800,3 +808,49 @@ class FederationHandlerRegistry(object):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+ """A FederationHandlerRegistry for worker processes.
+
+ When receiving EDU or queries it will check if an appropriate handler has
+ been registered on the worker, if there isn't one then it calls off to the
+ master process.
+ """
+
+ def __init__(self, hs):
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+ self.clock = hs.get_clock()
+
+ self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+ self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+ super(ReplicationFederationHandlerRegistry, self).__init__()
+
+ def on_edu(self, edu_type, origin, content):
+ """Overrides FederationHandlerRegistry
+ """
+ handler = self.edu_handlers.get(edu_type)
+ if handler:
+ return super(ReplicationFederationHandlerRegistry, self).on_edu(
+ edu_type, origin, content,
+ )
+
+ return self._send_edu(
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
+ )
+
+ def on_query(self, query_type, args):
+ """Overrides FederationHandlerRegistry
+ """
+ handler = self.query_handlers.get(query_type)
+ if handler:
+ return handler(args)
+
+ return self._get_query_client(
+ query_type=query_type,
+ args=args,
+ )
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7ea8ce9f94..4a81bd2ba9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
- yield self.auth.check_auth_blocking()
+ yield self.auth.check_auth_blocking(user_id)
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@@ -734,7 +734,6 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
- yield self.auth.check_auth_blocking()
auth_api = self.hs.get_auth()
user_id = None
try:
@@ -743,6 +742,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+ yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@defer.inlineCallbacks
@@ -828,12 +828,26 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def delete_threepid(self, user_id, medium, address):
+ """Attempts to unbind the 3pid on the identity servers and deletes it
+ from the local database.
+
+ Args:
+ user_id (str)
+ medium (str)
+ address (str)
+
+ Returns:
+ Deferred[bool]: Returns True if successfully unbound the 3pid on
+ the identity server, False if identity server doesn't support the
+ unbind API.
+ """
+
# 'Canonicalise' email addresses as per above
if medium == 'email':
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
- yield identity_handler.unbind_threepid(
+ result = yield identity_handler.try_unbind_threepid(
user_id,
{
'medium': medium,
@@ -841,10 +855,10 @@ class AuthHandler(BaseHandler):
},
)
- ret = yield self.store.user_delete_threepid(
+ yield self.store.user_delete_threepid(
user_id, medium, address,
)
- defer.returnValue(ret)
+ defer.returnValue(result)
def _save_session(self, session):
# TODO: Persistent storage
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b3c5a9ee64..b078df4a76 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
erase_data (bool): whether to GDPR-erase the user's data
Returns:
- Deferred
+ Deferred[bool]: True if identity server supports removing
+ threepids, otherwise False.
"""
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
@@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
# leave the user still active so they can try again.
# Ideally we would prevent password resets and then do this in the
# background thread.
+
+ # This will be set to false if the identity server doesn't support
+ # unbinding
+ identity_server_supports_unbinding = True
+
threepids = yield self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
- yield self._identity_handler.unbind_threepid(
+ result = yield self._identity_handler.try_unbind_threepid(
user_id,
{
'medium': threepid['medium'],
'address': threepid['address'],
},
)
+ identity_server_supports_unbinding &= result
except Exception:
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
@@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
# parts users from rooms (if it isn't already running)
self._start_user_parting()
+ defer.returnValue(identity_server_supports_unbinding)
+
def _start_user_parting(self):
"""
Start the process that goes through the table of users
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2380d17f4e..f38b393e4a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -49,6 +49,11 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+ ReplicationCleanRoomRestServlet,
+ ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
@@ -91,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+
+ self._send_events_to_master = (
+ ReplicationFederationSendEventsRestServlet.make_client(hs)
+ )
+ self._notify_user_membership_change = (
+ ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+ )
+ self._clean_room_for_join_client = (
+ ReplicationCleanRoomRestServlet.make_client(hs)
+ )
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -1158,7 +1175,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1189,7 +1206,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1432,7 +1449,7 @@ class FederationHandler(BaseHandler):
event, context
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
@@ -1470,7 +1487,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1558,7 +1575,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1569,7 +1586,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, new_event_context)],
)
@@ -2297,7 +2314,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
- response = yield self.hs.get_simple_http_client().get_json(
+ response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@@ -2310,7 +2327,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
- def _persist_events(self, event_and_contexts, backfilled=False):
+ def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -2322,14 +2339,21 @@ class FederationHandler(BaseHandler):
Returns:
Deferred
"""
- max_stream_id = yield self.store.persist_events(
- event_and_contexts,
- backfilled=backfilled,
- )
+ if self.config.worker_app:
+ yield self._send_events_to_master(
+ store=self.store,
+ event_and_contexts=event_and_contexts,
+ backfilled=backfilled
+ )
+ else:
+ max_stream_id = yield self.store.persist_events(
+ event_and_contexts,
+ backfilled=backfilled,
+ )
- if not backfilled: # Never notify for backfilled events
- for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ if not backfilled: # Never notify for backfilled events
+ for event, _ in event_and_contexts:
+ self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2368,9 +2392,25 @@ class FederationHandler(BaseHandler):
)
def _clean_room_for_join(self, room_id):
- return self.store.clean_room_for_join(room_id)
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Args:
+ room_id (str)
+ """
+ if self.config.worker_app:
+ return self._clean_room_for_join_client(room_id)
+ else:
+ return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
- return user_joined_room(self.distributor, user, room_id)
+ if self.config.worker_app:
+ return self._notify_user_membership_change(
+ room_id=room_id,
+ user_id=user.to_string(),
+ change="joined",
+ )
+ else:
+ return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 1d36d967c3..5feb3f22a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def unbind_threepid(self, mxid, threepid):
- """
- Removes a binding from an identity server
+ def try_unbind_threepid(self, mxid, threepid):
+ """Removes a binding from an identity server
+
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
+ Raises:
+ SynapseError: If we failed to contact the identity server
+
Returns:
- Deferred[bool]: True on success, otherwise False
+ Deferred[bool]: True on success, otherwise False if the identity
+ server doesn't support unbinding
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
@@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
content=content,
destination_is=id_server,
)
- yield self.http_client.post_json_get_json(
- url,
- content,
- headers,
- )
+ try:
+ yield self.http_client.post_json_get_json(
+ url,
+ content,
+ headers,
+ )
+ except HttpResponseException as e:
+ if e.code in (400, 404, 501,):
+ # The remote server probably doesn't support unbinding (yet)
+ logger.warn("Received %d response while unbinding threepid", e.code)
+ defer.returnValue(False)
+ else:
+ logger.error("Failed to unbind threepid on identity server: %s", e)
+ raise SynapseError(502, "Failed to contact identity server")
+
defer.returnValue(True)
@defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 01a362360e..893c9bcdc4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -25,7 +25,13 @@ from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ ConsentNotGivenError,
+ NotFoundError,
+ SynapseError,
+)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
@@ -36,6 +42,7 @@ from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
+from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -82,28 +89,85 @@ class MessageHandler(object):
defer.returnValue(data)
@defer.inlineCallbacks
- def get_state_events(self, user_id, room_id, is_guest=False):
+ def get_state_events(
+ self, user_id, room_id, types=None, filtered_types=None,
+ at_token=None, is_guest=False,
+ ):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
- left the room return the state events from when they left.
+ left the room return the state events from when they left. If an explicit
+ 'at' parameter is passed, return the state events as of that event, if
+ visible.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
+ at_token(StreamToken|None): the stream token of the at which we are requesting
+ the stats. If the user is not allowed to view the state as of that
+ stream token, we raise a 403 SynapseError. If None, returns the current
+ state based on the current_state_events table.
+ is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
+ Raises:
+ NotFoundError (404) if the at token does not yield an event
+
+ AuthError (403) if the user doesn't have permission to view
+ members of this room.
"""
- membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
- room_id, user_id
- )
+ if at_token:
+ # FIXME this claims to get the state at a stream position, but
+ # get_recent_events_for_room operates by topo ordering. This therefore
+ # does not reliably give you the state at the given stream position.
+ # (https://github.com/matrix-org/synapse/issues/3305)
+ last_events, _ = yield self.store.get_recent_events_for_room(
+ room_id, end_token=at_token.room_key, limit=1,
+ )
- if membership == Membership.JOIN:
- room_state = yield self.state.get_current_state(room_id)
- elif membership == Membership.LEAVE:
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], None
+ if not last_events:
+ raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+ visible_events = yield filter_events_for_client(
+ self.store, user_id, last_events,
+ )
+
+ event = last_events[0]
+ if visible_events:
+ room_state = yield self.store.get_state_for_events(
+ [event.event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[event.event_id]
+ else:
+ raise AuthError(
+ 403,
+ "User %s not allowed to view events in room %s at token %s" % (
+ user_id, room_id, at_token,
+ )
+ )
+ else:
+ membership, membership_event_id = (
+ yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id,
+ )
)
- room_state = room_state[membership_event_id]
+
+ if membership == Membership.JOIN:
+ state_ids = yield self.store.get_filtered_current_state_ids(
+ room_id, types, filtered_types=filtered_types,
+ )
+ room_state = yield self.store.get_events(state_ids.values())
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 3526b20d5a..f03ee1476b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -144,7 +144,8 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
- yield self._check_mau_limits()
+
+ yield self.auth.check_auth_blocking()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -289,7 +290,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID can only contain characters a-z, 0-9, or '=_-./'",
)
- yield self._check_mau_limits()
+ yield self.auth.check_auth_blocking()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -439,7 +440,7 @@ class RegistrationHandler(BaseHandler):
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
- yield self._check_mau_limits()
+ yield self.auth.check_auth_blocking()
need_register = True
try:
@@ -533,14 +534,3 @@ class RegistrationHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
action="join",
)
-
- @defer.inlineCallbacks
- def _check_mau_limits(self):
- """
- Do not accept registrations if monthly active user limits exceeded
- and limiting is enabled
- """
- try:
- yield self.auth.check_auth_blocking()
- except AuthError as e:
- raise RegistrationError(e.code, str(e), e.errcode)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6393a9674b..3b21a04a5d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -191,6 +191,7 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@@ -198,19 +199,27 @@ class SyncHandler(object):
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
+ @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
- A Deferred SyncResult.
+ Deferred[SyncResult]
"""
- return self.response_cache.wrap(
+ # If the user is not part of the mau group, then check that limits have
+ # not been exceeded (if not part of the group by this point, almost certain
+ # auth_blocking will occur)
+ user_id = sync_config.user.to_string()
+ yield self.auth.check_auth_blocking(user_id)
+
+ res = yield self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
+ defer.returnValue(res)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 589ee94c66..19f214281e 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@
# limitations under the License.
from synapse.http.server import JsonResource
-from synapse.replication.http import membership, send_event
+from synapse.replication.http import federation, membership, send_event
REPLICATION_PREFIX = "/_synapse/replication"
@@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
+ federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
new file mode 100644
index 0000000000..2ddd18f73b
--- /dev/null
+++ b/synapse/replication/http/federation.py
@@ -0,0 +1,259 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
+ """Handles events newly received from federation, including persisting and
+ notifying.
+
+ The API looks like:
+
+ POST /_synapse/replication/fed_send_events/:txn_id
+
+ {
+ "events": [{
+ "event": { .. serialized event .. },
+ "internal_metadata": { .. serialized internal_metadata .. },
+ "rejected_reason": .., // The event.rejected_reason field
+ "context": { .. serialized event context .. },
+ }],
+ "backfilled": false
+ """
+
+ NAME = "fed_send_events"
+ PATH_ARGS = ()
+
+ def __init__(self, hs):
+ super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.federation_handler = hs.get_handlers().federation_handler
+
+ @staticmethod
+ @defer.inlineCallbacks
+ def _serialize_payload(store, event_and_contexts, backfilled):
+ """
+ Args:
+ store
+ event_and_contexts (list[tuple[FrozenEvent, EventContext]])
+ backfilled (bool): Whether or not the events are the result of
+ backfilling
+ """
+ event_payloads = []
+ for event, context in event_and_contexts:
+ serialized_context = yield context.serialize(event, store)
+
+ event_payloads.append({
+ "event": event.get_pdu_json(),
+ "internal_metadata": event.internal_metadata.get_dict(),
+ "rejected_reason": event.rejected_reason,
+ "context": serialized_context,
+ })
+
+ payload = {
+ "events": event_payloads,
+ "backfilled": backfilled,
+ }
+
+ defer.returnValue(payload)
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request):
+ with Measure(self.clock, "repl_fed_send_events_parse"):
+ content = parse_json_object_from_request(request)
+
+ backfilled = content["backfilled"]
+
+ event_payloads = content["events"]
+
+ event_and_contexts = []
+ for event_payload in event_payloads:
+ event_dict = event_payload["event"]
+ internal_metadata = event_payload["internal_metadata"]
+ rejected_reason = event_payload["rejected_reason"]
+ event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+ context = yield EventContext.deserialize(
+ self.store, event_payload["context"],
+ )
+
+ event_and_contexts.append((event, context))
+
+ logger.info(
+ "Got %d events from federation",
+ len(event_and_contexts),
+ )
+
+ yield self.federation_handler.persist_events_and_notify(
+ event_and_contexts, backfilled,
+ )
+
+ defer.returnValue((200, {}))
+
+
+class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
+ """Handles EDUs newly received from federation, including persisting and
+ notifying.
+
+ Request format:
+
+ POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
+
+ {
+ "origin": ...,
+ "content: { ... }
+ }
+ """
+
+ NAME = "fed_send_edu"
+ PATH_ARGS = ("edu_type",)
+
+ def __init__(self, hs):
+ super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.registry = hs.get_federation_registry()
+
+ @staticmethod
+ def _serialize_payload(edu_type, origin, content):
+ return {
+ "origin": origin,
+ "content": content,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, edu_type):
+ with Measure(self.clock, "repl_fed_send_edu_parse"):
+ content = parse_json_object_from_request(request)
+
+ origin = content["origin"]
+ edu_content = content["content"]
+
+ logger.info(
+ "Got %r edu from $s",
+ edu_type, origin,
+ )
+
+ result = yield self.registry.on_edu(edu_type, origin, edu_content)
+
+ defer.returnValue((200, result))
+
+
+class ReplicationGetQueryRestServlet(ReplicationEndpoint):
+ """Handle responding to queries from federation.
+
+ Request format:
+
+ POST /_synapse/replication/fed_query/:query_type
+
+ {
+ "args": { ... }
+ }
+ """
+
+ NAME = "fed_query"
+ PATH_ARGS = ("query_type",)
+
+ # This is a query, so let's not bother caching
+ CACHE = False
+
+ def __init__(self, hs):
+ super(ReplicationGetQueryRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.registry = hs.get_federation_registry()
+
+ @staticmethod
+ def _serialize_payload(query_type, args):
+ """
+ Args:
+ query_type (str)
+ args (dict): The arguments received for the given query type
+ """
+ return {
+ "args": args,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, query_type):
+ with Measure(self.clock, "repl_fed_query_parse"):
+ content = parse_json_object_from_request(request)
+
+ args = content["args"]
+
+ logger.info(
+ "Got %r query",
+ query_type,
+ )
+
+ result = yield self.registry.on_query(query_type, args)
+
+ defer.returnValue((200, result))
+
+
+class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Request format:
+
+ POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+
+ {}
+ """
+
+ NAME = "fed_cleanup_room"
+ PATH_ARGS = ("room_id",)
+
+ def __init__(self, hs):
+ super(ReplicationCleanRoomRestServlet, self).__init__(hs)
+
+ self.store = hs.get_datastore()
+
+ @staticmethod
+ def _serialize_payload(room_id, args):
+ """
+ Args:
+ room_id (str)
+ """
+ return {}
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, room_id):
+ yield self.store.clean_room_for_join(room_id)
+
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+ ReplicationFederationSendEduRestServlet(hs).register(http_server)
+ ReplicationGetQueryRestServlet(hs).register(http_server)
+ ReplicationCleanRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py
index 9c9a5eadd9..3527beb3c9 100644
--- a/synapse/replication/slave/storage/transactions.py
+++ b/synapse/replication/slave/storage/transactions.py
@@ -13,19 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage import DataStore
from synapse.storage.transactions import TransactionStore
from ._base import BaseSlavedStore
-class TransactionStore(BaseSlavedStore):
- get_destination_retry_timings = TransactionStore.__dict__[
- "get_destination_retry_timings"
- ]
- _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
- set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
- _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
-
- prep_send_transaction = DataStore.prep_send_transaction.__func__
- delivered_txn = DataStore.delivered_txn.__func__
+class SlavedTransactionStore(TransactionStore, BaseSlavedStore):
+ pass
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 80d625eecc..ad536ab570 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -391,10 +391,17 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
- yield self._deactivate_account_handler.deactivate_account(
+ result = yield self._deactivate_account_handler.deactivate_account(
target_user_id, erase,
)
- defer.returnValue((200, {}))
+ if result:
+ id_server_unbind_result = "success"
+ else:
+ id_server_unbind_result = "no-support"
+
+ defer.returnValue((200, {
+ "id_server_unbind_result": id_server_unbind_result,
+ }))
class ShutdownRoomRestServlet(ClientV1RestServlet):
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index fa5989e74e..fcc1091760 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -34,7 +34,7 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.streams.config import PaginationConfig
-from synapse.types import RoomAlias, RoomID, ThirdPartyInstanceID, UserID
+from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID
from .base import ClientV1RestServlet, client_path_patterns
@@ -384,15 +384,39 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
- events = yield self.message_handler.get_state_events(
+ handler = self.message_handler
+
+ # request the state as of a given event, as identified by a stream token,
+ # for consistency with /messages etc.
+ # useful for getting the membership in retrospect as of a given /sync
+ # response.
+ at_token_string = parse_string(request, "at")
+ if at_token_string is None:
+ at_token = None
+ else:
+ at_token = StreamToken.from_string(at_token_string)
+
+ # let you filter down on particular memberships.
+ # XXX: this may not be the best shape for this API - we could pass in a filter
+ # instead, except filters aren't currently aware of memberships.
+ # See https://github.com/matrix-org/matrix-doc/issues/1337 for more details.
+ membership = parse_string(request, "membership")
+ not_membership = parse_string(request, "not_membership")
+
+ events = yield handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
+ at_token=at_token,
+ types=[(EventTypes.Member, None)],
)
chunk = []
for event in events:
- if event["type"] != EventTypes.Member:
+ if (
+ (membership and event['content'].get("membership") != membership) or
+ (not_membership and event['content'].get("membership") == not_membership)
+ ):
continue
chunk.append(event)
@@ -401,6 +425,8 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
}))
+# deprecated in favour of /members?membership=join?
+# except it does custom AS logic and has a simpler return format
class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index eeae466d82..372648cafd 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -209,10 +209,17 @@ class DeactivateAccountRestServlet(RestServlet):
yield self.auth_handler.validate_user_via_ui_auth(
requester, body, self.hs.get_ip_from_request(request),
)
- yield self._deactivate_account_handler.deactivate_account(
+ result = yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string(), erase,
)
- defer.returnValue((200, {}))
+ if result:
+ id_server_unbind_result = "success"
+ else:
+ id_server_unbind_result = "no-support"
+
+ defer.returnValue((200, {
+ "id_server_unbind_result": id_server_unbind_result,
+ }))
class EmailThreepidRequestTokenRestServlet(RestServlet):
@@ -364,7 +371,7 @@ class ThreepidDeleteRestServlet(RestServlet):
user_id = requester.user.to_string()
try:
- yield self.auth_handler.delete_threepid(
+ ret = yield self.auth_handler.delete_threepid(
user_id, body['medium'], body['address']
)
except Exception:
@@ -374,7 +381,14 @@ class ThreepidDeleteRestServlet(RestServlet):
logger.exception("Failed to remove threepid")
raise SynapseError(500, "Failed to remove threepid")
- defer.returnValue((200, {}))
+ if ret:
+ id_server_unbind_result = "success"
+ else:
+ id_server_unbind_result = "no-support"
+
+ defer.returnValue((200, {
+ "id_server_unbind_result": id_server_unbind_result,
+ }))
class WhoamiRestServlet(RestServlet):
diff --git a/synapse/server.py b/synapse/server.py
index 140be9ebe8..26228d8c72 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
+ ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.transaction_queue import TransactionQueue
@@ -423,7 +424,10 @@ class HomeServer(object):
return RoomMemberMasterHandler(self)
def build_federation_registry(self):
- return FederationHandlerRegistry()
+ if self.config.worker_app:
+ return ReplicationFederationHandlerRegistry(self)
+ else:
+ return FederationHandlerRegistry()
def build_server_notices_manager(self):
if self.config.worker_app:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d4aa192a0a..025a7fb6d9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1436,88 +1436,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
)
@defer.inlineCallbacks
- def have_events_in_timeline(self, event_ids):
- """Given a list of event ids, check if we have already processed and
- stored them as non outliers.
- """
- rows = yield self._simple_select_many_batch(
- table="events",
- retcols=("event_id",),
- column="event_id",
- iterable=list(event_ids),
- keyvalues={"outlier": False},
- desc="have_events_in_timeline",
- )
-
- defer.returnValue(set(r["event_id"] for r in rows))
-
- @defer.inlineCallbacks
- def have_seen_events(self, event_ids):
- """Given a list of event ids, check if we have already processed them.
-
- Args:
- event_ids (iterable[str]):
-
- Returns:
- Deferred[set[str]]: The events we have already seen.
- """
- results = set()
-
- def have_seen_events_txn(txn, chunk):
- sql = (
- "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
- % (",".join("?" * len(chunk)), )
- )
- txn.execute(sql, chunk)
- for (event_id, ) in txn:
- results.add(event_id)
-
- # break the input up into chunks of 100
- input_iterator = iter(event_ids)
- for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
- []):
- yield self.runInteraction(
- "have_seen_events",
- have_seen_events_txn,
- chunk,
- )
- defer.returnValue(results)
-
- def get_seen_events_with_rejections(self, event_ids):
- """Given a list of event ids, check if we rejected them.
-
- Args:
- event_ids (list[str])
-
- Returns:
- Deferred[dict[str, str|None):
- Has an entry for each event id we already have seen. Maps to
- the rejected reason string if we rejected the event, else maps
- to None.
- """
- if not event_ids:
- return defer.succeed({})
-
- def f(txn):
- sql = (
- "SELECT e.event_id, reason FROM events as e "
- "LEFT JOIN rejections as r ON e.event_id = r.event_id "
- "WHERE e.event_id = ?"
- )
-
- res = {}
- for event_id in event_ids:
- txn.execute(sql, (event_id,))
- row = txn.fetchone()
- if row:
- _, rejected = row
- res[event_id] = rejected
-
- return res
-
- return self.runInteraction("get_rejection_reasons", f)
-
- @defer.inlineCallbacks
def count_daily_messages(self):
"""
Returns an estimate of the number of messages sent in the last day.
@@ -1993,7 +1911,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
max_depth = max(row[0] for row in rows)
if max_depth <= token.topological:
- # We need to ensure we don't delete all the events from the datanase
+ # We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
raise SynapseError(
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 9b4cfeb899..59822178ff 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import itertools
import logging
from collections import namedtuple
@@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
defer.returnValue(cache_entry)
+
+ @defer.inlineCallbacks
+ def have_events_in_timeline(self, event_ids):
+ """Given a list of event ids, check if we have already processed and
+ stored them as non outliers.
+ """
+ rows = yield self._simple_select_many_batch(
+ table="events",
+ retcols=("event_id",),
+ column="event_id",
+ iterable=list(event_ids),
+ keyvalues={"outlier": False},
+ desc="have_events_in_timeline",
+ )
+
+ defer.returnValue(set(r["event_id"] for r in rows))
+
+ @defer.inlineCallbacks
+ def have_seen_events(self, event_ids):
+ """Given a list of event ids, check if we have already processed them.
+
+ Args:
+ event_ids (iterable[str]):
+
+ Returns:
+ Deferred[set[str]]: The events we have already seen.
+ """
+ results = set()
+
+ def have_seen_events_txn(txn, chunk):
+ sql = (
+ "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+ % (",".join("?" * len(chunk)), )
+ )
+ txn.execute(sql, chunk)
+ for (event_id, ) in txn:
+ results.add(event_id)
+
+ # break the input up into chunks of 100
+ input_iterator = iter(event_ids)
+ for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+ []):
+ yield self.runInteraction(
+ "have_seen_events",
+ have_seen_events_txn,
+ chunk,
+ )
+ defer.returnValue(results)
+
+ def get_seen_events_with_rejections(self, event_ids):
+ """Given a list of event ids, check if we rejected them.
+
+ Args:
+ event_ids (list[str])
+
+ Returns:
+ Deferred[dict[str, str|None):
+ Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps
+ to None.
+ """
+ if not event_ids:
+ return defer.succeed({})
+
+ def f(txn):
+ sql = (
+ "SELECT e.event_id, reason FROM events as e "
+ "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+ "WHERE e.event_id = ?"
+ )
+
+ res = {}
+ for event_id in event_ids:
+ txn.execute(sql, (event_id,))
+ row = txn.fetchone()
+ if row:
+ _, rejected = row
+ res[event_id] = rejected
+
+ return res
+
+ return self.runInteraction("get_rejection_reasons", f)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index d47dcef3a0..7e417f811e 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -46,7 +46,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
tp["medium"], tp["address"]
)
if user_id:
- self.upsert_monthly_active_user(user_id)
+ yield self.upsert_monthly_active_user(user_id)
reserved_user_list.append(user_id)
else:
logger.warning(
@@ -64,23 +64,27 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[]
"""
def _reap_users(txn):
+ # Purge stale users
thirty_days_ago = (
int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
)
- # Purge stale users
-
- # questionmarks is a hack to overcome sqlite not supporting
- # tuples in 'WHERE IN %s'
- questionmarks = '?' * len(self.reserved_users)
query_args = [thirty_days_ago]
- query_args.extend(self.reserved_users)
-
- sql = """
- DELETE FROM monthly_active_users
- WHERE timestamp < ?
- AND user_id NOT IN ({})
- """.format(','.join(questionmarks))
+ base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
+
+ # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
+ # when len(reserved_users) == 0. Works fine on sqlite.
+ if len(self.reserved_users) > 0:
+ # questionmarks is a hack to overcome sqlite not supporting
+ # tuples in 'WHERE IN %s'
+ questionmarks = '?' * len(self.reserved_users)
+
+ query_args.extend(self.reserved_users)
+ sql = base_sql + """ AND user_id NOT IN ({})""".format(
+ ','.join(questionmarks)
+ )
+ else:
+ sql = base_sql
txn.execute(sql, query_args)
@@ -93,16 +97,24 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# negative LIMIT values. So there is no way to write it that both can
# support
query_args = [self.hs.config.max_mau_value]
- query_args.extend(self.reserved_users)
- sql = """
+
+ base_sql = """
DELETE FROM monthly_active_users
WHERE user_id NOT IN (
SELECT user_id FROM monthly_active_users
ORDER BY timestamp DESC
LIMIT ?
)
- AND user_id NOT IN ({})
- """.format(','.join(questionmarks))
+ """
+ # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
+ # when len(reserved_users) == 0. Works fine on sqlite.
+ if len(self.reserved_users) > 0:
+ query_args.extend(self.reserved_users)
+ sql = base_sql + """ AND user_id NOT IN ({})""".format(
+ ','.join(questionmarks)
+ )
+ else:
+ sql = base_sql
txn.execute(sql, query_args)
yield self.runInteraction("reap_monthly_active_users", _reap_users)
@@ -113,7 +125,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# is racy.
# Have resolved to invalidate the whole cache for now and do
# something about it if and when the perf becomes significant
- self._user_last_seen_monthly_active.invalidate_all()
+ self.user_last_seen_monthly_active.invalidate_all()
self.get_monthly_active_count.invalidate_all()
@cached(num_args=0)
@@ -152,11 +164,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
lock=False,
)
if is_insert:
- self._user_last_seen_monthly_active.invalidate((user_id,))
+ self.user_last_seen_monthly_active.invalidate((user_id,))
self.get_monthly_active_count.invalidate(())
@cached(num_args=1)
- def _user_last_seen_monthly_active(self, user_id):
+ def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
Arguments:
@@ -173,7 +185,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
},
retcol="timestamp",
allow_none=True,
- desc="_user_last_seen_monthly_active",
+ desc="user_last_seen_monthly_active",
))
@defer.inlineCallbacks
@@ -185,7 +197,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
user_id(str): the user_id to query
"""
if self.hs.config.limit_usage_by_mau:
- last_seen_timestamp = yield self._user_last_seen_monthly_active(user_id)
+ last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
now = self.hs.get_clock().time_msec()
# We want to reduce to the total number of db writes, and are happy
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 3147fb6827..3378fc77d1 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
class RoomWorkerStore(SQLBaseStore):
+ def get_room(self, room_id):
+ """Retrieve a room.
+
+ Args:
+ room_id (str): The ID of the room to retrieve.
+ Returns:
+ A namedtuple containing the room information, or an empty list.
+ """
+ return self._simple_select_one(
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ retcols=("room_id", "is_public", "creator"),
+ desc="get_room",
+ allow_none=True,
+ )
+
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",
@@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
- def get_room(self, room_id):
- """Retrieve a room.
-
- Args:
- room_id (str): The ID of the room to retrieve.
- Returns:
- A namedtuple containing the room information, or an empty list.
- """
- return self._simple_select_one(
- table="rooms",
- keyvalues={"room_id": room_id},
- retcols=("room_id", "is_public", "creator"),
- desc="get_room",
- allow_none=True,
- )
-
@defer.inlineCallbacks
def set_room_is_public(self, room_id, is_public):
def set_room_is_public_txn(txn, next_id):
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 17b14d464b..754dfa6973 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -116,6 +116,69 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
_get_current_state_ids_txn,
)
+ # FIXME: how should this be cached?
+ def get_filtered_current_state_ids(self, room_id, types, filtered_types=None):
+ """Get the current state event of a given type for a room based on the
+ current_state_events table. This may not be as up-to-date as the result
+ of doing a fresh state resolution as per state_handler.get_current_state
+ Args:
+ room_id (str)
+ types (list[(Str, (Str|None))]): List of (type, state_key) tuples
+ which are used to filter the state fetched. `state_key` may be
+ None, which matches any `state_key`
+ filtered_types (list[Str]|None): List of types to apply the above filter to.
+ Returns:
+ deferred: dict of (type, state_key) -> event
+ """
+
+ include_other_types = False if filtered_types is None else True
+
+ def _get_filtered_current_state_ids_txn(txn):
+ results = {}
+ sql = """SELECT type, state_key, event_id FROM current_state_events
+ WHERE room_id = ? %s"""
+ # Turns out that postgres doesn't like doing a list of OR's and
+ # is about 1000x slower, so we just issue a query for each specific
+ # type seperately.
+ if types:
+ clause_to_args = [
+ (
+ "AND type = ? AND state_key = ?",
+ (etype, state_key)
+ ) if state_key is not None else (
+ "AND type = ?",
+ (etype,)
+ )
+ for etype, state_key in types
+ ]
+
+ if include_other_types:
+ unique_types = set(filtered_types)
+ clause_to_args.append(
+ (
+ "AND type <> ? " * len(unique_types),
+ list(unique_types)
+ )
+ )
+ else:
+ # If types is None we fetch all the state, and so just use an
+ # empty where clause with no extra args.
+ clause_to_args = [("", [])]
+ for where_clause, where_args in clause_to_args:
+ args = [room_id]
+ args.extend(where_args)
+ txn.execute(sql % (where_clause,), args)
+ for row in txn:
+ typ, state_key, event_id = row
+ key = (intern_string(typ), intern_string(state_key))
+ results[key] = event_id
+ return results
+
+ return self.runInteraction(
+ "get_filtered_current_state_ids",
+ _get_filtered_current_state_ids_txn,
+ )
+
@cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
"""Given a state group try to return a previous group and a delta between
@@ -389,8 +452,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- deferred: A list of dicts corresponding to the event_ids given.
- The dicts are mappings from (type, state_key) -> state_events
+ deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
"""
event_to_groups = yield self._get_state_group_for_events(
event_ids,
diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py
index a65689ba89..32a2b5fc3d 100644
--- a/tests/api/test_auth.py
+++ b/tests/api/test_auth.py
@@ -455,8 +455,11 @@ class AuthTestCase(unittest.TestCase):
return_value=defer.succeed(lots_of_users)
)
- with self.assertRaises(AuthError):
+ with self.assertRaises(AuthError) as e:
yield self.auth.check_auth_blocking()
+ self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri)
+ self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
+ self.assertEquals(e.exception.code, 403)
# Ensure does not throw an error
self.store.get_monthly_active_count = Mock(
@@ -470,5 +473,6 @@ class AuthTestCase(unittest.TestCase):
self.hs.config.hs_disabled_message = "Reason for being disabled"
with self.assertRaises(AuthError) as e:
yield self.auth.check_auth_blocking()
- self.assertEquals(e.exception.errcode, Codes.HS_DISABLED)
+ self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri)
+ self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
self.assertEquals(e.exception.code, 403)
diff --git a/tests/handlers/test_auth.py b/tests/handlers/test_auth.py
index 56c0f87fb7..3046bd6093 100644
--- a/tests/handlers/test_auth.py
+++ b/tests/handlers/test_auth.py
@@ -124,7 +124,7 @@ class AuthTestCase(unittest.TestCase):
)
@defer.inlineCallbacks
- def test_mau_limits_exceeded(self):
+ def test_mau_limits_exceeded_large(self):
self.hs.config.limit_usage_by_mau = True
self.hs.get_datastore().get_monthly_active_count = Mock(
return_value=defer.succeed(self.large_number_of_users)
@@ -142,6 +142,42 @@ class AuthTestCase(unittest.TestCase):
)
@defer.inlineCallbacks
+ def test_mau_limits_parity(self):
+ self.hs.config.limit_usage_by_mau = True
+
+ # If not in monthly active cohort
+ self.hs.get_datastore().get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
+ with self.assertRaises(AuthError):
+ yield self.auth_handler.get_access_token_for_user_id('user_a')
+
+ self.hs.get_datastore().get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
+ with self.assertRaises(AuthError):
+ yield self.auth_handler.validate_short_term_login_token_and_get_user_id(
+ self._get_macaroon().serialize()
+ )
+ # If in monthly active cohort
+ self.hs.get_datastore().user_last_seen_monthly_active = Mock(
+ return_value=defer.succeed(self.hs.get_clock().time_msec())
+ )
+ self.hs.get_datastore().get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
+ yield self.auth_handler.get_access_token_for_user_id('user_a')
+ self.hs.get_datastore().user_last_seen_monthly_active = Mock(
+ return_value=defer.succeed(self.hs.get_clock().time_msec())
+ )
+ self.hs.get_datastore().get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
+ yield self.auth_handler.validate_short_term_login_token_and_get_user_id(
+ self._get_macaroon().serialize()
+ )
+
+ @defer.inlineCallbacks
def test_mau_limits_not_exceeded(self):
self.hs.config.limit_usage_by_mau = True
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index d48d40c8dd..7154816a34 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -17,7 +17,7 @@ from mock import Mock
from twisted.internet import defer
-from synapse.api.errors import RegistrationError
+from synapse.api.errors import AuthError
from synapse.handlers.register import RegistrationHandler
from synapse.types import UserID, create_requester
@@ -98,7 +98,7 @@ class RegistrationTestCase(unittest.TestCase):
def test_get_or_create_user_mau_not_blocked(self):
self.hs.config.limit_usage_by_mau = True
self.store.count_monthly_users = Mock(
- return_value=defer.succeed(self.small_number_of_users)
+ return_value=defer.succeed(self.hs.config.max_mau_value - 1)
)
# Ensure does not throw exception
yield self.handler.get_or_create_user("@user:server", 'c', "User")
@@ -109,7 +109,13 @@ class RegistrationTestCase(unittest.TestCase):
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.lots_of_users)
)
- with self.assertRaises(RegistrationError):
+ with self.assertRaises(AuthError):
+ yield self.handler.get_or_create_user("requester", 'b', "display_name")
+
+ self.store.get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
+ with self.assertRaises(AuthError):
yield self.handler.get_or_create_user("requester", 'b', "display_name")
@defer.inlineCallbacks
@@ -118,7 +124,13 @@ class RegistrationTestCase(unittest.TestCase):
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.lots_of_users)
)
- with self.assertRaises(RegistrationError):
+ with self.assertRaises(AuthError):
+ yield self.handler.register(localpart="local_part")
+
+ self.store.get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
+ with self.assertRaises(AuthError):
yield self.handler.register(localpart="local_part")
@defer.inlineCallbacks
@@ -127,5 +139,11 @@ class RegistrationTestCase(unittest.TestCase):
self.store.get_monthly_active_count = Mock(
return_value=defer.succeed(self.lots_of_users)
)
- with self.assertRaises(RegistrationError):
+ with self.assertRaises(AuthError):
+ yield self.handler.register_saml2(localpart="local_part")
+
+ self.store.get_monthly_active_count = Mock(
+ return_value=defer.succeed(self.hs.config.max_mau_value)
+ )
+ with self.assertRaises(AuthError):
yield self.handler.register_saml2(localpart="local_part")
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
new file mode 100644
index 0000000000..33d861bd64
--- /dev/null
+++ b/tests/handlers/test_sync.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from synapse.api.errors import AuthError, Codes
+from synapse.api.filtering import DEFAULT_FILTER_COLLECTION
+from synapse.handlers.sync import SyncConfig, SyncHandler
+from synapse.types import UserID
+
+import tests.unittest
+import tests.utils
+from tests.utils import setup_test_homeserver
+
+
+class SyncTestCase(tests.unittest.TestCase):
+ """ Tests Sync Handler. """
+
+ @defer.inlineCallbacks
+ def setUp(self):
+ self.hs = yield setup_test_homeserver(self.addCleanup)
+ self.sync_handler = SyncHandler(self.hs)
+ self.store = self.hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def test_wait_for_sync_for_user_auth_blocking(self):
+
+ user_id1 = "@user1:server"
+ user_id2 = "@user2:server"
+ sync_config = self._generate_sync_config(user_id1)
+
+ self.hs.config.limit_usage_by_mau = True
+ self.hs.config.max_mau_value = 1
+
+ # Check that the happy case does not throw errors
+ yield self.store.upsert_monthly_active_user(user_id1)
+ yield self.sync_handler.wait_for_sync_for_user(sync_config)
+
+ # Test that global lock works
+ self.hs.config.hs_disabled = True
+ with self.assertRaises(AuthError) as e:
+ yield self.sync_handler.wait_for_sync_for_user(sync_config)
+ self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
+
+ self.hs.config.hs_disabled = False
+
+ sync_config = self._generate_sync_config(user_id2)
+
+ with self.assertRaises(AuthError) as e:
+ yield self.sync_handler.wait_for_sync_for_user(sync_config)
+ self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED)
+
+ def _generate_sync_config(self, user_id):
+ return SyncConfig(
+ user=UserID(user_id.split(":")[0][1:], user_id.split(":")[1]),
+ filter_collection=DEFAULT_FILTER_COLLECTION,
+ is_guest=False,
+ request_key="request_key",
+ device_id="device_id",
+ )
diff --git a/tests/rest/client/v1/test_typing.py b/tests/rest/client/v1/test_typing.py
index 677265edf6..0ad814c5e5 100644
--- a/tests/rest/client/v1/test_typing.py
+++ b/tests/rest/client/v1/test_typing.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,41 +18,32 @@
from mock import Mock, NonCallableMock
-# twisted imports
from twisted.internet import defer
-import synapse.rest.client.v1.room
+from synapse.rest.client.v1 import room
from synapse.types import UserID
-from ....utils import MockClock, MockHttpResource, setup_test_homeserver
-from .utils import RestTestCase
+from tests import unittest
PATH_PREFIX = "/_matrix/client/api/v1"
-class RoomTypingTestCase(RestTestCase):
+class RoomTypingTestCase(unittest.HomeserverTestCase):
""" Tests /rooms/$room_id/typing/$user_id REST API. """
user_id = "@sid:red"
user = UserID.from_string(user_id)
+ servlets = [room.register_servlets]
- @defer.inlineCallbacks
- def setUp(self):
- self.clock = MockClock()
+ def make_homeserver(self, reactor, clock):
- self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
- self.auth_user_id = self.user_id
-
- hs = yield setup_test_homeserver(
- self.addCleanup,
+ hs = self.setup_test_homeserver(
"red",
- clock=self.clock,
http_client=None,
federation_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["send_message"]),
)
- self.hs = hs
self.event_source = hs.get_event_sources().sources["typing"]
@@ -100,25 +92,24 @@ class RoomTypingTestCase(RestTestCase):
fetch_room_distributions_into
)
- synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource)
+ return hs
- self.room_id = yield self.create_room_as(self.user_id)
+ def prepare(self, reactor, clock, hs):
+ self.room_id = self.helper.create_room_as(self.user_id)
# Need another user to make notifications actually work
- yield self.join(self.room_id, user="@jim:red")
+ self.helper.join(self.room_id, user="@jim:red")
- @defer.inlineCallbacks
def test_set_typing(self):
- (code, _) = yield self.mock_resource.trigger(
+ request, channel = self.make_request(
"PUT",
"/rooms/%s/typing/%s" % (self.room_id, self.user_id),
- '{"typing": true, "timeout": 30000}',
+ b'{"typing": true, "timeout": 30000}',
)
- self.assertEquals(200, code)
+ self.render(request)
+ self.assertEquals(200, channel.code)
self.assertEquals(self.event_source.get_current_key(), 1)
- events = yield self.event_source.get_new_events(
- from_key=0, room_ids=[self.room_id]
- )
+ events = self.event_source.get_new_events(from_key=0, room_ids=[self.room_id])
self.assertEquals(
events[0],
[
@@ -130,35 +121,36 @@ class RoomTypingTestCase(RestTestCase):
],
)
- @defer.inlineCallbacks
def test_set_not_typing(self):
- (code, _) = yield self.mock_resource.trigger(
+ request, channel = self.make_request(
"PUT",
"/rooms/%s/typing/%s" % (self.room_id, self.user_id),
- '{"typing": false}',
+ b'{"typing": false}',
)
- self.assertEquals(200, code)
+ self.render(request)
+ self.assertEquals(200, channel.code)
- @defer.inlineCallbacks
def test_typing_timeout(self):
- (code, _) = yield self.mock_resource.trigger(
+ request, channel = self.make_request(
"PUT",
"/rooms/%s/typing/%s" % (self.room_id, self.user_id),
- '{"typing": true, "timeout": 30000}',
+ b'{"typing": true, "timeout": 30000}',
)
- self.assertEquals(200, code)
+ self.render(request)
+ self.assertEquals(200, channel.code)
self.assertEquals(self.event_source.get_current_key(), 1)
- self.clock.advance_time(36)
+ self.reactor.advance(36)
self.assertEquals(self.event_source.get_current_key(), 2)
- (code, _) = yield self.mock_resource.trigger(
+ request, channel = self.make_request(
"PUT",
"/rooms/%s/typing/%s" % (self.room_id, self.user_id),
- '{"typing": true, "timeout": 30000}',
+ b'{"typing": true, "timeout": 30000}',
)
- self.assertEquals(200, code)
+ self.render(request)
+ self.assertEquals(200, channel.code)
self.assertEquals(self.event_source.get_current_key(), 3)
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index fa60d949ba..c2e88bdbaf 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -63,7 +63,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
yield self.store.insert_client_ip(
user_id, "access_token", "ip", "user_agent", "device_id"
)
- active = yield self.store._user_last_seen_monthly_active(user_id)
+ active = yield self.store.user_last_seen_monthly_active(user_id)
self.assertFalse(active)
@defer.inlineCallbacks
@@ -79,7 +79,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
yield self.store.insert_client_ip(
user_id, "access_token", "ip", "user_agent", "device_id"
)
- active = yield self.store._user_last_seen_monthly_active(user_id)
+ active = yield self.store.user_last_seen_monthly_active(user_id)
self.assertFalse(active)
@defer.inlineCallbacks
@@ -87,13 +87,13 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
self.hs.config.limit_usage_by_mau = True
self.hs.config.max_mau_value = 50
user_id = "@user:server"
- active = yield self.store._user_last_seen_monthly_active(user_id)
+ active = yield self.store.user_last_seen_monthly_active(user_id)
self.assertFalse(active)
yield self.store.insert_client_ip(
user_id, "access_token", "ip", "user_agent", "device_id"
)
- active = yield self.store._user_last_seen_monthly_active(user_id)
+ active = yield self.store.user_last_seen_monthly_active(user_id)
self.assertTrue(active)
@defer.inlineCallbacks
@@ -102,7 +102,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
self.hs.config.max_mau_value = 50
user_id = "@user:server"
- active = yield self.store._user_last_seen_monthly_active(user_id)
+ active = yield self.store.user_last_seen_monthly_active(user_id)
self.assertFalse(active)
yield self.store.insert_client_ip(
@@ -111,5 +111,5 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
yield self.store.insert_client_ip(
user_id, "access_token", "ip", "user_agent", "device_id"
)
- active = yield self.store._user_last_seen_monthly_active(user_id)
+ active = yield self.store.user_last_seen_monthly_active(user_id)
self.assertTrue(active)
diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py
index 0a2c859f26..511acbde9b 100644
--- a/tests/storage/test_monthly_active_users.py
+++ b/tests/storage/test_monthly_active_users.py
@@ -33,7 +33,7 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
@defer.inlineCallbacks
def test_initialise_reserved_users(self):
-
+ self.hs.config.max_mau_value = 5
user1 = "@user1:server"
user1_email = "user1@matrix.org"
user2 = "@user2:server"
@@ -60,9 +60,9 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
# Test user is marked as active
- timestamp = yield self.store._user_last_seen_monthly_active(user1)
+ timestamp = yield self.store.user_last_seen_monthly_active(user1)
self.assertTrue(timestamp)
- timestamp = yield self.store._user_last_seen_monthly_active(user2)
+ timestamp = yield self.store.user_last_seen_monthly_active(user2)
self.assertTrue(timestamp)
# Test that users are never removed from the db.
@@ -86,17 +86,18 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
self.assertEqual(1, count)
@defer.inlineCallbacks
- def test__user_last_seen_monthly_active(self):
+ def test_user_last_seen_monthly_active(self):
user_id1 = "@user1:server"
user_id2 = "@user2:server"
user_id3 = "@user3:server"
- result = yield self.store._user_last_seen_monthly_active(user_id1)
+
+ result = yield self.store.user_last_seen_monthly_active(user_id1)
self.assertFalse(result == 0)
yield self.store.upsert_monthly_active_user(user_id1)
yield self.store.upsert_monthly_active_user(user_id2)
- result = yield self.store._user_last_seen_monthly_active(user_id1)
+ result = yield self.store.user_last_seen_monthly_active(user_id1)
self.assertTrue(result > 0)
- result = yield self.store._user_last_seen_monthly_active(user_id3)
+ result = yield self.store.user_last_seen_monthly_active(user_id3)
self.assertFalse(result == 0)
@defer.inlineCallbacks
diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index 6168c46248..ebfd969b36 100644
--- a/tests/storage/test_state.py
+++ b/tests/storage/test_state.py
@@ -148,7 +148,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
{(e3.type, e3.state_key): e3, (e5.type, e5.state_key): e5}, state
)
- # check we can use filter_types to grab a specific room member
+ # check we can use filtered_types to grab a specific room member
# without filtering out the other event types
state = yield self.store.get_state_for_event(
e5.event_id,
diff --git a/tests/unittest.py b/tests/unittest.py
index f448a6dfbd..e6afe3b96d 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,12 +16,19 @@
import logging
+from mock import Mock
+
import twisted
import twisted.logger
from twisted.trial import unittest
+from synapse.http.server import JsonResource
+from synapse.server import HomeServer
+from synapse.types import UserID, create_requester
from synapse.util.logcontext import LoggingContextFilter
+from tests.server import get_clock, make_request, render, setup_test_homeserver
+
# Set up putting Synapse's logs into Trial's.
rootLogger = logging.getLogger()
@@ -129,3 +137,139 @@ def DEBUG(target):
Can apply to either a TestCase or an individual test method."""
target.loglevel = logging.DEBUG
return target
+
+
+class HomeserverTestCase(TestCase):
+ """
+ A base TestCase that reduces boilerplate for HomeServer-using test cases.
+
+ Attributes:
+ servlets (list[function]): List of servlet registration function.
+ user_id (str): The user ID to assume if auth is hijacked.
+ hijack_auth (bool): Whether to hijack auth to return the user specified
+ in user_id.
+ """
+ servlets = []
+ hijack_auth = True
+
+ def setUp(self):
+ """
+ Set up the TestCase by calling the homeserver constructor, optionally
+ hijacking the authentication system to return a fixed user, and then
+ calling the prepare function.
+ """
+ self.reactor, self.clock = get_clock()
+ self._hs_args = {"clock": self.clock, "reactor": self.reactor}
+ self.hs = self.make_homeserver(self.reactor, self.clock)
+
+ if self.hs is None:
+ raise Exception("No homeserver returned from make_homeserver.")
+
+ if not isinstance(self.hs, HomeServer):
+ raise Exception("A homeserver wasn't returned, but %r" % (self.hs,))
+
+ # Register the resources
+ self.resource = JsonResource(self.hs)
+
+ for servlet in self.servlets:
+ servlet(self.hs, self.resource)
+
+ if hasattr(self, "user_id"):
+ from tests.rest.client.v1.utils import RestHelper
+
+ self.helper = RestHelper(self.hs, self.resource, self.user_id)
+
+ if self.hijack_auth:
+
+ def get_user_by_access_token(token=None, allow_guest=False):
+ return {
+ "user": UserID.from_string(self.helper.auth_user_id),
+ "token_id": 1,
+ "is_guest": False,
+ }
+
+ def get_user_by_req(request, allow_guest=False, rights="access"):
+ return create_requester(
+ UserID.from_string(self.helper.auth_user_id), 1, False, None
+ )
+
+ self.hs.get_auth().get_user_by_req = get_user_by_req
+ self.hs.get_auth().get_user_by_access_token = get_user_by_access_token
+ self.hs.get_auth().get_access_token_from_request = Mock(
+ return_value="1234"
+ )
+
+ if hasattr(self, "prepare"):
+ self.prepare(self.reactor, self.clock, self.hs)
+
+ def make_homeserver(self, reactor, clock):
+ """
+ Make and return a homeserver.
+
+ Args:
+ reactor: A Twisted Reactor, or something that pretends to be one.
+ clock (synapse.util.Clock): The Clock, associated with the reactor.
+
+ Returns:
+ A homeserver (synapse.server.HomeServer) suitable for testing.
+
+ Function to be overridden in subclasses.
+ """
+ raise NotImplementedError()
+
+ def prepare(self, reactor, clock, homeserver):
+ """
+ Prepare for the test. This involves things like mocking out parts of
+ the homeserver, or building test data common across the whole test
+ suite.
+
+ Args:
+ reactor: A Twisted Reactor, or something that pretends to be one.
+ clock (synapse.util.Clock): The Clock, associated with the reactor.
+ homeserver (synapse.server.HomeServer): The HomeServer to test
+ against.
+
+ Function to optionally be overridden in subclasses.
+ """
+
+ def make_request(self, method, path, content=b""):
+ """
+ Create a SynapseRequest at the path using the method and containing the
+ given content.
+
+ Args:
+ method (bytes/unicode): The HTTP request method ("verb").
+ path (bytes/unicode): The HTTP path, suitably URL encoded (e.g.
+ escaped UTF-8 & spaces and such).
+ content (bytes): The body of the request.
+
+ Returns:
+ A synapse.http.site.SynapseRequest.
+ """
+ return make_request(method, path, content)
+
+ def render(self, request):
+ """
+ Render a request against the resources registered by the test class's
+ servlets.
+
+ Args:
+ request (synapse.http.site.SynapseRequest): The request to render.
+ """
+ render(request, self.resource, self.reactor)
+
+ def setup_test_homeserver(self, *args, **kwargs):
+ """
+ Set up the test homeserver, meant to be called by the overridable
+ make_homeserver. It automatically passes through the test class's
+ clock & reactor.
+
+ Args:
+ See tests.utils.setup_test_homeserver.
+
+ Returns:
+ synapse.server.HomeServer
+ """
+ kwargs = dict(kwargs)
+ kwargs.update(self._hs_args)
+ return setup_test_homeserver(self.addCleanup, *args, **kwargs)
diff --git a/tests/utils.py b/tests/utils.py
index 90378326f8..52326d4f67 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -139,6 +139,7 @@ def setup_test_homeserver(
config.hs_disabled_message = ""
config.max_mau_value = 50
config.mau_limits_reserved_threepids = []
+ config.admin_uri = None
# we need a sane default_room_version, otherwise attempts to create rooms will
# fail.
|