diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8725c3c420..6a2339f2eb 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,7 @@ from synapse.appservice.scheduler import AppServiceScheduler
from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler
from .room import (
- RoomCreationHandler, RoomMemberHandler, RoomListHandler
+ RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler,
)
from .message import MessageHandler
from .events import EventStreamHandler, EventHandler
@@ -32,6 +32,7 @@ from .sync import SyncHandler
from .auth import AuthHandler
from .identity import IdentityHandler
from .receipts import ReceiptsHandler
+from .search import SearchHandler
class Handlers(object):
@@ -68,3 +69,5 @@ class Handlers(object):
self.sync_handler = SyncHandler(hs)
self.auth_handler = AuthHandler(hs)
self.identity_handler = IdentityHandler(hs)
+ self.search_handler = SearchHandler(hs)
+ self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index cb992143f5..6519f183df 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,7 +15,7 @@
from twisted.internet import defer
-from synapse.api.errors import LimitExceededError, SynapseError
+from synapse.api.errors import LimitExceededError, SynapseError, AuthError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias
@@ -29,6 +29,12 @@ logger = logging.getLogger(__name__)
class BaseHandler(object):
+ """
+ Common base class for the event handlers.
+
+ :type store: synapse.storage.events.StateStore
+ :type state_handler: synapse.state.StateHandler
+ """
def __init__(self, hs):
self.store = hs.get_datastore()
@@ -45,6 +51,74 @@ class BaseHandler(object):
self.event_builder_factory = hs.get_event_builder_factory()
+ @defer.inlineCallbacks
+ def _filter_events_for_client(self, user_id, events, is_guest=False,
+ require_all_visible_for_guests=True):
+ # Assumes that user has at some point joined the room if not is_guest.
+
+ def allowed(event, membership, visibility):
+ if visibility == "world_readable":
+ return True
+
+ if is_guest:
+ return False
+
+ if membership == Membership.JOIN:
+ return True
+
+ if event.type == EventTypes.RoomHistoryVisibility:
+ return not is_guest
+
+ if visibility == "shared":
+ return True
+ elif visibility == "joined":
+ return membership == Membership.JOIN
+ elif visibility == "invited":
+ return membership == Membership.INVITE
+
+ return True
+
+ event_id_to_state = yield self.store.get_state_for_events(
+ frozenset(e.event_id for e in events),
+ types=(
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, user_id),
+ )
+ )
+
+ events_to_return = []
+ for event in events:
+ state = event_id_to_state[event.event_id]
+
+ membership_event = state.get((EventTypes.Member, user_id), None)
+ if membership_event:
+ membership = membership_event.membership
+ else:
+ membership = None
+
+ visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+ if visibility_event:
+ visibility = visibility_event.content.get("history_visibility", "shared")
+ else:
+ visibility = "shared"
+
+ should_include = allowed(event, membership, visibility)
+ if should_include:
+ events_to_return.append(event)
+
+ if (require_all_visible_for_guests
+ and is_guest
+ and len(events_to_return) < len(events)):
+ # This indicates that some events in the requested range were not
+ # visible to guest users. To be safe, we reject the entire request,
+ # so that we don't have to worry about interpreting visibility
+ # boundaries.
+ raise AuthError(403, "User %s does not have permission" % (
+ user_id
+ ))
+
+ defer.returnValue(events_to_return)
+
def ratelimit(self, user_id):
time_now = self.clock.time()
allowed, time_allowed = self.ratelimiter.send_message(
@@ -107,6 +181,8 @@ class BaseHandler(object):
if not suppress_auth:
self.auth.check(event, auth_events=context.current_state)
+ yield self.maybe_kick_guest_users(event, context.current_state.values())
+
if event.type == EventTypes.CanonicalAlias:
# Check the alias is acually valid (at this time at least)
room_alias_str = event.content.get("alias", None)
@@ -123,29 +199,63 @@ class BaseHandler(object):
)
)
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
- event, context=context
- )
-
federation_handler = self.hs.get_handlers().federation_handler
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
+ event.unsigned["invite_room_state"] = [
+ {
+ "type": e.type,
+ "state_key": e.state_key,
+ "content": e.content,
+ "sender": e.sender,
+ }
+ for k, e in context.current_state.items()
+ if e.type in (
+ EventTypes.JoinRules,
+ EventTypes.CanonicalAlias,
+ EventTypes.RoomAvatar,
+ EventTypes.Name,
+ )
+ ]
+
invitee = UserID.from_string(event.state_key)
if not self.hs.is_mine(invitee):
# TODO: Can we add signature from remote server in a nicer
# way? If we have been invited by a remote server, we need
# to get them to sign the event.
+
returned_invite = yield federation_handler.send_invite(
invitee.domain,
event,
)
+ event.unsigned.pop("room_state", None)
+
# TODO: Make sure the signatures actually are correct.
event.signatures.update(
returned_invite.signatures
)
+ if event.type == EventTypes.Redaction:
+ if self.auth.check_redaction(event, auth_events=context.current_state):
+ original_event = yield self.store.get_event(
+ event.redacts,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=False,
+ allow_none=False
+ )
+ if event.user_id != original_event.user_id:
+ raise AuthError(
+ 403,
+ "You don't have permission to redact events"
+ )
+
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
+ )
+
destinations = set(extra_destinations)
for k, s in context.current_state.items():
try:
@@ -174,6 +284,64 @@ class BaseHandler(object):
notify_d.addErrback(log_failure)
+ # If invite, remove room_state from unsigned before sending.
+ event.unsigned.pop("invite_room_state", None)
+
federation_handler.handle_new_event(
event, destinations=destinations,
)
+
+ @defer.inlineCallbacks
+ def maybe_kick_guest_users(self, event, current_state):
+ # Technically this function invalidates current_state by changing it.
+ # Hopefully this isn't that important to the caller.
+ if event.type == EventTypes.GuestAccess:
+ guest_access = event.content.get("guest_access", "forbidden")
+ if guest_access != "can_join":
+ yield self.kick_guest_users(current_state)
+
+ @defer.inlineCallbacks
+ def kick_guest_users(self, current_state):
+ for member_event in current_state:
+ try:
+ if member_event.type != EventTypes.Member:
+ continue
+
+ if not self.hs.is_mine(UserID.from_string(member_event.state_key)):
+ continue
+
+ if member_event.content["membership"] not in {
+ Membership.JOIN,
+ Membership.INVITE
+ }:
+ continue
+
+ if (
+ "kind" not in member_event.content
+ or member_event.content["kind"] != "guest"
+ ):
+ continue
+
+ # We make the user choose to leave, rather than have the
+ # event-sender kick them. This is partially because we don't
+ # need to worry about power levels, and partially because guest
+ # users are a concept which doesn't hugely work over federation,
+ # and having homeservers have their own users leave keeps more
+ # of that decision-making and control local to the guest-having
+ # homeserver.
+ message_handler = self.hs.get_handlers().message_handler
+ yield message_handler.create_and_send_event(
+ {
+ "type": EventTypes.Member,
+ "state_key": member_event.state_key,
+ "content": {
+ "membership": Membership.LEAVE,
+ "kind": "guest"
+ },
+ "room_id": member_event.room_id,
+ "sender": member_event.state_key
+ },
+ ratelimit=False,
+ )
+ except Exception as e:
+ logger.warn("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 1c9e7152c7..d852a18555 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -34,6 +34,7 @@ class AdminHandler(BaseHandler):
d = {}
for r in res:
+ # Note that device_id is always None
device = d.setdefault(r["device_id"], {})
session = device.setdefault(r["access_token"], [])
session.append({
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 602c5bcd89..be157e2bb7 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,14 +18,14 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.types import UserID
-from synapse.api.errors import LoginError, Codes
-from synapse.http.client import SimpleHttpClient
+from synapse.api.errors import AuthError, LoginError, Codes
from synapse.util.async import run_on_reactor
from twisted.web.client import PartialDownloadError
import logging
import bcrypt
+import pymacaroons
import simplejson
import synapse.util.stringutils as stringutils
@@ -44,7 +44,9 @@ class AuthHandler(BaseHandler):
LoginType.EMAIL_IDENTITY: self._check_email_identity,
LoginType.DUMMY: self._check_dummy_auth,
}
+ self.bcrypt_rounds = hs.config.bcrypt_rounds
self.sessions = {}
+ self.INVALID_TOKEN_HTTP_STATUS = 401
@defer.inlineCallbacks
def check_auth(self, flows, clientdict, clientip):
@@ -186,7 +188,7 @@ class AuthHandler(BaseHandler):
# TODO: get this from the homeserver rather than creating a new one for
# each request
try:
- client = SimpleHttpClient(self.hs)
+ client = self.hs.get_simple_http_client()
resp_body = yield client.post_urlencoded_get_json(
self.hs.config.recaptcha_siteverify_api,
args={
@@ -279,7 +281,10 @@ class AuthHandler(BaseHandler):
user_id (str): User ID
password (str): Password
Returns:
- The access token for the user's session.
+ A tuple of:
+ The user's ID.
+ The access token for the user's session.
+ The refresh token for the user's session.
Raises:
StoreError if there was a problem storing the token.
LoginError if there was an authentication problem.
@@ -287,11 +292,43 @@ class AuthHandler(BaseHandler):
user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
self._check_password(user_id, password, password_hash)
- reg_handler = self.hs.get_handlers().registration_handler
- access_token = reg_handler.generate_token(user_id)
logger.info("Logging in user %s", user_id)
- yield self.store.add_access_token_to_user(user_id, access_token)
- defer.returnValue((user_id, access_token))
+ access_token = yield self.issue_access_token(user_id)
+ refresh_token = yield self.issue_refresh_token(user_id)
+ defer.returnValue((user_id, access_token, refresh_token))
+
+ @defer.inlineCallbacks
+ def get_login_tuple_for_user_id(self, user_id):
+ """
+ Gets login tuple for the user with the given user ID.
+ The user is assumed to have been authenticated by some other
+ machanism (e.g. CAS)
+
+ Args:
+ user_id (str): User ID
+ Returns:
+ A tuple of:
+ The user's ID.
+ The access token for the user's session.
+ The refresh token for the user's session.
+ Raises:
+ StoreError if there was a problem storing the token.
+ LoginError if there was an authentication problem.
+ """
+ user_id, ignored = yield self._find_user_id_and_pwd_hash(user_id)
+
+ logger.info("Logging in user %s", user_id)
+ access_token = yield self.issue_access_token(user_id)
+ refresh_token = yield self.issue_refresh_token(user_id)
+ defer.returnValue((user_id, access_token, refresh_token))
+
+ @defer.inlineCallbacks
+ def does_user_exist(self, user_id):
+ try:
+ yield self._find_user_id_and_pwd_hash(user_id)
+ defer.returnValue(True)
+ except LoginError:
+ defer.returnValue(False)
@defer.inlineCallbacks
def _find_user_id_and_pwd_hash(self, user_id):
@@ -321,13 +358,82 @@ class AuthHandler(BaseHandler):
def _check_password(self, user_id, password, stored_hash):
"""Checks that user_id has passed password, raises LoginError if not."""
- if not bcrypt.checkpw(password, stored_hash):
+ if not self.validate_hash(password, stored_hash):
logger.warn("Failed password login for user %s", user_id)
raise LoginError(403, "", errcode=Codes.FORBIDDEN)
@defer.inlineCallbacks
+ def issue_access_token(self, user_id):
+ access_token = self.generate_access_token(user_id)
+ yield self.store.add_access_token_to_user(user_id, access_token)
+ defer.returnValue(access_token)
+
+ @defer.inlineCallbacks
+ def issue_refresh_token(self, user_id):
+ refresh_token = self.generate_refresh_token(user_id)
+ yield self.store.add_refresh_token_to_user(user_id, refresh_token)
+ defer.returnValue(refresh_token)
+
+ def generate_access_token(self, user_id, extra_caveats=None):
+ extra_caveats = extra_caveats or []
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = access")
+ now = self.hs.get_clock().time_msec()
+ expiry = now + (60 * 60 * 1000)
+ macaroon.add_first_party_caveat("time < %d" % (expiry,))
+ for caveat in extra_caveats:
+ macaroon.add_first_party_caveat(caveat)
+ return macaroon.serialize()
+
+ def generate_refresh_token(self, user_id):
+ m = self._generate_base_macaroon(user_id)
+ m.add_first_party_caveat("type = refresh")
+ # Important to add a nonce, because otherwise every refresh token for a
+ # user will be the same.
+ m.add_first_party_caveat("nonce = %s" % (
+ stringutils.random_string_with_symbols(16),
+ ))
+ return m.serialize()
+
+ def generate_short_term_login_token(self, user_id):
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = login")
+ now = self.hs.get_clock().time_msec()
+ expiry = now + (2 * 60 * 1000)
+ macaroon.add_first_party_caveat("time < %d" % (expiry,))
+ return macaroon.serialize()
+
+ def validate_short_term_login_token_and_get_user_id(self, login_token):
+ try:
+ macaroon = pymacaroons.Macaroon.deserialize(login_token)
+ auth_api = self.hs.get_auth()
+ auth_api.validate_macaroon(macaroon, "login", [auth_api.verify_expiry])
+ return self._get_user_from_macaroon(macaroon)
+ except (pymacaroons.exceptions.MacaroonException, TypeError, ValueError):
+ raise AuthError(401, "Invalid token", errcode=Codes.UNKNOWN_TOKEN)
+
+ def _generate_base_macaroon(self, user_id):
+ macaroon = pymacaroons.Macaroon(
+ location=self.hs.config.server_name,
+ identifier="key",
+ key=self.hs.config.macaroon_secret_key)
+ macaroon.add_first_party_caveat("gen = 1")
+ macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
+ return macaroon
+
+ def _get_user_from_macaroon(self, macaroon):
+ user_prefix = "user_id = "
+ for caveat in macaroon.caveats:
+ if caveat.caveat_id.startswith(user_prefix):
+ return caveat.caveat_id[len(user_prefix):]
+ raise AuthError(
+ self.INVALID_TOKEN_HTTP_STATUS, "No user_id found in token",
+ errcode=Codes.UNKNOWN_TOKEN
+ )
+
+ @defer.inlineCallbacks
def set_password(self, user_id, newpassword):
- password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt())
+ password_hash = self.hash(newpassword)
yield self.store.user_set_password_hash(user_id, password_hash)
yield self.store.user_delete_access_tokens(user_id)
@@ -349,3 +455,26 @@ class AuthHandler(BaseHandler):
def _remove_session(self, session):
logger.debug("Removing session %s", session)
del self.sessions[session["id"]]
+
+ def hash(self, password):
+ """Computes a secure hash of password.
+
+ Args:
+ password (str): Password to hash.
+
+ Returns:
+ Hashed password (str).
+ """
+ return bcrypt.hashpw(password, bcrypt.gensalt(self.bcrypt_rounds))
+
+ def validate_hash(self, password, stored_hash):
+ """Validates that self.hash(password) == stored_hash.
+
+ Args:
+ password (str): Password to hash.
+ stored_hash (str): Expected hash value.
+
+ Returns:
+ Whether self.hash(password) == stored_hash (bool).
+ """
+ return bcrypt.checkpw(password, stored_hash)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 891502c04f..0e4c0d4d06 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -47,10 +47,60 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
@defer.inlineCallbacks
+ def started_stream(self, user):
+ """Tells the presence handler that we have started an eventstream for
+ the user:
+
+ Args:
+ user (User): The user who started a stream.
+ Returns:
+ A deferred that completes once their presence has been updated.
+ """
+ if user not in self._streams_per_user:
+ self._streams_per_user[user] = 0
+ if user in self._stop_timer_per_user:
+ try:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(user)
+ )
+ except:
+ logger.exception("Failed to cancel event timer")
+ else:
+ yield self.distributor.fire("started_user_eventstream", user)
+
+ self._streams_per_user[user] += 1
+
+ def stopped_stream(self, user):
+ """If there are no streams for a user this starts a timer that will
+ notify the presence handler that we haven't got an event stream for
+ the user unless the user starts a new stream in 30 seconds.
+
+ Args:
+ user (User): The user who stopped a stream.
+ """
+ self._streams_per_user[user] -= 1
+ if not self._streams_per_user[user]:
+ del self._streams_per_user[user]
+
+ # 30 seconds of grace to allow the client to reconnect again
+ # before we think they're gone
+ def _later():
+ logger.debug("_later stopped_user_eventstream %s", user)
+
+ self._stop_timer_per_user.pop(user, None)
+
+ return self.distributor.fire("stopped_user_eventstream", user)
+
+ logger.debug("Scheduling _later: for %s", user)
+ self._stop_timer_per_user[user] = (
+ self.clock.call_later(30, _later)
+ )
+
+ @defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0,
as_client_event=True, affect_presence=True,
- only_room_events=False):
+ only_room_events=False, room_id=None, is_guest=False):
"""Fetches the events stream for a given user.
If `only_room_events` is `True` only room events will be returned.
@@ -59,31 +109,7 @@ class EventStreamHandler(BaseHandler):
try:
if affect_presence:
- if auth_user not in self._streams_per_user:
- self._streams_per_user[auth_user] = 0
- if auth_user in self._stop_timer_per_user:
- try:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user)
- )
- except:
- logger.exception("Failed to cancel event timer")
- else:
- yield self.distributor.fire(
- "started_user_eventstream", auth_user
- )
- self._streams_per_user[auth_user] += 1
-
- rm_handler = self.hs.get_handlers().room_member_handler
-
- app_service = yield self.store.get_app_service_by_user_id(
- auth_user.to_string()
- )
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- room_ids = set(r.room_id for r in rooms)
- else:
- room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user)
+ yield self.started_stream(auth_user)
if timeout:
# If they've set a timeout set a minimum limit.
@@ -93,9 +119,15 @@ class EventStreamHandler(BaseHandler):
# thundering herds on restart.
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+ if is_guest:
+ yield self.distributor.fire(
+ "user_joined_room", user=auth_user, room_id=room_id
+ )
+
events, tokens = yield self.notifier.get_events_for(
- auth_user, room_ids, pagin_config, timeout,
- only_room_events=only_room_events
+ auth_user, pagin_config, timeout,
+ only_room_events=only_room_events,
+ is_guest=is_guest, guest_room_id=room_id
)
time_now = self.clock.time_msec()
@@ -114,27 +146,7 @@ class EventStreamHandler(BaseHandler):
finally:
if affect_presence:
- self._streams_per_user[auth_user] -= 1
- if not self._streams_per_user[auth_user]:
- del self._streams_per_user[auth_user]
-
- # 10 seconds of grace to allow the client to reconnect again
- # before we think they're gone
- def _later():
- logger.debug(
- "_later stopped_user_eventstream %s", auth_user
- )
-
- self._stop_timer_per_user.pop(auth_user, None)
-
- return self.distributor.fire(
- "stopped_user_eventstream", auth_user
- )
-
- logger.debug("Scheduling _later: for %s", auth_user)
- self._stop_timer_per_user[auth_user] = (
- self.clock.call_later(30, _later)
- )
+ self.stopped_stream(auth_user)
class EventHandler(BaseHandler):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4ff20599d6..c1bce07e31 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,6 +21,7 @@ from synapse.api.errors import (
AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
@@ -40,7 +41,6 @@ from twisted.internet import defer
import itertools
import logging
-
logger = logging.getLogger(__name__)
@@ -58,6 +58,8 @@ class FederationHandler(BaseHandler):
def __init__(self, hs):
super(FederationHandler, self).__init__(hs)
+ self.hs = hs
+
self.distributor.observe(
"user_joined_room",
self._on_user_joined
@@ -68,12 +70,9 @@ class FederationHandler(BaseHandler):
self.store = hs.get_datastore()
self.replication_layer = hs.get_replication_layer()
self.state_handler = hs.get_state_handler()
- # self.auth_handler = gs.get_auth_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
- self.lock_manager = hs.get_room_lock_manager()
-
self.replication_layer.set_handler(self)
# When joining a room we need to queue any events for that room up
@@ -125,60 +124,72 @@ class FederationHandler(BaseHandler):
)
if not is_in_room and not event.internal_metadata.is_outlier():
logger.debug("Got event for room we're not in.")
- current_state = state
- event_ids = set()
- if state:
- event_ids |= {e.event_id for e in state}
- if auth_chain:
- event_ids |= {e.event_id for e in auth_chain}
+ try:
+ event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ auth_chain, state, event
+ )
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ else:
+ event_ids = set()
+ if state:
+ event_ids |= {e.event_id for e in state}
+ if auth_chain:
+ event_ids |= {e.event_id for e in auth_chain}
+
+ seen_ids = set(
+ (yield self.store.have_events(event_ids)).keys()
+ )
- if state and auth_chain is not None:
- # If we have any state or auth_chain given to us by the replication
- # layer, then we should handle them (if we haven't before.)
+ if state and auth_chain is not None:
+ # If we have any state or auth_chain given to us by the replication
+ # layer, then we should handle them (if we haven't before.)
- event_infos = []
+ event_infos = []
- for e in itertools.chain(auth_chain, state):
- if e.event_id in seen_ids:
- continue
- e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- event_infos.append({
- "event": e,
- "auth_events": auth,
- })
- seen_ids.add(e.event_id)
+ for e in itertools.chain(auth_chain, state):
+ if e.event_id in seen_ids:
+ continue
+ e.internal_metadata.outlier = True
+ auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in auth_chain
+ if e.event_id in auth_ids or e.type == EventTypes.Create
+ }
+ event_infos.append({
+ "event": e,
+ "auth_events": auth,
+ })
+ seen_ids.add(e.event_id)
- yield self._handle_new_events(
- origin,
- event_infos,
- outliers=True
- )
+ yield self._handle_new_events(
+ origin,
+ event_infos,
+ outliers=True
+ )
- try:
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
- origin,
- event,
- state=state,
- backfilled=backfilled,
- current_state=current_state,
- )
- except AuthError as e:
- raise FederationError(
- "ERROR",
- e.code,
- e.msg,
- affected=event.event_id,
- )
+ try:
+ _, event_stream_id, max_stream_id = yield self._handle_new_event(
+ origin,
+ event,
+ state=state,
+ backfilled=backfilled,
+ current_state=current_state,
+ )
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
# if we're receiving valid events from an origin,
# it's probably a good idea to mark it as not in retry-state
@@ -230,7 +241,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
- room_id, frozenset(e.event_id for e in events),
+ frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
@@ -553,7 +564,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
+ def do_invite_join(self, target_hosts, room_id, joinee, content):
""" Attempts to join the `joinee` to the room `room_id` via the
server `target_host`.
@@ -569,49 +580,19 @@ class FederationHandler(BaseHandler):
yield self.store.clean_room_for_join(room_id)
- origin, pdu = yield self.replication_layer.make_join(
+ origin, event = yield self._make_and_verify_event(
target_hosts,
room_id,
- joinee
+ joinee,
+ "join",
+ content,
)
- logger.debug("Got response to make_join: %s", pdu)
-
- event = pdu
-
- # We should assert some things.
- # FIXME: Do this in a nicer way
- assert(event.type == EventTypes.Member)
- assert(event.user_id == joinee)
- assert(event.state_key == joinee)
- assert(event.room_id == room_id)
-
- event.internal_metadata.outlier = False
-
self.room_queues[room_id] = []
-
- builder = self.event_builder_factory.new(
- unfreeze(event.get_pdu_json())
- )
-
handled_events = set()
try:
- builder.event_id = self.event_builder_factory.create_event_id()
- builder.origin = self.hs.hostname
- builder.content = content
-
- if not hasattr(event, "signatures"):
- builder.signatures = {}
-
- add_hashes_and_signatures(
- builder,
- self.hs.hostname,
- self.hs.config.signing_key[0],
- )
-
- new_event = builder.build()
-
+ new_event = self._sign_event(event)
# Try the host we successfully got a response to /make_join/
# request first.
try:
@@ -619,11 +600,7 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
-
- ret = yield self.replication_layer.send_join(
- target_hosts,
- new_event
- )
+ ret = yield self.replication_layer.send_join(target_hosts, new_event)
origin = ret["origin"]
state = ret["state"]
@@ -649,35 +626,8 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- ev_infos = []
- for e in itertools.chain(state, auth_chain):
- if e.event_id == event.event_id:
- continue
-
- e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
- ev_infos.append({
- "event": e,
- "auth_events": {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- })
-
- yield self._handle_new_events(origin, ev_infos, outliers=True)
-
- auth_ids = [e_id for e_id, _ in event.auth_events]
- auth_events = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
-
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
- origin,
- new_event,
- state=state,
- current_state=state,
- auth_events=auth_events,
+ event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ auth_chain, state, event
)
with PreserveLoggingContext():
@@ -714,12 +664,14 @@ class FederationHandler(BaseHandler):
@log_function
def on_make_join_request(self, room_id, user_id):
""" We've received a /make_join/ request, so we create a partial
- join event for the room and return that. We don *not* persist or
+ join event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
+ event_content = {"membership": Membership.JOIN}
+
builder = self.event_builder_factory.new({
"type": EventTypes.Member,
- "content": {"membership": Membership.JOIN},
+ "content": event_content,
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
@@ -865,6 +817,168 @@ class FederationHandler(BaseHandler):
defer.returnValue(event)
@defer.inlineCallbacks
+ def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
+ origin, event = yield self._make_and_verify_event(
+ target_hosts,
+ room_id,
+ user_id,
+ "leave"
+ )
+ signed_event = self._sign_event(event)
+
+ # Try the host we successfully got a response to /make_join/
+ # request first.
+ try:
+ target_hosts.remove(origin)
+ target_hosts.insert(0, origin)
+ except ValueError:
+ pass
+
+ yield self.replication_layer.send_leave(
+ target_hosts,
+ signed_event
+ )
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
+ content={},):
+ origin, pdu = yield self.replication_layer.make_membership_event(
+ target_hosts,
+ room_id,
+ user_id,
+ membership,
+ content,
+ )
+
+ logger.debug("Got response to make_%s: %s", membership, pdu)
+
+ event = pdu
+
+ # We should assert some things.
+ # FIXME: Do this in a nicer way
+ assert(event.type == EventTypes.Member)
+ assert(event.user_id == user_id)
+ assert(event.state_key == user_id)
+ assert(event.room_id == room_id)
+ defer.returnValue((origin, event))
+
+ def _sign_event(self, event):
+ event.internal_metadata.outlier = False
+
+ builder = self.event_builder_factory.new(
+ unfreeze(event.get_pdu_json())
+ )
+
+ builder.event_id = self.event_builder_factory.create_event_id()
+ builder.origin = self.hs.hostname
+
+ if not hasattr(event, "signatures"):
+ builder.signatures = {}
+
+ add_hashes_and_signatures(
+ builder,
+ self.hs.hostname,
+ self.hs.config.signing_key[0],
+ )
+
+ return builder.build()
+
+ @defer.inlineCallbacks
+ @log_function
+ def on_make_leave_request(self, room_id, user_id):
+ """ We've received a /make_leave/ request, so we create a partial
+ join event for the room and return that. We do *not* persist or
+ process it until the other server has signed it and sent it back.
+ """
+ builder = self.event_builder_factory.new({
+ "type": EventTypes.Member,
+ "content": {"membership": Membership.LEAVE},
+ "room_id": room_id,
+ "sender": user_id,
+ "state_key": user_id,
+ })
+
+ event, context = yield self._create_new_client_event(
+ builder=builder,
+ )
+
+ self.auth.check(event, auth_events=context.current_state)
+
+ defer.returnValue(event)
+
+ @defer.inlineCallbacks
+ @log_function
+ def on_send_leave_request(self, origin, pdu):
+ """ We have received a leave event for a room. Fully process it."""
+ event = pdu
+
+ logger.debug(
+ "on_send_leave_request: Got event: %s, signatures: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ event.internal_metadata.outlier = False
+
+ context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ origin, event
+ )
+
+ logger.debug(
+ "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
+
+ with PreserveLoggingContext():
+ d = self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
+
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
+ new_pdu = event
+
+ destinations = set()
+
+ for k, s in context.current_state.items():
+ try:
+ if k[0] == EventTypes.Member:
+ if s.content["membership"] == Membership.LEAVE:
+ destinations.add(
+ UserID.from_string(s.state_key).domain
+ )
+ except:
+ logger.warn(
+ "Failed to get destination from event %s", s.event_id
+ )
+
+ destinations.discard(origin)
+
+ logger.debug(
+ "on_send_leave_request: Sending event: %s, signatures: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ self.replication_layer.send_pdu(new_pdu, destinations)
+
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
yield run_on_reactor()
@@ -986,8 +1100,6 @@ class FederationHandler(BaseHandler):
context = yield self._prep_event(
origin, event,
state=state,
- backfilled=backfilled,
- current_state=current_state,
auth_events=auth_events,
)
@@ -1010,7 +1122,6 @@ class FederationHandler(BaseHandler):
origin,
ev_info["event"],
state=ev_info.get("state"),
- backfilled=backfilled,
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
@@ -1027,8 +1138,77 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _prep_event(self, origin, event, state=None, backfilled=False,
- current_state=None, auth_events=None):
+ def _persist_auth_tree(self, auth_events, state, event):
+ """Checks the auth chain is valid (and passes auth checks) for the
+ state and event. Then persists the auth chain and state atomically.
+ Persists the event seperately.
+
+ Returns:
+ 2-tuple of (event_stream_id, max_stream_id) from the persist_event
+ call for `event`
+ """
+ events_to_context = {}
+ for e in itertools.chain(auth_events, state):
+ ctx = yield self.state_handler.compute_event_context(
+ e, outlier=True,
+ )
+ events_to_context[e.event_id] = ctx
+ e.internal_metadata.outlier = True
+
+ event_map = {
+ e.event_id: e
+ for e in auth_events
+ }
+
+ create_event = None
+ for e in auth_events:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ create_event = e
+ break
+
+ for e in itertools.chain(auth_events, state, [event]):
+ auth_for_e = {
+ (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
+ for e_id, _ in e.auth_events
+ }
+ if create_event:
+ auth_for_e[(EventTypes.Create, "")] = create_event
+
+ try:
+ self.auth.check(e, auth_events=auth_for_e)
+ except AuthError as err:
+ logger.warn(
+ "Rejecting %s because %s",
+ e.event_id, err.msg
+ )
+
+ if e == event:
+ raise
+ events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
+
+ yield self.store.persist_events(
+ [
+ (e, events_to_context[e.event_id])
+ for e in itertools.chain(auth_events, state)
+ ],
+ is_new_state=False,
+ )
+
+ new_event_context = yield self.state_handler.compute_event_context(
+ event, old_state=state, outlier=False,
+ )
+
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event, new_event_context,
+ backfilled=False,
+ is_new_state=True,
+ current_state=state,
+ )
+
+ defer.returnValue((event_stream_id, max_stream_id))
+
+ @defer.inlineCallbacks
+ def _prep_event(self, origin, event, state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context(
@@ -1061,6 +1241,10 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
+ if event.type == EventTypes.GuestAccess:
+ full_context = yield self.store.get_current_state(room_id=event.room_id)
+ yield self.maybe_kick_guest_users(event, full_context)
+
defer.returnValue(context)
@defer.inlineCallbacks
@@ -1166,7 +1350,7 @@ class FederationHandler(BaseHandler):
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in remote_auth_chain
- if e.event_id in auth_ids
+ if e.event_id in auth_ids or e.type == EventTypes.Create
}
e.internal_metadata.outlier = True
@@ -1284,6 +1468,7 @@ class FederationHandler(BaseHandler):
(e.type, e.state_key): e
for e in result["auth_chain"]
if e.event_id in auth_ids
+ or event.type == EventTypes.Create
}
ev.internal_metadata.outlier = True
@@ -1458,50 +1643,73 @@ class FederationHandler(BaseHandler):
})
@defer.inlineCallbacks
- def _handle_auth_events(self, origin, auth_events):
- auth_ids_to_deferred = {}
-
- def process_auth_ev(ev):
- auth_ids = [e_id for e_id, _ in ev.auth_events]
-
- prev_ds = [
- auth_ids_to_deferred[i]
- for i in auth_ids
- if i in auth_ids_to_deferred
- ]
-
- d = defer.Deferred()
+ @log_function
+ def exchange_third_party_invite(self, invite):
+ sender = invite["sender"]
+ room_id = invite["room_id"]
- auth_ids_to_deferred[ev.event_id] = d
+ event_dict = {
+ "type": EventTypes.Member,
+ "content": {
+ "membership": Membership.INVITE,
+ "third_party_invite": invite,
+ },
+ "room_id": room_id,
+ "sender": sender,
+ "state_key": invite["mxid"],
+ }
+
+ if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
+ builder = self.event_builder_factory.new(event_dict)
+ EventValidator().validate_new(builder)
+ event, context = yield self._create_new_client_event(builder=builder)
+ self.auth.check(event, context.current_state)
+ yield self._validate_keyserver(event, auth_events=context.current_state)
+ member_handler = self.hs.get_handlers().room_member_handler
+ yield member_handler.change_membership(event, context)
+ else:
+ destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
+ yield self.replication_layer.forward_third_party_invite(
+ destinations,
+ room_id,
+ event_dict,
+ )
- @defer.inlineCallbacks
- def f(*_):
- ev.internal_metadata.outlier = True
+ @defer.inlineCallbacks
+ @log_function
+ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+ builder = self.event_builder_factory.new(event_dict)
- try:
- auth = {
- (e.type, e.state_key): e for e in auth_events
- if e.event_id in auth_ids
- }
+ event, context = yield self._create_new_client_event(
+ builder=builder,
+ )
- yield self._handle_new_event(
- origin, ev, auth_events=auth
- )
- except:
- logger.exception(
- "Failed to handle auth event %s",
- ev.event_id,
- )
+ self.auth.check(event, auth_events=context.current_state)
+ yield self._validate_keyserver(event, auth_events=context.current_state)
- d.callback(None)
+ returned_invite = yield self.send_invite(origin, event)
+ # TODO: Make sure the signatures actually are correct.
+ event.signatures.update(returned_invite.signatures)
+ member_handler = self.hs.get_handlers().room_member_handler
+ yield member_handler.change_membership(event, context)
- if prev_ds:
- dx = defer.DeferredList(prev_ds)
- dx.addBoth(f)
- else:
- f()
+ @defer.inlineCallbacks
+ def _validate_keyserver(self, event, auth_events):
+ token = event.content["third_party_invite"]["signed"]["token"]
- for e in auth_events:
- process_auth_ev(e)
+ invite_event = auth_events.get(
+ (EventTypes.ThirdPartyInvite, token,)
+ )
- yield defer.DeferredList(auth_ids_to_deferred.values())
+ try:
+ response = yield self.hs.get_simple_http_client().get_json(
+ invite_event.content["key_validity_url"],
+ {"public_key": invite_event.content["public_key"]}
+ )
+ except Exception:
+ raise SynapseError(
+ 502,
+ "Third party certificate could not be checked"
+ )
+ if "valid" not in response or not response["valid"]:
+ raise AuthError(403, "Third party certificate was invalid")
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f12465fa2c..14051aee99 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,13 +16,13 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError, SynapseError
+from synapse.api.errors import SynapseError, AuthError, Codes
from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID, RoomStreamToken
+from synapse.types import UserID, RoomStreamToken, StreamToken
from ._base import BaseHandler
@@ -71,34 +71,64 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
- feedback=False, as_client_event=True):
+ as_client_event=True, is_guest=False):
"""Get messages in a room.
Args:
user_id (str): The user requesting messages.
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- feedback (bool): True to get compressed feedback with the messages
+ config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
+ is_guest (bool): Whether the requesting user is a guest (as opposed
+ to a fully registered user).
Returns:
dict: Pagination API results
"""
- yield self.auth.check_joined_room(room_id, user_id)
-
data_source = self.hs.get_event_sources().sources["room"]
- if not pagin_config.from_token:
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token(
direction='b'
)
)
+ room_token = pagin_config.from_token.room_key
- room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+ room_token = RoomStreamToken.parse(room_token)
if room_token.topological is None:
raise SynapseError(400, "Invalid token")
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ if not is_guest:
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+ if member_event.membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room.
+ # If they're a guest, we'll just 403 them if they're asking for
+ # events they can't see.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event.event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < room_token.topological:
+ source_config.from_key = str(leave_token)
+
+ if source_config.direction == "f":
+ if source_config.to_key is None:
+ source_config.to_key = str(leave_token)
+ else:
+ to_token = RoomStreamToken.parse(source_config.to_key)
+ if leave_token.topological < to_token.topological:
+ source_config.to_key = str(leave_token)
+
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, room_token.topological
)
@@ -106,7 +136,7 @@ class MessageHandler(BaseHandler):
user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows(
- user, pagin_config.get_source_config("room"), room_id
+ user, source_config, room_id
)
next_token = pagin_config.from_token.copy_and_replace(
@@ -120,7 +150,7 @@ class MessageHandler(BaseHandler):
"end": next_token.to_string(),
})
- events = yield self._filter_events_for_client(user_id, room_id, events)
+ events = yield self._filter_events_for_client(user_id, events, is_guest=is_guest)
time_now = self.clock.time_msec()
@@ -136,54 +166,8 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
- def _filter_events_for_client(self, user_id, room_id, events):
- event_id_to_state = yield self.store.get_state_for_events(
- room_id, frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_id),
- )
- )
-
- def allowed(event, state):
- if event.type == EventTypes.RoomHistoryVisibility:
- return True
-
- membership_ev = state.get((EventTypes.Member, user_id), None)
- if membership_ev:
- membership = membership_ev.membership
- else:
- membership = Membership.LEAVE
-
- if membership == Membership.JOIN:
- return True
-
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- else:
- visibility = "shared"
-
- if visibility == "public":
- return True
- elif visibility == "shared":
- return True
- elif visibility == "joined":
- return membership == Membership.JOIN
- elif visibility == "invited":
- return membership == Membership.INVITE
-
- return True
-
- defer.returnValue([
- event
- for event in events
- if allowed(event, event_id_to_state[event.event_id])
- ])
-
- @defer.inlineCallbacks
def create_and_send_event(self, event_dict, ratelimit=True,
- client=None, txn_id=None):
+ token_id=None, txn_id=None, is_guest=False):
""" Given a dict from a client, create and handle a new event.
Creates an FrozenEvent object, filling out auth_events, prev_events,
@@ -217,11 +201,8 @@ class MessageHandler(BaseHandler):
builder.content
)
- if client is not None:
- if client.token_id is not None:
- builder.internal_metadata.token_id = client.token_id
- if client.device_id is not None:
- builder.internal_metadata.device_id = client.device_id
+ if token_id is not None:
+ builder.internal_metadata.token_id = token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
@@ -232,7 +213,7 @@ class MessageHandler(BaseHandler):
if event.type == EventTypes.Member:
member_handler = self.hs.get_handlers().room_member_handler
- yield member_handler.change_membership(event, context)
+ yield member_handler.change_membership(event, context, is_guest=is_guest)
else:
yield self.handle_new_client_event(
event=event,
@@ -248,7 +229,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
- event_type=None, state_key=""):
+ event_type=None, state_key="", is_guest=False):
""" Get data from a room.
Args:
@@ -258,29 +239,55 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- have_joined = yield self.auth.check_joined_room(room_id, user_id)
- if not have_joined:
- raise RoomError(403, "User not in room.")
-
- data = yield self.state_handler.get_current_state(
- room_id, event_type, state_key
+ membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id, is_guest
)
- defer.returnValue(data)
- @defer.inlineCallbacks
- def get_feedback(self, event_id):
- # yield self.auth.check_joined_room(room_id, user_id)
+ if membership == Membership.JOIN:
+ data = yield self.state_handler.get_current_state(
+ room_id, event_type, state_key
+ )
+ elif membership == Membership.LEAVE:
+ key = (event_type, state_key)
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], [key]
+ )
+ data = room_state[membership_event_id].get(key)
- # Pull out the feedback from the db
- fb = yield self.store.get_feedback(event_id)
+ defer.returnValue(data)
- if fb:
- defer.returnValue(fb)
- defer.returnValue(None)
+ @defer.inlineCallbacks
+ def _check_in_room_or_world_readable(self, room_id, user_id, is_guest):
+ try:
+ # check_user_was_in_room will return the most recent membership
+ # event for the user if:
+ # * The user is a non-guest user, and was ever in the room
+ # * The user is a guest user, and has joined the room
+ # else it will throw.
+ member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+ defer.returnValue((member_event.membership, member_event.event_id))
+ return
+ except AuthError, auth_error:
+ visibility = yield self.state_handler.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility, ""
+ )
+ if (
+ visibility and
+ visibility.content["history_visibility"] == "world_readable"
+ ):
+ defer.returnValue((Membership.JOIN, None))
+ return
+ if not is_guest:
+ raise auth_error
+ raise AuthError(
+ 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+ )
@defer.inlineCallbacks
- def get_state_events(self, user_id, room_id):
- """Retrieve all state events for a given room.
+ def get_state_events(self, user_id, room_id, is_guest=False):
+ """Retrieve all state events for a given room. If the user is
+ joined to the room then return the current state. If the user has
+ left the room return the state events from when they left.
Args:
user_id(str): The user requesting state events.
@@ -288,18 +295,26 @@ class MessageHandler(BaseHandler):
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
- yield self.auth.check_joined_room(room_id, user_id)
+ membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ room_id, user_id, is_guest
+ )
+
+ if membership == Membership.JOIN:
+ room_state = yield self.state_handler.get_current_state(room_id)
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], None
+ )
+ room_state = room_state[membership_event_id]
- # TODO: This is duplicating logic from snapshot_all_rooms
- current_state = yield self.state_handler.get_current_state(room_id)
now = self.clock.time_msec()
defer.returnValue(
- [serialize_event(c, now) for c in current_state.values()]
+ [serialize_event(c, now) for c in room_state.values()]
)
@defer.inlineCallbacks
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
- feedback=False, as_client_event=True):
+ as_client_event=True, include_archived=False):
"""Retrieve a snapshot of all rooms the user is invited or has joined.
This snapshot may include messages for all rooms where the user is
@@ -309,17 +324,20 @@ class MessageHandler(BaseHandler):
user_id (str): The ID of the user making the request.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config used to determine how many messages *PER ROOM* to return.
- feedback (bool): True to get feedback along with these messages.
as_client_event (bool): True to get events in client-server format.
+ include_archived (bool): True to get rooms that the user has left
Returns:
A list of dicts with "room_id" and "membership" keys for all rooms
the user is currently invited or joined in on. Rooms where the user
is joined on, may return a "messages" key with messages, depending
on the specified PaginationConfig.
"""
+ memberships = [Membership.INVITE, Membership.JOIN]
+ if include_archived:
+ memberships.append(Membership.LEAVE)
+
room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=user_id,
- membership_list=[Membership.INVITE, Membership.JOIN]
+ user_id=user_id, membership_list=memberships
)
user = UserID.from_string(user_id)
@@ -339,6 +357,8 @@ class MessageHandler(BaseHandler):
user, pagination_config.get_source_config("receipt"), None
)
+ tags_by_room = yield self.store.get_tags_for_user(user_id)
+
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
@@ -357,28 +377,45 @@ class MessageHandler(BaseHandler):
}
if event.membership == Membership.INVITE:
+ time_now = self.clock.time_msec()
d["inviter"] = event.sender
+ invite_event = yield self.store.get_event(event.event_id)
+ d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+
rooms_ret.append(d)
- if event.membership != Membership.JOIN:
+ if event.membership not in (Membership.JOIN, Membership.LEAVE):
return
+
try:
+ if event.membership == Membership.JOIN:
+ room_end_token = now_token.room_key
+ deferred_room_state = self.state_handler.get_current_state(
+ event.room_id
+ )
+ elif event.membership == Membership.LEAVE:
+ room_end_token = "s%d" % (event.stream_ordering,)
+ deferred_room_state = self.store.get_state_for_events(
+ [event.event_id], None
+ )
+ deferred_room_state.addCallback(
+ lambda states: states[event.event_id]
+ )
+
(messages, token), current_state = yield defer.gatherResults(
[
self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
- end_token=now_token.room_key,
- ),
- self.state_handler.get_current_state(
- event.room_id
+ end_token=room_end_token,
),
+ deferred_room_state,
]
).addErrback(unwrapFirstError)
messages = yield self._filter_events_for_client(
- user_id, event.room_id, messages
+ user_id, messages
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -398,6 +435,15 @@ class MessageHandler(BaseHandler):
serialize_event(c, time_now, as_client_event)
for c in current_state.values()
]
+
+ private_user_data = []
+ tags = tags_by_room.get(event.room_id)
+ if tags:
+ private_user_data.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+ d["private_user_data"] = private_user_data
except:
logger.exception("Failed to get snapshot")
@@ -420,15 +466,99 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret)
@defer.inlineCallbacks
- def room_initial_sync(self, user_id, room_id, pagin_config=None,
- feedback=False):
- current_state = yield self.state.get_current_state(
- room_id=room_id,
+ def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False):
+ """Capture the a snapshot of a room. If user is currently a member of
+ the room this will be what is currently in the room. If the user left
+ the room this will be what was in the room when they left.
+
+ Args:
+ user_id(str): The user to get a snapshot for.
+ room_id(str): The room to get a snapshot of.
+ pagin_config(synapse.streams.config.PaginationConfig):
+ The pagination config used to determine how many messages to
+ return.
+ Raises:
+ AuthError if the user wasn't in the room.
+ Returns:
+ A JSON serialisable dict with the snapshot of the room.
+ """
+
+ membership, member_event_id = yield self._check_in_room_or_world_readable(
+ room_id,
+ user_id,
+ is_guest
)
- yield self.auth.check_joined_room(
- room_id, user_id,
- current_state=current_state
+ if membership == Membership.JOIN:
+ result = yield self._room_initial_sync_joined(
+ user_id, room_id, pagin_config, membership, is_guest
+ )
+ elif membership == Membership.LEAVE:
+ result = yield self._room_initial_sync_parted(
+ user_id, room_id, pagin_config, membership, member_event_id, is_guest
+ )
+
+ private_user_data = []
+ tags = yield self.store.get_tags_for_room(user_id, room_id)
+ if tags:
+ private_user_data.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+ result["private_user_data"] = private_user_data
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
+ membership, member_event_id, is_guest):
+ room_state = yield self.store.get_state_for_events(
+ [member_event_id], None
+ )
+
+ room_state = room_state[member_event_id]
+
+ limit = pagin_config.limit if pagin_config else None
+ if limit is None:
+ limit = 10
+
+ stream_token = yield self.store.get_stream_token_for_event(
+ member_event_id
+ )
+
+ messages, token = yield self.store.get_recent_events_for_room(
+ room_id,
+ limit=limit,
+ end_token=stream_token
+ )
+
+ messages = yield self._filter_events_for_client(
+ user_id, messages, is_guest=is_guest
+ )
+
+ start_token = StreamToken(token[0], 0, 0, 0, 0)
+ end_token = StreamToken(token[1], 0, 0, 0, 0)
+
+ time_now = self.clock.time_msec()
+
+ defer.returnValue({
+ "membership": membership,
+ "room_id": room_id,
+ "messages": {
+ "chunk": [serialize_event(m, time_now) for m in messages],
+ "start": start_token.to_string(),
+ "end": end_token.to_string(),
+ },
+ "state": [serialize_event(s, time_now) for s in room_state.values()],
+ "presence": [],
+ "receipts": [],
+ })
+
+ @defer.inlineCallbacks
+ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
+ membership, is_guest):
+ current_state = yield self.state.get_current_state(
+ room_id=room_id,
)
# TODO(paul): I wish I was called with user objects not user_id
@@ -442,8 +572,6 @@ class MessageHandler(BaseHandler):
for x in current_state.values()
]
- member_event = current_state.get((EventTypes.Member, user_id,))
-
now_token = yield self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None
@@ -460,12 +588,14 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
- states = yield presence_handler.get_states(
- target_users=[UserID.from_string(m.user_id) for m in room_members],
- auth_user=auth_user,
- as_event=True,
- check_auth=False,
- )
+ states = {}
+ if not is_guest:
+ states = yield presence_handler.get_states(
+ target_users=[UserID.from_string(m.user_id) for m in room_members],
+ auth_user=auth_user,
+ as_event=True,
+ check_auth=False,
+ )
defer.returnValue(states.values())
@@ -485,7 +615,7 @@ class MessageHandler(BaseHandler):
).addErrback(unwrapFirstError)
messages = yield self._filter_events_for_client(
- user_id, room_id, messages
+ user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -493,8 +623,7 @@ class MessageHandler(BaseHandler):
time_now = self.clock.time_msec()
- defer.returnValue({
- "membership": member_event.membership,
+ ret = {
"room_id": room_id,
"messages": {
"chunk": [serialize_event(m, time_now) for m in messages],
@@ -504,4 +633,8 @@ class MessageHandler(BaseHandler):
"state": state,
"presence": presence,
"receipts": receipts,
- })
+ }
+ if not is_guest:
+ ret["membership"] = membership
+
+ defer.returnValue(ret)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e91e81831e..aca65096fc 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -378,7 +378,7 @@ class PresenceHandler(BaseHandler):
# TODO(paul): perform a presence push as part of start/stop poll so
# we don't have to do this all the time
- self.changed_presencelike_data(target_user, state)
+ yield self.changed_presencelike_data(target_user, state)
def bump_presence_active_time(self, user, now=None):
if now is None:
@@ -422,12 +422,12 @@ class PresenceHandler(BaseHandler):
@log_function
def started_user_eventstream(self, user):
# TODO(paul): Use "last online" state
- self.set_state(user, user, {"presence": PresenceState.ONLINE})
+ return self.set_state(user, user, {"presence": PresenceState.ONLINE})
@log_function
def stopped_user_eventstream(self, user):
# TODO(paul): Save current state as "last online" state
- self.set_state(user, user, {"presence": PresenceState.OFFLINE})
+ return self.set_state(user, user, {"presence": PresenceState.OFFLINE})
@defer.inlineCallbacks
def user_joined_room(self, user, room_id):
@@ -950,7 +950,8 @@ class PresenceHandler(BaseHandler):
)
while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS:
self._remote_offline_serials.pop() # remove the oldest
- del self._user_cachemap[user]
+ if user in self._user_cachemap:
+ del self._user_cachemap[user]
else:
# Remove the user from remote_offline_serials now that they're
# no longer offline
@@ -1142,8 +1143,9 @@ class PresenceEventSource(object):
@defer.inlineCallbacks
@log_function
- def get_new_events_for_user(self, user, from_key, limit):
+ def get_new_events(self, user, from_key, room_ids=None, **kwargs):
from_key = int(from_key)
+ room_ids = room_ids or []
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
@@ -1161,7 +1163,6 @@ class PresenceEventSource(object):
user_ids_to_check |= set(
UserID.from_string(p["observed_user_id"]) for p in presence_list
)
- room_ids = yield presence.get_joined_rooms_for_user(user)
for room_id in set(room_ids) & set(presence._room_serials):
if presence._room_serials[room_id] > from_key:
joined = yield presence.get_joined_users_for_room_id(room_id)
@@ -1263,6 +1264,11 @@ class UserPresenceCache(object):
self.state = {"presence": PresenceState.OFFLINE}
self.serial = None
+ def __repr__(self):
+ return "UserPresenceCache(state=%r, serial=%r)" % (
+ self.state, self.serial
+ )
+
def update(self, state, serial):
assert("mtime_age" not in state)
diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py
new file mode 100644
index 0000000000..1abe45ed7b
--- /dev/null
+++ b/synapse/handlers/private_user_data.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+class PrivateUserDataEventSource(object):
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ def get_current_key(self, direction='f'):
+ return self.store.get_max_private_user_data_stream_id()
+
+ @defer.inlineCallbacks
+ def get_new_events(self, user, from_key, **kwargs):
+ user_id = user.to_string()
+ last_stream_id = from_key
+
+ current_stream_id = yield self.store.get_max_private_user_data_stream_id()
+ tags = yield self.store.get_updated_tags(user_id, last_stream_id)
+
+ results = []
+ for room_id, room_tags in tags.items():
+ results.append({
+ "type": "m.tag",
+ "content": {"tags": room_tags},
+ "room_id": room_id,
+ })
+
+ defer.returnValue((results, current_stream_id))
+
+ @defer.inlineCallbacks
+ def get_pagination_rows(self, user, config, key):
+ defer.returnValue(([], config.to_id))
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 86c911c4bf..973f4d5cae 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -156,13 +156,7 @@ class ReceiptsHandler(BaseHandler):
if not result:
defer.returnValue([])
- event = {
- "type": "m.receipt",
- "room_id": room_id,
- "content": result,
- }
-
- defer.returnValue([event])
+ defer.returnValue(result)
class ReceiptEventSource(object):
@@ -170,17 +164,15 @@ class ReceiptEventSource(object):
self.store = hs.get_datastore()
@defer.inlineCallbacks
- def get_new_events_for_user(self, user, from_key, limit):
+ def get_new_events(self, from_key, room_ids, **kwargs):
from_key = int(from_key)
to_key = yield self.get_current_key()
if from_key == to_key:
defer.returnValue(([], to_key))
- rooms = yield self.store.get_rooms_for_user(user.to_string())
- rooms = [room.room_id for room in rooms]
events = yield self.store.get_linearized_receipts_for_rooms(
- rooms,
+ room_ids,
from_key=from_key,
to_key=to_key,
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 86390a3671..493a087031 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -25,8 +25,6 @@ import synapse.util.stringutils as stringutils
from synapse.util.async import run_on_reactor
from synapse.http.client import CaptchaServerHttpClient
-import base64
-import bcrypt
import logging
import urllib
@@ -66,7 +64,7 @@ class RegistrationHandler(BaseHandler):
)
@defer.inlineCallbacks
- def register(self, localpart=None, password=None):
+ def register(self, localpart=None, password=None, generate_token=True):
"""Registers a new client on the server.
Args:
@@ -83,7 +81,7 @@ class RegistrationHandler(BaseHandler):
yield run_on_reactor()
password_hash = None
if password:
- password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
+ password_hash = self.auth_handler().hash(password)
if localpart:
yield self.check_username(localpart)
@@ -91,7 +89,9 @@ class RegistrationHandler(BaseHandler):
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
- token = self.generate_token(user_id)
+ token = None
+ if generate_token:
+ token = self.auth_handler().generate_access_token(user_id)
yield self.store.register(
user_id=user_id,
token=token,
@@ -104,14 +104,14 @@ class RegistrationHandler(BaseHandler):
attempts = 0
user_id = None
token = None
- while not user_id and not token:
+ while not user_id:
try:
localpart = self._generate_user_id()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
yield self.check_user_id_is_valid(user_id)
-
- token = self.generate_token(user_id)
+ if generate_token:
+ token = self.auth_handler().generate_access_token(user_id)
yield self.store.register(
user_id=user_id,
token=token,
@@ -161,7 +161,7 @@ class RegistrationHandler(BaseHandler):
400, "Invalid user localpart for this application service.",
errcode=Codes.EXCLUSIVE
)
- token = self.generate_token(user_id)
+ token = self.auth_handler().generate_access_token(user_id)
yield self.store.register(
user_id=user_id,
token=token,
@@ -208,7 +208,7 @@ class RegistrationHandler(BaseHandler):
user_id = user.to_string()
yield self.check_user_id_is_valid(user_id)
- token = self.generate_token(user_id)
+ token = self.auth_handler().generate_access_token(user_id)
try:
yield self.store.register(
user_id=user_id,
@@ -273,13 +273,6 @@ class RegistrationHandler(BaseHandler):
errcode=Codes.EXCLUSIVE
)
- def generate_token(self, user_id):
- # urlsafe variant uses _ and - so use . as the separator and replace
- # all =s with .s so http clients don't quote =s when it is used as
- # query params.
- return (base64.urlsafe_b64encode(user_id).replace('=', '.') + '.' +
- stringutils.random_string(18))
-
def _generate_user_id(self):
return "-" + stringutils.random_string(18)
@@ -322,3 +315,6 @@ class RegistrationHandler(BaseHandler):
}
)
defer.returnValue(data)
+
+ def auth_handler(self):
+ return self.hs.get_handlers().auth_handler
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c5d1001b50..3f04752581 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,26 +22,38 @@ from synapse.types import UserID, RoomAlias, RoomID
from synapse.api.constants import (
EventTypes, Membership, JoinRules, RoomCreationPreset,
)
-from synapse.api.errors import StoreError, SynapseError
+from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.util import stringutils, unwrapFirstError
from synapse.util.async import run_on_reactor
-from synapse.events.utils import serialize_event
+
+from signedjson.sign import verify_signed_json
+from signedjson.key import decode_verify_key_bytes
from collections import OrderedDict
+from unpaddedbase64 import decode_base64
+
import logging
+import math
import string
logger = logging.getLogger(__name__)
+id_server_scheme = "https://"
+
class RoomCreationHandler(BaseHandler):
PRESETS_DICT = {
RoomCreationPreset.PRIVATE_CHAT: {
"join_rules": JoinRules.INVITE,
- "history_visibility": "invited",
+ "history_visibility": "shared",
"original_invitees_have_ops": False,
},
+ RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
+ "join_rules": JoinRules.INVITE,
+ "history_visibility": "shared",
+ "original_invitees_have_ops": True,
+ },
RoomCreationPreset.PUBLIC_CHAT: {
"join_rules": JoinRules.PUBLIC,
"history_visibility": "shared",
@@ -150,12 +162,16 @@ class RoomCreationHandler(BaseHandler):
for val in raw_initial_state:
initial_state[(val["type"], val.get("state_key", ""))] = val["content"]
+ creation_content = config.get("creation_content", {})
+
user = UserID.from_string(user_id)
creation_events = self._create_events_for_new_room(
user, room_id,
preset_config=preset_config,
invite_list=invite_list,
initial_state=initial_state,
+ creation_content=creation_content,
+ room_alias=room_alias,
)
msg_handler = self.hs.get_handlers().message_handler
@@ -203,7 +219,8 @@ class RoomCreationHandler(BaseHandler):
defer.returnValue(result)
def _create_events_for_new_room(self, creator, room_id, preset_config,
- invite_list, initial_state):
+ invite_list, initial_state, creation_content,
+ room_alias):
config = RoomCreationHandler.PRESETS_DICT[preset_config]
creator_id = creator.to_string()
@@ -225,9 +242,10 @@ class RoomCreationHandler(BaseHandler):
return e
+ creation_content.update({"creator": creator.to_string()})
creation_event = create(
etype=EventTypes.Create,
- content={"creator": creator.to_string()},
+ content=creation_content,
)
join_event = create(
@@ -272,6 +290,14 @@ class RoomCreationHandler(BaseHandler):
returned_events.append(power_levels_event)
+ if room_alias and (EventTypes.CanonicalAlias, '') not in initial_state:
+ room_alias_event = create(
+ etype=EventTypes.CanonicalAlias,
+ content={"alias": room_alias.to_string()},
+ )
+
+ returned_events.append(room_alias_event)
+
if (EventTypes.JoinRules, '') not in initial_state:
join_rules_event = create(
etype=EventTypes.JoinRules,
@@ -343,42 +369,7 @@ class RoomMemberHandler(BaseHandler):
remotedomains.add(member.domain)
@defer.inlineCallbacks
- def get_room_members_as_pagination_chunk(self, room_id=None, user_id=None,
- limit=0, start_tok=None,
- end_tok=None):
- """Retrieve a list of room members in the room.
-
- Args:
- room_id (str): The room to get the member list for.
- user_id (str): The ID of the user making the request.
- limit (int): The max number of members to return.
- start_tok (str): Optional. The start token if known.
- end_tok (str): Optional. The end token if known.
- Returns:
- dict: A Pagination streamable dict.
- Raises:
- SynapseError if something goes wrong.
- """
- yield self.auth.check_joined_room(room_id, user_id)
-
- member_list = yield self.store.get_room_members(room_id=room_id)
- time_now = self.clock.time_msec()
- event_list = [
- serialize_event(entry, time_now)
- for entry in member_list
- ]
- chunk_data = {
- "start": "START", # FIXME (erikj): START is no longer valid
- "end": "END",
- "chunk": event_list
- }
- # TODO honor Pagination stream params
- # TODO snapshot this list to return on subsequent requests when
- # paginating
- defer.returnValue(chunk_data)
-
- @defer.inlineCallbacks
- def change_membership(self, event, context, do_auth=True):
+ def change_membership(self, event, context, do_auth=True, is_guest=False):
""" Change the membership status of a user in a room.
Args:
@@ -399,9 +390,38 @@ class RoomMemberHandler(BaseHandler):
# if this HS is not currently in the room, i.e. we have to do the
# invite/join dance.
if event.membership == Membership.JOIN:
+ if is_guest:
+ guest_access = context.current_state.get(
+ (EventTypes.GuestAccess, ""),
+ None
+ )
+ is_guest_access_allowed = (
+ guest_access
+ and guest_access.content
+ and "guest_access" in guest_access.content
+ and guest_access.content["guest_access"] == "can_join"
+ )
+ if not is_guest_access_allowed:
+ raise AuthError(403, "Guest access not allowed")
+
yield self._do_join(event, context, do_auth=do_auth)
else:
- # This is not a JOIN, so we can handle it normally.
+ if event.membership == Membership.LEAVE:
+ is_host_in_room = yield self.is_host_in_room(room_id, context)
+ if not is_host_in_room:
+ # Rejecting an invite, rather than leaving a joined room
+ handler = self.hs.get_handlers().federation_handler
+ inviter = yield self.get_inviter(event)
+ if not inviter:
+ # return the same error as join_room_alias does
+ raise SynapseError(404, "No known servers")
+ yield handler.do_remotely_reject_invite(
+ [inviter.domain],
+ room_id,
+ event.user_id
+ )
+ defer.returnValue({"room_id": room_id})
+ return
# FIXME: This isn't idempotency.
if prev_state and prev_state.membership == event.membership:
@@ -425,7 +445,7 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue({"room_id": room_id})
@defer.inlineCallbacks
- def join_room_alias(self, joinee, room_alias, do_auth=True, content={}):
+ def join_room_alias(self, joinee, room_alias, content={}):
directory_handler = self.hs.get_handlers().directory_handler
mapping = yield directory_handler.get_association(room_alias)
@@ -459,8 +479,6 @@ class RoomMemberHandler(BaseHandler):
@defer.inlineCallbacks
def _do_join(self, event, context, room_hosts=None, do_auth=True):
- joinee = UserID.from_string(event.state_key)
- # room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
# XXX: We don't do an auth check if we are doing an invite
@@ -468,41 +486,18 @@ class RoomMemberHandler(BaseHandler):
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- is_host_in_room = yield self.auth.check_host_in_room(
- event.room_id,
- self.hs.hostname
- )
- if not is_host_in_room:
- # is *anyone* in the room?
- room_member_keys = [
- v for (k, v) in context.current_state.keys() if (
- k == "m.room.member"
- )
- ]
- if len(room_member_keys) == 0:
- # has the room been created so we can join it?
- create_event = context.current_state.get(("m.room.create", ""))
- if create_event:
- is_host_in_room = True
-
+ is_host_in_room = yield self.is_host_in_room(room_id, context)
if is_host_in_room:
should_do_dance = False
elif room_hosts: # TODO: Shouldn't this be remote_room_host?
should_do_dance = True
else:
- # TODO(markjh): get prev_state from snapshot
- prev_state = yield self.store.get_room_member(
- joinee.to_string(), room_id
- )
-
- if prev_state and prev_state.membership == Membership.INVITE:
- inviter = UserID.from_string(prev_state.user_id)
-
- should_do_dance = not self.hs.is_mine(inviter)
- room_hosts = [inviter.domain]
- else:
+ inviter = yield self.get_inviter(event)
+ if not inviter:
# return the same error as join_room_alias does
raise SynapseError(404, "No known servers")
+ should_do_dance = not self.hs.is_mine(inviter)
+ room_hosts = [inviter.domain]
if should_do_dance:
handler = self.hs.get_handlers().federation_handler
@@ -510,8 +505,7 @@ class RoomMemberHandler(BaseHandler):
room_hosts,
room_id,
event.user_id,
- event.content, # FIXME To get a non-frozen dict
- context
+ event.content,
)
else:
logger.debug("Doing normal join")
@@ -529,30 +523,42 @@ class RoomMemberHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _should_invite_join(self, room_id, prev_state, do_auth):
- logger.debug("_should_invite_join: room_id: %s", room_id)
-
- # XXX: We don't do an auth check if we are doing an invite
- # join dance for now, since we're kinda implicitly checking
- # that we are allowed to join when we decide whether or not we
- # need to do the invite/join dance.
+ def get_inviter(self, event):
+ # TODO(markjh): get prev_state from snapshot
+ prev_state = yield self.store.get_room_member(
+ event.user_id, event.room_id
+ )
- # Only do an invite join dance if a) we were invited,
- # b) the person inviting was from a differnt HS and c) we are
- # not currently in the room
- room_host = None
if prev_state and prev_state.membership == Membership.INVITE:
- room = yield self.store.get_room(room_id)
- inviter = UserID.from_string(
- prev_state.sender
- )
-
- is_remote_invite_join = not self.hs.is_mine(inviter) and not room
- room_host = inviter.domain
- else:
- is_remote_invite_join = False
+ defer.returnValue(UserID.from_string(prev_state.user_id))
+ return
+ elif "third_party_invite" in event.content:
+ if "sender" in event.content["third_party_invite"]:
+ inviter = UserID.from_string(
+ event.content["third_party_invite"]["sender"]
+ )
+ defer.returnValue(inviter)
+ defer.returnValue(None)
- defer.returnValue((is_remote_invite_join, room_host))
+ @defer.inlineCallbacks
+ def is_host_in_room(self, room_id, context):
+ is_host_in_room = yield self.auth.check_host_in_room(
+ room_id,
+ self.hs.hostname
+ )
+ if not is_host_in_room:
+ # is *anyone* in the room?
+ room_member_keys = [
+ v for (k, v) in context.current_state.keys() if (
+ k == "m.room.member"
+ )
+ ]
+ if len(room_member_keys) == 0:
+ # has the room been created so we can join it?
+ create_event = context.current_state.get(("m.room.create", ""))
+ if create_event:
+ is_host_in_room = True
+ defer.returnValue(is_host_in_room)
@defer.inlineCallbacks
def get_joined_rooms_for_user(self, user):
@@ -583,6 +589,160 @@ class RoomMemberHandler(BaseHandler):
suppress_auth=(not do_auth),
)
+ @defer.inlineCallbacks
+ def do_3pid_invite(
+ self,
+ room_id,
+ inviter,
+ medium,
+ address,
+ id_server,
+ token_id,
+ txn_id
+ ):
+ invitee = yield self._lookup_3pid(
+ id_server, medium, address
+ )
+
+ if invitee:
+ # make sure it looks like a user ID; it'll throw if it's invalid.
+ UserID.from_string(invitee)
+ yield self.hs.get_handlers().message_handler.create_and_send_event(
+ {
+ "type": EventTypes.Member,
+ "content": {
+ "membership": unicode("invite")
+ },
+ "room_id": room_id,
+ "sender": inviter.to_string(),
+ "state_key": invitee,
+ },
+ token_id=token_id,
+ txn_id=txn_id,
+ )
+ else:
+ yield self._make_and_store_3pid_invite(
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter,
+ token_id,
+ txn_id=txn_id
+ )
+
+ @defer.inlineCallbacks
+ def _lookup_3pid(self, id_server, medium, address):
+ """Looks up a 3pid in the passed identity server.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ medium (str): The type of the third party identifier (e.g. "email").
+ address (str): The third party identifier (e.g. "foo@example.com").
+
+ Returns:
+ (str) the matrix ID of the 3pid, or None if it is not recognized.
+ """
+ try:
+ data = yield self.hs.get_simple_http_client().get_json(
+ "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
+ {
+ "medium": medium,
+ "address": address,
+ }
+ )
+
+ if "mxid" in data:
+ if "signatures" not in data:
+ raise AuthError(401, "No signatures on 3pid binding")
+ self.verify_any_signature(data, id_server)
+ defer.returnValue(data["mxid"])
+
+ except IOError as e:
+ logger.warn("Error from identity server lookup: %s" % (e,))
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def verify_any_signature(self, data, server_hostname):
+ if server_hostname not in data["signatures"]:
+ raise AuthError(401, "No signature from server %s" % (server_hostname,))
+ for key_name, signature in data["signatures"][server_hostname].items():
+ key_data = yield self.hs.get_simple_http_client().get_json(
+ "%s%s/_matrix/identity/api/v1/pubkey/%s" %
+ (id_server_scheme, server_hostname, key_name,),
+ )
+ if "public_key" not in key_data:
+ raise AuthError(401, "No public key named %s from %s" %
+ (key_name, server_hostname,))
+ verify_signed_json(
+ data,
+ server_hostname,
+ decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
+ )
+ return
+
+ @defer.inlineCallbacks
+ def _make_and_store_3pid_invite(
+ self,
+ id_server,
+ medium,
+ address,
+ room_id,
+ user,
+ token_id,
+ txn_id
+ ):
+ token, public_key, key_validity_url, display_name = (
+ yield self._ask_id_server_for_third_party_invite(
+ id_server,
+ medium,
+ address,
+ room_id,
+ user.to_string()
+ )
+ )
+ msg_handler = self.hs.get_handlers().message_handler
+ yield msg_handler.create_and_send_event(
+ {
+ "type": EventTypes.ThirdPartyInvite,
+ "content": {
+ "display_name": display_name,
+ "key_validity_url": key_validity_url,
+ "public_key": public_key,
+ },
+ "room_id": room_id,
+ "sender": user.to_string(),
+ "state_key": token,
+ },
+ token_id=token_id,
+ txn_id=txn_id,
+ )
+
+ @defer.inlineCallbacks
+ def _ask_id_server_for_third_party_invite(
+ self, id_server, medium, address, room_id, sender):
+ is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
+ id_server_scheme, id_server,
+ )
+ data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+ is_url,
+ {
+ "medium": medium,
+ "address": address,
+ "room_id": room_id,
+ "sender": sender,
+ }
+ )
+ # TODO: Check for success
+ token = data["token"]
+ public_key = data["public_key"]
+ display_name = data["display_name"]
+ key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+ id_server_scheme, id_server,
+ )
+ defer.returnValue((token, public_key, key_validity_url, display_name))
+
class RoomListHandler(BaseHandler):
@@ -604,12 +764,79 @@ class RoomListHandler(BaseHandler):
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
+class RoomContextHandler(BaseHandler):
+ @defer.inlineCallbacks
+ def get_event_context(self, user, room_id, event_id, limit, is_guest):
+ """Retrieves events, pagination tokens and state around a given event
+ in a room.
+
+ Args:
+ user (UserID)
+ room_id (str)
+ event_id (str)
+ limit (int): The maximum number of events to return in total
+ (excluding state).
+
+ Returns:
+ dict
+ """
+ before_limit = math.floor(limit/2.)
+ after_limit = limit - before_limit
+
+ now_token = yield self.hs.get_event_sources().get_current_token()
+
+ results = yield self.store.get_events_around(
+ room_id, event_id, before_limit, after_limit
+ )
+
+ results["events_before"] = yield self._filter_events_for_client(
+ user.to_string(),
+ results["events_before"],
+ is_guest=is_guest,
+ require_all_visible_for_guests=False
+ )
+
+ results["events_after"] = yield self._filter_events_for_client(
+ user.to_string(),
+ results["events_after"],
+ is_guest=is_guest,
+ require_all_visible_for_guests=False
+ )
+
+ if results["events_after"]:
+ last_event_id = results["events_after"][-1].event_id
+ else:
+ last_event_id = event_id
+
+ state = yield self.store.get_state_for_events(
+ [last_event_id], None
+ )
+ results["state"] = state[last_event_id].values()
+
+ results["start"] = now_token.copy_and_replace(
+ "room_key", results["start"]
+ ).to_string()
+
+ results["end"] = now_token.copy_and_replace(
+ "room_key", results["end"]
+ ).to_string()
+
+ defer.returnValue(results)
+
+
class RoomEventSource(object):
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
- def get_new_events_for_user(self, user, from_key, limit):
+ def get_new_events(
+ self,
+ user,
+ from_key,
+ limit,
+ room_ids,
+ is_guest,
+ ):
# We just ignore the key for now.
to_key = yield self.get_current_key()
@@ -629,8 +856,9 @@ class RoomEventSource(object):
user_id=user.to_string(),
from_key=from_key,
to_key=to_key,
- room_id=None,
limit=limit,
+ room_ids=room_ids,
+ is_guest=is_guest,
)
defer.returnValue((events, end_key))
@@ -646,7 +874,6 @@ class RoomEventSource(object):
to_key=config.to_key,
direction=config.direction,
limit=config.limit,
- with_feedback=True
)
defer.returnValue((events, next_key))
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
new file mode 100644
index 0000000000..b7545c111f
--- /dev/null
+++ b/synapse/handlers/search.py
@@ -0,0 +1,319 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+from synapse.api.constants import Membership
+from synapse.api.filtering import Filter
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+
+from unpaddedbase64 import decode_base64, encode_base64
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class SearchHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(SearchHandler, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def search(self, user, content, batch=None):
+ """Performs a full text search for a user.
+
+ Args:
+ user (UserID)
+ content (dict): Search parameters
+ batch (str): The next_batch parameter. Used for pagination.
+
+ Returns:
+ dict to be returned to the client with results of search
+ """
+
+ batch_group = None
+ batch_group_key = None
+ batch_token = None
+ if batch:
+ try:
+ b = decode_base64(batch)
+ batch_group, batch_group_key, batch_token = b.split("\n")
+
+ assert batch_group is not None
+ assert batch_group_key is not None
+ assert batch_token is not None
+ except:
+ raise SynapseError(400, "Invalid batch")
+
+ try:
+ room_cat = content["search_categories"]["room_events"]
+
+ # The actual thing to query in FTS
+ search_term = room_cat["search_term"]
+
+ # Which "keys" to search over in FTS query
+ keys = room_cat.get("keys", [
+ "content.body", "content.name", "content.topic",
+ ])
+
+ # Filter to apply to results
+ filter_dict = room_cat.get("filter", {})
+
+ # What to order results by (impacts whether pagination can be doen)
+ order_by = room_cat.get("order_by", "rank")
+
+ # Include context around each event?
+ event_context = room_cat.get(
+ "event_context", None
+ )
+
+ # Group results together? May allow clients to paginate within a
+ # group
+ group_by = room_cat.get("groupings", {}).get("group_by", {})
+ group_keys = [g["key"] for g in group_by]
+
+ if event_context is not None:
+ before_limit = int(event_context.get(
+ "before_limit", 5
+ ))
+ after_limit = int(event_context.get(
+ "after_limit", 5
+ ))
+ except KeyError:
+ raise SynapseError(400, "Invalid search query")
+
+ if order_by not in ("rank", "recent"):
+ raise SynapseError(400, "Invalid order by: %r" % (order_by,))
+
+ if set(group_keys) - {"room_id", "sender"}:
+ raise SynapseError(
+ 400,
+ "Invalid group by keys: %r" % (set(group_keys) - {"room_id", "sender"},)
+ )
+
+ search_filter = Filter(filter_dict)
+
+ # TODO: Search through left rooms too
+ rooms = yield self.store.get_rooms_for_user_where_membership_is(
+ user.to_string(),
+ membership_list=[Membership.JOIN],
+ # membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],
+ )
+ room_ids = set(r.room_id for r in rooms)
+
+ room_ids = search_filter.filter_rooms(room_ids)
+
+ if batch_group == "room_id":
+ room_ids.intersection_update({batch_group_key})
+
+ rank_map = {} # event_id -> rank of event
+ allowed_events = []
+ room_groups = {} # Holds result of grouping by room, if applicable
+ sender_group = {} # Holds result of grouping by sender, if applicable
+
+ # Holds the next_batch for the entire result set if one of those exists
+ global_next_batch = None
+
+ if order_by == "rank":
+ results = yield self.store.search_msgs(
+ room_ids, search_term, keys
+ )
+
+ results_map = {r["event"].event_id: r for r in results}
+
+ rank_map.update({r["event"].event_id: r["rank"] for r in results})
+
+ filtered_events = search_filter.filter([r["event"] for r in results])
+
+ events = yield self._filter_events_for_client(
+ user.to_string(), filtered_events
+ )
+
+ events.sort(key=lambda e: -rank_map[e.event_id])
+ allowed_events = events[:search_filter.limit()]
+
+ for e in allowed_events:
+ rm = room_groups.setdefault(e.room_id, {
+ "results": [],
+ "order": rank_map[e.event_id],
+ })
+ rm["results"].append(e.event_id)
+
+ s = sender_group.setdefault(e.sender, {
+ "results": [],
+ "order": rank_map[e.event_id],
+ })
+ s["results"].append(e.event_id)
+
+ elif order_by == "recent":
+ # In this case we specifically loop through each room as the given
+ # limit applies to each room, rather than a global list.
+ # This is not necessarilly a good idea.
+ for room_id in room_ids:
+ room_events = []
+ if batch_group == "room_id" and batch_group_key == room_id:
+ pagination_token = batch_token
+ else:
+ pagination_token = None
+ i = 0
+
+ # We keep looping and we keep filtering until we reach the limit
+ # or we run out of things.
+ # But only go around 5 times since otherwise synapse will be sad.
+ while len(room_events) < search_filter.limit() and i < 5:
+ i += 1
+ results = yield self.store.search_room(
+ room_id, search_term, keys, search_filter.limit() * 2,
+ pagination_token=pagination_token,
+ )
+
+ results_map = {r["event"].event_id: r for r in results}
+
+ rank_map.update({r["event"].event_id: r["rank"] for r in results})
+
+ filtered_events = search_filter.filter([
+ r["event"] for r in results
+ ])
+
+ events = yield self._filter_events_for_client(
+ user.to_string(), filtered_events
+ )
+
+ room_events.extend(events)
+ room_events = room_events[:search_filter.limit()]
+
+ if len(results) < search_filter.limit() * 2:
+ pagination_token = None
+ break
+ else:
+ pagination_token = results[-1]["pagination_token"]
+
+ if room_events:
+ res = results_map[room_events[-1].event_id]
+ pagination_token = res["pagination_token"]
+
+ group = room_groups.setdefault(room_id, {})
+ if pagination_token:
+ next_batch = encode_base64("%s\n%s\n%s" % (
+ "room_id", room_id, pagination_token
+ ))
+ group["next_batch"] = next_batch
+
+ if batch_token:
+ global_next_batch = next_batch
+
+ group["results"] = [e.event_id for e in room_events]
+ group["order"] = max(
+ e.origin_server_ts/1000 for e in room_events
+ if hasattr(e, "origin_server_ts")
+ )
+
+ allowed_events.extend(room_events)
+
+ # Normalize the group orders
+ if room_groups:
+ if len(room_groups) > 1:
+ mx = max(g["order"] for g in room_groups.values())
+ mn = min(g["order"] for g in room_groups.values())
+
+ for g in room_groups.values():
+ g["order"] = (g["order"] - mn) * 1.0 / (mx - mn)
+ else:
+ room_groups.values()[0]["order"] = 1
+
+ else:
+ # We should never get here due to the guard earlier.
+ raise NotImplementedError()
+
+ # If client has asked for "context" for each event (i.e. some surrounding
+ # events and state), fetch that
+ if event_context is not None:
+ now_token = yield self.hs.get_event_sources().get_current_token()
+
+ contexts = {}
+ for event in allowed_events:
+ res = yield self.store.get_events_around(
+ event.room_id, event.event_id, before_limit, after_limit
+ )
+
+ res["events_before"] = yield self._filter_events_for_client(
+ user.to_string(), res["events_before"]
+ )
+
+ res["events_after"] = yield self._filter_events_for_client(
+ user.to_string(), res["events_after"]
+ )
+
+ res["start"] = now_token.copy_and_replace(
+ "room_key", res["start"]
+ ).to_string()
+
+ res["end"] = now_token.copy_and_replace(
+ "room_key", res["end"]
+ ).to_string()
+
+ contexts[event.event_id] = res
+ else:
+ contexts = {}
+
+ # TODO: Add a limit
+
+ time_now = self.clock.time_msec()
+
+ for context in contexts.values():
+ context["events_before"] = [
+ serialize_event(e, time_now)
+ for e in context["events_before"]
+ ]
+ context["events_after"] = [
+ serialize_event(e, time_now)
+ for e in context["events_after"]
+ ]
+
+ results = {
+ e.event_id: {
+ "rank": rank_map[e.event_id],
+ "result": serialize_event(e, time_now),
+ "context": contexts.get(e.event_id, {}),
+ }
+ for e in allowed_events
+ }
+
+ logger.info("Found %d results", len(results))
+
+ rooms_cat_res = {
+ "results": results,
+ "count": len(results)
+ }
+
+ if room_groups and "room_id" in group_keys:
+ rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
+
+ if sender_group and "sender" in group_keys:
+ rooms_cat_res.setdefault("groups", {})["sender"] = sender_group
+
+ if global_next_batch:
+ rooms_cat_res["next_batch"] = global_next_batch
+
+ defer.returnValue({
+ "search_categories": {
+ "room_events": rooms_cat_res
+ }
+ })
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 353a416054..6dc9d0fb92 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,23 +28,30 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
- "client_info",
- "limit",
- "gap",
- "sort",
- "backfill",
"filter",
])
-class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
- "room_id",
- "limited",
- "published",
- "events",
- "state",
+class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
+ "events",
+ "limited",
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ """Make the result appear empty if there are no updates. This is used
+ to tell if room needs to be part of the sync result.
+ """
+ return bool(self.events)
+
+
+class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
+ "room_id", # str
+ "timeline", # TimelineBatch
+ "state", # dict[(str, str), FrozenEvent]
"ephemeral",
+ "private_user_data",
])):
__slots__ = []
@@ -52,14 +59,50 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
- return bool(self.events or self.state or self.ephemeral)
+ return bool(
+ self.timeline
+ or self.state
+ or self.ephemeral
+ or self.private_user_data
+ )
+
+
+class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+ "room_id", # str
+ "timeline", # TimelineBatch
+ "state", # dict[(str, str), FrozenEvent]
+ "private_user_data",
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ """Make the result appear empty if there are no updates. This is used
+ to tell if room needs to be part of the sync result.
+ """
+ return bool(
+ self.timeline
+ or self.state
+ or self.private_user_data
+ )
+
+
+class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
+ "room_id", # str
+ "invite", # FrozenEvent: the invite event
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ """Invited rooms should always be reported to the client"""
+ return True
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
- "private_user_data", # List of private events for the user.
- "public_user_data", # List of public events for all users.
- "rooms", # RoomSyncResult for each room.
+ "presence", # List of presence events for the user.
+ "joined", # JoinedSyncResult for each joined room.
+ "invited", # InvitedSyncResult for each invited room.
+ "archived", # ArchivedSyncResult for each archived room.
])):
__slots__ = []
@@ -69,7 +112,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
events.
"""
return bool(
- self.private_user_data or self.public_user_data or self.rooms
+ self.presence or self.joined or self.invited
)
@@ -81,67 +124,58 @@ class SyncHandler(BaseHandler):
self.clock = hs.get_clock()
@defer.inlineCallbacks
- def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
+ def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
+ full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
A Deferred SyncResult.
"""
- if timeout == 0 or since_token is None:
- result = yield self.current_sync_for_user(sync_config, since_token)
+
+ if timeout == 0 or since_token is None or full_state:
+ # we are going to return immediately, so don't bother calling
+ # notifier.wait_for_events.
+ result = yield self.current_sync_for_user(sync_config, since_token,
+ full_state=full_state)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
- rm_handler = self.hs.get_handlers().room_member_handler
-
- app_service = yield self.store.get_app_service_by_user_id(
- sync_config.user.to_string()
- )
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- room_ids = set(r.room_id for r in rooms)
- else:
- room_ids = yield rm_handler.get_joined_rooms_for_user(
- sync_config.user
- )
-
result = yield self.notifier.wait_for_events(
- sync_config.user, room_ids,
- sync_config.filter, timeout, current_sync_callback
+ sync_config.user, timeout, current_sync_callback,
+ from_token=since_token
)
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, since_token=None):
+ def current_sync_for_user(self, sync_config, since_token=None,
+ full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
- if since_token is None:
- return self.initial_sync(sync_config)
+ if since_token is None or full_state:
+ return self.full_state_sync(sync_config, since_token)
else:
- if sync_config.gap:
- return self.incremental_sync_with_gap(sync_config, since_token)
- else:
- # TODO(mjark): Handle gapless sync
- raise NotImplementedError()
+ return self.incremental_sync_with_gap(sync_config, since_token)
@defer.inlineCallbacks
- def initial_sync(self, sync_config):
- """Get a sync for a client which is starting without any state
+ def full_state_sync(self, sync_config, timeline_since_token):
+ """Get a sync for a client which is starting without any state.
+
+ If a 'message_since_token' is given, only timeline events which have
+ happened since that token will be returned.
+
Returns:
A Deferred SyncResult.
"""
- if sync_config.sort == "timeline,desc":
- # TODO(mjark): Handle going through events in reverse order?.
- # What does "most recent events" mean when applying the limits mean
- # in this case?
- raise NotImplementedError()
-
now_token = yield self.event_sources.get_current_token()
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token
+ )
+
presence_stream = self.event_sources.sources["presence"]
# TODO (mjark): This looks wrong, shouldn't we be getting the presence
# UP to the present rather than after the present?
@@ -153,52 +187,179 @@ class SyncHandler(BaseHandler):
)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=sync_config.user.to_string(),
- membership_list=[Membership.INVITE, Membership.JOIN]
+ membership_list=(
+ Membership.INVITE,
+ Membership.JOIN,
+ Membership.LEAVE,
+ Membership.BAN
+ )
)
- # TODO (mjark): Does public mean "published"?
- published_rooms = yield self.store.get_rooms(is_public=True)
- published_room_ids = set(r["room_id"] for r in published_rooms)
+ tags_by_room = yield self.store.get_tags_for_user(
+ sync_config.user.to_string()
+ )
- rooms = []
+ joined = []
+ invited = []
+ archived = []
for event in room_list:
- room_sync = yield self.initial_sync_for_room(
- event.room_id, sync_config, now_token, published_room_ids
- )
- rooms.append(room_sync)
+ if event.membership == Membership.JOIN:
+ room_sync = yield self.full_state_sync_for_joined_room(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ )
+ joined.append(room_sync)
+ elif event.membership == Membership.INVITE:
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_sync = yield self.full_state_sync_for_archived_room(
+ sync_config=sync_config,
+ room_id=event.room_id,
+ leave_event_id=event.event_id,
+ leave_token=leave_token,
+ timeline_since_token=timeline_since_token,
+ tags_by_room=tags_by_room,
+ )
+ archived.append(room_sync)
defer.returnValue(SyncResult(
- public_user_data=presence,
- private_user_data=[],
- rooms=rooms,
+ presence=presence,
+ joined=joined,
+ invited=invited,
+ archived=archived,
next_batch=now_token,
))
@defer.inlineCallbacks
- def initial_sync_for_room(self, room_id, sync_config, now_token,
- published_room_ids):
+ def full_state_sync_for_joined_room(self, room_id, sync_config,
+ now_token, timeline_since_token,
+ ephemeral_by_room, tags_by_room):
"""Sync a room for a client which is starting without any state
Returns:
- A Deferred RoomSyncResult.
+ A Deferred JoinedSyncResult.
"""
- recents, prev_batch_token, limited = yield self.load_filtered_recents(
- room_id, sync_config, now_token,
+ batch = yield self.load_filtered_recents(
+ room_id, sync_config, now_token, since_token=timeline_since_token
)
- current_state = yield self.state_handler.get_current_state(
- room_id
+ current_state = yield self.get_state_at(room_id, now_token)
+
+ defer.returnValue(JoinedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=current_state,
+ ephemeral=ephemeral_by_room.get(room_id, []),
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
+ ))
+
+ def private_user_data_for_room(self, room_id, tags_by_room):
+ private_user_data = []
+ tags = tags_by_room.get(room_id)
+ if tags is not None:
+ private_user_data.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+ return private_user_data
+
+ @defer.inlineCallbacks
+ def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+ """Get the ephemeral events for each room the user is in
+ Args:
+ sync_config (SyncConfig): The flags, filters and user for the sync.
+ now_token (StreamToken): Where the server is currently up to.
+ since_token (StreamToken): Where the server was when the client
+ last synced.
+ Returns:
+ A tuple of the now StreamToken, updated to reflect the which typing
+ events are included, and a dict mapping from room_id to a list of
+ typing events for that room.
+ """
+
+ typing_key = since_token.typing_key if since_token else "0"
+
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
+
+ typing_source = self.event_sources.sources["typing"]
+ typing, typing_key = yield typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=typing_key,
+ limit=sync_config.filter.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=False,
)
- current_state_events = current_state.values()
+ now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+ ephemeral_by_room = {}
- defer.returnValue(RoomSyncResult(
+ for event in typing:
+ # we want to exclude the room_id from the event, but modifying the
+ # result returned by the event source is poor form (it might cache
+ # the object)
+ room_id = event["room_id"]
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ receipt_key = since_token.receipt_key if since_token else "0"
+
+ receipt_source = self.event_sources.sources["receipt"]
+ receipts, receipt_key = yield receipt_source.get_new_events(
+ user=sync_config.user,
+ from_key=receipt_key,
+ limit=sync_config.filter.ephemeral_limit(),
+ room_ids=room_ids,
+ # /sync doesn't support guest access, they can't get to this point in code
+ is_guest=False,
+ )
+ now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+
+ for event in receipts:
+ room_id = event["room_id"]
+ # exclude room id, as above
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ defer.returnValue((now_token, ephemeral_by_room))
+
+ @defer.inlineCallbacks
+ def full_state_sync_for_archived_room(self, room_id, sync_config,
+ leave_event_id, leave_token,
+ timeline_since_token, tags_by_room):
+ """Sync a room for a client which is starting without any state
+ Returns:
+ A Deferred JoinedSyncResult.
+ """
+
+ batch = yield self.load_filtered_recents(
+ room_id, sync_config, leave_token, since_token=timeline_since_token
+ )
+
+ leave_state = yield self.store.get_state_for_event(leave_event_id)
+
+ defer.returnValue(ArchivedSyncResult(
room_id=room_id,
- published=room_id in published_room_ids,
- events=recents,
- prev_batch=prev_batch_token,
- state=current_state_events,
- limited=limited,
- ephemeral=[],
+ timeline=batch,
+ state=leave_state,
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
))
@defer.inlineCallbacks
@@ -208,34 +369,25 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
- if sync_config.sort == "timeline,desc":
- # TODO(mjark): Handle going through events in reverse order?.
- # What does "most recent events" mean when applying the limits mean
- # in this case?
- raise NotImplementedError()
-
now_token = yield self.event_sources.get_current_token()
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
+
presence_source = self.event_sources.sources["presence"]
- presence, presence_key = yield presence_source.get_new_events_for_user(
+ presence, presence_key = yield presence_source.get_new_events(
user=sync_config.user,
from_key=since_token.presence_key,
- limit=sync_config.limit,
+ limit=sync_config.filter.presence_limit(),
+ room_ids=room_ids,
+ # /sync doesn't support guest access, they can't get to this point in code
+ is_guest=False,
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
- typing_source = self.event_sources.sources["typing"]
- typing, typing_key = yield typing_source.get_new_events_for_user(
- user=sync_config.user,
- from_key=since_token.typing_key,
- limit=sync_config.limit,
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token, since_token
)
- now_token = now_token.copy_and_replace("typing_key", typing_key)
-
- typing_by_room = {event["room_id"]: [event] for event in typing}
- for event in typing:
- event.pop("room_id")
- logger.debug("Typing %r", typing_by_room)
rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
@@ -243,35 +395,55 @@ class SyncHandler(BaseHandler):
)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
- room_ids = set(r.room_id for r in rooms)
+ joined_room_ids = set(r.room_id for r in rooms)
else:
- room_ids = yield rm_handler.get_joined_rooms_for_user(
+ joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
sync_config.user
)
- # TODO (mjark): Does public mean "published"?
- published_rooms = yield self.store.get_rooms(is_public=True)
- published_room_ids = set(r["room_id"] for r in published_rooms)
+ timeline_limit = sync_config.filter.timeline_limit()
room_events, _ = yield self.store.get_room_events_stream(
sync_config.user.to_string(),
from_key=since_token.room_key,
to_key=now_token.room_key,
- room_id=None,
- limit=sync_config.limit + 1,
+ limit=timeline_limit + 1,
)
- rooms = []
- if len(room_events) <= sync_config.limit:
+ tags_by_room = yield self.store.get_updated_tags(
+ sync_config.user.to_string(),
+ since_token.private_user_data_key,
+ )
+
+ joined = []
+ archived = []
+ if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them.
+ logger.debug("Got %i events for incremental sync - not limited",
+ len(room_events))
+
+ invite_events = []
+ leave_events = []
events_by_room_id = {}
for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event)
-
- for room_id in room_ids:
+ if event.room_id not in joined_room_ids:
+ if (event.type == EventTypes.Member
+ and event.state_key == sync_config.user.to_string()):
+ if event.membership == Membership.INVITE:
+ invite_events.append(event)
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_events.append(event)
+
+ for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, [])
- state = [event for event in recents if event.is_state()]
+ logger.debug("Events for room %s: %r", room_id, recents)
+ state = {
+ (event.type, event.state_key): event
+ for event in recents if event.is_state()}
+ limited = False
+
if recents:
prev_batch = now_token.copy_and_replace(
"room_key", recents[0].internal_metadata.before
@@ -279,95 +451,87 @@ class SyncHandler(BaseHandler):
else:
prev_batch = now_token
- state = yield self.check_joined_room(
- sync_config, room_id, state
- )
+ just_joined = yield self.check_joined_room(sync_config, state)
+ if just_joined:
+ logger.debug("User has just joined %s: needs full state",
+ room_id)
+ state = yield self.get_state_at(room_id, now_token)
+ # the timeline is inherently limited if we've just joined
+ limited = True
- room_sync = RoomSyncResult(
+ room_sync = JoinedSyncResult(
room_id=room_id,
- published=room_id in published_room_ids,
- events=recents,
- prev_batch=prev_batch,
+ timeline=TimelineBatch(
+ events=recents,
+ prev_batch=prev_batch,
+ limited=limited,
+ ),
state=state,
- limited=False,
- ephemeral=typing_by_room.get(room_id, [])
+ ephemeral=ephemeral_by_room.get(room_id, []),
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
)
+ logger.debug("Result for room %s: %r", room_id, room_sync)
+
if room_sync:
- rooms.append(room_sync)
+ joined.append(room_sync)
+
else:
- for room_id in room_ids:
+ logger.debug("Got %i events for incremental sync - hit limit",
+ len(room_events))
+
+ invite_events = yield self.store.get_invites_for_user(
+ sync_config.user.to_string()
+ )
+
+ leave_events = yield self.store.get_leave_and_ban_events_for_user(
+ sync_config.user.to_string()
+ )
+
+ for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
- published_room_ids, typing_by_room
+ ephemeral_by_room, tags_by_room
)
if room_sync:
- rooms.append(room_sync)
+ joined.append(room_sync)
- defer.returnValue(SyncResult(
- public_user_data=presence,
- private_user_data=[],
- rooms=rooms,
- next_batch=now_token,
- ))
-
- @defer.inlineCallbacks
- def _filter_events_for_client(self, user_id, room_id, events):
- event_id_to_state = yield self.store.get_state_for_events(
- room_id, frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_id),
+ for leave_event in leave_events:
+ room_sync = yield self.incremental_sync_for_archived_room(
+ sync_config, leave_event, since_token, tags_by_room
)
- )
-
- def allowed(event, state):
- if event.type == EventTypes.RoomHistoryVisibility:
- return True
-
- membership_ev = state.get((EventTypes.Member, user_id), None)
- if membership_ev:
- membership = membership_ev.membership
- else:
- membership = Membership.LEAVE
-
- if membership == Membership.JOIN:
- return True
+ archived.append(room_sync)
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- else:
- visibility = "shared"
+ invited = [
+ InvitedSyncResult(room_id=event.room_id, invite=event)
+ for event in invite_events
+ ]
- if visibility == "public":
- return True
- elif visibility == "shared":
- return True
- elif visibility == "joined":
- return membership == Membership.JOIN
- elif visibility == "invited":
- return membership == Membership.INVITE
-
- return True
-
- defer.returnValue([
- event
- for event in events
- if allowed(event, event_id_to_state[event.event_id])
- ])
+ defer.returnValue(SyncResult(
+ presence=presence,
+ joined=joined,
+ invited=invited,
+ archived=archived,
+ next_batch=now_token,
+ ))
@defer.inlineCallbacks
def load_filtered_recents(self, room_id, sync_config, now_token,
since_token=None):
+ """
+ :returns a Deferred TimelineBatch
+ """
limited = True
recents = []
filtering_factor = 2
- load_limit = max(sync_config.limit * filtering_factor, 100)
+ timeline_limit = sync_config.filter.timeline_limit()
+ load_limit = max(timeline_limit * filtering_factor, 100)
max_repeat = 3 # Only try a few times per room, otherwise
room_key = now_token.room_key
end_key = room_key
- while limited and len(recents) < sync_config.limit and max_repeat:
+ while limited and len(recents) < timeline_limit and max_repeat:
events, keys = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
@@ -376,9 +540,9 @@ class SyncHandler(BaseHandler):
)
(room_key, _) = keys
end_key = "s" + room_key.split('-')[-1]
- loaded_recents = sync_config.filter.filter_room_events(events)
+ loaded_recents = sync_config.filter.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
- sync_config.user.to_string(), room_id, loaded_recents,
+ sync_config.user.to_string(), loaded_recents,
)
loaded_recents.extend(recents)
recents = loaded_recents
@@ -386,64 +550,64 @@ class SyncHandler(BaseHandler):
limited = False
max_repeat -= 1
- if len(recents) > sync_config.limit:
- recents = recents[-sync_config.limit:]
+ if len(recents) > timeline_limit:
+ limited = True
+ recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace(
"room_key", room_key
)
- defer.returnValue((recents, prev_batch_token, limited))
+ defer.returnValue(TimelineBatch(
+ events=recents, prev_batch=prev_batch_token, limited=limited
+ ))
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
- published_room_ids, typing_by_room):
+ ephemeral_by_room, tags_by_room):
""" Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to
state.
Returns:
- A Deferred RoomSyncResult
+ A Deferred JoinedSyncResult
"""
+ logger.debug("Doing incremental sync for room %s between %s and %s",
+ room_id, since_token, now_token)
# TODO(mjark): Check for redactions we might have missed.
- recents, prev_batch_token, limited = yield self.load_filtered_recents(
+ batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, since_token,
)
- logging.debug("Recents %r", recents)
+ logging.debug("Recents %r", batch)
- # TODO(mjark): This seems racy since this isn't being passed a
- # token to indicate what point in the stream this is
- current_state = yield self.state_handler.get_current_state(
- room_id
- )
- current_state_events = current_state.values()
+ current_state = yield self.get_state_at(room_id, now_token)
- state_at_previous_sync = yield self.get_state_at_previous_sync(
- room_id, since_token=since_token
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
)
- state_events_delta = yield self.compute_state_delta(
+ state = yield self.compute_state_delta(
since_token=since_token,
previous_state=state_at_previous_sync,
- current_state=current_state_events,
+ current_state=current_state,
)
- state_events_delta = yield self.check_joined_room(
- sync_config, room_id, state_events_delta
- )
+ just_joined = yield self.check_joined_room(sync_config, state)
+ if just_joined:
+ state = yield self.get_state_at(room_id, now_token)
- room_sync = RoomSyncResult(
+ room_sync = JoinedSyncResult(
room_id=room_id,
- published=room_id in published_room_ids,
- events=recents,
- prev_batch=prev_batch_token,
- state=state_events_delta,
- limited=limited,
- ephemeral=typing_by_room.get(room_id, [])
+ timeline=batch,
+ state=state,
+ ephemeral=ephemeral_by_room.get(room_id, []),
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
)
logging.debug("Room sync: %r", room_sync)
@@ -451,58 +615,125 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync)
@defer.inlineCallbacks
- def get_state_at_previous_sync(self, room_id, since_token):
- """ Get the room state at the previous sync the client made.
+ def incremental_sync_for_archived_room(self, sync_config, leave_event,
+ since_token, tags_by_room):
+ """ Get the incremental delta needed to bring the client up to date for
+ the archived room.
Returns:
- A Deferred list of Events.
+ A Deferred ArchivedSyncResult
+ """
+
+ stream_token = yield self.store.get_stream_token_for_event(
+ leave_event.event_id
+ )
+
+ leave_token = since_token.copy_and_replace("room_key", stream_token)
+
+ batch = yield self.load_filtered_recents(
+ leave_event.room_id, sync_config, leave_token, since_token,
+ )
+
+ logging.debug("Recents %r", batch)
+
+ state_events_at_leave = yield self.store.get_state_for_event(
+ leave_event.event_id
+ )
+
+ state_at_previous_sync = yield self.get_state_at(
+ leave_event.room_id, stream_position=since_token
+ )
+
+ state_events_delta = yield self.compute_state_delta(
+ since_token=since_token,
+ previous_state=state_at_previous_sync,
+ current_state=state_events_at_leave,
+ )
+
+ room_sync = ArchivedSyncResult(
+ room_id=leave_event.room_id,
+ timeline=batch,
+ state=state_events_delta,
+ private_user_data=self.private_user_data_for_room(
+ leave_event.room_id, tags_by_room
+ ),
+ )
+
+ logging.debug("Room sync: %r", room_sync)
+
+ defer.returnValue(room_sync)
+
+ @defer.inlineCallbacks
+ def get_state_after_event(self, event):
+ """
+ Get the room state after the given event
+
+ :param synapse.events.EventBase event: event of interest
+ :return: A Deferred map from ((type, state_key)->Event)
+ """
+ state = yield self.store.get_state_for_event(event.event_id)
+ if event.is_state():
+ state = state.copy()
+ state[(event.type, event.state_key)] = event
+ defer.returnValue(state)
+
+ @defer.inlineCallbacks
+ def get_state_at(self, room_id, stream_position):
+ """ Get the room state at a particular stream position
+ :param str room_id: room for which to get state
+ :param StreamToken stream_position: point at which to get state
+ :returns: A Deferred map from ((type, state_key)->Event)
"""
last_events, token = yield self.store.get_recent_events_for_room(
- room_id, end_token=since_token.room_key, limit=1,
+ room_id, end_token=stream_position.room_key, limit=1,
)
if last_events:
- last_event = last_events[0]
- last_context = yield self.state_handler.compute_event_context(
- last_event
- )
- if last_event.is_state():
- state = [last_event] + last_context.current_state.values()
- else:
- state = last_context.current_state.values()
+ last_event = last_events[-1]
+ state = yield self.get_state_after_event(last_event)
+
else:
- state = ()
+ # no events in this room - so presumably no state
+ state = {}
defer.returnValue(state)
def compute_state_delta(self, since_token, previous_state, current_state):
""" Works out the differnce in state between the current state and the
state the client got when it last performed a sync.
- Returns:
- A list of events.
+
+ :param str since_token: the point we are comparing against
+ :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
+ state to compare to
+ :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
+ new state
+
+ :returns A new event dictionary
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
# updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
- previous_dict = {event.event_id: event for event in previous_state}
- state_delta = []
- for event in current_state:
- if event.event_id not in previous_dict:
- state_delta.append(event)
+
+ state_delta = {}
+ for key, event in current_state.iteritems():
+ if (key not in previous_state or
+ previous_state[key].event_id != event.event_id):
+ state_delta[key] = event
return state_delta
- @defer.inlineCallbacks
- def check_joined_room(self, sync_config, room_id, state_delta):
- joined = False
- for event in state_delta:
- if (
- event.type == EventTypes.Member
- and event.state_key == sync_config.user.to_string()
- ):
- if event.content["membership"] == Membership.JOIN:
- joined = True
-
- if joined:
- res = yield self.state_handler.get_current_state(room_id)
- state_delta = res.values()
-
- defer.returnValue(state_delta)
+ def check_joined_room(self, sync_config, state_delta):
+ """
+ Check if the user has just joined the given room (so should
+ be given the full state)
+
+ :param sync_config:
+ :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the
+ difference in state since the last sync
+
+ :returns A deferred Tuple (state_delta, limited)
+ """
+ join_event = state_delta.get((
+ EventTypes.Member, sync_config.user.to_string()), None)
+ if join_event is not None:
+ if join_event.content["membership"] == Membership.JOIN:
+ return True
+ return False
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d7096aab8c..2846f3e6e8 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -246,17 +246,12 @@ class TypingNotificationEventSource(object):
},
}
- @defer.inlineCallbacks
- def get_new_events_for_user(self, user, from_key, limit):
+ def get_new_events(self, from_key, room_ids, **kwargs):
from_key = int(from_key)
handler = self.handler()
- joined_room_ids = (
- yield self.room_member_handler().get_joined_rooms_for_user(user)
- )
-
events = []
- for room_id in joined_room_ids:
+ for room_id in room_ids:
if room_id not in handler._room_serials:
continue
if handler._room_serials[room_id] <= from_key:
@@ -264,7 +259,7 @@ class TypingNotificationEventSource(object):
events.append(self._make_event_for(room_id))
- defer.returnValue((events, handler._latest_room_serial))
+ return events, handler._latest_room_serial
def get_current_key(self):
return self.handler()._latest_room_serial
|