diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 66d2c01123..d28e07f0d9 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,23 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.appservice.scheduler import AppServiceScheduler
-from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler
from .room import (
- RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler,
+ RoomCreationHandler, RoomContextHandler,
)
+from .room_member import RoomMemberHandler
from .message import MessageHandler
from .events import EventStreamHandler, EventHandler
from .federation import FederationHandler
from .profile import ProfileHandler
-from .presence import PresenceHandler
from .directory import DirectoryHandler
-from .typing import TypingNotificationHandler
from .admin import AdminHandler
-from .appservice import ApplicationServicesHandler
-from .sync import SyncHandler
-from .auth import AuthHandler
from .identity import IdentityHandler
from .receipts import ReceiptsHandler
from .search import SearchHandler
@@ -52,22 +46,9 @@ class Handlers(object):
self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
- self.presence_handler = PresenceHandler(hs)
- self.room_list_handler = RoomListHandler(hs)
self.directory_handler = DirectoryHandler(hs)
- self.typing_notification_handler = TypingNotificationHandler(hs)
self.admin_handler = AdminHandler(hs)
self.receipts_handler = ReceiptsHandler(hs)
- asapi = ApplicationServiceApi(hs)
- self.appservice_handler = ApplicationServicesHandler(
- hs, asapi, AppServiceScheduler(
- clock=hs.get_clock(),
- store=hs.get_datastore(),
- as_api=asapi
- )
- )
- self.sync_handler = SyncHandler(hs)
- self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 90eabb6eb7..c904c6c500 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,13 +15,10 @@
from twisted.internet import defer
-from synapse.api.errors import LimitExceededError, SynapseError, AuthError
-from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.api.errors import LimitExceededError
from synapse.api.constants import Membership, EventTypes
-from synapse.types import UserID, RoomAlias, Requester
-from synapse.push.action_generator import ActionGenerator
+from synapse.types import UserID, Requester
-from synapse.util.logcontext import PreserveLoggingContext
import logging
@@ -29,20 +26,13 @@ import logging
logger = logging.getLogger(__name__)
-VISIBILITY_PRIORITY = (
- "world_readable",
- "shared",
- "invited",
- "joined",
-)
-
-
class BaseHandler(object):
"""
Common base class for the event handlers.
- :type store: synapse.storage.events.StateStore
- :type state_handler: synapse.state.StateHandler
+ Attributes:
+ store (synapse.storage.events.StateStore):
+ state_handler (synapse.state.StateHandler):
"""
def __init__(self, hs):
@@ -55,137 +45,10 @@ class BaseHandler(object):
self.clock = hs.get_clock()
self.hs = hs
- self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.event_builder_factory = hs.get_event_builder_factory()
- @defer.inlineCallbacks
- def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
- """ Returns dict of user_id -> list of events that user is allowed to
- see.
-
- :param (str, bool) user_tuples: (user id, is_peeking) for each
- user to be checked. is_peeking should be true if:
- * the user is not currently a member of the room, and:
- * the user has not been a member of the room since the given
- events
- """
- forgotten = yield defer.gatherResults([
- self.store.who_forgot_in_room(
- room_id,
- )
- for room_id in frozenset(e.room_id for e in events)
- ], consumeErrors=True)
-
- # Set of membership event_ids that have been forgotten
- event_id_forgotten = frozenset(
- row["event_id"] for rows in forgotten for row in rows
- )
-
- def allowed(event, user_id, is_peeking):
- state = event_id_to_state[event.event_id]
-
- # get the room_visibility at the time of the event.
- visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
- if visibility_event:
- visibility = visibility_event.content.get("history_visibility", "shared")
- else:
- visibility = "shared"
-
- if visibility not in VISIBILITY_PRIORITY:
- visibility = "shared"
-
- # if it was world_readable, it's easy: everyone can read it
- if visibility == "world_readable":
- return True
-
- # Always allow history visibility events on boundaries. This is done
- # by setting the effective visibility to the least restrictive
- # of the old vs new.
- if event.type == EventTypes.RoomHistoryVisibility:
- prev_content = event.unsigned.get("prev_content", {})
- prev_visibility = prev_content.get("history_visibility", None)
-
- if prev_visibility not in VISIBILITY_PRIORITY:
- prev_visibility = "shared"
-
- new_priority = VISIBILITY_PRIORITY.index(visibility)
- old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
- if old_priority < new_priority:
- visibility = prev_visibility
-
- # get the user's membership at the time of the event. (or rather,
- # just *after* the event. Which means that people can see their
- # own join events, but not (currently) their own leave events.)
- membership_event = state.get((EventTypes.Member, user_id), None)
- if membership_event:
- if membership_event.event_id in event_id_forgotten:
- membership = None
- else:
- membership = membership_event.membership
- else:
- membership = None
-
- # if the user was a member of the room at the time of the event,
- # they can see it.
- if membership == Membership.JOIN:
- return True
-
- if visibility == "joined":
- # we weren't a member at the time of the event, so we can't
- # see this event.
- return False
-
- elif visibility == "invited":
- # user can also see the event if they were *invited* at the time
- # of the event.
- return membership == Membership.INVITE
-
- else:
- # visibility is shared: user can also see the event if they have
- # become a member since the event
- #
- # XXX: if the user has subsequently joined and then left again,
- # ideally we would share history up to the point they left. But
- # we don't know when they left.
- return not is_peeking
-
- defer.returnValue({
- user_id: [
- event
- for event in events
- if allowed(event, user_id, is_peeking)
- ]
- for user_id, is_peeking in user_tuples
- })
-
- @defer.inlineCallbacks
- def _filter_events_for_client(self, user_id, events, is_peeking=False):
- """
- Check which events a user is allowed to see
-
- :param str user_id: user id to be checked
- :param [synapse.events.EventBase] events: list of events to be checked
- :param bool is_peeking should be True if:
- * the user is not currently a member of the room, and:
- * the user has not been a member of the room since the given
- events
- :rtype [synapse.events.EventBase]
- """
- types = (
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_id),
- )
- event_id_to_state = yield self.store.get_state_for_events(
- frozenset(e.event_id for e in events),
- types=types
- )
- res = yield self.filter_events_for_clients(
- [(user_id, is_peeking)], events, event_id_to_state
- )
- defer.returnValue(res.get(user_id, []))
-
def ratelimit(self, requester):
time_now = self.clock.time()
allowed, time_allowed = self.ratelimiter.send_message(
@@ -198,95 +61,6 @@ class BaseHandler(object):
retry_after_ms=int(1000 * (time_allowed - time_now)),
)
- @defer.inlineCallbacks
- def _create_new_client_event(self, builder):
- latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
- builder.room_id,
- )
-
- if latest_ret:
- depth = max([d for _, _, d in latest_ret]) + 1
- else:
- depth = 1
-
- prev_events = [
- (event_id, prev_hashes)
- for event_id, prev_hashes, _ in latest_ret
- ]
-
- builder.prev_events = prev_events
- builder.depth = depth
-
- state_handler = self.state_handler
-
- context = yield state_handler.compute_event_context(builder)
-
- # If we've received an invite over federation, there are no latest
- # events in the room, because we don't know enough about the graph
- # fragment we received to treat it like a graph, so the above returned
- # no relevant events. It may have returned some events (if we have
- # joined and left the room), but not useful ones, like the invite.
- if (
- not self.is_host_in_room(context.current_state) and
- builder.type == EventTypes.Member
- ):
- prev_member_event = yield self.store.get_room_member(
- builder.sender, builder.room_id
- )
-
- # The prev_member_event may already be in context.current_state,
- # despite us not being present in the room; in particular, if
- # inviting user, and all other local users, have already left.
- #
- # In that case, we have all the information we need, and we don't
- # want to drop "context" - not least because we may need to handle
- # the invite locally, which will require us to have the whole
- # context (not just prev_member_event) to auth it.
- #
- context_event_ids = (
- e.event_id for e in context.current_state.values()
- )
-
- if (
- prev_member_event and
- prev_member_event.event_id not in context_event_ids
- ):
- # The prev_member_event is missing from context, so it must
- # have arrived over federation and is an outlier. We forcibly
- # set our context to the invite we received over federation
- builder.prev_events = (
- prev_member_event.event_id,
- prev_member_event.prev_events
- )
-
- context = yield state_handler.compute_event_context(
- builder,
- old_state=(prev_member_event,),
- outlier=True
- )
-
- if builder.is_state():
- builder.prev_state = yield self.store.add_event_hashes(
- context.prev_state_events
- )
-
- yield self.auth.add_auth_events(builder, context)
-
- add_hashes_and_signatures(
- builder, self.server_name, self.signing_key
- )
-
- event = builder.build()
-
- logger.debug(
- "Created event %s with current state: %s",
- event.event_id, context.current_state,
- )
-
- defer.returnValue(
- (event, context,)
- )
-
def is_host_in_room(self, current_state):
room_members = [
(state_key, event.membership)
@@ -301,144 +75,13 @@ class BaseHandler(object):
return True
for (state_key, membership) in room_members:
if (
- UserID.from_string(state_key).domain == self.hs.hostname
+ self.hs.is_mine_id(state_key)
and membership == Membership.JOIN
):
return True
return False
@defer.inlineCallbacks
- def handle_new_client_event(
- self,
- requester,
- event,
- context,
- ratelimit=True,
- extra_users=[]
- ):
- # We now need to go and hit out to wherever we need to hit out to.
-
- if ratelimit:
- self.ratelimit(requester)
-
- self.auth.check(event, auth_events=context.current_state)
-
- yield self.maybe_kick_guest_users(event, context.current_state.values())
-
- if event.type == EventTypes.CanonicalAlias:
- # Check the alias is acually valid (at this time at least)
- room_alias_str = event.content.get("alias", None)
- if room_alias_str:
- room_alias = RoomAlias.from_string(room_alias_str)
- directory_handler = self.hs.get_handlers().directory_handler
- mapping = yield directory_handler.get_association(room_alias)
-
- if mapping["room_id"] != event.room_id:
- raise SynapseError(
- 400,
- "Room alias %s does not point to the room" % (
- room_alias_str,
- )
- )
-
- federation_handler = self.hs.get_handlers().federation_handler
-
- if event.type == EventTypes.Member:
- if event.content["membership"] == Membership.INVITE:
- def is_inviter_member_event(e):
- return (
- e.type == EventTypes.Member and
- e.sender == event.sender
- )
-
- event.unsigned["invite_room_state"] = [
- {
- "type": e.type,
- "state_key": e.state_key,
- "content": e.content,
- "sender": e.sender,
- }
- for k, e in context.current_state.items()
- if e.type in self.hs.config.room_invite_state_types
- or is_inviter_member_event(e)
- ]
-
- invitee = UserID.from_string(event.state_key)
- if not self.hs.is_mine(invitee):
- # TODO: Can we add signature from remote server in a nicer
- # way? If we have been invited by a remote server, we need
- # to get them to sign the event.
-
- returned_invite = yield federation_handler.send_invite(
- invitee.domain,
- event,
- )
-
- event.unsigned.pop("room_state", None)
-
- # TODO: Make sure the signatures actually are correct.
- event.signatures.update(
- returned_invite.signatures
- )
-
- if event.type == EventTypes.Redaction:
- if self.auth.check_redaction(event, auth_events=context.current_state):
- original_event = yield self.store.get_event(
- event.redacts,
- check_redacted=False,
- get_prev_content=False,
- allow_rejected=False,
- allow_none=False
- )
- if event.user_id != original_event.user_id:
- raise AuthError(
- 403,
- "You don't have permission to redact events"
- )
-
- if event.type == EventTypes.Create and context.current_state:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
-
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
- event, context, self
- )
-
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
- event, context=context
- )
-
- destinations = set()
- for k, s in context.current_state.items():
- try:
- if k[0] == EventTypes.Member:
- if s.content["membership"] == Membership.JOIN:
- destinations.add(
- UserID.from_string(s.state_key).domain
- )
- except SynapseError:
- logger.warn(
- "Failed to get destination from event %s", s.event_id
- )
-
- with PreserveLoggingContext():
- # Don't block waiting on waking up all the listeners.
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
- # If invite, remove room_state from unsigned before sending.
- event.unsigned.pop("invite_room_state", None)
-
- federation_handler.handle_new_event(
- event, destinations=destinations,
- )
-
- @defer.inlineCallbacks
def maybe_kick_guest_users(self, event, current_state):
# Technically this function invalidates current_state by changing it.
# Hopefully this isn't that important to the caller.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..051ccdb380 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
-from synapse.types import UserID
import logging
@@ -35,16 +34,13 @@ def log_failure(failure):
)
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
class ApplicationServicesHandler(object):
- def __init__(self, hs, appservice_api, appservice_scheduler):
+ def __init__(self, hs):
self.store = hs.get_datastore()
- self.hs = hs
- self.appservice_api = appservice_api
- self.scheduler = appservice_scheduler
+ self.is_mine_id = hs.is_mine_id
+ self.appservice_api = hs.get_application_service_api()
+ self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
@defer.inlineCallbacks
@@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
@defer.inlineCallbacks
def _is_unknown_user(self, user_id):
- user = UserID.from_string(user_id)
- if not self.hs.is_mine(user):
+ if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
defer.returnValue(False)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 82d458b424..200793b5ed 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.types import UserID
-from synapse.api.errors import AuthError, LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
from synapse.util.async import run_on_reactor
from twisted.web.client import PartialDownloadError
@@ -49,6 +49,21 @@ class AuthHandler(BaseHandler):
self.sessions = {}
self.INVALID_TOKEN_HTTP_STATUS = 401
+ self.ldap_enabled = hs.config.ldap_enabled
+ self.ldap_server = hs.config.ldap_server
+ self.ldap_port = hs.config.ldap_port
+ self.ldap_tls = hs.config.ldap_tls
+ self.ldap_search_base = hs.config.ldap_search_base
+ self.ldap_search_property = hs.config.ldap_search_property
+ self.ldap_email_property = hs.config.ldap_email_property
+ self.ldap_full_name_property = hs.config.ldap_full_name_property
+
+ if self.ldap_enabled is True:
+ import ldap
+ logger.info("Import ldap version: %s", ldap.__version__)
+
+ self.hs = hs # FIXME better possibility to access registrationHandler later?
+
@defer.inlineCallbacks
def check_auth(self, flows, clientdict, clientip):
"""
@@ -163,9 +178,13 @@ class AuthHandler(BaseHandler):
def get_session_id(self, clientdict):
"""
Gets the session ID for a client given the client dictionary
- :param clientdict: The dictionary sent by the client in the request
- :return: The string session ID the client sent. If the client did not
- send a session ID, returns None.
+
+ Args:
+ clientdict: The dictionary sent by the client in the request
+
+ Returns:
+ str|None: The string session ID the client sent. If the client did
+ not send a session ID, returns None.
"""
sid = None
if clientdict and 'auth' in clientdict:
@@ -179,9 +198,11 @@ class AuthHandler(BaseHandler):
Store a key-value pair into the sessions data associated with this
request. This data is stored server-side and cannot be modified by
the client.
- :param session_id: (string) The ID of this session as returned from check_auth
- :param key: (string) The key to store the data under
- :param value: (any) The data to store
+
+ Args:
+ session_id (string): The ID of this session as returned from check_auth
+ key (string): The key to store the data under
+ value (any): The data to store
"""
sess = self._get_session_info(session_id)
sess.setdefault('serverdict', {})[key] = value
@@ -190,9 +211,11 @@ class AuthHandler(BaseHandler):
def get_session_data(self, session_id, key, default=None):
"""
Retrieve data stored with set_session_data
- :param session_id: (string) The ID of this session as returned from check_auth
- :param key: (string) The key to store the data under
- :param default: (any) Value to return if the key has not been set
+
+ Args:
+ session_id (string): The ID of this session as returned from check_auth
+ key (string): The key to store the data under
+ default (any): Value to return if the key has not been set
"""
sess = self._get_session_info(session_id)
return sess.setdefault('serverdict', {}).get(key, default)
@@ -207,8 +230,10 @@ class AuthHandler(BaseHandler):
if not user_id.startswith('@'):
user_id = UserID.create(user_id, self.hs.hostname).to_string()
- user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
- self._check_password(user_id, password, password_hash)
+ if not (yield self._check_password(user_id, password)):
+ logger.warn("Failed password login for user %s", user_id)
+ raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+
defer.returnValue(user_id)
@defer.inlineCallbacks
@@ -332,8 +357,10 @@ class AuthHandler(BaseHandler):
StoreError if there was a problem storing the token.
LoginError if there was an authentication problem.
"""
- user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
- self._check_password(user_id, password, password_hash)
+
+ if not (yield self._check_password(user_id, password)):
+ logger.warn("Failed password login for user %s", user_id)
+ raise LoginError(403, "", errcode=Codes.FORBIDDEN)
logger.info("Logging in user %s", user_id)
access_token = yield self.issue_access_token(user_id)
@@ -399,11 +426,67 @@ class AuthHandler(BaseHandler):
else:
defer.returnValue(user_infos.popitem())
- def _check_password(self, user_id, password, stored_hash):
- """Checks that user_id has passed password, raises LoginError if not."""
- if not self.validate_hash(password, stored_hash):
- logger.warn("Failed password login for user %s", user_id)
- raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+ @defer.inlineCallbacks
+ def _check_password(self, user_id, password):
+ """
+ Returns:
+ True if the user_id successfully authenticated
+ """
+ valid_ldap = yield self._check_ldap_password(user_id, password)
+ if valid_ldap:
+ defer.returnValue(True)
+
+ valid_local_password = yield self._check_local_password(user_id, password)
+ if valid_local_password:
+ defer.returnValue(True)
+
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
+ def _check_local_password(self, user_id, password):
+ try:
+ user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
+ defer.returnValue(self.validate_hash(password, password_hash))
+ except LoginError:
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
+ def _check_ldap_password(self, user_id, password):
+ if not self.ldap_enabled:
+ logger.debug("LDAP not configured")
+ defer.returnValue(False)
+
+ import ldap
+
+ logger.info("Authenticating %s with LDAP" % user_id)
+ try:
+ ldap_url = "%s:%s" % (self.ldap_server, self.ldap_port)
+ logger.debug("Connecting LDAP server at %s" % ldap_url)
+ l = ldap.initialize(ldap_url)
+ if self.ldap_tls:
+ logger.debug("Initiating TLS")
+ self._connection.start_tls_s()
+
+ local_name = UserID.from_string(user_id).localpart
+
+ dn = "%s=%s, %s" % (
+ self.ldap_search_property,
+ local_name,
+ self.ldap_search_base)
+ logger.debug("DN for LDAP authentication: %s" % dn)
+
+ l.simple_bind_s(dn.encode('utf-8'), password.encode('utf-8'))
+
+ if not (yield self.does_user_exist(user_id)):
+ handler = self.hs.get_handlers().registration_handler
+ user_id, access_token = (
+ yield handler.register(localpart=local_name)
+ )
+
+ defer.returnValue(True)
+ except ldap.LDAPError, e:
+ logger.warn("LDAP error: %s", e)
+ defer.returnValue(False)
@defer.inlineCallbacks
def issue_access_token(self, user_id):
@@ -438,14 +521,19 @@ class AuthHandler(BaseHandler):
))
return m.serialize()
- def generate_short_term_login_token(self, user_id):
+ def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
now = self.hs.get_clock().time_msec()
- expiry = now + (2 * 60 * 1000)
+ expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
+ def generate_delete_pusher_token(self, user_id):
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = delete_pusher")
+ return macaroon.serialize()
+
def validate_short_term_login_token_and_get_user_id(self, login_token):
try:
macaroon = pymacaroons.Macaroon.deserialize(login_token)
@@ -480,7 +568,12 @@ class AuthHandler(BaseHandler):
except_access_token_ids = [requester.access_token_id] if requester else []
- yield self.store.user_set_password_hash(user_id, password_hash)
+ try:
+ yield self.store.user_set_password_hash(user_id, password_hash)
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+ raise e
yield self.store.user_delete_access_tokens(
user_id, except_access_token_ids
)
@@ -532,4 +625,7 @@ class AuthHandler(BaseHandler):
Returns:
Whether self.hash(password) == stored_hash (bool).
"""
- return bcrypt.hashpw(password, stored_hash) == stored_hash
+ if stored_hash:
+ return bcrypt.hashpw(password, stored_hash) == stored_hash
+ else:
+ return False
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8eeb225811..4bea7f2b19 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
super(DirectoryHandler, self).__init__(hs)
self.state = hs.get_state_handler()
+ self.appservice_handler = hs.get_application_service_handler()
self.federation = hs.get_replication_layer()
self.federation.register_query_handler(
@@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
)
if not result:
# Query AS to see if it exists
- as_handler = self.hs.get_handlers().appservice_handler
+ as_handler = self.appservice_handler
result = yield as_handler.query_room_alias_exists(room_alias)
defer.returnValue(result)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f25a252523..3a3a1257d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
auth_user = UserID.from_string(auth_user_id)
- presence_handler = self.hs.get_handlers().presence_handler
+ presence_handler = self.hs.get_presence_handler()
context = yield presence_handler.user_syncing(
auth_user_id, affect_presence=affect_presence,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 267fedf114..ff83c608e7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,20 +26,21 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
)
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
from synapse.events.utils import prune_event
from synapse.util.retryutils import NotRetryingDestination
from synapse.push.action_generator import ActionGenerator
+from synapse.util.distributor import user_joined_room
from twisted.internet import defer
@@ -49,10 +50,6 @@ import logging
logger = logging.getLogger(__name__)
-def user_joined_room(distributor, user, room_id):
- return distributor.fire("user_joined_room", user, room_id)
-
-
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -69,10 +66,6 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.distributor.observe("user_joined_room", self.user_joined_room)
-
- self.waiting_for_join_list = {}
-
self.store = hs.get_datastore()
self.replication_layer = hs.get_replication_layer()
self.state_handler = hs.get_state_handler()
@@ -102,8 +95,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, state=None,
- auth_chain=None):
+ def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -174,11 +166,7 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
- yield self._handle_new_events(
- origin,
- event_infos,
- outliers=True
- )
+ yield self._handle_new_events(origin, event_infos)
try:
context, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -288,7 +276,14 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities=[]):
""" Trigger a backfill request to `dest` for the given `room_id`
+
+ This will attempt to get more events from the remote. This may return
+ be successfull and still return no events if the other side has no new
+ events to offer.
"""
+ if dest == self.server_name:
+ raise SynapseError(400, "Can't backfill from self.")
+
if not extremities:
extremities = yield self.store.get_oldest_events_in_room(room_id)
@@ -299,6 +294,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
+ # Don't bother processing events we already have.
+ seen_events = yield self.store.have_events_in_timeline(
+ set(e.event_id for e in events)
+ )
+
+ events = [e for e in events if e.event_id not in seen_events]
+
+ if not events:
+ defer.returnValue([])
+
event_map = {e.event_id: e for e in events}
event_ids = set(e.event_id for e in events)
@@ -358,6 +363,7 @@ class FederationHandler(BaseHandler):
for a in auth_events.values():
if a.event_id in seen_events:
continue
+ a.internal_metadata.outlier = True
ev_infos.append({
"event": a,
"auth_events": {
@@ -378,20 +384,23 @@ class FederationHandler(BaseHandler):
}
})
+ yield self._handle_new_events(
+ dest, ev_infos,
+ backfilled=True,
+ )
+
events.sort(key=lambda e: e.depth)
for event in events:
if event in events_to_state:
continue
- ev_infos.append({
- "event": event,
- })
-
- yield self._handle_new_events(
- dest, ev_infos,
- backfilled=True,
- )
+ # We store these one at a time since each event depends on the
+ # previous to work out the state.
+ # TODO: We can probably do something more clever here.
+ yield self._handle_new_event(
+ dest, event
+ )
defer.returnValue(events)
@@ -440,7 +449,7 @@ class FederationHandler(BaseHandler):
joined_domains = {}
for u, d in joined_users:
try:
- dom = UserID.from_string(u).domain
+ dom = get_domain_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
@@ -455,7 +464,7 @@ class FederationHandler(BaseHandler):
likely_domains = [
domain for domain, depth in curr_domains
- if domain is not self.server_name
+ if domain != self.server_name
]
@defer.inlineCallbacks
@@ -463,11 +472,15 @@ class FederationHandler(BaseHandler):
# TODO: Should we try multiple of these at a time?
for dom in domains:
try:
- events = yield self.backfill(
+ yield self.backfill(
dom, room_id,
limit=100,
extremities=[e for e in extremities.keys()]
)
+ # If this succeeded then we probably already have the
+ # appropriate stuff.
+ # TODO: We can probably do something more intelligent here.
+ defer.returnValue(True)
except SynapseError as e:
logger.info(
"Failed to backfill from %s because %s",
@@ -493,8 +506,6 @@ class FederationHandler(BaseHandler):
)
continue
- if events:
- defer.returnValue(True)
defer.returnValue(False)
success = yield try_backfill(likely_domains)
@@ -666,9 +677,14 @@ class FederationHandler(BaseHandler):
"state_key": user_id,
})
- event, context = yield self._create_new_client_event(
- builder=builder,
- )
+ try:
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
+ builder=builder,
+ )
+ except AuthError as e:
+ logger.warn("Failed to create join %r because %s", event, e)
+ raise e
self.auth.check(event, auth_events=context.current_state)
@@ -724,9 +740,7 @@ class FederationHandler(BaseHandler):
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.JOIN:
- destinations.add(
- UserID.from_string(s.state_key).domain
- )
+ destinations.add(get_domain_from_id(s.state_key))
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
@@ -761,6 +775,7 @@ class FederationHandler(BaseHandler):
event = pdu
event.internal_metadata.outlier = True
+ event.internal_metadata.invite_from_remote = True
event.signatures.update(
compute_event_signature(
@@ -788,13 +803,19 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
- origin, event = yield self._make_and_verify_event(
- target_hosts,
- room_id,
- user_id,
- "leave"
- )
- signed_event = self._sign_event(event)
+ try:
+ origin, event = yield self._make_and_verify_event(
+ target_hosts,
+ room_id,
+ user_id,
+ "leave"
+ )
+ signed_event = self._sign_event(event)
+ except SynapseError:
+ raise
+ except CodeMessageException as e:
+ logger.warn("Failed to reject invite: %s", e)
+ raise SynapseError(500, "Failed to reject invite")
# Try the host we successfully got a response to /make_join/
# request first.
@@ -804,10 +825,16 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- yield self.replication_layer.send_leave(
- target_hosts,
- signed_event
- )
+ try:
+ yield self.replication_layer.send_leave(
+ target_hosts,
+ signed_event
+ )
+ except SynapseError:
+ raise
+ except CodeMessageException as e:
+ logger.warn("Failed to reject invite: %s", e)
+ raise SynapseError(500, "Failed to reject invite")
context = yield self.state_handler.compute_event_context(event)
@@ -883,11 +910,16 @@ class FederationHandler(BaseHandler):
"state_key": user_id,
})
- event, context = yield self._create_new_client_event(
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
builder=builder,
)
- self.auth.check(event, auth_events=context.current_state)
+ try:
+ self.auth.check(event, auth_events=context.current_state)
+ except AuthError as e:
+ logger.warn("Failed to create new leave %r because %s", event, e)
+ raise e
defer.returnValue(event)
@@ -934,9 +966,7 @@ class FederationHandler(BaseHandler):
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.LEAVE:
- destinations.add(
- UserID.from_string(s.state_key).domain
- )
+ destinations.add(get_domain_from_id(s.state_key))
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
@@ -1057,21 +1087,10 @@ class FederationHandler(BaseHandler):
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
- @log_function
- def user_joined_room(self, user, room_id):
- waiters = self.waiting_for_join_list.get(
- (user.to_string(), room_id),
- []
- )
- while waiters:
- waiters.pop().callback(None)
-
@defer.inlineCallbacks
@log_function
- def _handle_new_event(self, origin, event, state=None, auth_events=None):
-
- outlier = event.internal_metadata.is_outlier()
-
+ def _handle_new_event(self, origin, event, state=None, auth_events=None,
+ backfilled=False):
context = yield self._prep_event(
origin, event,
state=state,
@@ -1081,20 +1100,30 @@ class FederationHandler(BaseHandler):
if not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
- event, context, self
+ event, context
)
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- is_new_state=not outlier,
+ backfilled=backfilled,
+ )
+
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
)
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
- def _handle_new_events(self, origin, event_infos, backfilled=False,
- outliers=False):
+ def _handle_new_events(self, origin, event_infos, backfilled=False):
+ """Creates the appropriate contexts and persists events. The events
+ should not depend on one another, e.g. this should be used to persist
+ a bunch of outliers, but not a chunk of individual events that depend
+ on each other for state calculations.
+ """
contexts = yield defer.gatherResults(
[
self._prep_event(
@@ -1113,7 +1142,6 @@ class FederationHandler(BaseHandler):
for ev_info, context in itertools.izip(event_infos, contexts)
],
backfilled=backfilled,
- is_new_state=(not outliers and not backfilled),
)
@defer.inlineCallbacks
@@ -1128,11 +1156,9 @@ class FederationHandler(BaseHandler):
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
- ctx = yield self.state_handler.compute_event_context(
- e, outlier=True,
- )
- events_to_context[e.event_id] = ctx
e.internal_metadata.outlier = True
+ ctx = yield self.state_handler.compute_event_context(e)
+ events_to_context[e.event_id] = ctx
event_map = {
e.event_id: e
@@ -1176,16 +1202,14 @@ class FederationHandler(BaseHandler):
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
- is_new_state=False,
)
new_event_context = yield self.state_handler.compute_event_context(
- event, old_state=state, outlier=False,
+ event, old_state=state
)
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
- is_new_state=True,
current_state=state,
)
@@ -1193,10 +1217,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
- outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context(
- event, old_state=state, outlier=outlier,
+ event, old_state=state,
)
if not auth_events:
@@ -1482,8 +1505,9 @@ class FederationHandler(BaseHandler):
try:
self.auth.check(event, auth_events=auth_events)
- except AuthError:
- raise
+ except AuthError as e:
+ logger.warn("Failed auth resolution for %r because %s", event, e)
+ raise e
@defer.inlineCallbacks
def construct_auth_difference(self, local_auth, remote_auth):
@@ -1653,13 +1677,21 @@ class FederationHandler(BaseHandler):
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- event, context = yield self._create_new_client_event(builder=builder)
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
+ builder=builder
+ )
event, context = yield self.add_display_name_to_third_party_invite(
event_dict, event, context
)
- self.auth.check(event, context.current_state)
+ try:
+ self.auth.check(event, context.current_state)
+ except AuthError as e:
+ logger.warn("Denying new third party invite %r because %s", event, e)
+ raise e
+
yield self._check_signature(event, auth_events=context.current_state)
member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.send_membership_event(None, event, context)
@@ -1676,7 +1708,8 @@ class FederationHandler(BaseHandler):
def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
builder = self.event_builder_factory.new(event_dict)
- event, context = yield self._create_new_client_event(
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
builder=builder,
)
@@ -1684,7 +1717,11 @@ class FederationHandler(BaseHandler):
event_dict, event, context
)
- self.auth.check(event, auth_events=context.current_state)
+ try:
+ self.auth.check(event, auth_events=context.current_state)
+ except AuthError as e:
+ logger.warn("Denying third party invite %r because %s", event, e)
+ raise e
yield self._check_signature(event, auth_events=context.current_state)
returned_invite = yield self.send_invite(origin, event)
@@ -1711,20 +1748,23 @@ class FederationHandler(BaseHandler):
event_dict["content"]["third_party_invite"]["display_name"] = display_name
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- event, context = yield self._create_new_client_event(builder=builder)
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(builder=builder)
defer.returnValue((event, context))
@defer.inlineCallbacks
def _check_signature(self, event, auth_events):
"""
Checks that the signature in the event is consistent with its invite.
- :param event (Event): The m.room.member event to check
- :param auth_events (dict<(event type, state_key), event>)
- :raises
- AuthError if signature didn't match any keys, or key has been
+ Args:
+ event (Event): The m.room.member event to check
+ auth_events (dict<(event type, state_key), event>):
+
+ Raises:
+ AuthError: if signature didn't match any keys, or key has been
revoked,
- SynapseError if a transient error meant a key couldn't be checked
+ SynapseError: if a transient error meant a key couldn't be checked
for revocation.
"""
signed = event.content["third_party_invite"]["signed"]
@@ -1766,12 +1806,13 @@ class FederationHandler(BaseHandler):
"""
Checks whether public_key has been revoked.
- :param public_key (str): base-64 encoded public key.
- :param url (str): Key revocation URL.
+ Args:
+ public_key (str): base-64 encoded public key.
+ url (str): Key revocation URL.
- :raises
- AuthError if they key has been revoked.
- SynapseError if a transient error meant a key couldn't be checked
+ Raises:
+ AuthError: if they key has been revoked.
+ SynapseError: if a transient error meant a key couldn't be checked
for revocation.
"""
try:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..15caf1950a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,12 +17,19 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+ UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
+)
from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute, run_on_reactor
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import preserve_fn
+from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -33,10 +40,6 @@ import logging
logger = logging.getLogger(__name__)
-def collect_presencelike_data(distributor, user, content):
- return distributor.fire("collect_presencelike_data", user, content)
-
-
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -48,35 +51,6 @@ class MessageHandler(BaseHandler):
self.snapshot_cache = SnapshotCache()
@defer.inlineCallbacks
- def get_message(self, msg_id=None, room_id=None, sender_id=None,
- user_id=None):
- """ Retrieve a message.
-
- Args:
- msg_id (str): The message ID to obtain.
- room_id (str): The room where the message resides.
- sender_id (str): The user ID of the user who sent the message.
- user_id (str): The user ID of the user making this request.
- Returns:
- The message, or None if no message exists.
- Raises:
- SynapseError if something went wrong.
- """
- yield self.auth.check_joined_room(room_id, user_id)
-
- # Pull out the message from the db
-# msg = yield self.store.get_message(
-# room_id=room_id,
-# msg_id=msg_id,
-# user_id=sender_id
-# )
-
- # TODO (erikj): Once we work out the correct c-s api we need to think
- # on how to do this.
-
- defer.returnValue(None)
-
- @defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True):
"""Get messages in a room.
@@ -155,7 +129,8 @@ class MessageHandler(BaseHandler):
"end": next_token.to_string(),
})
- events = yield self._filter_events_for_client(
+ events = yield filter_events_for_client(
+ self.store,
user_id,
events,
is_peeking=(member_event_id is None),
@@ -175,7 +150,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def create_event(self, event_dict, token_id=None, txn_id=None):
+ def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
"""
Given a dict from a client, create a new event.
@@ -186,6 +161,9 @@ class MessageHandler(BaseHandler):
Args:
event_dict (dict): An entire event
+ token_id (str)
+ txn_id (str)
+ prev_event_ids (list): The prev event ids to use when creating the event
Returns:
Tuple of created event (FrozenEvent), Context
@@ -198,12 +176,8 @@ class MessageHandler(BaseHandler):
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
- if membership == Membership.JOIN:
+ if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
- yield collect_presencelike_data(
- self.distributor, target, builder.content
- )
- elif membership == Membership.INVITE:
profile = self.hs.get_handlers().profile_handler
content = builder.content
@@ -224,6 +198,7 @@ class MessageHandler(BaseHandler):
event, context = yield self._create_new_client_event(
builder=builder,
+ prev_event_ids=prev_event_ids,
)
defer.returnValue((event, context))
@@ -261,7 +236,7 @@ class MessageHandler(BaseHandler):
)
if event.type == EventTypes.Message:
- presence = self.hs.get_handlers().presence_handler
+ presence = self.hs.get_presence_handler()
yield presence.bump_presence_active_time(user)
def deduplicate_state_event(self, event, context):
@@ -515,8 +490,8 @@ class MessageHandler(BaseHandler):
]
).addErrback(unwrapFirstError)
- messages = yield self._filter_events_for_client(
- user_id, messages
+ messages = yield filter_events_for_client(
+ self.store, user_id, messages
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -556,14 +531,7 @@ class MessageHandler(BaseHandler):
except:
logger.exception("Failed to get snapshot")
- # Only do N rooms at once
- n = 5
- d_list = [handle_room(e) for e in room_list]
- for i in range(0, len(d_list), n):
- yield defer.gatherResults(
- d_list[i:i + n],
- consumeErrors=True
- ).addErrback(unwrapFirstError)
+ yield concurrently_execute(handle_room, room_list, 10)
account_data_events = []
for account_data_type, content in account_data.items():
@@ -658,8 +626,8 @@ class MessageHandler(BaseHandler):
end_token=stream_token
)
- messages = yield self._filter_events_for_client(
- user_id, messages, is_peeking=is_peeking
+ messages = yield filter_events_for_client(
+ self.store, user_id, messages, is_peeking=is_peeking
)
start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -706,7 +674,7 @@ class MessageHandler(BaseHandler):
and m.content["membership"] == Membership.JOIN
]
- presence_handler = self.hs.get_handlers().presence_handler
+ presence_handler = self.hs.get_presence_handler()
@defer.inlineCallbacks
def get_presence():
@@ -739,8 +707,8 @@ class MessageHandler(BaseHandler):
consumeErrors=True,
).addErrback(unwrapFirstError)
- messages = yield self._filter_events_for_client(
- user_id, messages, is_peeking=is_peeking,
+ messages = yield filter_events_for_client(
+ self.store, user_id, messages, is_peeking=is_peeking,
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -763,3 +731,196 @@ class MessageHandler(BaseHandler):
ret["membership"] = membership
defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def _create_new_client_event(self, builder, prev_event_ids=None):
+ if prev_event_ids:
+ prev_events = yield self.store.add_event_hashes(prev_event_ids)
+ prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+ depth = prev_max_depth + 1
+ else:
+ latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+ builder.room_id,
+ )
+
+ if latest_ret:
+ depth = max([d for _, _, d in latest_ret]) + 1
+ else:
+ depth = 1
+
+ prev_events = [
+ (event_id, prev_hashes)
+ for event_id, prev_hashes, _ in latest_ret
+ ]
+
+ builder.prev_events = prev_events
+ builder.depth = depth
+
+ state_handler = self.state_handler
+
+ context = yield state_handler.compute_event_context(builder)
+
+ if builder.is_state():
+ builder.prev_state = yield self.store.add_event_hashes(
+ context.prev_state_events
+ )
+
+ yield self.auth.add_auth_events(builder, context)
+
+ signing_key = self.hs.config.signing_key[0]
+ add_hashes_and_signatures(
+ builder, self.server_name, signing_key
+ )
+
+ event = builder.build()
+
+ logger.debug(
+ "Created event %s with current state: %s",
+ event.event_id, context.current_state,
+ )
+
+ defer.returnValue(
+ (event, context,)
+ )
+
+ @defer.inlineCallbacks
+ def handle_new_client_event(
+ self,
+ requester,
+ event,
+ context,
+ ratelimit=True,
+ extra_users=[]
+ ):
+ # We now need to go and hit out to wherever we need to hit out to.
+
+ if ratelimit:
+ self.ratelimit(requester)
+
+ try:
+ self.auth.check(event, auth_events=context.current_state)
+ except AuthError as err:
+ logger.warn("Denying new event %r because %s", event, err)
+ raise err
+
+ yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+ if event.type == EventTypes.CanonicalAlias:
+ # Check the alias is acually valid (at this time at least)
+ room_alias_str = event.content.get("alias", None)
+ if room_alias_str:
+ room_alias = RoomAlias.from_string(room_alias_str)
+ directory_handler = self.hs.get_handlers().directory_handler
+ mapping = yield directory_handler.get_association(room_alias)
+
+ if mapping["room_id"] != event.room_id:
+ raise SynapseError(
+ 400,
+ "Room alias %s does not point to the room" % (
+ room_alias_str,
+ )
+ )
+
+ federation_handler = self.hs.get_handlers().federation_handler
+
+ if event.type == EventTypes.Member:
+ if event.content["membership"] == Membership.INVITE:
+ def is_inviter_member_event(e):
+ return (
+ e.type == EventTypes.Member and
+ e.sender == event.sender
+ )
+
+ event.unsigned["invite_room_state"] = [
+ {
+ "type": e.type,
+ "state_key": e.state_key,
+ "content": e.content,
+ "sender": e.sender,
+ }
+ for k, e in context.current_state.items()
+ if e.type in self.hs.config.room_invite_state_types
+ or is_inviter_member_event(e)
+ ]
+
+ invitee = UserID.from_string(event.state_key)
+ if not self.hs.is_mine(invitee):
+ # TODO: Can we add signature from remote server in a nicer
+ # way? If we have been invited by a remote server, we need
+ # to get them to sign the event.
+
+ returned_invite = yield federation_handler.send_invite(
+ invitee.domain,
+ event,
+ )
+
+ event.unsigned.pop("room_state", None)
+
+ # TODO: Make sure the signatures actually are correct.
+ event.signatures.update(
+ returned_invite.signatures
+ )
+
+ if event.type == EventTypes.Redaction:
+ if self.auth.check_redaction(event, auth_events=context.current_state):
+ original_event = yield self.store.get_event(
+ event.redacts,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=False,
+ allow_none=False
+ )
+ if event.user_id != original_event.user_id:
+ raise AuthError(
+ 403,
+ "You don't have permission to redact events"
+ )
+
+ if event.type == EventTypes.Create and context.current_state:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
+
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context
+ )
+
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
+ )
+
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
+ )
+
+ destinations = set()
+ for k, s in context.current_state.items():
+ try:
+ if k[0] == EventTypes.Member:
+ if s.content["membership"] == Membership.JOIN:
+ destinations.add(get_domain_from_id(s.state_key))
+ except SynapseError:
+ logger.warn(
+ "Failed to get destination from event %s", s.event_id
+ )
+
+ @defer.inlineCallbacks
+ def _notify():
+ yield run_on_reactor()
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+
+ preserve_fn(_notify)()
+
+ # If invite, remove room_state from unsigned before sending.
+ event.unsigned.pop("invite_room_state", None)
+
+ federation_handler.handle_new_event(
+ event, destinations=destinations,
+ )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d0c8f1328b..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
import synapse.metrics
-from ._base import BaseHandler
-
import logging
@@ -52,6 +50,8 @@ timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
+get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
@@ -70,14 +70,18 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
# How often to resend presence to remote servers
FEDERATION_PING_INTERVAL = 25 * 60 * 1000
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
-class PresenceHandler(BaseHandler):
+class PresenceHandler(object):
def __init__(self, hs):
- super(PresenceHandler, self).__init__(hs)
- self.hs = hs
+ self.is_mine = hs.is_mine
+ self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
@@ -138,7 +142,7 @@ class PresenceHandler(BaseHandler):
obj=state.user_id,
then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
- if self.hs.is_mine_id(state.user_id):
+ if self.is_mine_id(state.user_id):
self.wheel_timer.insert(
now=now,
obj=state.user_id,
@@ -160,15 +164,26 @@ class PresenceHandler(BaseHandler):
self.serial_to_user = {}
self._next_serial = 1
- # Keeps track of the number of *ongoing* syncs. While this is non zero
- # a user will never go offline.
+ # Keeps track of the number of *ongoing* syncs on this process. While
+ # this is non zero a user will never go offline.
self.user_to_num_current_syncs = {}
+ # Keeps track of the number of *ongoing* syncs on other processes.
+ # While any sync is ongoing on another process the user will never
+ # go offline.
+ # Each process has a unique identifier and an update frequency. If
+ # no update is received from that process within the update period then
+ # we assume that all the sync requests on that process have stopped.
+ # Stored as a dict from process_id to set of user_id, and a dict of
+ # process_id to millisecond timestamp last updated.
+ self.external_process_to_current_syncs = {}
+ self.external_process_last_updated_ms = {}
+
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
- 0 * 1000,
+ 30,
self.clock.looping_call,
self._handle_timeouts,
5000,
@@ -228,7 +243,7 @@ class PresenceHandler(BaseHandler):
new_state, should_notify, should_ping = handle_update(
prev_state, new_state,
- is_mine=self.hs.is_mine_id(user_id),
+ is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer,
now=now
)
@@ -268,31 +283,48 @@ class PresenceHandler(BaseHandler):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
+ logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = self.wheel_timer.fetch(now)
+ try:
+ with Measure(self.clock, "presence_handle_timeouts"):
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in set(users_to_check)
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc_by(len(states))
+ timers_fired_counter.inc_by(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.hs.is_mine_id,
- user_to_num_current_syncs=self.user_to_num_current_syncs,
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- preserve_fn(self._update_states)(changes)
+ preserve_fn(self._update_states)(changes)
+ except:
+ logger.exception("Exception in _handle_timeouts loop")
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
@@ -365,6 +397,74 @@ class PresenceHandler(BaseHandler):
defer.returnValue(_user_syncing())
+ def get_currently_syncing_users(self):
+ """Get the set of user ids that are currently syncing on this HS.
+ Returns:
+ set(str): A set of user_id strings.
+ """
+ syncing_user_ids = {
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count
+ }
+ for user_ids in self.external_process_to_current_syncs.values():
+ syncing_user_ids.update(user_ids)
+ return syncing_user_ids
+
+ @defer.inlineCallbacks
+ def update_external_syncs(self, process_id, syncing_user_ids):
+ """Update the syncing users for an external process
+
+ Args:
+ process_id(str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ syncing_user_ids(set(str)): The set of user_ids that are
+ currently syncing on that server.
+ """
+
+ # Grab the previous list of user_ids that were syncing on that process
+ prev_syncing_user_ids = (
+ self.external_process_to_current_syncs.get(process_id, set())
+ )
+ # Grab the current presence state for both the users that are syncing
+ # now and the users that were syncing before this update.
+ prev_states = yield self.current_state_for_users(
+ syncing_user_ids | prev_syncing_user_ids
+ )
+ updates = []
+ time_now_ms = self.clock.time_msec()
+
+ # For each new user that is syncing check if we need to mark them as
+ # being online.
+ for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+ prev_state = prev_states[new_user_id]
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=time_now_ms,
+ last_user_sync_ts=time_now_ms,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ # For each user that is still syncing or stopped syncing update the
+ # last sync time so that we will correctly apply the grace period when
+ # they stop syncing.
+ for old_user_id in prev_syncing_user_ids:
+ prev_state = prev_states[old_user_id]
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ yield self._update_states(updates)
+
+ # Update the last updated time for the process. We expire the entries
+ # if we don't receive an update in the given timeframe.
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+ self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
@defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
@@ -427,7 +527,7 @@ class PresenceHandler(BaseHandler):
hosts_to_states = {}
for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
@@ -436,11 +536,11 @@ class PresenceHandler(BaseHandler):
hosts_to_states.setdefault(host, []).extend(local_states)
for user_id, states in users_to_states.items():
- local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
- host = UserID.from_string(user_id).domain
+ host = get_domain_from_id(user_id)
hosts_to_states.setdefault(host, []).extend(local_states)
# TODO: de-dup hosts_to_states, as a single host might have multiple
@@ -611,14 +711,14 @@ class PresenceHandler(BaseHandler):
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
- if self.hs.is_mine(user):
+ if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
hosts = yield self.store.get_joined_hosts_for_room(room_id)
self._push_to_remotes({host: (state,) for host in hosts})
else:
user_ids = yield self.store.get_users_in_room(room_id)
- user_ids = filter(self.hs.is_mine_id, user_ids)
+ user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
@@ -628,7 +728,7 @@ class PresenceHandler(BaseHandler):
def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list.
"""
- if not self.hs.is_mine(observer_user):
+ if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
presence_list = yield self.store.get_presence_list(
@@ -659,7 +759,7 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string()
)
- if self.hs.is_mine(observed_user):
+ if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
yield self.federation.send_edu(
@@ -675,11 +775,11 @@ class PresenceHandler(BaseHandler):
def invite_presence(self, observed_user, observer_user):
"""Handles new presence invites.
"""
- if not self.hs.is_mine(observed_user):
+ if not self.is_mine(observed_user):
raise SynapseError(400, "User is not hosted on this Home Server")
# TODO: Don't auto accept
- if self.hs.is_mine(observer_user):
+ if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
self.federation.send_edu(
@@ -742,7 +842,7 @@ class PresenceHandler(BaseHandler):
Returns:
A Deferred.
"""
- if not self.hs.is_mine(observer_user):
+ if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
yield self.store.del_presence_list(
@@ -834,7 +934,11 @@ def _format_user_presence_state(state, now):
class PresenceEventSource(object):
def __init__(self, hs):
- self.hs = hs
+ # We can't call get_presence_handler here because there's a cycle:
+ #
+ # Presence -> Notifier -> PresenceEventSource -> Presence
+ #
+ self.get_presence_handler = hs.get_presence_handler
self.clock = hs.get_clock()
self.store = hs.get_datastore()
@@ -860,7 +964,7 @@ class PresenceEventSource(object):
from_key = int(from_key)
room_ids = room_ids or []
- presence = self.hs.get_handlers().presence_handler
+ presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
if not room_ids:
@@ -877,13 +981,13 @@ class PresenceEventSource(object):
user_ids_changed = set()
changed = None
- if from_key and max_token - from_key < 100:
- # For small deltas, its quicker to get all changes and then
- # work out if we share a room or they're in our presence list
+ if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
- # get_all_entities_changed can return None
- if changed is not None:
+ if changed is not None and len(changed) < 500:
+ # For small deltas, its quicker to get all changes and then
+ # work out if we share a room or they're in our presence list
+ get_updates_counter.inc("stream")
for other_user_id in changed:
if other_user_id in friends:
user_ids_changed.add(other_user_id)
@@ -895,6 +999,8 @@ class PresenceEventSource(object):
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
+ get_updates_counter.inc("full")
+
user_ids_to_check = set()
for room_id in room_ids:
users = yield self.store.get_users_in_room(room_id)
@@ -933,15 +1039,14 @@ class PresenceEventSource(object):
return self.get_new_events(user, from_key=None, include_offline=False)
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
appropriate.
Args:
user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -952,21 +1057,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
for state in user_states:
is_mine = is_mine_fn(state.user_id)
- new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+ new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
if new_state:
changes[state.user_id] = new_state
return changes.values()
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state (UserPresenceState)
is_mine (bool): Whether the user is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -1000,7 +1104,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
- if not user_to_num_current_syncs.get(user_id, 0):
+ if user_id not in syncing_user_ids:
if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index b45eafbb49..e37409170d 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.types import UserID, Requester
-from synapse.util import unwrapFirstError
from ._base import BaseHandler
@@ -27,14 +26,6 @@ import logging
logger = logging.getLogger(__name__)
-def changed_presencelike_data(distributor, user, state):
- return distributor.fire("changed_presencelike_data", user, state)
-
-
-def collect_presencelike_data(distributor, user, content):
- return distributor.fire("collect_presencelike_data", user, content)
-
-
class ProfileHandler(BaseHandler):
def __init__(self, hs):
@@ -46,17 +37,9 @@ class ProfileHandler(BaseHandler):
)
distributor = hs.get_distributor()
- self.distributor = distributor
-
- distributor.declare("collect_presencelike_data")
- distributor.declare("changed_presencelike_data")
distributor.observe("registered_user", self.registered_user)
- distributor.observe(
- "collect_presencelike_data", self.collect_presencelike_data
- )
-
def registered_user(self, user):
return self.store.create_profile(user.localpart)
@@ -105,10 +88,6 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_displayname
)
- yield changed_presencelike_data(self.distributor, target_user, {
- "displayname": new_displayname,
- })
-
yield self._update_join_states(requester)
@defer.inlineCallbacks
@@ -152,31 +131,9 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_avatar_url
)
- yield changed_presencelike_data(self.distributor, target_user, {
- "avatar_url": new_avatar_url,
- })
-
yield self._update_join_states(requester)
@defer.inlineCallbacks
- def collect_presencelike_data(self, user, state):
- if not self.hs.is_mine(user):
- defer.returnValue(None)
-
- (displayname, avatar_url) = yield defer.gatherResults(
- [
- self.store.get_profile_displayname(user.localpart),
- self.store.get_profile_avatar_url(user.localpart),
- ],
- consumeErrors=True
- ).addErrback(unwrapFirstError)
-
- state["displayname"] = displayname
- state["avatar_url"] = avatar_url
-
- defer.returnValue(None)
-
- @defer.inlineCallbacks
def on_profile_query(self, args):
user = UserID.from_string(args["user_id"])
if not self.hs.is_mine(user):
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 935c339707..e62722d78d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler):
def __init__(self, hs):
super(ReceiptsHandler, self).__init__(hs)
+ self.server_name = hs.config.server_name
+ self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_replication_layer()
self.federation.register_edu_handler(
@@ -80,6 +82,9 @@ class ReceiptsHandler(BaseHandler):
def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
+ min_batch_id = None
+ max_batch_id = None
+
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
@@ -97,10 +102,21 @@ class ReceiptsHandler(BaseHandler):
stream_id, max_persisted_id = res
- with PreserveLoggingContext():
- self.notifier.on_new_event(
- "receipt_key", max_persisted_id, rooms=[room_id]
- )
+ if min_batch_id is None or stream_id < min_batch_id:
+ min_batch_id = stream_id
+ if max_batch_id is None or max_persisted_id > max_batch_id:
+ max_batch_id = max_persisted_id
+
+ affected_room_ids = list(set([r["room_id"] for r in receipts]))
+
+ with PreserveLoggingContext():
+ self.notifier.on_new_event(
+ "receipt_key", max_batch_id, rooms=affected_room_ids
+ )
+ # Note that the min here shouldn't be relied upon to be accurate.
+ self.hs.get_pusherpool().on_new_receipts(
+ min_batch_id, max_batch_id, affected_room_ids
+ )
defer.returnValue(True)
@@ -117,12 +133,9 @@ class ReceiptsHandler(BaseHandler):
event_ids = receipt["event_ids"]
data = receipt["data"]
- remotedomains = set()
-
- rm_handler = self.hs.get_handlers().room_member_handler
- yield rm_handler.fetch_room_distributions_into(
- room_id, localusers=None, remotedomains=remotedomains
- )
+ remotedomains = yield self.store.get_joined_hosts_for_room(room_id)
+ remotedomains = remotedomains.copy()
+ remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f287ee247b..bbc07b045e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -16,13 +16,14 @@
"""Contains functions for registering clients."""
from twisted.internet import defer
-from synapse.types import UserID
+from synapse.types import UserID, Requester
from synapse.api.errors import (
AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
)
from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.http.client import CaptchaServerHttpClient
+from synapse.util.distributor import registered_user
import logging
import urllib
@@ -30,10 +31,6 @@ import urllib
logger = logging.getLogger(__name__)
-def registered_user(distributor, user):
- return distributor.fire("registered_user", user)
-
-
class RegistrationHandler(BaseHandler):
def __init__(self, hs):
@@ -361,8 +358,62 @@ class RegistrationHandler(BaseHandler):
)
defer.returnValue(data)
+ @defer.inlineCallbacks
+ def get_or_create_user(self, localpart, displayname, duration_seconds):
+ """Creates a new user if the user does not exist,
+ else revokes all previous access tokens and generates a new one.
+
+ Args:
+ localpart : The local part of the user ID to register. If None,
+ one will be randomly generated.
+ Returns:
+ A tuple of (user_id, access_token).
+ Raises:
+ RegistrationError if there was a problem registering.
+ """
+ yield run_on_reactor()
+
+ if localpart is None:
+ raise SynapseError(400, "Request must include user id")
+
+ need_register = True
+
+ try:
+ yield self.check_username(localpart)
+ except SynapseError as e:
+ if e.errcode == Codes.USER_IN_USE:
+ need_register = False
+ else:
+ raise
+
+ user = UserID(localpart, self.hs.hostname)
+ user_id = user.to_string()
+ auth_handler = self.hs.get_handlers().auth_handler
+ token = auth_handler.generate_short_term_login_token(user_id, duration_seconds)
+
+ if need_register:
+ yield self.store.register(
+ user_id=user_id,
+ token=token,
+ password_hash=None
+ )
+
+ yield registered_user(self.distributor, user)
+ else:
+ yield self.store.user_delete_access_tokens(user_id=user_id)
+ yield self.store.add_access_token_to_user(user_id=user_id, token=token)
+
+ if displayname is not None:
+ logger.info("setting user display name: %s -> %s", user_id, displayname)
+ profile_handler = self.hs.get_handlers().profile_handler
+ yield profile_handler.set_displayname(
+ user, Requester(user, token, False), displayname
+ )
+
+ defer.returnValue((user_id, token))
+
def auth_handler(self):
- return self.hs.get_handlers().auth_handler
+ return self.hs.get_auth_handler()
@defer.inlineCallbacks
def guest_access_token_for(self, medium, address, inviter_user_id):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d5c56ce0d6..9fd34588dd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,19 +18,17 @@ from twisted.internet import defer
from ._base import BaseHandler
-from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken, Requester
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
- EventTypes, Membership, JoinRules, RoomCreationPreset,
+ EventTypes, JoinRules, RoomCreationPreset,
)
-from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
-from synapse.util import stringutils, unwrapFirstError
-from synapse.util.logcontext import preserve_context_over_fn
-
-from signedjson.sign import verify_signed_json
-from signedjson.key import decode_verify_key_bytes
+from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.util import stringutils
+from synapse.util.async import concurrently_execute
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
from collections import OrderedDict
-from unpaddedbase64 import decode_base64
import logging
import math
@@ -38,21 +36,9 @@ import string
logger = logging.getLogger(__name__)
-id_server_scheme = "https://"
-
-
-def user_left_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_left_room", user=user, room_id=room_id
- )
+REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
-
-def user_joined_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_joined_room", user=user, room_id=room_id
- )
+id_server_scheme = "https://"
class RoomCreationHandler(BaseHandler):
@@ -356,598 +342,31 @@ class RoomCreationHandler(BaseHandler):
)
-class RoomMemberHandler(BaseHandler):
- # TODO(paul): This handler currently contains a messy conflation of
- # low-level API that works on UserID objects and so on, and REST-level
- # API that takes ID strings and returns pagination chunks. These concerns
- # ought to be separated out a lot better.
-
+class RoomListHandler(BaseHandler):
def __init__(self, hs):
- super(RoomMemberHandler, self).__init__(hs)
-
- self.clock = hs.get_clock()
-
- self.distributor = hs.get_distributor()
- self.distributor.declare("user_joined_room")
- self.distributor.declare("user_left_room")
-
- @defer.inlineCallbacks
- def get_room_members(self, room_id):
- users = yield self.store.get_users_in_room(room_id)
-
- defer.returnValue([UserID.from_string(u) for u in users])
-
- @defer.inlineCallbacks
- def fetch_room_distributions_into(self, room_id, localusers=None,
- remotedomains=None, ignore_user=None):
- """Fetch the distribution of a room, adding elements to either
- 'localusers' or 'remotedomains', which should be a set() if supplied.
- If ignore_user is set, ignore that user.
-
- This function returns nothing; its result is performed by the
- side-effect on the two passed sets. This allows easy accumulation of
- member lists of multiple rooms at once if required.
- """
- members = yield self.get_room_members(room_id)
- for member in members:
- if ignore_user is not None and member == ignore_user:
- continue
-
- if self.hs.is_mine(member):
- if localusers is not None:
- localusers.add(member)
- else:
- if remotedomains is not None:
- remotedomains.add(member.domain)
-
- @defer.inlineCallbacks
- def update_membership(
- self,
- requester,
- target,
- room_id,
- action,
- txn_id=None,
- remote_room_hosts=None,
- third_party_signed=None,
- ratelimit=True,
- ):
- effective_membership_state = action
- if action in ["kick", "unban"]:
- effective_membership_state = "leave"
- elif action == "forget":
- effective_membership_state = "leave"
-
- if third_party_signed is not None:
- replication = self.hs.get_replication_layer()
- yield replication.exchange_third_party_invite(
- third_party_signed["sender"],
- target.to_string(),
- room_id,
- third_party_signed,
- )
-
- msg_handler = self.hs.get_handlers().message_handler
-
- content = {"membership": effective_membership_state}
- if requester.is_guest:
- content["kind"] = "guest"
-
- event, context = yield msg_handler.create_event(
- {
- "type": EventTypes.Member,
- "content": content,
- "room_id": room_id,
- "sender": requester.user.to_string(),
- "state_key": target.to_string(),
-
- # For backwards compatibility:
- "membership": effective_membership_state,
- },
- token_id=requester.access_token_id,
- txn_id=txn_id,
- )
-
- old_state = context.current_state.get((EventTypes.Member, event.state_key))
- old_membership = old_state.content.get("membership") if old_state else None
- if action == "unban" and old_membership != "ban":
- raise SynapseError(
- 403,
- "Cannot unban user who was not banned (membership=%s)" % old_membership,
- errcode=Codes.BAD_STATE
- )
- if old_membership == "ban" and action != "unban":
- raise SynapseError(
- 403,
- "Cannot %s user who was is banned" % (action,),
- errcode=Codes.BAD_STATE
- )
-
- member_handler = self.hs.get_handlers().room_member_handler
- yield member_handler.send_membership_event(
- requester,
- event,
- context,
- ratelimit=ratelimit,
- remote_room_hosts=remote_room_hosts,
- )
-
- if action == "forget":
- yield self.forget(requester.user, room_id)
-
- @defer.inlineCallbacks
- def send_membership_event(
- self,
- requester,
- event,
- context,
- remote_room_hosts=None,
- ratelimit=True,
- ):
- """
- Change the membership status of a user in a room.
-
- Args:
- requester (Requester): The local user who requested the membership
- event. If None, certain checks, like whether this homeserver can
- act as the sender, will be skipped.
- event (SynapseEvent): The membership event.
- context: The context of the event.
- is_guest (bool): Whether the sender is a guest.
- room_hosts ([str]): Homeservers which are likely to already be in
- the room, and could be danced with in order to join this
- homeserver for the first time.
- ratelimit (bool): Whether to rate limit this request.
- Raises:
- SynapseError if there was a problem changing the membership.
- """
- remote_room_hosts = remote_room_hosts or []
-
- target_user = UserID.from_string(event.state_key)
- room_id = event.room_id
-
- if requester is not None:
- sender = UserID.from_string(event.sender)
- assert sender == requester.user, (
- "Sender (%s) must be same as requester (%s)" %
- (sender, requester.user)
- )
- assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
- else:
- requester = Requester(target_user, None, False)
-
- message_handler = self.hs.get_handlers().message_handler
- prev_event = message_handler.deduplicate_state_event(event, context)
- if prev_event is not None:
- return
-
- action = "send"
-
- if event.membership == Membership.JOIN:
- if requester.is_guest and not self._can_guest_join(context.current_state):
- # This should be an auth check, but guests are a local concept,
- # so don't really fit into the general auth process.
- raise AuthError(403, "Guest access not allowed")
- do_remote_join_dance, remote_room_hosts = self._should_do_dance(
- context,
- (self.get_inviter(event.state_key, context.current_state)),
- remote_room_hosts,
- )
- if do_remote_join_dance:
- action = "remote_join"
- elif event.membership == Membership.LEAVE:
- is_host_in_room = self.is_host_in_room(context.current_state)
-
- if not is_host_in_room:
- # perhaps we've been invited
- inviter = self.get_inviter(target_user.to_string(), context.current_state)
- if not inviter:
- raise SynapseError(404, "Not a known room")
-
- if self.hs.is_mine(inviter):
- # the inviter was on our server, but has now left. Carry on
- # with the normal rejection codepath.
- #
- # This is a bit of a hack, because the room might still be
- # active on other servers.
- pass
- else:
- # send the rejection to the inviter's HS.
- remote_room_hosts = remote_room_hosts + [inviter.domain]
- action = "remote_reject"
-
- federation_handler = self.hs.get_handlers().federation_handler
-
- if action == "remote_join":
- if len(remote_room_hosts) == 0:
- raise SynapseError(404, "No known servers")
-
- # We don't do an auth check if we are doing an invite
- # join dance for now, since we're kinda implicitly checking
- # that we are allowed to join when we decide whether or not we
- # need to do the invite/join dance.
- yield federation_handler.do_invite_join(
- remote_room_hosts,
- event.room_id,
- event.user_id,
- event.content,
- )
- elif action == "remote_reject":
- yield federation_handler.do_remotely_reject_invite(
- remote_room_hosts,
- room_id,
- event.user_id
- )
- else:
- yield self.handle_new_client_event(
- requester,
- event,
- context,
- extra_users=[target_user],
- ratelimit=ratelimit,
- )
-
- prev_member_event = context.current_state.get(
- (EventTypes.Member, target_user.to_string()),
- None
- )
-
- if event.membership == Membership.JOIN:
- if not prev_member_event or prev_member_event.membership != Membership.JOIN:
- # Only fire user_joined_room if the user has acutally joined the
- # room. Don't bother if the user is just changing their profile
- # info.
- yield user_joined_room(self.distributor, target_user, room_id)
- elif event.membership == Membership.LEAVE:
- if prev_member_event and prev_member_event.membership == Membership.JOIN:
- user_left_room(self.distributor, target_user, room_id)
-
- def _can_guest_join(self, current_state):
- """
- Returns whether a guest can join a room based on its current state.
- """
- guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
- return (
- guest_access
- and guest_access.content
- and "guest_access" in guest_access.content
- and guest_access.content["guest_access"] == "can_join"
- )
-
- def _should_do_dance(self, context, inviter, room_hosts=None):
- # TODO: Shouldn't this be remote_room_host?
- room_hosts = room_hosts or []
-
- is_host_in_room = self.is_host_in_room(context.current_state)
- if is_host_in_room:
- return False, room_hosts
-
- if inviter and not self.hs.is_mine(inviter):
- room_hosts.append(inviter.domain)
-
- return True, room_hosts
-
- @defer.inlineCallbacks
- def lookup_room_alias(self, room_alias):
- """
- Get the room ID associated with a room alias.
-
- Args:
- room_alias (RoomAlias): The alias to look up.
- Returns:
- A tuple of:
- The room ID as a RoomID object.
- Hosts likely to be participating in the room ([str]).
- Raises:
- SynapseError if room alias could not be found.
- """
- directory_handler = self.hs.get_handlers().directory_handler
- mapping = yield directory_handler.get_association(room_alias)
-
- if not mapping:
- raise SynapseError(404, "No such room alias")
-
- room_id = mapping["room_id"]
- servers = mapping["servers"]
-
- defer.returnValue((RoomID.from_string(room_id), servers))
-
- def get_inviter(self, user_id, current_state):
- prev_state = current_state.get((EventTypes.Member, user_id))
- if prev_state and prev_state.membership == Membership.INVITE:
- return UserID.from_string(prev_state.user_id)
- return None
-
- @defer.inlineCallbacks
- def get_joined_rooms_for_user(self, user):
- """Returns a list of roomids that the user has any of the given
- membership states in."""
-
- rooms = yield self.store.get_rooms_for_user(
- user.to_string(),
- )
-
- # For some reason the list of events contains duplicates
- # TODO(paul): work out why because I really don't think it should
- room_ids = set(r.room_id for r in rooms)
-
- defer.returnValue(room_ids)
-
- @defer.inlineCallbacks
- def do_3pid_invite(
- self,
- room_id,
- inviter,
- medium,
- address,
- id_server,
- requester,
- txn_id
- ):
- invitee = yield self._lookup_3pid(
- id_server, medium, address
- )
-
- if invitee:
- handler = self.hs.get_handlers().room_member_handler
- yield handler.update_membership(
- requester,
- UserID.from_string(invitee),
- room_id,
- "invite",
- txn_id=txn_id,
- )
- else:
- yield self._make_and_store_3pid_invite(
- requester,
- id_server,
- medium,
- address,
- room_id,
- inviter,
- txn_id=txn_id
- )
-
- @defer.inlineCallbacks
- def _lookup_3pid(self, id_server, medium, address):
- """Looks up a 3pid in the passed identity server.
-
- Args:
- id_server (str): The server name (including port, if required)
- of the identity server to use.
- medium (str): The type of the third party identifier (e.g. "email").
- address (str): The third party identifier (e.g. "foo@example.com").
-
- Returns:
- (str) the matrix ID of the 3pid, or None if it is not recognized.
- """
- try:
- data = yield self.hs.get_simple_http_client().get_json(
- "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
- {
- "medium": medium,
- "address": address,
- }
- )
-
- if "mxid" in data:
- if "signatures" not in data:
- raise AuthError(401, "No signatures on 3pid binding")
- self.verify_any_signature(data, id_server)
- defer.returnValue(data["mxid"])
-
- except IOError as e:
- logger.warn("Error from identity server lookup: %s" % (e,))
- defer.returnValue(None)
-
- @defer.inlineCallbacks
- def verify_any_signature(self, data, server_hostname):
- if server_hostname not in data["signatures"]:
- raise AuthError(401, "No signature from server %s" % (server_hostname,))
- for key_name, signature in data["signatures"][server_hostname].items():
- key_data = yield self.hs.get_simple_http_client().get_json(
- "%s%s/_matrix/identity/api/v1/pubkey/%s" %
- (id_server_scheme, server_hostname, key_name,),
- )
- if "public_key" not in key_data:
- raise AuthError(401, "No public key named %s from %s" %
- (key_name, server_hostname,))
- verify_signed_json(
- data,
- server_hostname,
- decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
- )
- return
-
- @defer.inlineCallbacks
- def _make_and_store_3pid_invite(
- self,
- requester,
- id_server,
- medium,
- address,
- room_id,
- user,
- txn_id
- ):
- room_state = yield self.hs.get_state_handler().get_current_state(room_id)
-
- inviter_display_name = ""
- inviter_avatar_url = ""
- member_event = room_state.get((EventTypes.Member, user.to_string()))
- if member_event:
- inviter_display_name = member_event.content.get("displayname", "")
- inviter_avatar_url = member_event.content.get("avatar_url", "")
-
- canonical_room_alias = ""
- canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
- if canonical_alias_event:
- canonical_room_alias = canonical_alias_event.content.get("alias", "")
-
- room_name = ""
- room_name_event = room_state.get((EventTypes.Name, ""))
- if room_name_event:
- room_name = room_name_event.content.get("name", "")
-
- room_join_rules = ""
- join_rules_event = room_state.get((EventTypes.JoinRules, ""))
- if join_rules_event:
- room_join_rules = join_rules_event.content.get("join_rule", "")
-
- room_avatar_url = ""
- room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
- if room_avatar_event:
- room_avatar_url = room_avatar_event.content.get("url", "")
-
- token, public_keys, fallback_public_key, display_name = (
- yield self._ask_id_server_for_third_party_invite(
- id_server=id_server,
- medium=medium,
- address=address,
- room_id=room_id,
- inviter_user_id=user.to_string(),
- room_alias=canonical_room_alias,
- room_avatar_url=room_avatar_url,
- room_join_rules=room_join_rules,
- room_name=room_name,
- inviter_display_name=inviter_display_name,
- inviter_avatar_url=inviter_avatar_url
- )
- )
-
- msg_handler = self.hs.get_handlers().message_handler
- yield msg_handler.create_and_send_nonmember_event(
- requester,
- {
- "type": EventTypes.ThirdPartyInvite,
- "content": {
- "display_name": display_name,
- "public_keys": public_keys,
-
- # For backwards compatibility:
- "key_validity_url": fallback_public_key["key_validity_url"],
- "public_key": fallback_public_key["public_key"],
- },
- "room_id": room_id,
- "sender": user.to_string(),
- "state_key": token,
- },
- txn_id=txn_id,
- )
-
- @defer.inlineCallbacks
- def _ask_id_server_for_third_party_invite(
- self,
- id_server,
- medium,
- address,
- room_id,
- inviter_user_id,
- room_alias,
- room_avatar_url,
- room_join_rules,
- room_name,
- inviter_display_name,
- inviter_avatar_url
- ):
- """
- Asks an identity server for a third party invite.
-
- :param id_server (str): hostname + optional port for the identity server.
- :param medium (str): The literal string "email".
- :param address (str): The third party address being invited.
- :param room_id (str): The ID of the room to which the user is invited.
- :param inviter_user_id (str): The user ID of the inviter.
- :param room_alias (str): An alias for the room, for cosmetic
- notifications.
- :param room_avatar_url (str): The URL of the room's avatar, for cosmetic
- notifications.
- :param room_join_rules (str): The join rules of the email
- (e.g. "public").
- :param room_name (str): The m.room.name of the room.
- :param inviter_display_name (str): The current display name of the
- inviter.
- :param inviter_avatar_url (str): The URL of the inviter's avatar.
-
- :return: A deferred tuple containing:
- token (str): The token which must be signed to prove authenticity.
- public_keys ([{"public_key": str, "key_validity_url": str}]):
- public_key is a base64-encoded ed25519 public key.
- fallback_public_key: One element from public_keys.
- display_name (str): A user-friendly name to represent the invited
- user.
- """
-
- is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
- id_server_scheme, id_server,
+ super(RoomListHandler, self).__init__(hs)
+ self.response_cache = ResponseCache()
+ self.remote_list_request_cache = ResponseCache()
+ self.remote_list_cache = {}
+ self.fetch_looping_call = hs.get_clock().looping_call(
+ self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
)
+ self.fetch_all_remote_lists()
- invite_config = {
- "medium": medium,
- "address": address,
- "room_id": room_id,
- "room_alias": room_alias,
- "room_avatar_url": room_avatar_url,
- "room_join_rules": room_join_rules,
- "room_name": room_name,
- "sender": inviter_user_id,
- "sender_display_name": inviter_display_name,
- "sender_avatar_url": inviter_avatar_url,
- }
-
- if self.hs.config.invite_3pid_guest:
- registration_handler = self.hs.get_handlers().registration_handler
- guest_access_token = yield registration_handler.guest_access_token_for(
- medium=medium,
- address=address,
- inviter_user_id=inviter_user_id,
- )
-
- guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
- guest_access_token
- )
-
- invite_config.update({
- "guest_access_token": guest_access_token,
- "guest_user_id": guest_user_info["user"].to_string(),
- })
-
- data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
- is_url,
- invite_config
- )
- # TODO: Check for success
- token = data["token"]
- public_keys = data.get("public_keys", [])
- if "public_key" in data:
- fallback_public_key = {
- "public_key": data["public_key"],
- "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
- id_server_scheme, id_server,
- ),
- }
- else:
- fallback_public_key = public_keys[0]
-
- if not public_keys:
- public_keys.append(fallback_public_key)
- display_name = data["display_name"]
- defer.returnValue((token, public_keys, fallback_public_key, display_name))
-
- def forget(self, user, room_id):
- return self.store.forget(user.to_string(), room_id)
-
-
-class RoomListHandler(BaseHandler):
+ def get_local_public_room_list(self):
+ result = self.response_cache.get(())
+ if not result:
+ result = self.response_cache.set((), self._get_public_room_list())
+ return result
@defer.inlineCallbacks
- def get_public_room_list(self):
+ def _get_public_room_list(self):
room_ids = yield self.store.get_public_room_ids()
+ results = []
+
@defer.inlineCallbacks
def handle_room(room_id):
- aliases = yield self.store.get_aliases_for_room(room_id)
-
# We pull each bit of state out indvidually to avoid pulling the
# full state into memory. Due to how the caching works this should
# be fairly quick, even if not originally in the cache.
@@ -962,6 +381,14 @@ class RoomListHandler(BaseHandler):
defer.returnValue(None)
result = {"room_id": room_id}
+
+ joined_users = yield self.store.get_users_in_room(room_id)
+ if len(joined_users) == 0:
+ return
+
+ result["num_joined_members"] = len(joined_users)
+
+ aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
result["aliases"] = aliases
@@ -1001,21 +428,61 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
- joined_users = yield self.store.get_users_in_room(room_id)
- result["num_joined_members"] = len(joined_users)
-
- defer.returnValue(result)
+ results.append(result)
- result = []
- for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
- chunk_result = yield defer.gatherResults([
- handle_room(room_id)
- for room_id in chunk
- ], consumeErrors=True).addErrback(unwrapFirstError)
- result.extend(v for v in chunk_result if v)
+ yield concurrently_execute(handle_room, room_ids, 10)
# FIXME (erikj): START is no longer a valid value
- defer.returnValue({"start": "START", "end": "END", "chunk": result})
+ defer.returnValue({"start": "START", "end": "END", "chunk": results})
+
+ @defer.inlineCallbacks
+ def fetch_all_remote_lists(self):
+ deferred = self.hs.get_replication_layer().get_public_rooms(
+ self.hs.config.secondary_directory_servers
+ )
+ self.remote_list_request_cache.set((), deferred)
+ self.remote_list_cache = yield deferred
+
+ @defer.inlineCallbacks
+ def get_aggregated_public_room_list(self):
+ """
+ Get the public room list from this server and the servers
+ specified in the secondary_directory_servers config option.
+ XXX: Pagination...
+ """
+ # We return the results from out cache which is updated by a looping call,
+ # unless we're missing a cache entry, in which case wait for the result
+ # of the fetch if there's one in progress. If not, omit that server.
+ wait = False
+ for s in self.hs.config.secondary_directory_servers:
+ if s not in self.remote_list_cache:
+ logger.warn("No cached room list from %s: waiting for fetch", s)
+ wait = True
+ break
+
+ if wait and self.remote_list_request_cache.get(()):
+ yield self.remote_list_request_cache.get(())
+
+ public_rooms = yield self.get_local_public_room_list()
+
+ # keep track of which room IDs we've seen so we can de-dup
+ room_ids = set()
+
+ # tag all the ones in our list with our server name.
+ # Also add the them to the de-deping set
+ for room in public_rooms['chunk']:
+ room["server_name"] = self.hs.hostname
+ room_ids.add(room["room_id"])
+
+ # Now add the results from federation
+ for server_name, server_result in self.remote_list_cache.items():
+ for room in server_result["chunk"]:
+ if room["room_id"] not in room_ids:
+ room["server_name"] = server_name
+ public_rooms["chunk"].append(room)
+ room_ids.add(room["room_id"])
+
+ defer.returnValue(public_rooms)
class RoomContextHandler(BaseHandler):
@@ -1040,10 +507,12 @@ class RoomContextHandler(BaseHandler):
now_token = yield self.hs.get_event_sources().get_current_token()
def filter_evts(events):
- return self._filter_events_for_client(
+ return filter_events_for_client(
+ self.store,
user.to_string(),
events,
- is_peeking=is_guest)
+ is_peeking=is_guest
+ )
event = yield self.store.get_event(event_id, get_prev_content=True,
allow_none=True)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
new file mode 100644
index 0000000000..7e616f44fd
--- /dev/null
+++ b/synapse/handlers/room_member.py
@@ -0,0 +1,677 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+from synapse.types import UserID, RoomID, Requester
+from synapse.api.constants import (
+ EventTypes, Membership,
+)
+from synapse.api.errors import AuthError, SynapseError, Codes
+from synapse.util.async import Linearizer
+from synapse.util.distributor import user_left_room, user_joined_room
+
+from signedjson.sign import verify_signed_json
+from signedjson.key import decode_verify_key_bytes
+
+from unpaddedbase64 import decode_base64
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+id_server_scheme = "https://"
+
+
+class RoomMemberHandler(BaseHandler):
+ # TODO(paul): This handler currently contains a messy conflation of
+ # low-level API that works on UserID objects and so on, and REST-level
+ # API that takes ID strings and returns pagination chunks. These concerns
+ # ought to be separated out a lot better.
+
+ def __init__(self, hs):
+ super(RoomMemberHandler, self).__init__(hs)
+
+ self.member_linearizer = Linearizer()
+
+ self.clock = hs.get_clock()
+
+ self.distributor = hs.get_distributor()
+ self.distributor.declare("user_joined_room")
+ self.distributor.declare("user_left_room")
+
+ @defer.inlineCallbacks
+ def _local_membership_update(
+ self, requester, target, room_id, membership,
+ prev_event_ids,
+ txn_id=None,
+ ratelimit=True,
+ ):
+ msg_handler = self.hs.get_handlers().message_handler
+
+ content = {"membership": membership}
+ if requester.is_guest:
+ content["kind"] = "guest"
+
+ event, context = yield msg_handler.create_event(
+ {
+ "type": EventTypes.Member,
+ "content": content,
+ "room_id": room_id,
+ "sender": requester.user.to_string(),
+ "state_key": target.to_string(),
+
+ # For backwards compatibility:
+ "membership": membership,
+ },
+ token_id=requester.access_token_id,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ )
+
+ yield msg_handler.handle_new_client_event(
+ requester,
+ event,
+ context,
+ extra_users=[target],
+ ratelimit=ratelimit,
+ )
+
+ prev_member_event = context.current_state.get(
+ (EventTypes.Member, target.to_string()),
+ None
+ )
+
+ if event.membership == Membership.JOIN:
+ if not prev_member_event or prev_member_event.membership != Membership.JOIN:
+ # Only fire user_joined_room if the user has acutally joined the
+ # room. Don't bother if the user is just changing their profile
+ # info.
+ yield user_joined_room(self.distributor, target, room_id)
+ elif event.membership == Membership.LEAVE:
+ if prev_member_event and prev_member_event.membership == Membership.JOIN:
+ user_left_room(self.distributor, target, room_id)
+
+ @defer.inlineCallbacks
+ def remote_join(self, remote_room_hosts, room_id, user, content):
+ if len(remote_room_hosts) == 0:
+ raise SynapseError(404, "No known servers")
+
+ # We don't do an auth check if we are doing an invite
+ # join dance for now, since we're kinda implicitly checking
+ # that we are allowed to join when we decide whether or not we
+ # need to do the invite/join dance.
+ yield self.hs.get_handlers().federation_handler.do_invite_join(
+ remote_room_hosts,
+ room_id,
+ user.to_string(),
+ content,
+ )
+ yield user_joined_room(self.distributor, user, room_id)
+
+ def reject_remote_invite(self, user_id, room_id, remote_room_hosts):
+ return self.hs.get_handlers().federation_handler.do_remotely_reject_invite(
+ remote_room_hosts,
+ room_id,
+ user_id
+ )
+
+ @defer.inlineCallbacks
+ def update_membership(
+ self,
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=None,
+ remote_room_hosts=None,
+ third_party_signed=None,
+ ratelimit=True,
+ ):
+ key = (target, room_id,)
+
+ with (yield self.member_linearizer.queue(key)):
+ result = yield self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ )
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _update_membership(
+ self,
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=None,
+ remote_room_hosts=None,
+ third_party_signed=None,
+ ratelimit=True,
+ ):
+ effective_membership_state = action
+ if action in ["kick", "unban"]:
+ effective_membership_state = "leave"
+
+ if third_party_signed is not None:
+ replication = self.hs.get_replication_layer()
+ yield replication.exchange_third_party_invite(
+ third_party_signed["sender"],
+ target.to_string(),
+ room_id,
+ third_party_signed,
+ )
+
+ if not remote_room_hosts:
+ remote_room_hosts = []
+
+ latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+ current_state = yield self.state_handler.get_current_state(
+ room_id, latest_event_ids=latest_event_ids,
+ )
+
+ old_state = current_state.get((EventTypes.Member, target.to_string()))
+ old_membership = old_state.content.get("membership") if old_state else None
+ if action == "unban" and old_membership != "ban":
+ raise SynapseError(
+ 403,
+ "Cannot unban user who was not banned (membership=%s)" % old_membership,
+ errcode=Codes.BAD_STATE
+ )
+ if old_membership == "ban" and action != "unban":
+ raise SynapseError(
+ 403,
+ "Cannot %s user who was banned" % (action,),
+ errcode=Codes.BAD_STATE
+ )
+
+ is_host_in_room = self.is_host_in_room(current_state)
+
+ if effective_membership_state == Membership.JOIN:
+ if requester.is_guest and not self._can_guest_join(current_state):
+ # This should be an auth check, but guests are a local concept,
+ # so don't really fit into the general auth process.
+ raise AuthError(403, "Guest access not allowed")
+
+ if not is_host_in_room:
+ inviter = yield self.get_inviter(target.to_string(), room_id)
+ if inviter and not self.hs.is_mine(inviter):
+ remote_room_hosts.append(inviter.domain)
+
+ content = {"membership": Membership.JOIN}
+
+ profile = self.hs.get_handlers().profile_handler
+ content["displayname"] = yield profile.get_displayname(target)
+ content["avatar_url"] = yield profile.get_avatar_url(target)
+
+ if requester.is_guest:
+ content["kind"] = "guest"
+
+ ret = yield self.remote_join(
+ remote_room_hosts, room_id, target, content
+ )
+ defer.returnValue(ret)
+
+ elif effective_membership_state == Membership.LEAVE:
+ if not is_host_in_room:
+ # perhaps we've been invited
+ inviter = yield self.get_inviter(target.to_string(), room_id)
+ if not inviter:
+ raise SynapseError(404, "Not a known room")
+
+ if self.hs.is_mine(inviter):
+ # the inviter was on our server, but has now left. Carry on
+ # with the normal rejection codepath.
+ #
+ # This is a bit of a hack, because the room might still be
+ # active on other servers.
+ pass
+ else:
+ # send the rejection to the inviter's HS.
+ remote_room_hosts = remote_room_hosts + [inviter.domain]
+
+ try:
+ ret = yield self.reject_remote_invite(
+ target.to_string(), room_id, remote_room_hosts
+ )
+ defer.returnValue(ret)
+ except SynapseError as e:
+ logger.warn("Failed to reject invite: %s", e)
+
+ yield self.store.locally_reject_invite(
+ target.to_string(), room_id
+ )
+
+ defer.returnValue({})
+
+ yield self._local_membership_update(
+ requester=requester,
+ target=target,
+ room_id=room_id,
+ membership=effective_membership_state,
+ txn_id=txn_id,
+ ratelimit=ratelimit,
+ prev_event_ids=latest_event_ids,
+ )
+
+ @defer.inlineCallbacks
+ def send_membership_event(
+ self,
+ requester,
+ event,
+ context,
+ remote_room_hosts=None,
+ ratelimit=True,
+ ):
+ """
+ Change the membership status of a user in a room.
+
+ Args:
+ requester (Requester): The local user who requested the membership
+ event. If None, certain checks, like whether this homeserver can
+ act as the sender, will be skipped.
+ event (SynapseEvent): The membership event.
+ context: The context of the event.
+ is_guest (bool): Whether the sender is a guest.
+ room_hosts ([str]): Homeservers which are likely to already be in
+ the room, and could be danced with in order to join this
+ homeserver for the first time.
+ ratelimit (bool): Whether to rate limit this request.
+ Raises:
+ SynapseError if there was a problem changing the membership.
+ """
+ remote_room_hosts = remote_room_hosts or []
+
+ target_user = UserID.from_string(event.state_key)
+ room_id = event.room_id
+
+ if requester is not None:
+ sender = UserID.from_string(event.sender)
+ assert sender == requester.user, (
+ "Sender (%s) must be same as requester (%s)" %
+ (sender, requester.user)
+ )
+ assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
+ else:
+ requester = Requester(target_user, None, False)
+
+ message_handler = self.hs.get_handlers().message_handler
+ prev_event = message_handler.deduplicate_state_event(event, context)
+ if prev_event is not None:
+ return
+
+ if event.membership == Membership.JOIN:
+ if requester.is_guest and not self._can_guest_join(context.current_state):
+ # This should be an auth check, but guests are a local concept,
+ # so don't really fit into the general auth process.
+ raise AuthError(403, "Guest access not allowed")
+
+ yield message_handler.handle_new_client_event(
+ requester,
+ event,
+ context,
+ extra_users=[target_user],
+ ratelimit=ratelimit,
+ )
+
+ prev_member_event = context.current_state.get(
+ (EventTypes.Member, target_user.to_string()),
+ None
+ )
+
+ if event.membership == Membership.JOIN:
+ if not prev_member_event or prev_member_event.membership != Membership.JOIN:
+ # Only fire user_joined_room if the user has acutally joined the
+ # room. Don't bother if the user is just changing their profile
+ # info.
+ yield user_joined_room(self.distributor, target_user, room_id)
+ elif event.membership == Membership.LEAVE:
+ if prev_member_event and prev_member_event.membership == Membership.JOIN:
+ user_left_room(self.distributor, target_user, room_id)
+
+ def _can_guest_join(self, current_state):
+ """
+ Returns whether a guest can join a room based on its current state.
+ """
+ guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
+ return (
+ guest_access
+ and guest_access.content
+ and "guest_access" in guest_access.content
+ and guest_access.content["guest_access"] == "can_join"
+ )
+
+ @defer.inlineCallbacks
+ def lookup_room_alias(self, room_alias):
+ """
+ Get the room ID associated with a room alias.
+
+ Args:
+ room_alias (RoomAlias): The alias to look up.
+ Returns:
+ A tuple of:
+ The room ID as a RoomID object.
+ Hosts likely to be participating in the room ([str]).
+ Raises:
+ SynapseError if room alias could not be found.
+ """
+ directory_handler = self.hs.get_handlers().directory_handler
+ mapping = yield directory_handler.get_association(room_alias)
+
+ if not mapping:
+ raise SynapseError(404, "No such room alias")
+
+ room_id = mapping["room_id"]
+ servers = mapping["servers"]
+
+ defer.returnValue((RoomID.from_string(room_id), servers))
+
+ @defer.inlineCallbacks
+ def get_inviter(self, user_id, room_id):
+ invite = yield self.store.get_invite_for_user_in_room(
+ user_id=user_id,
+ room_id=room_id,
+ )
+ if invite:
+ defer.returnValue(UserID.from_string(invite.sender))
+
+ @defer.inlineCallbacks
+ def do_3pid_invite(
+ self,
+ room_id,
+ inviter,
+ medium,
+ address,
+ id_server,
+ requester,
+ txn_id
+ ):
+ invitee = yield self._lookup_3pid(
+ id_server, medium, address
+ )
+
+ if invitee:
+ yield self.update_membership(
+ requester,
+ UserID.from_string(invitee),
+ room_id,
+ "invite",
+ txn_id=txn_id,
+ )
+ else:
+ yield self._make_and_store_3pid_invite(
+ requester,
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter,
+ txn_id=txn_id
+ )
+
+ @defer.inlineCallbacks
+ def _lookup_3pid(self, id_server, medium, address):
+ """Looks up a 3pid in the passed identity server.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ medium (str): The type of the third party identifier (e.g. "email").
+ address (str): The third party identifier (e.g. "foo@example.com").
+
+ Returns:
+ str: the matrix ID of the 3pid, or None if it is not recognized.
+ """
+ try:
+ data = yield self.hs.get_simple_http_client().get_json(
+ "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
+ {
+ "medium": medium,
+ "address": address,
+ }
+ )
+
+ if "mxid" in data:
+ if "signatures" not in data:
+ raise AuthError(401, "No signatures on 3pid binding")
+ self.verify_any_signature(data, id_server)
+ defer.returnValue(data["mxid"])
+
+ except IOError as e:
+ logger.warn("Error from identity server lookup: %s" % (e,))
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def verify_any_signature(self, data, server_hostname):
+ if server_hostname not in data["signatures"]:
+ raise AuthError(401, "No signature from server %s" % (server_hostname,))
+ for key_name, signature in data["signatures"][server_hostname].items():
+ key_data = yield self.hs.get_simple_http_client().get_json(
+ "%s%s/_matrix/identity/api/v1/pubkey/%s" %
+ (id_server_scheme, server_hostname, key_name,),
+ )
+ if "public_key" not in key_data:
+ raise AuthError(401, "No public key named %s from %s" %
+ (key_name, server_hostname,))
+ verify_signed_json(
+ data,
+ server_hostname,
+ decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
+ )
+ return
+
+ @defer.inlineCallbacks
+ def _make_and_store_3pid_invite(
+ self,
+ requester,
+ id_server,
+ medium,
+ address,
+ room_id,
+ user,
+ txn_id
+ ):
+ room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+
+ inviter_display_name = ""
+ inviter_avatar_url = ""
+ member_event = room_state.get((EventTypes.Member, user.to_string()))
+ if member_event:
+ inviter_display_name = member_event.content.get("displayname", "")
+ inviter_avatar_url = member_event.content.get("avatar_url", "")
+
+ canonical_room_alias = ""
+ canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
+ if canonical_alias_event:
+ canonical_room_alias = canonical_alias_event.content.get("alias", "")
+
+ room_name = ""
+ room_name_event = room_state.get((EventTypes.Name, ""))
+ if room_name_event:
+ room_name = room_name_event.content.get("name", "")
+
+ room_join_rules = ""
+ join_rules_event = room_state.get((EventTypes.JoinRules, ""))
+ if join_rules_event:
+ room_join_rules = join_rules_event.content.get("join_rule", "")
+
+ room_avatar_url = ""
+ room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+ if room_avatar_event:
+ room_avatar_url = room_avatar_event.content.get("url", "")
+
+ token, public_keys, fallback_public_key, display_name = (
+ yield self._ask_id_server_for_third_party_invite(
+ id_server=id_server,
+ medium=medium,
+ address=address,
+ room_id=room_id,
+ inviter_user_id=user.to_string(),
+ room_alias=canonical_room_alias,
+ room_avatar_url=room_avatar_url,
+ room_join_rules=room_join_rules,
+ room_name=room_name,
+ inviter_display_name=inviter_display_name,
+ inviter_avatar_url=inviter_avatar_url
+ )
+ )
+
+ msg_handler = self.hs.get_handlers().message_handler
+ yield msg_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.ThirdPartyInvite,
+ "content": {
+ "display_name": display_name,
+ "public_keys": public_keys,
+
+ # For backwards compatibility:
+ "key_validity_url": fallback_public_key["key_validity_url"],
+ "public_key": fallback_public_key["public_key"],
+ },
+ "room_id": room_id,
+ "sender": user.to_string(),
+ "state_key": token,
+ },
+ txn_id=txn_id,
+ )
+
+ @defer.inlineCallbacks
+ def _ask_id_server_for_third_party_invite(
+ self,
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter_user_id,
+ room_alias,
+ room_avatar_url,
+ room_join_rules,
+ room_name,
+ inviter_display_name,
+ inviter_avatar_url
+ ):
+ """
+ Asks an identity server for a third party invite.
+
+ Args:
+ id_server (str): hostname + optional port for the identity server.
+ medium (str): The literal string "email".
+ address (str): The third party address being invited.
+ room_id (str): The ID of the room to which the user is invited.
+ inviter_user_id (str): The user ID of the inviter.
+ room_alias (str): An alias for the room, for cosmetic notifications.
+ room_avatar_url (str): The URL of the room's avatar, for cosmetic
+ notifications.
+ room_join_rules (str): The join rules of the email (e.g. "public").
+ room_name (str): The m.room.name of the room.
+ inviter_display_name (str): The current display name of the
+ inviter.
+ inviter_avatar_url (str): The URL of the inviter's avatar.
+
+ Returns:
+ A deferred tuple containing:
+ token (str): The token which must be signed to prove authenticity.
+ public_keys ([{"public_key": str, "key_validity_url": str}]):
+ public_key is a base64-encoded ed25519 public key.
+ fallback_public_key: One element from public_keys.
+ display_name (str): A user-friendly name to represent the invited
+ user.
+ """
+
+ is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
+ id_server_scheme, id_server,
+ )
+
+ invite_config = {
+ "medium": medium,
+ "address": address,
+ "room_id": room_id,
+ "room_alias": room_alias,
+ "room_avatar_url": room_avatar_url,
+ "room_join_rules": room_join_rules,
+ "room_name": room_name,
+ "sender": inviter_user_id,
+ "sender_display_name": inviter_display_name,
+ "sender_avatar_url": inviter_avatar_url,
+ }
+
+ if self.hs.config.invite_3pid_guest:
+ registration_handler = self.hs.get_handlers().registration_handler
+ guest_access_token = yield registration_handler.guest_access_token_for(
+ medium=medium,
+ address=address,
+ inviter_user_id=inviter_user_id,
+ )
+
+ guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+ guest_access_token
+ )
+
+ invite_config.update({
+ "guest_access_token": guest_access_token,
+ "guest_user_id": guest_user_info["user"].to_string(),
+ })
+
+ data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+ is_url,
+ invite_config
+ )
+ # TODO: Check for success
+ token = data["token"]
+ public_keys = data.get("public_keys", [])
+ if "public_key" in data:
+ fallback_public_key = {
+ "public_key": data["public_key"],
+ "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+ id_server_scheme, id_server,
+ ),
+ }
+ else:
+ fallback_public_key = public_keys[0]
+
+ if not public_keys:
+ public_keys.append(fallback_public_key)
+ display_name = data["display_name"]
+ defer.returnValue((token, public_keys, fallback_public_key, display_name))
+
+ @defer.inlineCallbacks
+ def forget(self, user, room_id):
+ user_id = user.to_string()
+
+ member = yield self.state_handler.get_current_state(
+ room_id=room_id,
+ event_type=EventTypes.Member,
+ state_key=user_id
+ )
+ membership = member.membership if member else None
+
+ if membership is not None and membership != Membership.LEAVE:
+ raise SynapseError(400, "User %s in room %s" % (
+ user_id, room_id
+ ))
+
+ if membership:
+ yield self.store.forget(user_id, room_id)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9937d8dd7f..df75d70fac 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.api.filtering import Filter
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
+from synapse.visibility import filter_events_for_client
from unpaddedbase64 import decode_base64, encode_base64
@@ -172,8 +173,8 @@ class SearchHandler(BaseHandler):
filtered_events = search_filter.filter([r["event"] for r in results])
- events = yield self._filter_events_for_client(
- user.to_string(), filtered_events
+ events = yield filter_events_for_client(
+ self.store, user.to_string(), filtered_events
)
events.sort(key=lambda e: -rank_map[e.event_id])
@@ -223,8 +224,8 @@ class SearchHandler(BaseHandler):
r["event"] for r in results
])
- events = yield self._filter_events_for_client(
- user.to_string(), filtered_events
+ events = yield filter_events_for_client(
+ self.store, user.to_string(), filtered_events
)
room_events.extend(events)
@@ -281,12 +282,12 @@ class SearchHandler(BaseHandler):
event.room_id, event.event_id, before_limit, after_limit
)
- res["events_before"] = yield self._filter_events_for_client(
- user.to_string(), res["events_before"]
+ res["events_before"] = yield filter_events_for_client(
+ self.store, user.to_string(), res["events_before"]
)
- res["events_after"] = yield self._filter_events_for_client(
- user.to_string(), res["events_after"]
+ res["events_after"] = yield filter_events_for_client(
+ self.store, user.to_string(), res["events_after"]
)
res["start"] = now_token.copy_and_replace(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1f6fde8e8a..be26a491ff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,14 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseHandler
-
-from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.async import concurrently_execute
+from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
+from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
+from synapse.visibility import filter_events_for_client
from twisted.internet import defer
@@ -35,6 +34,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"filter_collection",
"is_guest",
+ "request_key",
])
@@ -130,14 +130,16 @@ class SyncResult(collections.namedtuple("SyncResult", [
)
-class SyncHandler(BaseHandler):
+class SyncHandler(object):
def __init__(self, hs):
- super(SyncHandler, self).__init__(hs)
+ self.store = hs.get_datastore()
+ self.notifier = hs.get_notifier()
+ self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
+ self.response_cache = ResponseCache()
- @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -146,7 +148,19 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
+ result = self.response_cache.get(sync_config.request_key)
+ if not result:
+ result = self.response_cache.set(
+ sync_config.request_key,
+ self._wait_for_sync_for_user(
+ sync_config, since_token, timeout, full_state
+ )
+ )
+ return result
+ @defer.inlineCallbacks
+ def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
+ full_state):
context = LoggingContext.current_context()
if context:
if since_token is None:
@@ -179,197 +193,15 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
- if since_token is None or full_state:
- return self.full_state_sync(sync_config, since_token)
- else:
- return self.incremental_sync_with_gap(sync_config, since_token)
-
- @defer.inlineCallbacks
- def full_state_sync(self, sync_config, timeline_since_token):
- """Get a sync for a client which is starting without any state.
-
- If a 'message_since_token' is given, only timeline events which have
- happened since that token will be returned.
-
- Returns:
- A Deferred SyncResult.
- """
- now_token = yield self.event_sources.get_current_token()
-
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token
- )
-
- presence_stream = self.event_sources.sources["presence"]
- # TODO (mjark): This looks wrong, shouldn't we be getting the presence
- # UP to the present rather than after the present?
- pagination_config = PaginationConfig(from_token=now_token)
- presence, _ = yield presence_stream.get_pagination_rows(
- user=sync_config.user,
- pagination_config=pagination_config.get_source_config("presence"),
- key=None
- )
-
- membership_list = (
- Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
- )
-
- room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=sync_config.user.to_string(),
- membership_list=membership_list
- )
-
- account_data, account_data_by_room = (
- yield self.store.get_account_data_for_user(
- sync_config.user.to_string()
- )
- )
-
- account_data['m.push_rules'] = yield self.push_rules_for_user(
- sync_config.user
- )
-
- tags_by_room = yield self.store.get_tags_for_user(
- sync_config.user.to_string()
- )
-
- joined = []
- invited = []
- archived = []
- deferreds = []
-
- room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
- for room_list_chunk in room_list_chunks:
- for event in room_list_chunk:
- if event.membership == Membership.JOIN:
- room_sync_deferred = preserve_fn(
- self.full_state_sync_for_joined_room
- )(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(joined.append)
- deferreds.append(room_sync_deferred)
- elif event.membership == Membership.INVITE:
- invite = yield self.store.get_event(event.event_id)
- invited.append(InvitedSyncResult(
- room_id=event.room_id,
- invite=invite,
- ))
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- # Always send down rooms we were banned or kicked from.
- if not sync_config.filter_collection.include_leave:
- if event.membership == Membership.LEAVE:
- if sync_config.user.to_string() == event.sender:
- continue
-
- leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
- )
- room_sync_deferred = preserve_fn(
- self.full_state_sync_for_archived_room
- )(
- sync_config=sync_config,
- room_id=event.room_id,
- leave_event_id=event.event_id,
- leave_token=leave_token,
- timeline_since_token=timeline_since_token,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(archived.append)
- deferreds.append(room_sync_deferred)
-
- yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
-
- account_data_for_user = sync_config.filter_collection.filter_account_data(
- self.account_data_for_user(account_data)
- )
-
- presence = sync_config.filter_collection.filter_presence(
- presence
- )
-
- defer.returnValue(SyncResult(
- presence=presence,
- account_data=account_data_for_user,
- joined=joined,
- invited=invited,
- archived=archived,
- next_batch=now_token,
- ))
-
- @defer.inlineCallbacks
- def full_state_sync_for_joined_room(self, room_id, sync_config,
- now_token, timeline_since_token,
- ephemeral_by_room, tags_by_room,
- account_data_by_room):
- """Sync a room for a client which is starting without any state
- Returns:
- A Deferred JoinedSyncResult.
- """
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, now_token, since_token=timeline_since_token
- )
-
- room_sync = yield self.incremental_sync_with_gap_for_room(
- room_id, sync_config,
- now_token=now_token,
- since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- batch=batch,
- full_state=True,
- )
-
- defer.returnValue(room_sync)
+ return self.generate_sync_result(sync_config, since_token, full_state)
@defer.inlineCallbacks
def push_rules_for_user(self, user):
user_id = user.to_string()
- rawrules = yield self.store.get_push_rules_for_user(user_id)
- enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
- rules = format_push_rules_for_user(user, rawrules, enabled_map)
+ rules = yield self.store.get_push_rules_for_user(user_id)
+ rules = format_push_rules_for_user(user, rules)
defer.returnValue(rules)
- def account_data_for_user(self, account_data):
- account_data_events = []
-
- for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
-
- return account_data_events
-
- def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
- account_data_events = []
- tags = tags_by_room.get(room_id)
- if tags is not None:
- account_data_events.append({
- "type": "m.tag",
- "content": {"tags": tags},
- })
-
- account_data = account_data_by_room.get(room_id, {})
- for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
-
- return account_data_events
-
@defer.inlineCallbacks
def ephemeral_by_room(self, sync_config, now_token, since_token=None):
"""Get the ephemeral events for each room the user is in
@@ -432,255 +264,44 @@ class SyncHandler(BaseHandler):
defer.returnValue((now_token, ephemeral_by_room))
- def full_state_sync_for_archived_room(self, room_id, sync_config,
- leave_event_id, leave_token,
- timeline_since_token, tags_by_room,
- account_data_by_room):
- """Sync a room for a client which is starting without any state
- Returns:
- A Deferred ArchivedSyncResult.
- """
-
- return self.incremental_sync_for_archived_room(
- sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
- account_data_by_room, full_state=True, leave_token=leave_token,
- )
-
@defer.inlineCallbacks
- def incremental_sync_with_gap(self, sync_config, since_token):
- """ Get the incremental delta needed to bring the client up to
- date with the server.
- Returns:
- A Deferred SyncResult.
+ def _load_filtered_recents(self, room_id, sync_config, now_token,
+ since_token=None, recents=None, newly_joined_room=False):
"""
- now_token = yield self.event_sources.get_current_token()
-
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
-
- presence_source = self.event_sources.sources["presence"]
- presence, presence_key = yield presence_source.get_new_events(
- user=sync_config.user,
- from_key=since_token.presence_key,
- limit=sync_config.filter_collection.presence_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("presence_key", presence_key)
-
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token, since_token
- )
-
- rm_handler = self.hs.get_handlers().room_member_handler
- app_service = yield self.store.get_app_service_by_user_id(
- sync_config.user.to_string()
- )
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- joined_room_ids = set(r.room_id for r in rooms)
- else:
- joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
- sync_config.user
- )
-
- user_id = sync_config.user.to_string()
-
- timeline_limit = sync_config.filter_collection.timeline_limit()
-
- tags_by_room = yield self.store.get_updated_tags(
- user_id,
- since_token.account_data_key,
- )
-
- account_data, account_data_by_room = (
- yield self.store.get_updated_account_data_for_user(
- user_id,
- since_token.account_data_key,
- )
- )
-
- push_rules_changed = yield self.store.have_push_rules_changed_for_user(
- user_id, int(since_token.push_rules_key)
- )
-
- if push_rules_changed:
- account_data["m.push_rules"] = yield self.push_rules_for_user(
- sync_config.user
- )
-
- # Get a list of membership change events that have happened.
- rooms_changed = yield self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key
- )
-
- mem_change_events_by_room_id = {}
- for event in rooms_changed:
- mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
-
- newly_joined_rooms = []
- archived = []
- invited = []
- for room_id, events in mem_change_events_by_room_id.items():
- non_joins = [e for e in events if e.membership != Membership.JOIN]
- has_join = len(non_joins) != len(events)
-
- # We want to figure out if we joined the room at some point since
- # the last sync (even if we have since left). This is to make sure
- # we do send down the room, and with full state, where necessary
- if room_id in joined_room_ids or has_join:
- old_state = yield self.get_state_at(room_id, since_token)
- old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
- if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
- newly_joined_rooms.append(room_id)
-
- if room_id in joined_room_ids:
- continue
-
- if not non_joins:
- continue
-
- # Only bother if we're still currently invited
- should_invite = non_joins[-1].membership == Membership.INVITE
- if should_invite:
- room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
- if room_sync:
- invited.append(room_sync)
-
- # Always include leave/ban events. Just take the last one.
- # TODO: How do we handle ban -> leave in same batch?
- leave_events = [
- e for e in non_joins
- if e.membership in (Membership.LEAVE, Membership.BAN)
- ]
-
- if leave_events:
- leave_event = leave_events[-1]
- room_sync = yield self.incremental_sync_for_archived_room(
- sync_config, room_id, leave_event.event_id, since_token,
- tags_by_room, account_data_by_room,
- full_state=room_id in newly_joined_rooms
- )
- if room_sync:
- archived.append(room_sync)
-
- # Get all events for rooms we're currently joined to.
- room_to_events = yield self.store.get_room_events_stream_for_rooms(
- room_ids=joined_room_ids,
- from_key=since_token.room_key,
- to_key=now_token.room_key,
- limit=timeline_limit + 1,
- )
-
- joined = []
- # We loop through all room ids, even if there are no new events, in case
- # there are non room events taht we need to notify about.
- for room_id in joined_room_ids:
- room_entry = room_to_events.get(room_id, None)
-
- if room_entry:
- events, start_key = room_entry
-
- prev_batch_token = now_token.copy_and_replace("room_key", start_key)
-
- newly_joined_room = room_id in newly_joined_rooms
- full_state = newly_joined_room
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, prev_batch_token,
- since_token=since_token,
- recents=events,
- newly_joined_room=newly_joined_room,
- )
- else:
- batch = TimelineBatch(
- events=[],
- prev_batch=since_token,
- limited=False,
- )
- full_state = False
-
- room_sync = yield self.incremental_sync_with_gap_for_room(
- room_id=room_id,
- sync_config=sync_config,
- since_token=since_token,
- now_token=now_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- batch=batch,
- full_state=full_state,
- )
- if room_sync:
- joined.append(room_sync)
-
- # For each newly joined room, we want to send down presence of
- # existing users.
- presence_handler = self.hs.get_handlers().presence_handler
- extra_presence_users = set()
- for room_id in newly_joined_rooms:
- users = yield self.store.get_users_in_room(event.room_id)
- extra_presence_users.update(users)
-
- # For each new member, send down presence.
- for joined_sync in joined:
- it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
- for event in it:
- if event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- extra_presence_users.add(event.state_key)
-
- states = yield presence_handler.get_states(
- [u for u in extra_presence_users if u != user_id],
- as_event=True,
- )
- presence.extend(states)
-
- account_data_for_user = sync_config.filter_collection.filter_account_data(
- self.account_data_for_user(account_data)
- )
-
- presence = sync_config.filter_collection.filter_presence(
- presence
- )
-
- defer.returnValue(SyncResult(
- presence=presence,
- account_data=account_data_for_user,
- joined=joined,
- invited=invited,
- archived=archived,
- next_batch=now_token,
- ))
-
- @defer.inlineCallbacks
- def load_filtered_recents(self, room_id, sync_config, now_token,
- since_token=None, recents=None, newly_joined_room=False):
- """
- :returns a Deferred TimelineBatch
+ Returns:
+ a Deferred TimelineBatch
"""
with Measure(self.clock, "load_filtered_recents"):
- filtering_factor = 2
timeline_limit = sync_config.filter_collection.timeline_limit()
- load_limit = max(timeline_limit * filtering_factor, 10)
- max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
- end_key = room_key
if recents is None or newly_joined_room or timeline_limit < len(recents):
limited = True
else:
limited = False
- if recents is not None:
+ if recents:
recents = sync_config.filter_collection.filter_room_timeline(recents)
- recents = yield self._filter_events_for_client(
+ recents = yield filter_events_for_client(
+ self.store,
sync_config.user.to_string(),
recents,
)
else:
recents = []
+ if not limited:
+ defer.returnValue(TimelineBatch(
+ events=recents,
+ prev_batch=now_token,
+ limited=False
+ ))
+
+ filtering_factor = 2
+ load_limit = max(timeline_limit * filtering_factor, 10)
+ max_repeat = 5 # Only try a few times per room, otherwise
+ room_key = now_token.room_key
+ end_key = room_key
+
since_key = None
if since_token and not newly_joined_room:
since_key = since_token.room_key
@@ -695,7 +316,8 @@ class SyncHandler(BaseHandler):
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
- loaded_recents = yield self._filter_events_for_client(
+ loaded_recents = yield filter_events_for_client(
+ self.store,
sync_config.user.to_string(),
loaded_recents,
)
@@ -723,109 +345,15 @@ class SyncHandler(BaseHandler):
))
@defer.inlineCallbacks
- def incremental_sync_with_gap_for_room(self, room_id, sync_config,
- since_token, now_token,
- ephemeral_by_room, tags_by_room,
- account_data_by_room,
- batch, full_state=False):
- state = yield self.compute_state_delta(
- room_id, batch, sync_config, since_token, now_token,
- full_state=full_state
- )
-
- account_data = self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- )
-
- account_data = sync_config.filter_collection.filter_room_account_data(
- account_data
- )
-
- ephemeral = sync_config.filter_collection.filter_room_ephemeral(
- ephemeral_by_room.get(room_id, [])
- )
-
- unread_notifications = {}
- room_sync = JoinedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=state,
- ephemeral=ephemeral,
- account_data=account_data,
- unread_notifications=unread_notifications,
- )
-
- if room_sync:
- notifs = yield self.unread_notifs_for_room_id(
- room_id, sync_config
- )
-
- if notifs is not None:
- unread_notifications["notification_count"] = notifs["notify_count"]
- unread_notifications["highlight_count"] = notifs["highlight_count"]
-
- logger.debug("Room sync: %r", room_sync)
-
- defer.returnValue(room_sync)
-
- @defer.inlineCallbacks
- def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
- since_token, tags_by_room,
- account_data_by_room, full_state,
- leave_token=None):
- """ Get the incremental delta needed to bring the client up to date for
- the archived room.
- Returns:
- A Deferred ArchivedSyncResult
- """
-
- if not leave_token:
- stream_token = yield self.store.get_stream_token_for_event(
- leave_event_id
- )
-
- leave_token = since_token.copy_and_replace("room_key", stream_token)
-
- if since_token and since_token.is_after(leave_token):
- defer.returnValue(None)
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, leave_token, since_token,
- )
-
- logger.debug("Recents %r", batch)
-
- state_events_delta = yield self.compute_state_delta(
- room_id, batch, sync_config, since_token, leave_token,
- full_state=full_state
- )
-
- account_data = self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- )
-
- account_data = sync_config.filter_collection.filter_room_account_data(
- account_data
- )
-
- room_sync = ArchivedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=state_events_delta,
- account_data=account_data,
- )
-
- logger.debug("Room sync: %r", room_sync)
-
- defer.returnValue(room_sync)
-
- @defer.inlineCallbacks
def get_state_after_event(self, event):
"""
Get the room state after the given event
- :param synapse.events.EventBase event: event of interest
- :return: A Deferred map from ((type, state_key)->Event)
+ Args:
+ event(synapse.events.EventBase): event of interest
+
+ Returns:
+ A Deferred map from ((type, state_key)->Event)
"""
state = yield self.store.get_state_for_event(event.event_id)
if event.is_state():
@@ -836,9 +364,13 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_state_at(self, room_id, stream_position):
""" Get the room state at a particular stream position
- :param str room_id: room for which to get state
- :param StreamToken stream_position: point at which to get state
- :returns: A Deferred map from ((type, state_key)->Event)
+
+ Args:
+ room_id(str): room for which to get state
+ stream_position(StreamToken): point at which to get state
+
+ Returns:
+ A Deferred map from ((type, state_key)->Event)
"""
last_events, token = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1,
@@ -859,15 +391,18 @@ class SyncHandler(BaseHandler):
""" Works out the differnce in state between the start of the timeline
and the previous sync.
- :param str room_id
- :param TimelineBatch batch: The timeline batch for the room that will
- be sent to the user.
- :param sync_config
- :param str since_token: Token of the end of the previous batch. May be None.
- :param str now_token: Token of the end of the current batch.
- :param bool full_state: Whether to force returning the full state.
+ Args:
+ room_id(str):
+ batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+ the room that will be sent to the user.
+ sync_config(synapse.handlers.sync.SyncConfig):
+ since_token(str|None): Token of the end of the previous batch. May
+ be None.
+ now_token(str): Token of the end of the current batch.
+ full_state(bool): Whether to force returning the full state.
- :returns A new event dictionary
+ Returns:
+ A deferred new event dictionary
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -934,24 +469,6 @@ class SyncHandler(BaseHandler):
for e in sync_config.filter_collection.filter_room_state(state.values())
})
- def check_joined_room(self, sync_config, state_delta):
- """
- Check if the user has just joined the given room (so should
- be given the full state)
-
- :param sync_config:
- :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the
- difference in state since the last sync
-
- :returns A deferred Tuple (state_delta, limited)
- """
- join_event = state_delta.get((
- EventTypes.Member, sync_config.user.to_string()), None)
- if join_event is not None:
- if join_event.content["membership"] == Membership.JOIN:
- return True
- return False
-
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config):
with Measure(self.clock, "unread_notifs_for_room_id"):
@@ -972,6 +489,551 @@ class SyncHandler(BaseHandler):
# count is whatever it was last time.
defer.returnValue(None)
+ @defer.inlineCallbacks
+ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+ """Generates a sync result.
+
+ Args:
+ sync_config (SyncConfig)
+ since_token (StreamToken)
+ full_state (bool)
+
+ Returns:
+ Deferred(SyncResult)
+ """
+
+ # NB: The now_token gets changed by some of the generate_sync_* methods,
+ # this is due to some of the underlying streams not supporting the ability
+ # to query up to a given point.
+ # Always use the `now_token` in `SyncResultBuilder`
+ now_token = yield self.event_sources.get_current_token()
+
+ sync_result_builder = SyncResultBuilder(
+ sync_config, full_state,
+ since_token=since_token,
+ now_token=now_token,
+ )
+
+ account_data_by_room = yield self._generate_sync_entry_for_account_data(
+ sync_result_builder
+ )
+
+ res = yield self._generate_sync_entry_for_rooms(
+ sync_result_builder, account_data_by_room
+ )
+ newly_joined_rooms, newly_joined_users = res
+
+ yield self._generate_sync_entry_for_presence(
+ sync_result_builder, newly_joined_rooms, newly_joined_users
+ )
+
+ defer.returnValue(SyncResult(
+ presence=sync_result_builder.presence,
+ account_data=sync_result_builder.account_data,
+ joined=sync_result_builder.joined,
+ invited=sync_result_builder.invited,
+ archived=sync_result_builder.archived,
+ next_batch=sync_result_builder.now_token,
+ ))
+
+ @defer.inlineCallbacks
+ def _generate_sync_entry_for_account_data(self, sync_result_builder):
+ """Generates the account data portion of the sync response. Populates
+ `sync_result_builder` with the result.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+
+ Returns:
+ Deferred(dict): A dictionary containing the per room account data.
+ """
+ sync_config = sync_result_builder.sync_config
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+
+ if since_token and not sync_result_builder.full_state:
+ account_data, account_data_by_room = (
+ yield self.store.get_updated_account_data_for_user(
+ user_id,
+ since_token.account_data_key,
+ )
+ )
+
+ push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+ user_id, int(since_token.push_rules_key)
+ )
+
+ if push_rules_changed:
+ account_data["m.push_rules"] = yield self.push_rules_for_user(
+ sync_config.user
+ )
+ else:
+ account_data, account_data_by_room = (
+ yield self.store.get_account_data_for_user(
+ sync_config.user.to_string()
+ )
+ )
+
+ account_data['m.push_rules'] = yield self.push_rules_for_user(
+ sync_config.user
+ )
+
+ account_data_for_user = sync_config.filter_collection.filter_account_data([
+ {"type": account_data_type, "content": content}
+ for account_data_type, content in account_data.items()
+ ])
+
+ sync_result_builder.account_data = account_data_for_user
+
+ defer.returnValue(account_data_by_room)
+
+ @defer.inlineCallbacks
+ def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
+ newly_joined_users):
+ """Generates the presence portion of the sync response. Populates the
+ `sync_result_builder` with the result.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ newly_joined_rooms(list): List of rooms that the user has joined
+ since the last sync (or empty if an initial sync)
+ newly_joined_users(list): List of users that have joined rooms
+ since the last sync (or empty if an initial sync)
+ """
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+ user = sync_result_builder.sync_config.user
+
+ presence_source = self.event_sources.sources["presence"]
+
+ since_token = sync_result_builder.since_token
+ if since_token and not sync_result_builder.full_state:
+ presence_key = since_token.presence_key
+ include_offline = True
+ else:
+ presence_key = None
+ include_offline = False
+
+ presence, presence_key = yield presence_source.get_new_events(
+ user=user,
+ from_key=presence_key,
+ is_guest=sync_config.is_guest,
+ include_offline=include_offline,
+ )
+ sync_result_builder.now_token = now_token.copy_and_replace(
+ "presence_key", presence_key
+ )
+
+ extra_users_ids = set(newly_joined_users)
+ for room_id in newly_joined_rooms:
+ users = yield self.store.get_users_in_room(room_id)
+ extra_users_ids.update(users)
+ extra_users_ids.discard(user.to_string())
+
+ states = yield self.presence_handler.get_states(
+ extra_users_ids,
+ as_event=True,
+ )
+ presence.extend(states)
+
+ # Deduplicate the presence entries so that there's at most one per user
+ presence = {p["content"]["user_id"]: p for p in presence}.values()
+
+ presence = sync_config.filter_collection.filter_presence(
+ presence
+ )
+
+ sync_result_builder.presence = presence
+
+ @defer.inlineCallbacks
+ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+ """Generates the rooms portion of the sync response. Populates the
+ `sync_result_builder` with the result.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ account_data_by_room(dict): Dictionary of per room account data
+
+ Returns:
+ Deferred(tuple): Returns a 2-tuple of
+ `(newly_joined_rooms, newly_joined_users)`
+ """
+ user_id = sync_result_builder.sync_config.user.to_string()
+
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_result_builder.sync_config,
+ now_token=sync_result_builder.now_token,
+ since_token=sync_result_builder.since_token,
+ )
+ sync_result_builder.now_token = now_token
+
+ ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+ "m.ignored_user_list", user_id=user_id,
+ )
+
+ if ignored_account_data:
+ ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+ else:
+ ignored_users = frozenset()
+
+ if sync_result_builder.since_token:
+ res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
+ room_entries, invited, newly_joined_rooms = res
+
+ tags_by_room = yield self.store.get_updated_tags(
+ user_id,
+ sync_result_builder.since_token.account_data_key,
+ )
+ else:
+ res = yield self._get_all_rooms(sync_result_builder, ignored_users)
+ room_entries, invited, newly_joined_rooms = res
+
+ tags_by_room = yield self.store.get_tags_for_user(user_id)
+
+ def handle_room_entries(room_entry):
+ return self._generate_room_entry(
+ sync_result_builder,
+ ignored_users,
+ room_entry,
+ ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+ tags=tags_by_room.get(room_entry.room_id),
+ account_data=account_data_by_room.get(room_entry.room_id, {}),
+ always_include=sync_result_builder.full_state,
+ )
+
+ yield concurrently_execute(handle_room_entries, room_entries, 10)
+
+ sync_result_builder.invited.extend(invited)
+
+ # Now we want to get any newly joined users
+ newly_joined_users = set()
+ if sync_result_builder.since_token:
+ for joined_sync in sync_result_builder.joined:
+ it = itertools.chain(
+ joined_sync.timeline.events, joined_sync.state.values()
+ )
+ for event in it:
+ if event.type == EventTypes.Member:
+ if event.membership == Membership.JOIN:
+ newly_joined_users.add(event.state_key)
+
+ defer.returnValue((newly_joined_rooms, newly_joined_users))
+
+ @defer.inlineCallbacks
+ def _get_rooms_changed(self, sync_result_builder, ignored_users):
+ """Gets the the changes that have happened since the last sync.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ ignored_users(set(str)): Set of users ignored by user.
+
+ Returns:
+ Deferred(tuple): Returns a tuple of the form:
+ `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)`
+ """
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+
+ assert since_token
+
+ app_service = yield self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ rooms = yield self.store.get_app_service_rooms(app_service)
+ joined_room_ids = set(r.room_id for r in rooms)
+ else:
+ rooms = yield self.store.get_rooms_for_user(user_id)
+ joined_room_ids = set(r.room_id for r in rooms)
+
+ # Get a list of membership change events that have happened.
+ rooms_changed = yield self.store.get_membership_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key
+ )
+
+ mem_change_events_by_room_id = {}
+ for event in rooms_changed:
+ mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+ newly_joined_rooms = []
+ room_entries = []
+ invited = []
+ for room_id, events in mem_change_events_by_room_id.items():
+ non_joins = [e for e in events if e.membership != Membership.JOIN]
+ has_join = len(non_joins) != len(events)
+
+ # We want to figure out if we joined the room at some point since
+ # the last sync (even if we have since left). This is to make sure
+ # we do send down the room, and with full state, where necessary
+ if room_id in joined_room_ids or has_join:
+ old_state = yield self.get_state_at(room_id, since_token)
+ old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+ if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+ newly_joined_rooms.append(room_id)
+
+ if room_id in joined_room_ids:
+ continue
+
+ if not non_joins:
+ continue
+
+ # Only bother if we're still currently invited
+ should_invite = non_joins[-1].membership == Membership.INVITE
+ if should_invite:
+ if event.sender not in ignored_users:
+ room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+ if room_sync:
+ invited.append(room_sync)
+
+ # Always include leave/ban events. Just take the last one.
+ # TODO: How do we handle ban -> leave in same batch?
+ leave_events = [
+ e for e in non_joins
+ if e.membership in (Membership.LEAVE, Membership.BAN)
+ ]
+
+ if leave_events:
+ leave_event = leave_events[-1]
+ leave_stream_token = yield self.store.get_stream_token_for_event(
+ leave_event.event_id
+ )
+ leave_token = since_token.copy_and_replace(
+ "room_key", leave_stream_token
+ )
+
+ if since_token and since_token.is_after(leave_token):
+ continue
+
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=room_id,
+ rtype="archived",
+ events=None,
+ newly_joined=room_id in newly_joined_rooms,
+ full_state=False,
+ since_token=since_token,
+ upto_token=leave_token,
+ ))
+
+ timeline_limit = sync_config.filter_collection.timeline_limit()
+
+ # Get all events for rooms we're currently joined to.
+ room_to_events = yield self.store.get_room_events_stream_for_rooms(
+ room_ids=joined_room_ids,
+ from_key=since_token.room_key,
+ to_key=now_token.room_key,
+ limit=timeline_limit + 1,
+ )
+
+ # We loop through all room ids, even if there are no new events, in case
+ # there are non room events taht we need to notify about.
+ for room_id in joined_room_ids:
+ room_entry = room_to_events.get(room_id, None)
+
+ if room_entry:
+ events, start_key = room_entry
+
+ prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=room_id,
+ rtype="joined",
+ events=events,
+ newly_joined=room_id in newly_joined_rooms,
+ full_state=False,
+ since_token=None if room_id in newly_joined_rooms else since_token,
+ upto_token=prev_batch_token,
+ ))
+ else:
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=room_id,
+ rtype="joined",
+ events=[],
+ newly_joined=room_id in newly_joined_rooms,
+ full_state=False,
+ since_token=since_token,
+ upto_token=since_token,
+ ))
+
+ defer.returnValue((room_entries, invited, newly_joined_rooms))
+
+ @defer.inlineCallbacks
+ def _get_all_rooms(self, sync_result_builder, ignored_users):
+ """Returns entries for all rooms for the user.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ ignored_users(set(str)): Set of users ignored by user.
+
+ Returns:
+ Deferred(tuple): Returns a tuple of the form:
+ `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
+ """
+
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+
+ membership_list = (
+ Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+ )
+
+ room_list = yield self.store.get_rooms_for_user_where_membership_is(
+ user_id=user_id,
+ membership_list=membership_list
+ )
+
+ room_entries = []
+ invited = []
+
+ for event in room_list:
+ if event.membership == Membership.JOIN:
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=event.room_id,
+ rtype="joined",
+ events=None,
+ newly_joined=False,
+ full_state=True,
+ since_token=since_token,
+ upto_token=now_token,
+ ))
+ elif event.membership == Membership.INVITE:
+ if event.sender in ignored_users:
+ continue
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ # Always send down rooms we were banned or kicked from.
+ if not sync_config.filter_collection.include_leave:
+ if event.membership == Membership.LEAVE:
+ if user_id == event.sender:
+ continue
+
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=event.room_id,
+ rtype="archived",
+ events=None,
+ newly_joined=False,
+ full_state=True,
+ since_token=since_token,
+ upto_token=leave_token,
+ ))
+
+ defer.returnValue((room_entries, invited, []))
+
+ @defer.inlineCallbacks
+ def _generate_room_entry(self, sync_result_builder, ignored_users,
+ room_builder, ephemeral, tags, account_data,
+ always_include=False):
+ """Populates the `joined` and `archived` section of `sync_result_builder`
+ based on the `room_builder`.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ ignored_users(set(str)): Set of users ignored by user.
+ room_builder(RoomSyncResultBuilder)
+ ephemeral(list): List of new ephemeral events for room
+ tags(list): List of *all* tags for room, or None if there has been
+ no change.
+ account_data(list): List of new account data for room
+ always_include(bool): Always include this room in the sync response,
+ even if empty.
+ """
+ newly_joined = room_builder.newly_joined
+ full_state = (
+ room_builder.full_state
+ or newly_joined
+ or sync_result_builder.full_state
+ )
+ events = room_builder.events
+
+ # We want to shortcut out as early as possible.
+ if not (always_include or account_data or ephemeral or full_state):
+ if events == [] and tags is None:
+ return
+
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+
+ room_id = room_builder.room_id
+ since_token = room_builder.since_token
+ upto_token = room_builder.upto_token
+
+ batch = yield self._load_filtered_recents(
+ room_id, sync_config,
+ now_token=upto_token,
+ since_token=since_token,
+ recents=events,
+ newly_joined_room=newly_joined,
+ )
+
+ account_data_events = []
+ if tags is not None:
+ account_data_events.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data_events
+ )
+
+ ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+
+ if not (always_include or batch or account_data or ephemeral or full_state):
+ return
+
+ state = yield self.compute_state_delta(
+ room_id, batch, sync_config, since_token, now_token,
+ full_state=full_state
+ )
+
+ if room_builder.rtype == "joined":
+ unread_notifications = {}
+ room_sync = JoinedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=state,
+ ephemeral=ephemeral,
+ account_data=account_data_events,
+ unread_notifications=unread_notifications,
+ )
+
+ if room_sync or always_include:
+ notifs = yield self.unread_notifs_for_room_id(
+ room_id, sync_config
+ )
+
+ if notifs is not None:
+ unread_notifications["notification_count"] = notifs["notify_count"]
+ unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+ sync_result_builder.joined.append(room_sync)
+ elif room_builder.rtype == "archived":
+ room_sync = ArchivedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=state,
+ account_data=account_data,
+ )
+ if room_sync or always_include:
+ sync_result_builder.archived.append(room_sync)
+ else:
+ raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1019,3 +1081,51 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
(e.type, e.state_key): e
for e in evs
}
+
+
+class SyncResultBuilder(object):
+ "Used to help build up a new SyncResult for a user"
+ def __init__(self, sync_config, full_state, since_token, now_token):
+ """
+ Args:
+ sync_config(SyncConfig)
+ full_state(bool): The full_state flag as specified by user
+ since_token(StreamToken): The token supplied by user, or None.
+ now_token(StreamToken): The token to sync up to.
+ """
+ self.sync_config = sync_config
+ self.full_state = full_state
+ self.since_token = since_token
+ self.now_token = now_token
+
+ self.presence = []
+ self.account_data = []
+ self.joined = []
+ self.invited = []
+ self.archived = []
+
+
+class RoomSyncResultBuilder(object):
+ """Stores information needed to create either a `JoinedSyncResult` or
+ `ArchivedSyncResult`.
+ """
+ def __init__(self, room_id, rtype, events, newly_joined, full_state,
+ since_token, upto_token):
+ """
+ Args:
+ room_id(str)
+ rtype(str): One of `"joined"` or `"archived"`
+ events(list): List of events to include in the room, (more events
+ may be added when generating result).
+ newly_joined(bool): If the user has newly joined the room
+ full_state(bool): Whether the full state should be sent in result
+ since_token(StreamToken): Earliest point to return events from, or None
+ upto_token(StreamToken): Latest point to return events from.
+ """
+ self.room_id = room_id
+ self.rtype = rtype
+ self.events = events
+ self.newly_joined = newly_joined
+ self.full_state = full_state
+ self.since_token = since_token
+ self.upto_token = upto_token
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 8ce27f49ec..861b8f7989 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,8 +15,6 @@
from twisted.internet import defer
-from ._base import BaseHandler
-
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.metrics import Measure
@@ -32,14 +30,16 @@ logger = logging.getLogger(__name__)
# A tiny object useful for storing a user's membership in a room, as a mapping
# key
-RoomMember = namedtuple("RoomMember", ("room_id", "user"))
+RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
-class TypingNotificationHandler(BaseHandler):
+class TypingHandler(object):
def __init__(self, hs):
- super(TypingNotificationHandler, self).__init__(hs)
-
- self.homeserver = hs
+ self.store = hs.get_datastore()
+ self.server_name = hs.config.server_name
+ self.auth = hs.get_auth()
+ self.is_mine_id = hs.is_mine_id
+ self.notifier = hs.get_notifier()
self.clock = hs.get_clock()
@@ -67,20 +67,23 @@ class TypingNotificationHandler(BaseHandler):
@defer.inlineCallbacks
def started_typing(self, target_user, auth_user, room_id, timeout):
- if not self.hs.is_mine(target_user):
+ target_user_id = target_user.to_string()
+ auth_user_id = auth_user.to_string()
+
+ if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != auth_user:
+ if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_joined_room(room_id, target_user.to_string())
+ yield self.auth.check_joined_room(room_id, target_user_id)
logger.debug(
- "%s has started typing in %s", target_user.to_string(), room_id
+ "%s has started typing in %s", target_user_id, room_id
)
until = self.clock.time_msec() + timeout
- member = RoomMember(room_id=room_id, user=target_user)
+ member = RoomMember(room_id=room_id, user_id=target_user_id)
was_present = member in self._member_typing_until
@@ -104,25 +107,28 @@ class TypingNotificationHandler(BaseHandler):
yield self._push_update(
room_id=room_id,
- user=target_user,
+ user_id=target_user_id,
typing=True,
)
@defer.inlineCallbacks
def stopped_typing(self, target_user, auth_user, room_id):
- if not self.hs.is_mine(target_user):
+ target_user_id = target_user.to_string()
+ auth_user_id = auth_user.to_string()
+
+ if not self.is_mine_id(target_user_id):
raise SynapseError(400, "User is not hosted on this Home Server")
- if target_user != auth_user:
+ if target_user_id != auth_user_id:
raise AuthError(400, "Cannot set another user's typing state")
- yield self.auth.check_joined_room(room_id, target_user.to_string())
+ yield self.auth.check_joined_room(room_id, target_user_id)
logger.debug(
- "%s has stopped typing in %s", target_user.to_string(), room_id
+ "%s has stopped typing in %s", target_user_id, room_id
)
- member = RoomMember(room_id=room_id, user=target_user)
+ member = RoomMember(room_id=room_id, user_id=target_user_id)
if member in self._member_typing_timer:
self.clock.cancel_call_later(self._member_typing_timer[member])
@@ -132,8 +138,9 @@ class TypingNotificationHandler(BaseHandler):
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
- if self.hs.is_mine(user):
- member = RoomMember(room_id=room_id, user=user)
+ user_id = user.to_string()
+ if self.is_mine_id(user_id):
+ member = RoomMember(room_id=room_id, user_id=user_id)
yield self._stopped_typing(member)
@defer.inlineCallbacks
@@ -144,7 +151,7 @@ class TypingNotificationHandler(BaseHandler):
yield self._push_update(
room_id=member.room_id,
- user=member.user,
+ user_id=member.user_id,
typing=False,
)
@@ -156,61 +163,53 @@ class TypingNotificationHandler(BaseHandler):
del self._member_typing_timer[member]
@defer.inlineCallbacks
- def _push_update(self, room_id, user, typing):
- localusers = set()
- remotedomains = set()
-
- rm_handler = self.homeserver.get_handlers().room_member_handler
- yield rm_handler.fetch_room_distributions_into(
- room_id, localusers=localusers, remotedomains=remotedomains
- )
-
- if localusers:
- self._push_update_local(
- room_id=room_id,
- user=user,
- typing=typing
- )
+ def _push_update(self, room_id, user_id, typing):
+ domains = yield self.store.get_joined_hosts_for_room(room_id)
deferreds = []
- for domain in remotedomains:
- deferreds.append(self.federation.send_edu(
- destination=domain,
- edu_type="m.typing",
- content={
- "room_id": room_id,
- "user_id": user.to_string(),
- "typing": typing,
- },
- ))
+ for domain in domains:
+ if domain == self.server_name:
+ self._push_update_local(
+ room_id=room_id,
+ user_id=user_id,
+ typing=typing
+ )
+ else:
+ deferreds.append(self.federation.send_edu(
+ destination=domain,
+ edu_type="m.typing",
+ content={
+ "room_id": room_id,
+ "user_id": user_id,
+ "typing": typing,
+ },
+ ))
yield defer.DeferredList(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
room_id = content["room_id"]
- user = UserID.from_string(content["user_id"])
+ user_id = content["user_id"]
- localusers = set()
+ # Check that the string is a valid user id
+ UserID.from_string(user_id)
- rm_handler = self.homeserver.get_handlers().room_member_handler
- yield rm_handler.fetch_room_distributions_into(
- room_id, localusers=localusers
- )
+ domains = yield self.store.get_joined_hosts_for_room(room_id)
- if localusers:
+ if self.server_name in domains:
self._push_update_local(
room_id=room_id,
- user=user,
+ user_id=user_id,
typing=content["typing"]
)
- def _push_update_local(self, room_id, user, typing):
+ def _push_update_local(self, room_id, user_id, typing):
room_set = self._room_typing.setdefault(room_id, set())
if typing:
- room_set.add(user)
+ room_set.add(user_id)
else:
- room_set.discard(user)
+ room_set.discard(user_id)
self._latest_room_serial += 1
self._room_serials[room_id] = self._latest_room_serial
@@ -226,9 +225,7 @@ class TypingNotificationHandler(BaseHandler):
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
typing = self._room_typing[room_id]
- typing_bytes = json.dumps([
- u.to_string() for u in typing
- ], ensure_ascii=False)
+ typing_bytes = json.dumps(list(typing), ensure_ascii=False)
rows.append((serial, room_id, typing_bytes))
rows.sort()
return rows
@@ -238,34 +235,26 @@ class TypingNotificationEventSource(object):
def __init__(self, hs):
self.hs = hs
self.clock = hs.get_clock()
- self._handler = None
- self._room_member_handler = None
-
- def handler(self):
- # Avoid cyclic dependency in handler setup
- if not self._handler:
- self._handler = self.hs.get_handlers().typing_notification_handler
- return self._handler
-
- def room_member_handler(self):
- if not self._room_member_handler:
- self._room_member_handler = self.hs.get_handlers().room_member_handler
- return self._room_member_handler
+ # We can't call get_typing_handler here because there's a cycle:
+ #
+ # Typing -> Notifier -> TypingNotificationEventSource -> Typing
+ #
+ self.get_typing_handler = hs.get_typing_handler
def _make_event_for(self, room_id):
- typing = self.handler()._room_typing[room_id]
+ typing = self.get_typing_handler()._room_typing[room_id]
return {
"type": "m.typing",
"room_id": room_id,
"content": {
- "user_ids": [u.to_string() for u in typing],
+ "user_ids": list(typing),
},
}
def get_new_events(self, from_key, room_ids, **kwargs):
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
- handler = self.handler()
+ handler = self.get_typing_handler()
events = []
for room_id in room_ids:
@@ -279,7 +268,7 @@ class TypingNotificationEventSource(object):
return events, handler._latest_room_serial
def get_current_key(self):
- return self.handler()._latest_room_serial
+ return self.get_typing_handler()._latest_room_serial
def get_pagination_rows(self, user, pagination_config, key):
return ([], pagination_config.from_key)
|