diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 53213cdccf..8f8fd82eb0 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .register import RegistrationHandler
from .room import (
RoomCreationHandler, RoomContextHandler,
)
-from .room_member import RoomMemberHandler
from .message import MessageHandler
from .federation import FederationHandler
from .directory import DirectoryHandler
@@ -49,7 +48,6 @@ class Handlers(object):
self.registration_handler = RegistrationHandler(hs)
self.message_handler = MessageHandler(hs)
self.room_creation_handler = RoomCreationHandler(hs)
- self.room_member_handler = RoomMemberHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index faa5609c0c..e089e66fde 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -158,7 +158,7 @@ class BaseHandler(object):
# homeserver.
requester = synapse.types.create_requester(
target_user, is_guest=True)
- handler = self.hs.get_handlers().room_member_handler
+ handler = self.hs.get_room_member_handler()
yield handler.update_membership(
requester,
target_user,
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index feca3e4c10..3dd3fa2a27 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,6 +15,7 @@
from twisted.internet import defer
+import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
@@ -23,6 +24,10 @@ import logging
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+events_processed_counter = metrics.register_counter("events_processed")
+
def log_failure(failure):
logger.error(
@@ -103,6 +108,8 @@ class ApplicationServicesHandler(object):
service, event
)
+ events_processed_counter.inc_by(len(events))
+
yield self.store.set_appservice_last_pos(upper_bound)
finally:
self.is_processing = False
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 573c9db8a1..a5365c4fe4 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, threads
from ._base import BaseHandler
from synapse.api.constants import LoginType
@@ -25,6 +25,7 @@ from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util.async import run_on_reactor
from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.logcontext import make_deferred_yieldable
from twisted.web.client import PartialDownloadError
@@ -714,7 +715,7 @@ class AuthHandler(BaseHandler):
if not lookupres:
defer.returnValue(None)
(user_id, password_hash) = lookupres
- result = self.validate_hash(password, password_hash)
+ result = yield self.validate_hash(password, password_hash)
if not result:
logger.warn("Failed password login for user %s", user_id)
defer.returnValue(None)
@@ -842,10 +843,13 @@ class AuthHandler(BaseHandler):
password (str): Password to hash.
Returns:
- Hashed password (str).
+ Deferred(str): Hashed password.
"""
- return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
- bcrypt.gensalt(self.bcrypt_rounds))
+ def _do_hash():
+ return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
+ bcrypt.gensalt(self.bcrypt_rounds))
+
+ return make_deferred_yieldable(threads.deferToThread(_do_hash))
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -855,13 +859,19 @@ class AuthHandler(BaseHandler):
stored_hash (str): Expected hash value.
Returns:
- Whether self.hash(password) == stored_hash (bool).
+ Deferred(bool): Whether self.hash(password) == stored_hash.
"""
+
+ def _do_validate_hash():
+ return bcrypt.checkpw(
+ password.encode('utf8') + self.hs.config.password_pepper,
+ stored_hash.encode('utf8')
+ )
+
if stored_hash:
- return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
- stored_hash.encode('utf8')) == stored_hash
+ return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
else:
- return False
+ return defer.succeed(False)
class MacaroonGeneartor(object):
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2152efc692..40f3d24678 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,6 +14,7 @@
# limitations under the License.
from synapse.api import errors
from synapse.api.constants import EventTypes
+from synapse.api.errors import FederationDeniedError
from synapse.util import stringutils
from synapse.util.async import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
@@ -36,14 +37,15 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
self.federation_sender = hs.get_federation_sender()
- self.federation = hs.get_replication_layer()
self._edu_updater = DeviceListEduUpdater(hs, self)
- self.federation.register_edu_handler(
+ federation_registry = hs.get_federation_registry()
+
+ federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update,
)
- self.federation.register_query_handler(
+ federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
)
@@ -429,7 +431,7 @@ class DeviceListEduUpdater(object):
def __init__(self, hs, device_handler):
self.store = hs.get_datastore()
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.device_handler = device_handler
@@ -513,6 +515,9 @@ class DeviceListEduUpdater(object):
# This makes it more likely that the device lists will
# eventually become consistent.
return
+ except FederationDeniedError as e:
+ logger.info(e)
+ return
except Exception:
# TODO: Remember that we are now out of sync and try again
# later
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index f7fad15c62..f147a20b73 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -17,7 +17,8 @@ import logging
from twisted.internet import defer
-from synapse.types import get_domain_from_id
+from synapse.api.errors import SynapseError
+from synapse.types import get_domain_from_id, UserID
from synapse.util.stringutils import random_string
@@ -33,10 +34,10 @@ class DeviceMessageHandler(object):
"""
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
- self.is_mine_id = hs.is_mine_id
+ self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler(
+ hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
@@ -52,6 +53,12 @@ class DeviceMessageHandler(object):
message_type = content["type"]
message_id = content["message_id"]
for user_id, by_device in content["messages"].items():
+ # we use UserID.from_string to catch invalid user ids
+ if not self.is_mine(UserID.from_string(user_id)):
+ logger.warning("Request for keys for non-local user %s",
+ user_id)
+ raise SynapseError(400, "Not a user here")
+
messages_by_device = {
device_id: {
"content": message_content,
@@ -77,7 +84,8 @@ class DeviceMessageHandler(object):
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
- if self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if self.is_mine(UserID.from_string(user_id)):
messages_by_device = {
device_id: {
"content": message_content,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index a0464ae5c0..c5b6e75e03 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -34,9 +34,10 @@ class DirectoryHandler(BaseHandler):
self.state = hs.get_state_handler()
self.appservice_handler = hs.get_application_service_handler()
+ self.event_creation_handler = hs.get_event_creation_handler()
- self.federation = hs.get_replication_layer()
- self.federation.register_query_handler(
+ self.federation = hs.get_federation_client()
+ hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query
)
@@ -249,8 +250,7 @@ class DirectoryHandler(BaseHandler):
def send_room_alias_update_event(self, requester, user_id, room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
- msg_handler = self.hs.get_handlers().message_handler
- yield msg_handler.create_and_send_nonmember_event(
+ yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.Aliases,
@@ -272,8 +272,7 @@ class DirectoryHandler(BaseHandler):
if not alias_event or alias_event.content.get("alias", "") != alias_str:
return
- msg_handler = self.hs.get_handlers().message_handler
- yield msg_handler.create_and_send_nonmember_event(
+ yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.CanonicalAlias,
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index ce2c87e400..80b359b2e7 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -19,8 +19,10 @@ import logging
from canonicaljson import encode_canonical_json
from twisted.internet import defer
-from synapse.api.errors import SynapseError, CodeMessageException
-from synapse.types import get_domain_from_id
+from synapse.api.errors import (
+ SynapseError, CodeMessageException, FederationDeniedError,
+)
+from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
@@ -30,15 +32,15 @@ logger = logging.getLogger(__name__)
class E2eKeysHandler(object):
def __init__(self, hs):
self.store = hs.get_datastore()
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_client()
self.device_handler = hs.get_device_handler()
- self.is_mine_id = hs.is_mine_id
+ self.is_mine = hs.is_mine
self.clock = hs.get_clock()
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
- self.federation.register_query_handler(
+ hs.get_federation_registry().register_query_handler(
"client_keys", self.on_federation_query_client_keys
)
@@ -70,7 +72,8 @@ class E2eKeysHandler(object):
remote_queries = {}
for user_id, device_ids in device_keys_query.items():
- if self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if self.is_mine(UserID.from_string(user_id)):
local_query[user_id] = device_ids
else:
remote_queries[user_id] = device_ids
@@ -139,6 +142,10 @@ class E2eKeysHandler(object):
failures[destination] = {
"status": 503, "message": "Not ready for retry",
}
+ except FederationDeniedError as e:
+ failures[destination] = {
+ "status": 403, "message": "Federation Denied",
+ }
except Exception as e:
# include ConnectionRefused and other errors
failures[destination] = {
@@ -170,7 +177,8 @@ class E2eKeysHandler(object):
result_dict = {}
for user_id, device_ids in query.items():
- if not self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s",
user_id)
raise SynapseError(400, "Not a user here")
@@ -213,7 +221,8 @@ class E2eKeysHandler(object):
remote_queries = {}
for user_id, device_keys in query.get("one_time_keys", {}).items():
- if self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if self.is_mine(UserID.from_string(user_id)):
for device_id, algorithm in device_keys.items():
local_query.append((user_id, device_id, algorithm))
else:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac70730885..080aca3d71 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,6 +23,7 @@ from ._base import BaseHandler
from synapse.api.errors import (
AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+ FederationDeniedError,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
@@ -66,7 +68,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_replication_layer()
+ self.replication_layer = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -74,8 +76,7 @@ class FederationHandler(BaseHandler):
self.is_mine_id = hs.is_mine_id
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
-
- self.replication_layer.set_handler(self)
+ self.event_creation_handler = hs.get_event_creation_handler()
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -782,6 +783,9 @@ class FederationHandler(BaseHandler):
except NotRetryingDestination as e:
logger.info(e.message)
continue
+ except FederationDeniedError as e:
+ logger.info(e)
+ continue
except Exception as e:
logger.exception(
"Failed to backfill from %s because %s",
@@ -804,13 +808,12 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
+ resolve = logcontext.preserve_fn(
+ self.state_handler.resolve_state_groups_for_events
+ )
states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
- room_id, [e]
- )
- for e in event_ids
- ], consumeErrors=True,
+ [resolve(room_id, [e]) for e in event_ids],
+ consumeErrors=True,
))
states = dict(zip(event_ids, [s.state for s in states]))
@@ -1004,8 +1007,7 @@ class FederationHandler(BaseHandler):
})
try:
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
except AuthError as e:
@@ -1245,8 +1247,7 @@ class FederationHandler(BaseHandler):
"state_key": user_id,
})
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -1444,16 +1445,24 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- if not event.internal_metadata.is_outlier() and not backfilled:
- yield self.action_generator.handle_push_actions_for_event(
- event, context
- )
+ try:
+ if not event.internal_metadata.is_outlier() and not backfilled:
+ yield self.action_generator.handle_push_actions_for_event(
+ event, context
+ )
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- backfilled=backfilled,
- )
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event,
+ context=context,
+ backfilled=backfilled,
+ )
+ except: # noqa: E722, as we reraise the exception this is fine.
+ # Ensure that we actually remove the entries in the push actions
+ # staging area
+ logcontext.preserve_fn(
+ self.store.remove_push_actions_from_staging
+ )(event.event_id)
+ raise
if not backfilled:
# this intentionally does not yield: we don't care about the result
@@ -1828,8 +1837,8 @@ class FederationHandler(BaseHandler):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state
- self._update_context_for_auth_events(
- context, auth_events, event_key,
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
)
if different_auth and not event.internal_metadata.is_outlier():
@@ -1910,8 +1919,8 @@ class FederationHandler(BaseHandler):
# 4. Look at rejects and their proofs.
# TODO.
- self._update_context_for_auth_events(
- context, auth_events, event_key,
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
)
try:
@@ -1920,11 +1929,15 @@ class FederationHandler(BaseHandler):
logger.warn("Failed auth resolution for %r because %s", event, e)
raise e
- def _update_context_for_auth_events(self, context, auth_events,
+ @defer.inlineCallbacks
+ def _update_context_for_auth_events(self, event, context, auth_events,
event_key):
- """Update the state_ids in an event context after auth event resolution
+ """Update the state_ids in an event context after auth event resolution,
+ storing the changes as a new state group.
Args:
+ event (Event): The event we're handling the context for
+
context (synapse.events.snapshot.EventContext): event context
to be updated
@@ -1947,7 +1960,13 @@ class FederationHandler(BaseHandler):
context.prev_state_ids.update({
k: a.event_id for k, a in auth_events.iteritems()
})
- context.state_group = self.store.get_next_state_group()
+ context.state_group = yield self.store.store_state_group(
+ event.event_id,
+ event.room_id,
+ prev_group=context.prev_group,
+ delta_ids=context.delta_ids,
+ current_state_ids=context.current_state_ids,
+ )
@defer.inlineCallbacks
def construct_auth_difference(self, local_auth, remote_auth):
@@ -2117,8 +2136,7 @@ class FederationHandler(BaseHandler):
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
@@ -2133,7 +2151,7 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2156,8 +2174,7 @@ class FederationHandler(BaseHandler):
"""
builder = self.event_builder_factory.new(event_dict)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -2178,7 +2195,7 @@ class FederationHandler(BaseHandler):
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
@@ -2207,8 +2224,9 @@ class FederationHandler(BaseHandler):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(builder=builder)
+ event, context = yield self.event_creation_handler.create_new_client_event(
+ builder=builder,
+ )
defer.returnValue((event, context))
@defer.inlineCallbacks
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 7e5d3f148d..e4d0cc8b02 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -383,11 +383,12 @@ class GroupsLocalHandler(object):
defer.returnValue({"groups": result})
else:
- result = yield self.transport_client.get_publicised_groups_for_user(
- get_domain_from_id(user_id), user_id
+ bulk_result = yield self.transport_client.bulk_get_publicised_groups(
+ get_domain_from_id(user_id), [user_id],
)
+ result = bulk_result.get("users", {}).get(user_id)
# TODO: Verify attestations
- defer.returnValue(result)
+ defer.returnValue({"groups": result})
@defer.inlineCallbacks
def bulk_get_publicised_groups(self, user_ids, proxy=True):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d7413833ed..5a8ddc253e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2017 New Vector Ltd
+# Copyright 2017 - 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,7 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,10 +25,12 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
+from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler
@@ -40,6 +43,36 @@ import simplejson
logger = logging.getLogger(__name__)
+class PurgeStatus(object):
+ """Object tracking the status of a purge request
+
+ This class contains information on the progress of a purge request, for
+ return by get_purge_status.
+
+ Attributes:
+ status (int): Tracks whether this request has completed. One of
+ STATUS_{ACTIVE,COMPLETE,FAILED}
+ """
+
+ STATUS_ACTIVE = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+
+ STATUS_TEXT = {
+ STATUS_ACTIVE: "active",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ }
+
+ def __init__(self):
+ self.status = PurgeStatus.STATUS_ACTIVE
+
+ def asdict(self):
+ return {
+ "status": PurgeStatus.STATUS_TEXT[self.status]
+ }
+
+
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -47,32 +80,89 @@ class MessageHandler(BaseHandler):
self.hs = hs
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
- self.validator = EventValidator()
- self.profile_handler = hs.get_profile_handler()
self.pagination_lock = ReadWriteLock()
+ self._purges_in_progress_by_room = set()
+ # map from purge id to PurgeStatus
+ self._purges_by_id = {}
- self.pusher_pool = hs.get_pusherpool()
+ def start_purge_history(self, room_id, topological_ordering,
+ delete_local_events=False):
+ """Start off a history purge on a room.
- # We arbitrarily limit concurrent event creation for a room to 5.
- # This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5)
+ Args:
+ room_id (str): The room to purge from
- self.action_generator = hs.get_action_generator()
+ topological_ordering (int): minimum topo ordering to preserve
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
- self.spam_checker = hs.get_spam_checker()
+ Returns:
+ str: unique ID for this purge transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400,
+ "History purge already in progress for %s" % (room_id, ),
+ )
+
+ purge_id = random_string(16)
+
+ # we log the purge_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info("[purge] starting purge_id %s", purge_id)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+ run_in_background(
+ self._purge_history,
+ purge_id, room_id, topological_ordering, delete_local_events,
+ )
+ return purge_id
@defer.inlineCallbacks
- def purge_history(self, room_id, event_id):
- event = yield self.store.get_event(event_id)
+ def _purge_history(self, purge_id, room_id, topological_ordering,
+ delete_local_events):
+ """Carry out a history purge on a room.
+
+ Args:
+ purge_id (str): The id for this purge
+ room_id (str): The room to purge from
+ topological_ordering (int): minimum topo ordering to preserve
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ Deferred
+ """
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.purge_history(
+ room_id, topological_ordering, delete_local_events,
+ )
+ logger.info("[purge] complete")
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ except Exception:
+ logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
- if event.room_id != room_id:
- raise SynapseError(400, "Event is for wrong room.")
+ # remove the purge from the list 24 hours after it completes
+ def clear_purge():
+ del self._purges_by_id[purge_id]
+ reactor.callLater(24 * 3600, clear_purge)
- depth = event.depth
+ def get_purge_status(self, purge_id):
+ """Get the current status of an active purge
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.delete_old_state(room_id, depth)
+ Args:
+ purge_id (str): purge_id returned by start_purge_history
+
+ Returns:
+ PurgeStatus|None
+ """
+ return self._purges_by_id.get(purge_id)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
@@ -183,6 +273,165 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
+ def get_room_data(self, user_id=None, room_id=None,
+ event_type=None, state_key="", is_guest=False):
+ """ Get data from a room.
+
+ Args:
+ event : The room path event
+ Returns:
+ The path data content.
+ Raises:
+ SynapseError if something went wrong.
+ """
+ membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id
+ )
+
+ if membership == Membership.JOIN:
+ data = yield self.state_handler.get_current_state(
+ room_id, event_type, state_key
+ )
+ elif membership == Membership.LEAVE:
+ key = (event_type, state_key)
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], [key]
+ )
+ data = room_state[membership_event_id].get(key)
+
+ defer.returnValue(data)
+
+ @defer.inlineCallbacks
+ def _check_in_room_or_world_readable(self, room_id, user_id):
+ try:
+ # check_user_was_in_room will return the most recent membership
+ # event for the user if:
+ # * The user is a non-guest user, and was ever in the room
+ # * The user is a guest user, and has joined the room
+ # else it will throw.
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+ defer.returnValue((member_event.membership, member_event.event_id))
+ return
+ except AuthError:
+ visibility = yield self.state_handler.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility, ""
+ )
+ if (
+ visibility and
+ visibility.content["history_visibility"] == "world_readable"
+ ):
+ defer.returnValue((Membership.JOIN, None))
+ return
+ raise AuthError(
+ 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+ )
+
+ @defer.inlineCallbacks
+ def get_state_events(self, user_id, room_id, is_guest=False):
+ """Retrieve all state events for a given room. If the user is
+ joined to the room then return the current state. If the user has
+ left the room return the state events from when they left.
+
+ Args:
+ user_id(str): The user requesting state events.
+ room_id(str): The room ID to get all state events from.
+ Returns:
+ A list of dicts representing state events. [{}, {}, {}]
+ """
+ membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id
+ )
+
+ if membership == Membership.JOIN:
+ room_state = yield self.state_handler.get_current_state(room_id)
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], None
+ )
+ room_state = room_state[membership_event_id]
+
+ now = self.clock.time_msec()
+ defer.returnValue(
+ [serialize_event(c, now) for c in room_state.values()]
+ )
+
+ @defer.inlineCallbacks
+ def get_joined_members(self, requester, room_id):
+ """Get all the joined members in the room and their profile information.
+
+ If the user has left the room return the state events from when they left.
+
+ Args:
+ requester(Requester): The user requesting state events.
+ room_id(str): The room ID to get all state events from.
+ Returns:
+ A dict of user_id to profile info
+ """
+ user_id = requester.user.to_string()
+ if not requester.app_service:
+ # We check AS auth after fetching the room membership, as it
+ # requires us to pull out all joined members anyway.
+ membership, _ = yield self._check_in_room_or_world_readable(
+ room_id, user_id
+ )
+ if membership != Membership.JOIN:
+ raise NotImplementedError(
+ "Getting joined members after leaving is not implemented"
+ )
+
+ users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+ # If this is an AS, double check that they are allowed to see the members.
+ # This can either be because the AS user is in the room or becuase there
+ # is a user in the room that the AS is "interested in"
+ if requester.app_service and user_id not in users_with_profile:
+ for uid in users_with_profile:
+ if requester.app_service.is_interested_in_user(uid):
+ break
+ else:
+ # Loop fell through, AS has no interested users in room
+ raise AuthError(403, "Appservice not in room")
+
+ defer.returnValue({
+ user_id: {
+ "avatar_url": profile.avatar_url,
+ "display_name": profile.display_name,
+ }
+ for user_id, profile in users_with_profile.iteritems()
+ })
+
+
+class EventCreationHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
+ self.clock = hs.get_clock()
+ self.validator = EventValidator()
+ self.profile_handler = hs.get_profile_handler()
+ self.event_builder_factory = hs.get_event_builder_factory()
+ self.server_name = hs.hostname
+ self.ratelimiter = hs.get_ratelimiter()
+ self.notifier = hs.get_notifier()
+ self.config = hs.config
+
+ self.http_client = hs.get_simple_http_client()
+
+ # This is only used to get at ratelimit function, and maybe_kick_guest_users
+ self.base_handler = BaseHandler(hs)
+
+ self.pusher_pool = hs.get_pusherpool()
+
+ # We arbitrarily limit concurrent event creation for a room to 5.
+ # This is to stop us from diverging history *too* much.
+ self.limiter = Limiter(max_count=5)
+
+ self.action_generator = hs.get_action_generator()
+
+ self.spam_checker = hs.get_spam_checker()
+
+ @defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_event_ids=None):
"""
@@ -234,7 +483,7 @@ class MessageHandler(BaseHandler):
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
- event, context = yield self._create_new_client_event(
+ event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
@@ -259,11 +508,6 @@ class MessageHandler(BaseHandler):
"Tried to send member event through non-member codepath"
)
- # We check here if we are currently being rate limited, so that we
- # don't do unnecessary work. We check again just before we actually
- # send the event.
- yield self.ratelimit(requester, update=False)
-
user = UserID.from_string(event.sender)
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
@@ -280,12 +524,6 @@ class MessageHandler(BaseHandler):
ratelimit=ratelimit,
)
- if event.type == EventTypes.Message:
- presence = self.hs.get_presence_handler()
- # We don't want to block sending messages on any presence code. This
- # matters as sometimes presence code can take a while.
- preserve_fn(presence.bump_presence_active_time)(user)
-
@defer.inlineCallbacks
def deduplicate_state_event(self, event, context):
"""
@@ -342,137 +580,9 @@ class MessageHandler(BaseHandler):
)
defer.returnValue(event)
+ @measure_func("create_new_client_event")
@defer.inlineCallbacks
- def get_room_data(self, user_id=None, room_id=None,
- event_type=None, state_key="", is_guest=False):
- """ Get data from a room.
-
- Args:
- event : The room path event
- Returns:
- The path data content.
- Raises:
- SynapseError if something went wrong.
- """
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if membership == Membership.JOIN:
- data = yield self.state_handler.get_current_state(
- room_id, event_type, state_key
- )
- elif membership == Membership.LEAVE:
- key = (event_type, state_key)
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], [key]
- )
- data = room_state[membership_event_id].get(key)
-
- defer.returnValue(data)
-
- @defer.inlineCallbacks
- def _check_in_room_or_world_readable(self, room_id, user_id):
- try:
- # check_user_was_in_room will return the most recent membership
- # event for the user if:
- # * The user is a non-guest user, and was ever in the room
- # * The user is a guest user, and has joined the room
- # else it will throw.
- member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- defer.returnValue((member_event.membership, member_event.event_id))
- return
- except AuthError:
- visibility = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if (
- visibility and
- visibility.content["history_visibility"] == "world_readable"
- ):
- defer.returnValue((Membership.JOIN, None))
- return
- raise AuthError(
- 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
- )
-
- @defer.inlineCallbacks
- def get_state_events(self, user_id, room_id, is_guest=False):
- """Retrieve all state events for a given room. If the user is
- joined to the room then return the current state. If the user has
- left the room return the state events from when they left.
-
- Args:
- user_id(str): The user requesting state events.
- room_id(str): The room ID to get all state events from.
- Returns:
- A list of dicts representing state events. [{}, {}, {}]
- """
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if membership == Membership.JOIN:
- room_state = yield self.state_handler.get_current_state(room_id)
- elif membership == Membership.LEAVE:
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], None
- )
- room_state = room_state[membership_event_id]
-
- now = self.clock.time_msec()
- defer.returnValue(
- [serialize_event(c, now) for c in room_state.values()]
- )
-
- @defer.inlineCallbacks
- def get_joined_members(self, requester, room_id):
- """Get all the joined members in the room and their profile information.
-
- If the user has left the room return the state events from when they left.
-
- Args:
- requester(Requester): The user requesting state events.
- room_id(str): The room ID to get all state events from.
- Returns:
- A dict of user_id to profile info
- """
- user_id = requester.user.to_string()
- if not requester.app_service:
- # We check AS auth after fetching the room membership, as it
- # requires us to pull out all joined members anyway.
- membership, _ = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
- if membership != Membership.JOIN:
- raise NotImplementedError(
- "Getting joined members after leaving is not implemented"
- )
-
- users_with_profile = yield self.state.get_current_user_in_room(room_id)
-
- # If this is an AS, double check that they are allowed to see the members.
- # This can either be because the AS user is in the room or becuase there
- # is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
- for uid in users_with_profile:
- if requester.app_service.is_interested_in_user(uid):
- break
- else:
- # Loop fell through, AS has no interested users in room
- raise AuthError(403, "Appservice not in room")
-
- defer.returnValue({
- user_id: {
- "avatar_url": profile.avatar_url,
- "display_name": profile.display_name,
- }
- for user_id, profile in users_with_profile.iteritems()
- })
-
- @measure_func("_create_new_client_event")
- @defer.inlineCallbacks
- def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
+ def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
if prev_event_ids:
prev_events = yield self.store.add_event_hashes(prev_event_ids)
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
@@ -509,9 +619,7 @@ class MessageHandler(BaseHandler):
builder.prev_events = prev_events
builder.depth = depth
- state_handler = self.state_handler
-
- context = yield state_handler.compute_event_context(builder)
+ context = yield self.state.compute_event_context(builder)
if requester:
context.app_service = requester.app_service
@@ -546,12 +654,21 @@ class MessageHandler(BaseHandler):
event,
context,
ratelimit=True,
- extra_users=[]
+ extra_users=[],
):
- # We now need to go and hit out to wherever we need to hit out to.
+ """Processes a new event. This includes checking auth, persisting it,
+ notifying users, sending to remote servers, etc.
- if ratelimit:
- yield self.ratelimit(requester)
+ If called from a worker will hit out to the master process for final
+ processing.
+
+ Args:
+ requester (Requester)
+ event (FrozenEvent)
+ context (EventContext)
+ ratelimit (bool)
+ extra_users (list(UserID)): Any extra users to notify about event
+ """
try:
yield self.auth.check_from_context(event, context)
@@ -567,7 +684,58 @@ class MessageHandler(BaseHandler):
logger.exception("Failed to encode content: %r", event.content)
raise
- yield self.maybe_kick_guest_users(event, context)
+ yield self.action_generator.handle_push_actions_for_event(
+ event, context
+ )
+
+ try:
+ # If we're a worker we need to hit out to the master.
+ if self.config.worker_app:
+ yield send_event_to_master(
+ self.http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ requester=requester,
+ event=event,
+ context=context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ )
+ return
+
+ yield self.persist_and_notify_client_event(
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
+ )
+ except: # noqa: E722, as we reraise the exception this is fine.
+ # Ensure that we actually remove the entries in the push actions
+ # staging area, if we calculated them.
+ preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
+ raise
+
+ @defer.inlineCallbacks
+ def persist_and_notify_client_event(
+ self,
+ requester,
+ event,
+ context,
+ ratelimit=True,
+ extra_users=[],
+ ):
+ """Called when we have fully built the event, have already
+ calculated the push actions for the event, and checked auth.
+
+ This should only be run on master.
+ """
+ assert not self.config.worker_app
+
+ if ratelimit:
+ yield self.base_handler.ratelimit(requester)
+
+ yield self.base_handler.maybe_kick_guest_users(event, context)
if event.type == EventTypes.CanonicalAlias:
# Check the alias is acually valid (at this time at least)
@@ -660,10 +828,6 @@ class MessageHandler(BaseHandler):
"Changing the room create event is forbidden",
)
- yield self.action_generator.handle_push_actions_for_event(
- event, context
- )
-
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
@@ -683,3 +847,9 @@ class MessageHandler(BaseHandler):
)
preserve_fn(_notify)()
+
+ if event.type == EventTypes.Message:
+ presence = self.hs.get_presence_handler()
+ # We don't want to block sending messages on any presence code. This
+ # matters as sometimes presence code can take a while.
+ preserve_fn(presence.bump_presence_active_time)(requester.user)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cb158ba962..a5e501897c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -93,29 +93,30 @@ class PresenceHandler(object):
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.replication = hs.get_replication_layer()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
- self.replication.register_edu_handler(
+ federation_registry = hs.get_federation_registry()
+
+ federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9800e24453..3465a787ab 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,14 +31,17 @@ class ProfileHandler(BaseHandler):
def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)
- self.federation = hs.get_replication_layer()
- self.federation.register_query_handler(
+ self.federation = hs.get_federation_client()
+ hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query
)
self.user_directory_handler = hs.get_user_directory_handler()
- self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
+ if hs.config.worker_app is None:
+ self.clock.looping_call(
+ self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+ )
@defer.inlineCallbacks
def get_profile(self, user_id):
@@ -233,7 +236,7 @@ class ProfileHandler(BaseHandler):
)
for room_id in room_ids:
- handler = self.hs.get_handlers().room_member_handler
+ handler = self.hs.get_room_member_handler()
try:
# Assume the target_user isn't a guest,
# because we don't let guests set profile or avatar data.
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index b5b0303d54..5142ae153d 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -41,9 +41,9 @@ class ReadMarkerHandler(BaseHandler):
"""
with (yield self.read_marker_linearizer.queue((room_id, user_id))):
- account_data = yield self.store.get_account_data_for_room(user_id, room_id)
-
- existing_read_marker = account_data.get("m.fully_read", None)
+ existing_read_marker = yield self.store.get_account_data_for_room_and_type(
+ user_id, room_id, "m.fully_read",
+ )
should_update = True
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 0525765272..3f215c2b4e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler):
self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler(
+ hs.get_federation_registry().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 4bc6ef51fe..ed5939880a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID
from synapse.util.async import run_on_reactor
+from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -131,7 +132,7 @@ class RegistrationHandler(BaseHandler):
yield run_on_reactor()
password_hash = None
if password:
- password_hash = self.auth_handler().hash(password)
+ password_hash = yield self.auth_handler().hash(password)
if localpart:
yield self.check_username(localpart, guest_access_token=guest_access_token)
@@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler):
"""
for c in threepidCreds:
- logger.info("validating theeepidcred sid %s on id server %s",
+ logger.info("validating threepidcred sid %s on id server %s",
c['sid'], c['idServer'])
try:
identity_handler = self.hs.get_handlers().identity_handler
@@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler):
logger.info("got threepid with medium '%s' and address '%s'",
threepid['medium'], threepid['address'])
+ if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']):
+ raise RegistrationError(
+ 403, "Third party identifier is not allowed"
+ )
+
@defer.inlineCallbacks
def bind_emails(self, user_id, threepidCreds):
"""Links emails with a user ID and informs an identity server.
@@ -440,16 +446,34 @@ class RegistrationHandler(BaseHandler):
return self.hs.get_auth_handler()
@defer.inlineCallbacks
- def guest_access_token_for(self, medium, address, inviter_user_id):
+ def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
+ """Get a guest access token for a 3PID, creating a guest account if
+ one doesn't already exist.
+
+ Args:
+ medium (str)
+ address (str)
+ inviter_user_id (str): The user ID who is trying to invite the
+ 3PID
+
+ Returns:
+ Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+ 3PID guest account.
+ """
access_token = yield self.store.get_3pid_guest_access_token(medium, address)
if access_token:
- defer.returnValue(access_token)
+ user_info = yield self.auth.get_user_by_access_token(
+ access_token
+ )
- _, access_token = yield self.register(
+ defer.returnValue((user_info["user"].to_string(), access_token))
+
+ user_id, access_token = yield self.register(
generate_token=True,
make_guest=True
)
access_token = yield self.store.save_or_get_3pid_guest_access_token(
medium, address, access_token, inviter_user_id
)
- defer.returnValue(access_token)
+
+ defer.returnValue((user_id, access_token))
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d1cc87a016..8df8fcbbad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -64,6 +65,7 @@ class RoomCreationHandler(BaseHandler):
super(RoomCreationHandler, self).__init__(hs)
self.spam_checker = hs.get_spam_checker()
+ self.event_creation_handler = hs.get_event_creation_handler()
@defer.inlineCallbacks
def create_room(self, requester, config, ratelimit=True):
@@ -163,13 +165,11 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
- msg_handler = self.hs.get_handlers().message_handler
- room_member_handler = self.hs.get_handlers().room_member_handler
+ room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(
requester,
room_id,
- msg_handler,
room_member_handler,
preset_config=preset_config,
invite_list=invite_list,
@@ -181,7 +181,7 @@ class RoomCreationHandler(BaseHandler):
if "name" in config:
name = config["name"]
- yield msg_handler.create_and_send_nonmember_event(
+ yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.Name,
@@ -194,7 +194,7 @@ class RoomCreationHandler(BaseHandler):
if "topic" in config:
topic = config["topic"]
- yield msg_handler.create_and_send_nonmember_event(
+ yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.Topic,
@@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler):
id_server = invite_3pid["id_server"]
address = invite_3pid["address"]
medium = invite_3pid["medium"]
- yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
+ yield self.hs.get_room_member_handler().do_3pid_invite(
room_id,
requester.user,
medium,
@@ -249,7 +249,6 @@ class RoomCreationHandler(BaseHandler):
self,
creator, # A Requester object.
room_id,
- msg_handler,
room_member_handler,
preset_config,
invite_list,
@@ -272,7 +271,7 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def send(etype, content, **kwargs):
event = create(etype, content, **kwargs)
- yield msg_handler.create_and_send_nonmember_event(
+ yield self.event_creation_handler.create_and_send_nonmember_event(
creator,
event,
ratelimit=False
@@ -476,12 +475,9 @@ class RoomEventSource(object):
user.to_string()
)
if app_service:
- events, end_key = yield self.store.get_appservice_room_stream(
- service=app_service,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- )
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
else:
room_events = yield self.store.get_membership_changes_for_user(
user.to_string(), from_key, to_key
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index bb40075387..5d81f59b44 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -203,7 +203,8 @@ class RoomListHandler(BaseHandler):
if limit:
step = limit + 1
else:
- step = len(rooms_to_scan)
+ # step cannot be zero
+ step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
chunk = []
for i in xrange(0, len(rooms_to_scan), step):
@@ -408,7 +409,7 @@ class RoomListHandler(BaseHandler):
def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
third_party_instance_id=None,):
- repl_layer = self.hs.get_replication_layer()
+ repl_layer = self.hs.get_federation_client()
if search_filter:
# We can't cache when asking for search
return repl_layer.get_public_rooms(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 7e6467cd1d..9977be8831 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import abc
import logging
from signedjson.key import decode_verify_key_bytes
@@ -29,32 +30,121 @@ from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer
from synapse.util.distributor import user_left_room, user_joined_room
-from ._base import BaseHandler
+
logger = logging.getLogger(__name__)
id_server_scheme = "https://"
-class RoomMemberHandler(BaseHandler):
+class RoomMemberHandler(object):
# TODO(paul): This handler currently contains a messy conflation of
# low-level API that works on UserID objects and so on, and REST-level
# API that takes ID strings and returns pagination chunks. These concerns
# ought to be separated out a lot better.
- def __init__(self, hs):
- super(RoomMemberHandler, self).__init__(hs)
+ __metaclass__ = abc.ABCMeta
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+ self.state_handler = hs.get_state_handler()
+ self.config = hs.config
+ self.simple_http_client = hs.get_simple_http_client()
+
+ self.federation_handler = hs.get_handlers().federation_handler
+ self.directory_handler = hs.get_handlers().directory_handler
+ self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler()
+ self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
- self.distributor = hs.get_distributor()
- self.distributor.declare("user_joined_room")
- self.distributor.declare("user_left_room")
+ @abc.abstractmethod
+ def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+ """Try and join a room that this server is not in
+
+ Args:
+ requester (Requester)
+ remote_room_hosts (list[str]): List of servers that can be used
+ to join via.
+ room_id (str): Room that we are trying to join
+ user (UserID): User who is trying to join
+ content (dict): A dict that should be used as the content of the
+ join event.
+
+ Returns:
+ Deferred
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+ """Attempt to reject an invite for a room this server is not in. If we
+ fail to do so we locally mark the invite as rejected.
+
+ Args:
+ requester (Requester)
+ remote_room_hosts (list[str]): List of servers to use to try and
+ reject invite
+ room_id (str)
+ target (UserID): The user rejecting the invite
+
+ Returns:
+ Deferred[dict]: A dictionary to be returned to the client, may
+ include event_id etc, or nothing if we locally rejected
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+ """Get a guest access token for a 3PID, creating a guest account if
+ one doesn't already exist.
+
+ Args:
+ requester (Requester)
+ medium (str)
+ address (str)
+ inviter_user_id (str): The user ID who is trying to invite the
+ 3PID
+
+ Returns:
+ Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+ 3PID guest account.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def _user_joined_room(self, target, room_id):
+ """Notifies distributor on master process that the user has joined the
+ room.
+
+ Args:
+ target (UserID)
+ room_id (str)
+
+ Returns:
+ Deferred|None
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def _user_left_room(self, target, room_id):
+ """Notifies distributor on master process that the user has left the
+ room.
+
+ Args:
+ target (UserID)
+ room_id (str)
+
+ Returns:
+ Deferred|None
+ """
+ raise NotImplementedError()
@defer.inlineCallbacks
def _local_membership_update(
@@ -66,13 +156,12 @@ class RoomMemberHandler(BaseHandler):
):
if content is None:
content = {}
- msg_handler = self.hs.get_handlers().message_handler
content["membership"] = membership
if requester.is_guest:
content["kind"] = "guest"
- event, context = yield msg_handler.create_event(
+ event, context = yield self.event_creation_hander.create_event(
requester,
{
"type": EventTypes.Member,
@@ -90,12 +179,14 @@ class RoomMemberHandler(BaseHandler):
)
# Check if this event matches the previous membership event for the user.
- duplicate = yield msg_handler.deduplicate_state_event(event, context)
+ duplicate = yield self.event_creation_hander.deduplicate_state_event(
+ event, context,
+ )
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
defer.returnValue(duplicate)
- yield msg_handler.handle_new_client_event(
+ yield self.event_creation_hander.handle_new_client_event(
requester,
event,
context,
@@ -117,33 +208,16 @@ class RoomMemberHandler(BaseHandler):
prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
- yield user_joined_room(self.distributor, target, room_id)
+ yield self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
- user_left_room(self.distributor, target, room_id)
+ yield self._user_left_room(target, room_id)
defer.returnValue(event)
@defer.inlineCallbacks
- def remote_join(self, remote_room_hosts, room_id, user, content):
- if len(remote_room_hosts) == 0:
- raise SynapseError(404, "No known servers")
-
- # We don't do an auth check if we are doing an invite
- # join dance for now, since we're kinda implicitly checking
- # that we are allowed to join when we decide whether or not we
- # need to do the invite/join dance.
- yield self.hs.get_handlers().federation_handler.do_invite_join(
- remote_room_hosts,
- room_id,
- user.to_string(),
- content,
- )
- yield user_joined_room(self.distributor, user, room_id)
-
- @defer.inlineCallbacks
def update_membership(
self,
requester,
@@ -201,8 +275,7 @@ class RoomMemberHandler(BaseHandler):
# if this is a join with a 3pid signature, we may need to turn a 3pid
# invite into a normal invite before we can handle the join.
if third_party_signed is not None:
- replication = self.hs.get_replication_layer()
- yield replication.exchange_third_party_invite(
+ yield self.federation_handler.exchange_third_party_invite(
third_party_signed["sender"],
target.to_string(),
room_id,
@@ -223,7 +296,7 @@ class RoomMemberHandler(BaseHandler):
requester.user,
)
if not is_requester_admin:
- if self.hs.config.block_non_admin_invites:
+ if self.config.block_non_admin_invites:
logger.info(
"Blocking invite: user is not admin and non-admin "
"invites disabled"
@@ -282,7 +355,7 @@ class RoomMemberHandler(BaseHandler):
raise AuthError(403, "Guest access not allowed")
if not is_host_in_room:
- inviter = yield self.get_inviter(target.to_string(), room_id)
+ inviter = yield self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
@@ -296,15 +369,15 @@ class RoomMemberHandler(BaseHandler):
if requester.is_guest:
content["kind"] = "guest"
- ret = yield self.remote_join(
- remote_room_hosts, room_id, target, content
+ ret = yield self._remote_join(
+ requester, remote_room_hosts, room_id, target, content
)
defer.returnValue(ret)
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
# perhaps we've been invited
- inviter = yield self.get_inviter(target.to_string(), room_id)
+ inviter = yield self._get_inviter(target.to_string(), room_id)
if not inviter:
raise SynapseError(404, "Not a known room")
@@ -318,28 +391,10 @@ class RoomMemberHandler(BaseHandler):
else:
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
- fed_handler = self.hs.get_handlers().federation_handler
- try:
- ret = yield fed_handler.do_remotely_reject_invite(
- remote_room_hosts,
- room_id,
- target.to_string(),
- )
- defer.returnValue(ret)
- except Exception as e:
- # if we were unable to reject the exception, just mark
- # it as rejected on our end and plough ahead.
- #
- # The 'except' clause is very broad, but we need to
- # capture everything from DNS failures upwards
- #
- logger.warn("Failed to reject invite: %s", e)
-
- yield self.store.locally_reject_invite(
- target.to_string(), room_id
- )
-
- defer.returnValue({})
+ res = yield self._remote_reject_invite(
+ requester, remote_room_hosts, room_id, target,
+ )
+ defer.returnValue(res)
res = yield self._local_membership_update(
requester=requester,
@@ -394,8 +449,9 @@ class RoomMemberHandler(BaseHandler):
else:
requester = synapse.types.create_requester(target_user)
- message_handler = self.hs.get_handlers().message_handler
- prev_event = yield message_handler.deduplicate_state_event(event, context)
+ prev_event = yield self.event_creation_hander.deduplicate_state_event(
+ event, context,
+ )
if prev_event is not None:
return
@@ -412,7 +468,7 @@ class RoomMemberHandler(BaseHandler):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
- yield message_handler.handle_new_client_event(
+ yield self.event_creation_hander.handle_new_client_event(
requester,
event,
context,
@@ -434,12 +490,12 @@ class RoomMemberHandler(BaseHandler):
prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
- yield user_joined_room(self.distributor, target_user, room_id)
+ yield self._user_joined_room(target_user, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN:
- user_left_room(self.distributor, target_user, room_id)
+ yield self._user_left_room(target_user, room_id)
@defer.inlineCallbacks
def _can_guest_join(self, current_state_ids):
@@ -473,7 +529,7 @@ class RoomMemberHandler(BaseHandler):
Raises:
SynapseError if room alias could not be found.
"""
- directory_handler = self.hs.get_handlers().directory_handler
+ directory_handler = self.directory_handler
mapping = yield directory_handler.get_association(room_alias)
if not mapping:
@@ -485,7 +541,7 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue((RoomID.from_string(room_id), servers))
@defer.inlineCallbacks
- def get_inviter(self, user_id, room_id):
+ def _get_inviter(self, user_id, room_id):
invite = yield self.store.get_invite_for_user_in_room(
user_id=user_id,
room_id=room_id,
@@ -504,7 +560,7 @@ class RoomMemberHandler(BaseHandler):
requester,
txn_id
):
- if self.hs.config.block_non_admin_invites:
+ if self.config.block_non_admin_invites:
is_requester_admin = yield self.auth.is_server_admin(
requester.user,
)
@@ -551,7 +607,7 @@ class RoomMemberHandler(BaseHandler):
str: the matrix ID of the 3pid, or None if it is not recognized.
"""
try:
- data = yield self.hs.get_simple_http_client().get_json(
+ data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
{
"medium": medium,
@@ -562,7 +618,7 @@ class RoomMemberHandler(BaseHandler):
if "mxid" in data:
if "signatures" not in data:
raise AuthError(401, "No signatures on 3pid binding")
- self.verify_any_signature(data, id_server)
+ yield self._verify_any_signature(data, id_server)
defer.returnValue(data["mxid"])
except IOError as e:
@@ -570,11 +626,11 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(None)
@defer.inlineCallbacks
- def verify_any_signature(self, data, server_hostname):
+ def _verify_any_signature(self, data, server_hostname):
if server_hostname not in data["signatures"]:
raise AuthError(401, "No signature from server %s" % (server_hostname,))
for key_name, signature in data["signatures"][server_hostname].items():
- key_data = yield self.hs.get_simple_http_client().get_json(
+ key_data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/pubkey/%s" %
(id_server_scheme, server_hostname, key_name,),
)
@@ -599,7 +655,7 @@ class RoomMemberHandler(BaseHandler):
user,
txn_id
):
- room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+ room_state = yield self.state_handler.get_current_state(room_id)
inviter_display_name = ""
inviter_avatar_url = ""
@@ -630,6 +686,7 @@ class RoomMemberHandler(BaseHandler):
token, public_keys, fallback_public_key, display_name = (
yield self._ask_id_server_for_third_party_invite(
+ requester=requester,
id_server=id_server,
medium=medium,
address=address,
@@ -644,8 +701,7 @@ class RoomMemberHandler(BaseHandler):
)
)
- msg_handler = self.hs.get_handlers().message_handler
- yield msg_handler.create_and_send_nonmember_event(
+ yield self.event_creation_hander.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.ThirdPartyInvite,
@@ -667,6 +723,7 @@ class RoomMemberHandler(BaseHandler):
@defer.inlineCallbacks
def _ask_id_server_for_third_party_invite(
self,
+ requester,
id_server,
medium,
address,
@@ -683,6 +740,7 @@ class RoomMemberHandler(BaseHandler):
Asks an identity server for a third party invite.
Args:
+ requester (Requester)
id_server (str): hostname + optional port for the identity server.
medium (str): The literal string "email".
address (str): The third party address being invited.
@@ -724,24 +782,20 @@ class RoomMemberHandler(BaseHandler):
"sender_avatar_url": inviter_avatar_url,
}
- if self.hs.config.invite_3pid_guest:
- registration_handler = self.hs.get_handlers().registration_handler
- guest_access_token = yield registration_handler.guest_access_token_for(
+ if self.config.invite_3pid_guest:
+ guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
+ requester=requester,
medium=medium,
address=address,
inviter_user_id=inviter_user_id,
)
- guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
- guest_access_token
- )
-
invite_config.update({
"guest_access_token": guest_access_token,
- "guest_user_id": guest_user_info["user"].to_string(),
+ "guest_user_id": guest_user_id,
})
- data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+ data = yield self.simple_http_client.post_urlencoded_get_json(
is_url,
invite_config
)
@@ -764,27 +818,6 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue((token, public_keys, fallback_public_key, display_name))
@defer.inlineCallbacks
- def forget(self, user, room_id):
- user_id = user.to_string()
-
- member = yield self.state_handler.get_current_state(
- room_id=room_id,
- event_type=EventTypes.Member,
- state_key=user_id
- )
- membership = member.membership if member else None
-
- if membership is not None and membership not in [
- Membership.LEAVE, Membership.BAN
- ]:
- raise SynapseError(400, "User %s in room %s" % (
- user_id, room_id
- ))
-
- if membership:
- yield self.store.forget(user_id, room_id)
-
- @defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids):
# Have we just created the room, and is this about to be the very
# first member event?
@@ -805,3 +838,94 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue(True)
defer.returnValue(False)
+
+
+class RoomMemberMasterHandler(RoomMemberHandler):
+ def __init__(self, hs):
+ super(RoomMemberMasterHandler, self).__init__(hs)
+
+ self.distributor = hs.get_distributor()
+ self.distributor.declare("user_joined_room")
+ self.distributor.declare("user_left_room")
+
+ @defer.inlineCallbacks
+ def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+ """Implements RoomMemberHandler._remote_join
+ """
+ if len(remote_room_hosts) == 0:
+ raise SynapseError(404, "No known servers")
+
+ # We don't do an auth check if we are doing an invite
+ # join dance for now, since we're kinda implicitly checking
+ # that we are allowed to join when we decide whether or not we
+ # need to do the invite/join dance.
+ yield self.federation_handler.do_invite_join(
+ remote_room_hosts,
+ room_id,
+ user.to_string(),
+ content,
+ )
+ yield self._user_joined_room(user, room_id)
+
+ @defer.inlineCallbacks
+ def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+ """Implements RoomMemberHandler._remote_reject_invite
+ """
+ fed_handler = self.federation_handler
+ try:
+ ret = yield fed_handler.do_remotely_reject_invite(
+ remote_room_hosts,
+ room_id,
+ target.to_string(),
+ )
+ defer.returnValue(ret)
+ except Exception as e:
+ # if we were unable to reject the exception, just mark
+ # it as rejected on our end and plough ahead.
+ #
+ # The 'except' clause is very broad, but we need to
+ # capture everything from DNS failures upwards
+ #
+ logger.warn("Failed to reject invite: %s", e)
+
+ yield self.store.locally_reject_invite(
+ target.to_string(), room_id
+ )
+ defer.returnValue({})
+
+ def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+ """Implements RoomMemberHandler.get_or_register_3pid_guest
+ """
+ rg = self.registration_handler
+ return rg.get_or_register_3pid_guest(medium, address, inviter_user_id)
+
+ def _user_joined_room(self, target, room_id):
+ """Implements RoomMemberHandler._user_joined_room
+ """
+ return user_joined_room(self.distributor, target, room_id)
+
+ def _user_left_room(self, target, room_id):
+ """Implements RoomMemberHandler._user_left_room
+ """
+ return user_left_room(self.distributor, target, room_id)
+
+ @defer.inlineCallbacks
+ def forget(self, user, room_id):
+ user_id = user.to_string()
+
+ member = yield self.state_handler.get_current_state(
+ room_id=room_id,
+ event_type=EventTypes.Member,
+ state_key=user_id
+ )
+ membership = member.membership if member else None
+
+ if membership is not None and membership not in [
+ Membership.LEAVE, Membership.BAN
+ ]:
+ raise SynapseError(400, "User %s in room %s" % (
+ user_id, room_id
+ ))
+
+ if membership:
+ yield self.store.forget(user_id, room_id)
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
new file mode 100644
index 0000000000..493aec1e48
--- /dev/null
+++ b/synapse/handlers/room_member_worker.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.handlers.room_member import RoomMemberHandler
+from synapse.replication.http.membership import (
+ remote_join, remote_reject_invite, get_or_register_3pid_guest,
+ notify_user_membership_change,
+)
+
+
+logger = logging.getLogger(__name__)
+
+
+class RoomMemberWorkerHandler(RoomMemberHandler):
+ @defer.inlineCallbacks
+ def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+ """Implements RoomMemberHandler._remote_join
+ """
+ if len(remote_room_hosts) == 0:
+ raise SynapseError(404, "No known servers")
+
+ ret = yield remote_join(
+ self.simple_http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ requester=requester,
+ remote_room_hosts=remote_room_hosts,
+ room_id=room_id,
+ user_id=user.to_string(),
+ content=content,
+ )
+
+ yield self._user_joined_room(user, room_id)
+
+ defer.returnValue(ret)
+
+ def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+ """Implements RoomMemberHandler._remote_reject_invite
+ """
+ return remote_reject_invite(
+ self.simple_http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ requester=requester,
+ remote_room_hosts=remote_room_hosts,
+ room_id=room_id,
+ user_id=target.to_string(),
+ )
+
+ def _user_joined_room(self, target, room_id):
+ """Implements RoomMemberHandler._user_joined_room
+ """
+ return notify_user_membership_change(
+ self.simple_http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ user_id=target.to_string(),
+ room_id=room_id,
+ change="joined",
+ )
+
+ def _user_left_room(self, target, room_id):
+ """Implements RoomMemberHandler._user_left_room
+ """
+ return notify_user_membership_change(
+ self.simple_http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ user_id=target.to_string(),
+ room_id=room_id,
+ change="left",
+ )
+
+ def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+ """Implements RoomMemberHandler.get_or_register_3pid_guest
+ """
+ return get_or_register_3pid_guest(
+ self.simple_http_client,
+ host=self.config.worker_replication_host,
+ port=self.config.worker_replication_http_port,
+ requester=requester,
+ medium=medium,
+ address=address,
+ inviter_user_id=inviter_user_id,
+ )
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index 44414e1dc1..e057ae54c9 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -31,7 +31,7 @@ class SetPasswordHandler(BaseHandler):
@defer.inlineCallbacks
def set_password(self, user_id, newpassword, requester=None):
- password_hash = self._auth_handler.hash(newpassword)
+ password_hash = yield self._auth_handler.hash(newpassword)
except_device_id = requester.device_id if requester else None
except_access_token_id = requester.access_token_id if requester else None
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b12988f3c9..0f713ce038 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -235,10 +235,10 @@ class SyncHandler(object):
defer.returnValue(rules)
@defer.inlineCallbacks
- def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+ def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
"""Get the ephemeral events for each room the user is in
Args:
- sync_config (SyncConfig): The flags, filters and user for the sync.
+ sync_result_builder(SyncResultBuilder)
now_token (StreamToken): Where the server is currently up to.
since_token (StreamToken): Where the server was when the client
last synced.
@@ -248,10 +248,12 @@ class SyncHandler(object):
typing events for that room.
"""
+ sync_config = sync_result_builder.sync_config
+
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"
- room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = sync_result_builder.joined_room_ids
typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events(
@@ -565,10 +567,22 @@ class SyncHandler(object):
# Always use the `now_token` in `SyncResultBuilder`
now_token = yield self.event_sources.get_current_token()
+ user_id = sync_config.user.to_string()
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
+ else:
+ joined_room_ids = yield self.get_rooms_for_user_at(
+ user_id, now_token.room_stream_id,
+ )
+
sync_result_builder = SyncResultBuilder(
sync_config, full_state,
since_token=since_token,
now_token=now_token,
+ joined_room_ids=joined_room_ids,
)
account_data_by_room = yield self._generate_sync_entry_for_account_data(
@@ -603,7 +617,6 @@ class SyncHandler(object):
device_id = sync_config.device_id
one_time_key_counts = {}
if device_id:
- user_id = sync_config.user.to_string()
one_time_key_counts = yield self.store.count_e2e_one_time_keys(
user_id, device_id
)
@@ -891,7 +904,7 @@ class SyncHandler(object):
ephemeral_by_room = {}
else:
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_result_builder.sync_config,
+ sync_result_builder,
now_token=sync_result_builder.now_token,
since_token=sync_result_builder.since_token,
)
@@ -996,15 +1009,8 @@ class SyncHandler(object):
if rooms_changed:
defer.returnValue(True)
- app_service = self.store.get_app_service_by_user_id(user_id)
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- joined_room_ids = set(r.room_id for r in rooms)
- else:
- joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
- for room_id in joined_room_ids:
+ for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
defer.returnValue(True)
defer.returnValue(False)
@@ -1028,13 +1034,6 @@ class SyncHandler(object):
assert since_token
- app_service = self.store.get_app_service_by_user_id(user_id)
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- joined_room_ids = set(r.room_id for r in rooms)
- else:
- joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
@@ -1057,7 +1056,7 @@ class SyncHandler(object):
# we do send down the room, and with full state, where necessary
old_state_ids = None
- if room_id in joined_room_ids and non_joins:
+ if room_id in sync_result_builder.joined_room_ids and non_joins:
# Always include if the user (re)joined the room, especially
# important so that device list changes are calculated correctly.
# If there are non join member events, but we are still in the room,
@@ -1067,7 +1066,7 @@ class SyncHandler(object):
# User is in the room so we don't need to do the invite/leave checks
continue
- if room_id in joined_room_ids or has_join:
+ if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
@@ -1079,7 +1078,7 @@ class SyncHandler(object):
newly_joined_rooms.append(room_id)
# If user is in the room then we don't need to do the invite/leave checks
- if room_id in joined_room_ids:
+ if room_id in sync_result_builder.joined_room_ids:
continue
if not non_joins:
@@ -1146,7 +1145,7 @@ class SyncHandler(object):
# Get all events for rooms we're currently joined to.
room_to_events = yield self.store.get_room_events_stream_for_rooms(
- room_ids=joined_room_ids,
+ room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
limit=timeline_limit + 1,
@@ -1154,7 +1153,7 @@ class SyncHandler(object):
# We loop through all room ids, even if there are no new events, in case
# there are non room events taht we need to notify about.
- for room_id in joined_room_ids:
+ for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
if room_entry:
@@ -1362,6 +1361,54 @@ class SyncHandler(object):
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+ @defer.inlineCallbacks
+ def get_rooms_for_user_at(self, user_id, stream_ordering):
+ """Get set of joined rooms for a user at the given stream ordering.
+
+ The stream ordering *must* be recent, otherwise this may throw an
+ exception if older than a month. (This function is called with the
+ current token, which should be perfectly fine).
+
+ Args:
+ user_id (str)
+ stream_ordering (int)
+
+ ReturnValue:
+ Deferred[frozenset[str]]: Set of room_ids the user is in at given
+ stream_ordering.
+ """
+ joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(
+ user_id,
+ )
+
+ joined_room_ids = set()
+
+ # We need to check that the stream ordering of the join for each room
+ # is before the stream_ordering asked for. This might not be the case
+ # if the user joins a room between us getting the current token and
+ # calling `get_rooms_for_user_with_stream_ordering`.
+ # If the membership's stream ordering is after the given stream
+ # ordering, we need to go and work out if the user was in the room
+ # before.
+ for room_id, membership_stream_ordering in joined_rooms:
+ if membership_stream_ordering <= stream_ordering:
+ joined_room_ids.add(room_id)
+ continue
+
+ logger.info("User joined room after current token: %s", room_id)
+
+ extrems = yield self.store.get_forward_extremeties_for_room(
+ room_id, stream_ordering,
+ )
+ users_in_room = yield self.state.get_current_user_in_room(
+ room_id, extrems,
+ )
+ if user_id in users_in_room:
+ joined_room_ids.add(room_id)
+
+ joined_room_ids = frozenset(joined_room_ids)
+ defer.returnValue(joined_room_ids)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1411,7 +1458,8 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
- def __init__(self, sync_config, full_state, since_token, now_token):
+ def __init__(self, sync_config, full_state, since_token, now_token,
+ joined_room_ids):
"""
Args:
sync_config(SyncConfig)
@@ -1423,6 +1471,7 @@ class SyncResultBuilder(object):
self.full_state = full_state
self.since_token = since_token
self.now_token = now_token
+ self.joined_room_ids = joined_room_ids
self.presence = []
self.account_data = []
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 82dedbbc99..77c0cf146f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -56,7 +56,7 @@ class TypingHandler(object):
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
+ hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
hs.get_distributor().observe("user_left_room", self.user_left_room)
|