diff --git a/synapse/__init__.py b/synapse/__init__.py
index bc50bec9db..7de51fbe8d 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.13.3"
+__version__ = "0.14.0"
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index a08c170f1d..acf74c8761 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -29,13 +29,14 @@ from .key import KeyConfig
from .saml2 import SAML2Config
from .cas import CasConfig
from .password import PasswordConfig
+from .jwt import JWTConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
- PasswordConfig,):
+ JWTConfig, PasswordConfig,):
pass
diff --git a/synapse/config/jwt.py b/synapse/config/jwt.py
new file mode 100644
index 0000000000..4cb092bbec
--- /dev/null
+++ b/synapse/config/jwt.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 Niklas Riekenbrauck
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class JWTConfig(Config):
+ def read_config(self, config):
+ jwt_config = config.get("jwt_config", None)
+ if jwt_config:
+ self.jwt_enabled = jwt_config.get("enabled", False)
+ self.jwt_secret = jwt_config["secret"]
+ self.jwt_algorithm = jwt_config["algorithm"]
+ else:
+ self.jwt_enabled = False
+ self.jwt_secret = None
+ self.jwt_algorithm = None
+
+ def default_config(self, **kwargs):
+ return """\
+ # jwt_config:
+ # enabled: true
+ # secret: "a secret"
+ # algorithm: "HS256"
+ """
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 23f8b612ae..13154b1723 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -31,7 +31,10 @@ class _EventInternalMetadata(object):
return dict(self.__dict__)
def is_outlier(self):
- return hasattr(self, "outlier") and self.outlier
+ return getattr(self, "outlier", False)
+
+ def is_invite_from_remote(self):
+ return getattr(self, "invite_from_remote", False)
def _event_dict_property(key):
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 66d2c01123..f4dbf47c1d 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,8 +17,9 @@ from synapse.appservice.scheduler import AppServiceScheduler
from synapse.appservice.api import ApplicationServiceApi
from .register import RegistrationHandler
from .room import (
- RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler,
+ RoomCreationHandler, RoomListHandler, RoomContextHandler,
)
+from .room_member import RoomMemberHandler
from .message import MessageHandler
from .events import EventStreamHandler, EventHandler
from .federation import FederationHandler
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 90eabb6eb7..5eeb7042c6 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -41,8 +41,9 @@ class BaseHandler(object):
"""
Common base class for the event handlers.
- :type store: synapse.storage.events.StateStore
- :type state_handler: synapse.state.StateHandler
+ Attributes:
+ store (synapse.storage.events.StateStore):
+ state_handler (synapse.state.StateHandler):
"""
def __init__(self, hs):
@@ -65,11 +66,12 @@ class BaseHandler(object):
""" Returns dict of user_id -> list of events that user is allowed to
see.
- :param (str, bool) user_tuples: (user id, is_peeking) for each
- user to be checked. is_peeking should be true if:
- * the user is not currently a member of the room, and:
- * the user has not been a member of the room since the given
- events
+ Args:
+ user_tuples (str, bool): (user id, is_peeking) for each user to be
+ checked. is_peeking should be true if:
+ * the user is not currently a member of the room, and:
+ * the user has not been a member of the room since the
+ given events
"""
forgotten = yield defer.gatherResults([
self.store.who_forgot_in_room(
@@ -165,13 +167,16 @@ class BaseHandler(object):
"""
Check which events a user is allowed to see
- :param str user_id: user id to be checked
- :param [synapse.events.EventBase] events: list of events to be checked
- :param bool is_peeking should be True if:
+ Args:
+ user_id(str): user id to be checked
+ events([synapse.events.EventBase]): list of events to be checked
+ is_peeking(bool): should be True if:
* the user is not currently a member of the room, and:
* the user has not been a member of the room since the given
events
- :rtype [synapse.events.EventBase]
+
+ Returns:
+ [synapse.events.EventBase]
"""
types = (
(EventTypes.RoomHistoryVisibility, ""),
@@ -261,8 +266,7 @@ class BaseHandler(object):
context = yield state_handler.compute_event_context(
builder,
- old_state=(prev_member_event,),
- outlier=True
+ old_state=(prev_member_event,)
)
if builder.is_state():
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 82d458b424..d5d6faa85f 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -163,9 +163,13 @@ class AuthHandler(BaseHandler):
def get_session_id(self, clientdict):
"""
Gets the session ID for a client given the client dictionary
- :param clientdict: The dictionary sent by the client in the request
- :return: The string session ID the client sent. If the client did not
- send a session ID, returns None.
+
+ Args:
+ clientdict: The dictionary sent by the client in the request
+
+ Returns:
+ str|None: The string session ID the client sent. If the client did
+ not send a session ID, returns None.
"""
sid = None
if clientdict and 'auth' in clientdict:
@@ -179,9 +183,11 @@ class AuthHandler(BaseHandler):
Store a key-value pair into the sessions data associated with this
request. This data is stored server-side and cannot be modified by
the client.
- :param session_id: (string) The ID of this session as returned from check_auth
- :param key: (string) The key to store the data under
- :param value: (any) The data to store
+
+ Args:
+ session_id (string): The ID of this session as returned from check_auth
+ key (string): The key to store the data under
+ value (any): The data to store
"""
sess = self._get_session_info(session_id)
sess.setdefault('serverdict', {})[key] = value
@@ -190,9 +196,11 @@ class AuthHandler(BaseHandler):
def get_session_data(self, session_id, key, default=None):
"""
Retrieve data stored with set_session_data
- :param session_id: (string) The ID of this session as returned from check_auth
- :param key: (string) The key to store the data under
- :param default: (any) Value to return if the key has not been set
+
+ Args:
+ session_id (string): The ID of this session as returned from check_auth
+ key (string): The key to store the data under
+ default (any): Value to return if the key has not been set
"""
sess = self._get_session_info(session_id)
return sess.setdefault('serverdict', {}).get(key, default)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 267fedf114..adafd06b24 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -102,8 +102,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, state=None,
- auth_chain=None):
+ def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -174,11 +173,7 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
- yield self._handle_new_events(
- origin,
- event_infos,
- outliers=True
- )
+ yield self._handle_new_events(origin, event_infos)
try:
context, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -761,6 +756,7 @@ class FederationHandler(BaseHandler):
event = pdu
event.internal_metadata.outlier = True
+ event.internal_metadata.invite_from_remote = True
event.signatures.update(
compute_event_signature(
@@ -1069,9 +1065,6 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None):
-
- outlier = event.internal_metadata.is_outlier()
-
context = yield self._prep_event(
origin, event,
state=state,
@@ -1087,14 +1080,12 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- is_new_state=not outlier,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
- def _handle_new_events(self, origin, event_infos, backfilled=False,
- outliers=False):
+ def _handle_new_events(self, origin, event_infos, backfilled=False):
contexts = yield defer.gatherResults(
[
self._prep_event(
@@ -1113,7 +1104,6 @@ class FederationHandler(BaseHandler):
for ev_info, context in itertools.izip(event_infos, contexts)
],
backfilled=backfilled,
- is_new_state=(not outliers and not backfilled),
)
@defer.inlineCallbacks
@@ -1128,11 +1118,9 @@ class FederationHandler(BaseHandler):
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
- ctx = yield self.state_handler.compute_event_context(
- e, outlier=True,
- )
- events_to_context[e.event_id] = ctx
e.internal_metadata.outlier = True
+ ctx = yield self.state_handler.compute_event_context(e)
+ events_to_context[e.event_id] = ctx
event_map = {
e.event_id: e
@@ -1176,16 +1164,14 @@ class FederationHandler(BaseHandler):
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
- is_new_state=False,
)
new_event_context = yield self.state_handler.compute_event_context(
- event, old_state=state, outlier=False,
+ event, old_state=state
)
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
- is_new_state=True,
current_state=state,
)
@@ -1193,10 +1179,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
- outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context(
- event, old_state=state, outlier=outlier,
+ event, old_state=state,
)
if not auth_events:
@@ -1718,13 +1703,15 @@ class FederationHandler(BaseHandler):
def _check_signature(self, event, auth_events):
"""
Checks that the signature in the event is consistent with its invite.
- :param event (Event): The m.room.member event to check
- :param auth_events (dict<(event type, state_key), event>)
- :raises
- AuthError if signature didn't match any keys, or key has been
+ Args:
+ event (Event): The m.room.member event to check
+ auth_events (dict<(event type, state_key), event>):
+
+ Raises:
+ AuthError: if signature didn't match any keys, or key has been
revoked,
- SynapseError if a transient error meant a key couldn't be checked
+ SynapseError: if a transient error meant a key couldn't be checked
for revocation.
"""
signed = event.content["third_party_invite"]["signed"]
@@ -1766,12 +1753,13 @@ class FederationHandler(BaseHandler):
"""
Checks whether public_key has been revoked.
- :param public_key (str): base-64 encoded public key.
- :param url (str): Key revocation URL.
+ Args:
+ public_key (str): base-64 encoded public key.
+ url (str): Key revocation URL.
- :raises
- AuthError if they key has been revoked.
- SynapseError if a transient error meant a key couldn't be checked
+ Raises:
+ AuthError: if they key has been revoked.
+ SynapseError: if a transient error meant a key couldn't be checked
for revocation.
"""
try:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..0bb111d047 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -21,6 +21,7 @@ from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.types import UserID, RoomStreamToken, StreamToken
@@ -556,14 +557,7 @@ class MessageHandler(BaseHandler):
except:
logger.exception("Failed to get snapshot")
- # Only do N rooms at once
- n = 5
- d_list = [handle_room(e) for e in room_list]
- for i in range(0, len(d_list), n):
- yield defer.gatherResults(
- d_list[i:i + n],
- consumeErrors=True
- ).addErrback(unwrapFirstError)
+ yield concurrently_execute(handle_room, room_list, 10)
account_data_events = []
for account_data_type, content in account_data.items():
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 133183a257..3e1d9282d7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,20 +18,17 @@ from twisted.internet import defer
from ._base import BaseHandler
-from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken, Requester
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
- EventTypes, Membership, JoinRules, RoomCreationPreset,
+ EventTypes, JoinRules, RoomCreationPreset,
)
-from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
-from synapse.util import stringutils, unwrapFirstError
+from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.util import stringutils
+from synapse.util.async import concurrently_execute
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.caches.response_cache import ResponseCache
-from signedjson.sign import verify_signed_json
-from signedjson.key import decode_verify_key_bytes
-
from collections import OrderedDict
-from unpaddedbase64 import decode_base64
import logging
import math
@@ -357,588 +354,6 @@ class RoomCreationHandler(BaseHandler):
)
-class RoomMemberHandler(BaseHandler):
- # TODO(paul): This handler currently contains a messy conflation of
- # low-level API that works on UserID objects and so on, and REST-level
- # API that takes ID strings and returns pagination chunks. These concerns
- # ought to be separated out a lot better.
-
- def __init__(self, hs):
- super(RoomMemberHandler, self).__init__(hs)
-
- self.clock = hs.get_clock()
-
- self.distributor = hs.get_distributor()
- self.distributor.declare("user_joined_room")
- self.distributor.declare("user_left_room")
-
- @defer.inlineCallbacks
- def get_room_members(self, room_id):
- users = yield self.store.get_users_in_room(room_id)
-
- defer.returnValue([UserID.from_string(u) for u in users])
-
- @defer.inlineCallbacks
- def fetch_room_distributions_into(self, room_id, localusers=None,
- remotedomains=None, ignore_user=None):
- """Fetch the distribution of a room, adding elements to either
- 'localusers' or 'remotedomains', which should be a set() if supplied.
- If ignore_user is set, ignore that user.
-
- This function returns nothing; its result is performed by the
- side-effect on the two passed sets. This allows easy accumulation of
- member lists of multiple rooms at once if required.
- """
- members = yield self.get_room_members(room_id)
- for member in members:
- if ignore_user is not None and member == ignore_user:
- continue
-
- if self.hs.is_mine(member):
- if localusers is not None:
- localusers.add(member)
- else:
- if remotedomains is not None:
- remotedomains.add(member.domain)
-
- @defer.inlineCallbacks
- def update_membership(
- self,
- requester,
- target,
- room_id,
- action,
- txn_id=None,
- remote_room_hosts=None,
- third_party_signed=None,
- ratelimit=True,
- ):
- effective_membership_state = action
- if action in ["kick", "unban"]:
- effective_membership_state = "leave"
- elif action == "forget":
- effective_membership_state = "leave"
-
- if third_party_signed is not None:
- replication = self.hs.get_replication_layer()
- yield replication.exchange_third_party_invite(
- third_party_signed["sender"],
- target.to_string(),
- room_id,
- third_party_signed,
- )
-
- msg_handler = self.hs.get_handlers().message_handler
-
- content = {"membership": effective_membership_state}
- if requester.is_guest:
- content["kind"] = "guest"
-
- event, context = yield msg_handler.create_event(
- {
- "type": EventTypes.Member,
- "content": content,
- "room_id": room_id,
- "sender": requester.user.to_string(),
- "state_key": target.to_string(),
-
- # For backwards compatibility:
- "membership": effective_membership_state,
- },
- token_id=requester.access_token_id,
- txn_id=txn_id,
- )
-
- old_state = context.current_state.get((EventTypes.Member, event.state_key))
- old_membership = old_state.content.get("membership") if old_state else None
- if action == "unban" and old_membership != "ban":
- raise SynapseError(
- 403,
- "Cannot unban user who was not banned (membership=%s)" % old_membership,
- errcode=Codes.BAD_STATE
- )
- if old_membership == "ban" and action != "unban":
- raise SynapseError(
- 403,
- "Cannot %s user who was is banned" % (action,),
- errcode=Codes.BAD_STATE
- )
-
- member_handler = self.hs.get_handlers().room_member_handler
- yield member_handler.send_membership_event(
- requester,
- event,
- context,
- ratelimit=ratelimit,
- remote_room_hosts=remote_room_hosts,
- )
-
- if action == "forget":
- yield self.forget(requester.user, room_id)
-
- @defer.inlineCallbacks
- def send_membership_event(
- self,
- requester,
- event,
- context,
- remote_room_hosts=None,
- ratelimit=True,
- ):
- """
- Change the membership status of a user in a room.
-
- Args:
- requester (Requester): The local user who requested the membership
- event. If None, certain checks, like whether this homeserver can
- act as the sender, will be skipped.
- event (SynapseEvent): The membership event.
- context: The context of the event.
- is_guest (bool): Whether the sender is a guest.
- room_hosts ([str]): Homeservers which are likely to already be in
- the room, and could be danced with in order to join this
- homeserver for the first time.
- ratelimit (bool): Whether to rate limit this request.
- Raises:
- SynapseError if there was a problem changing the membership.
- """
- remote_room_hosts = remote_room_hosts or []
-
- target_user = UserID.from_string(event.state_key)
- room_id = event.room_id
-
- if requester is not None:
- sender = UserID.from_string(event.sender)
- assert sender == requester.user, (
- "Sender (%s) must be same as requester (%s)" %
- (sender, requester.user)
- )
- assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
- else:
- requester = Requester(target_user, None, False)
-
- message_handler = self.hs.get_handlers().message_handler
- prev_event = message_handler.deduplicate_state_event(event, context)
- if prev_event is not None:
- return
-
- action = "send"
-
- if event.membership == Membership.JOIN:
- if requester.is_guest and not self._can_guest_join(context.current_state):
- # This should be an auth check, but guests are a local concept,
- # so don't really fit into the general auth process.
- raise AuthError(403, "Guest access not allowed")
- do_remote_join_dance, remote_room_hosts = self._should_do_dance(
- context,
- (self.get_inviter(event.state_key, context.current_state)),
- remote_room_hosts,
- )
- if do_remote_join_dance:
- action = "remote_join"
- elif event.membership == Membership.LEAVE:
- is_host_in_room = self.is_host_in_room(context.current_state)
-
- if not is_host_in_room:
- # perhaps we've been invited
- inviter = self.get_inviter(target_user.to_string(), context.current_state)
- if not inviter:
- raise SynapseError(404, "Not a known room")
-
- if self.hs.is_mine(inviter):
- # the inviter was on our server, but has now left. Carry on
- # with the normal rejection codepath.
- #
- # This is a bit of a hack, because the room might still be
- # active on other servers.
- pass
- else:
- # send the rejection to the inviter's HS.
- remote_room_hosts = remote_room_hosts + [inviter.domain]
- action = "remote_reject"
-
- federation_handler = self.hs.get_handlers().federation_handler
-
- if action == "remote_join":
- if len(remote_room_hosts) == 0:
- raise SynapseError(404, "No known servers")
-
- # We don't do an auth check if we are doing an invite
- # join dance for now, since we're kinda implicitly checking
- # that we are allowed to join when we decide whether or not we
- # need to do the invite/join dance.
- yield federation_handler.do_invite_join(
- remote_room_hosts,
- event.room_id,
- event.user_id,
- event.content,
- )
- elif action == "remote_reject":
- yield federation_handler.do_remotely_reject_invite(
- remote_room_hosts,
- room_id,
- event.user_id
- )
- else:
- yield self.handle_new_client_event(
- requester,
- event,
- context,
- extra_users=[target_user],
- ratelimit=ratelimit,
- )
-
- prev_member_event = context.current_state.get(
- (EventTypes.Member, target_user.to_string()),
- None
- )
-
- if event.membership == Membership.JOIN:
- if not prev_member_event or prev_member_event.membership != Membership.JOIN:
- # Only fire user_joined_room if the user has acutally joined the
- # room. Don't bother if the user is just changing their profile
- # info.
- yield user_joined_room(self.distributor, target_user, room_id)
- elif event.membership == Membership.LEAVE:
- if prev_member_event and prev_member_event.membership == Membership.JOIN:
- user_left_room(self.distributor, target_user, room_id)
-
- def _can_guest_join(self, current_state):
- """
- Returns whether a guest can join a room based on its current state.
- """
- guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
- return (
- guest_access
- and guest_access.content
- and "guest_access" in guest_access.content
- and guest_access.content["guest_access"] == "can_join"
- )
-
- def _should_do_dance(self, context, inviter, room_hosts=None):
- # TODO: Shouldn't this be remote_room_host?
- room_hosts = room_hosts or []
-
- is_host_in_room = self.is_host_in_room(context.current_state)
- if is_host_in_room:
- return False, room_hosts
-
- if inviter and not self.hs.is_mine(inviter):
- room_hosts.append(inviter.domain)
-
- return True, room_hosts
-
- @defer.inlineCallbacks
- def lookup_room_alias(self, room_alias):
- """
- Get the room ID associated with a room alias.
-
- Args:
- room_alias (RoomAlias): The alias to look up.
- Returns:
- A tuple of:
- The room ID as a RoomID object.
- Hosts likely to be participating in the room ([str]).
- Raises:
- SynapseError if room alias could not be found.
- """
- directory_handler = self.hs.get_handlers().directory_handler
- mapping = yield directory_handler.get_association(room_alias)
-
- if not mapping:
- raise SynapseError(404, "No such room alias")
-
- room_id = mapping["room_id"]
- servers = mapping["servers"]
-
- defer.returnValue((RoomID.from_string(room_id), servers))
-
- def get_inviter(self, user_id, current_state):
- prev_state = current_state.get((EventTypes.Member, user_id))
- if prev_state and prev_state.membership == Membership.INVITE:
- return UserID.from_string(prev_state.user_id)
- return None
-
- @defer.inlineCallbacks
- def get_joined_rooms_for_user(self, user):
- """Returns a list of roomids that the user has any of the given
- membership states in."""
-
- rooms = yield self.store.get_rooms_for_user(
- user.to_string(),
- )
-
- # For some reason the list of events contains duplicates
- # TODO(paul): work out why because I really don't think it should
- room_ids = set(r.room_id for r in rooms)
-
- defer.returnValue(room_ids)
-
- @defer.inlineCallbacks
- def do_3pid_invite(
- self,
- room_id,
- inviter,
- medium,
- address,
- id_server,
- requester,
- txn_id
- ):
- invitee = yield self._lookup_3pid(
- id_server, medium, address
- )
-
- if invitee:
- handler = self.hs.get_handlers().room_member_handler
- yield handler.update_membership(
- requester,
- UserID.from_string(invitee),
- room_id,
- "invite",
- txn_id=txn_id,
- )
- else:
- yield self._make_and_store_3pid_invite(
- requester,
- id_server,
- medium,
- address,
- room_id,
- inviter,
- txn_id=txn_id
- )
-
- @defer.inlineCallbacks
- def _lookup_3pid(self, id_server, medium, address):
- """Looks up a 3pid in the passed identity server.
-
- Args:
- id_server (str): The server name (including port, if required)
- of the identity server to use.
- medium (str): The type of the third party identifier (e.g. "email").
- address (str): The third party identifier (e.g. "foo@example.com").
-
- Returns:
- (str) the matrix ID of the 3pid, or None if it is not recognized.
- """
- try:
- data = yield self.hs.get_simple_http_client().get_json(
- "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
- {
- "medium": medium,
- "address": address,
- }
- )
-
- if "mxid" in data:
- if "signatures" not in data:
- raise AuthError(401, "No signatures on 3pid binding")
- self.verify_any_signature(data, id_server)
- defer.returnValue(data["mxid"])
-
- except IOError as e:
- logger.warn("Error from identity server lookup: %s" % (e,))
- defer.returnValue(None)
-
- @defer.inlineCallbacks
- def verify_any_signature(self, data, server_hostname):
- if server_hostname not in data["signatures"]:
- raise AuthError(401, "No signature from server %s" % (server_hostname,))
- for key_name, signature in data["signatures"][server_hostname].items():
- key_data = yield self.hs.get_simple_http_client().get_json(
- "%s%s/_matrix/identity/api/v1/pubkey/%s" %
- (id_server_scheme, server_hostname, key_name,),
- )
- if "public_key" not in key_data:
- raise AuthError(401, "No public key named %s from %s" %
- (key_name, server_hostname,))
- verify_signed_json(
- data,
- server_hostname,
- decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
- )
- return
-
- @defer.inlineCallbacks
- def _make_and_store_3pid_invite(
- self,
- requester,
- id_server,
- medium,
- address,
- room_id,
- user,
- txn_id
- ):
- room_state = yield self.hs.get_state_handler().get_current_state(room_id)
-
- inviter_display_name = ""
- inviter_avatar_url = ""
- member_event = room_state.get((EventTypes.Member, user.to_string()))
- if member_event:
- inviter_display_name = member_event.content.get("displayname", "")
- inviter_avatar_url = member_event.content.get("avatar_url", "")
-
- canonical_room_alias = ""
- canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
- if canonical_alias_event:
- canonical_room_alias = canonical_alias_event.content.get("alias", "")
-
- room_name = ""
- room_name_event = room_state.get((EventTypes.Name, ""))
- if room_name_event:
- room_name = room_name_event.content.get("name", "")
-
- room_join_rules = ""
- join_rules_event = room_state.get((EventTypes.JoinRules, ""))
- if join_rules_event:
- room_join_rules = join_rules_event.content.get("join_rule", "")
-
- room_avatar_url = ""
- room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
- if room_avatar_event:
- room_avatar_url = room_avatar_event.content.get("url", "")
-
- token, public_keys, fallback_public_key, display_name = (
- yield self._ask_id_server_for_third_party_invite(
- id_server=id_server,
- medium=medium,
- address=address,
- room_id=room_id,
- inviter_user_id=user.to_string(),
- room_alias=canonical_room_alias,
- room_avatar_url=room_avatar_url,
- room_join_rules=room_join_rules,
- room_name=room_name,
- inviter_display_name=inviter_display_name,
- inviter_avatar_url=inviter_avatar_url
- )
- )
-
- msg_handler = self.hs.get_handlers().message_handler
- yield msg_handler.create_and_send_nonmember_event(
- requester,
- {
- "type": EventTypes.ThirdPartyInvite,
- "content": {
- "display_name": display_name,
- "public_keys": public_keys,
-
- # For backwards compatibility:
- "key_validity_url": fallback_public_key["key_validity_url"],
- "public_key": fallback_public_key["public_key"],
- },
- "room_id": room_id,
- "sender": user.to_string(),
- "state_key": token,
- },
- txn_id=txn_id,
- )
-
- @defer.inlineCallbacks
- def _ask_id_server_for_third_party_invite(
- self,
- id_server,
- medium,
- address,
- room_id,
- inviter_user_id,
- room_alias,
- room_avatar_url,
- room_join_rules,
- room_name,
- inviter_display_name,
- inviter_avatar_url
- ):
- """
- Asks an identity server for a third party invite.
-
- :param id_server (str): hostname + optional port for the identity server.
- :param medium (str): The literal string "email".
- :param address (str): The third party address being invited.
- :param room_id (str): The ID of the room to which the user is invited.
- :param inviter_user_id (str): The user ID of the inviter.
- :param room_alias (str): An alias for the room, for cosmetic
- notifications.
- :param room_avatar_url (str): The URL of the room's avatar, for cosmetic
- notifications.
- :param room_join_rules (str): The join rules of the email
- (e.g. "public").
- :param room_name (str): The m.room.name of the room.
- :param inviter_display_name (str): The current display name of the
- inviter.
- :param inviter_avatar_url (str): The URL of the inviter's avatar.
-
- :return: A deferred tuple containing:
- token (str): The token which must be signed to prove authenticity.
- public_keys ([{"public_key": str, "key_validity_url": str}]):
- public_key is a base64-encoded ed25519 public key.
- fallback_public_key: One element from public_keys.
- display_name (str): A user-friendly name to represent the invited
- user.
- """
-
- is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
- id_server_scheme, id_server,
- )
-
- invite_config = {
- "medium": medium,
- "address": address,
- "room_id": room_id,
- "room_alias": room_alias,
- "room_avatar_url": room_avatar_url,
- "room_join_rules": room_join_rules,
- "room_name": room_name,
- "sender": inviter_user_id,
- "sender_display_name": inviter_display_name,
- "sender_avatar_url": inviter_avatar_url,
- }
-
- if self.hs.config.invite_3pid_guest:
- registration_handler = self.hs.get_handlers().registration_handler
- guest_access_token = yield registration_handler.guest_access_token_for(
- medium=medium,
- address=address,
- inviter_user_id=inviter_user_id,
- )
-
- guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
- guest_access_token
- )
-
- invite_config.update({
- "guest_access_token": guest_access_token,
- "guest_user_id": guest_user_info["user"].to_string(),
- })
-
- data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
- is_url,
- invite_config
- )
- # TODO: Check for success
- token = data["token"]
- public_keys = data.get("public_keys", [])
- if "public_key" in data:
- fallback_public_key = {
- "public_key": data["public_key"],
- "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
- id_server_scheme, id_server,
- ),
- }
- else:
- fallback_public_key = public_keys[0]
-
- if not public_keys:
- public_keys.append(fallback_public_key)
- display_name = data["display_name"]
- defer.returnValue((token, public_keys, fallback_public_key, display_name))
-
- def forget(self, user, room_id):
- return self.store.forget(user.to_string(), room_id)
-
-
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
@@ -954,6 +369,8 @@ class RoomListHandler(BaseHandler):
def _get_public_room_list(self):
room_ids = yield self.store.get_public_room_ids()
+ results = []
+
@defer.inlineCallbacks
def handle_room(room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
@@ -1014,18 +431,12 @@ class RoomListHandler(BaseHandler):
joined_users = yield self.store.get_users_in_room(room_id)
result["num_joined_members"] = len(joined_users)
- defer.returnValue(result)
+ results.append(result)
- result = []
- for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
- chunk_result = yield defer.gatherResults([
- handle_room(room_id)
- for room_id in chunk
- ], consumeErrors=True).addErrback(unwrapFirstError)
- result.extend(v for v in chunk_result if v)
+ yield concurrently_execute(handle_room, room_ids, 10)
# FIXME (erikj): START is no longer a valid value
- defer.returnValue({"start": "START", "end": "END", "chunk": result})
+ defer.returnValue({"start": "START", "end": "END", "chunk": results})
class RoomContextHandler(BaseHandler):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
new file mode 100644
index 0000000000..01f833c371
--- /dev/null
+++ b/synapse/handlers/room_member.py
@@ -0,0 +1,646 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+from synapse.types import UserID, RoomID, Requester
+from synapse.api.constants import (
+ EventTypes, Membership,
+)
+from synapse.api.errors import AuthError, SynapseError, Codes
+from synapse.util.logcontext import preserve_context_over_fn
+
+from signedjson.sign import verify_signed_json
+from signedjson.key import decode_verify_key_bytes
+
+from unpaddedbase64 import decode_base64
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+id_server_scheme = "https://"
+
+
+def user_left_room(distributor, user, room_id):
+ return preserve_context_over_fn(
+ distributor.fire,
+ "user_left_room", user=user, room_id=room_id
+ )
+
+
+def user_joined_room(distributor, user, room_id):
+ return preserve_context_over_fn(
+ distributor.fire,
+ "user_joined_room", user=user, room_id=room_id
+ )
+
+
+class RoomMemberHandler(BaseHandler):
+ # TODO(paul): This handler currently contains a messy conflation of
+ # low-level API that works on UserID objects and so on, and REST-level
+ # API that takes ID strings and returns pagination chunks. These concerns
+ # ought to be separated out a lot better.
+
+ def __init__(self, hs):
+ super(RoomMemberHandler, self).__init__(hs)
+
+ self.clock = hs.get_clock()
+
+ self.distributor = hs.get_distributor()
+ self.distributor.declare("user_joined_room")
+ self.distributor.declare("user_left_room")
+
+ @defer.inlineCallbacks
+ def get_room_members(self, room_id):
+ users = yield self.store.get_users_in_room(room_id)
+
+ defer.returnValue([UserID.from_string(u) for u in users])
+
+ @defer.inlineCallbacks
+ def fetch_room_distributions_into(self, room_id, localusers=None,
+ remotedomains=None, ignore_user=None):
+ """Fetch the distribution of a room, adding elements to either
+ 'localusers' or 'remotedomains', which should be a set() if supplied.
+ If ignore_user is set, ignore that user.
+
+ This function returns nothing; its result is performed by the
+ side-effect on the two passed sets. This allows easy accumulation of
+ member lists of multiple rooms at once if required.
+ """
+ members = yield self.get_room_members(room_id)
+ for member in members:
+ if ignore_user is not None and member == ignore_user:
+ continue
+
+ if self.hs.is_mine(member):
+ if localusers is not None:
+ localusers.add(member)
+ else:
+ if remotedomains is not None:
+ remotedomains.add(member.domain)
+
+ @defer.inlineCallbacks
+ def update_membership(
+ self,
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=None,
+ remote_room_hosts=None,
+ third_party_signed=None,
+ ratelimit=True,
+ ):
+ effective_membership_state = action
+ if action in ["kick", "unban"]:
+ effective_membership_state = "leave"
+
+ if third_party_signed is not None:
+ replication = self.hs.get_replication_layer()
+ yield replication.exchange_third_party_invite(
+ third_party_signed["sender"],
+ target.to_string(),
+ room_id,
+ third_party_signed,
+ )
+
+ msg_handler = self.hs.get_handlers().message_handler
+
+ content = {"membership": effective_membership_state}
+ if requester.is_guest:
+ content["kind"] = "guest"
+
+ event, context = yield msg_handler.create_event(
+ {
+ "type": EventTypes.Member,
+ "content": content,
+ "room_id": room_id,
+ "sender": requester.user.to_string(),
+ "state_key": target.to_string(),
+
+ # For backwards compatibility:
+ "membership": effective_membership_state,
+ },
+ token_id=requester.access_token_id,
+ txn_id=txn_id,
+ )
+
+ old_state = context.current_state.get((EventTypes.Member, event.state_key))
+ old_membership = old_state.content.get("membership") if old_state else None
+ if action == "unban" and old_membership != "ban":
+ raise SynapseError(
+ 403,
+ "Cannot unban user who was not banned (membership=%s)" % old_membership,
+ errcode=Codes.BAD_STATE
+ )
+ if old_membership == "ban" and action != "unban":
+ raise SynapseError(
+ 403,
+ "Cannot %s user who was is banned" % (action,),
+ errcode=Codes.BAD_STATE
+ )
+
+ member_handler = self.hs.get_handlers().room_member_handler
+ yield member_handler.send_membership_event(
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
+ remote_room_hosts=remote_room_hosts,
+ )
+
+ @defer.inlineCallbacks
+ def send_membership_event(
+ self,
+ requester,
+ event,
+ context,
+ remote_room_hosts=None,
+ ratelimit=True,
+ ):
+ """
+ Change the membership status of a user in a room.
+
+ Args:
+ requester (Requester): The local user who requested the membership
+ event. If None, certain checks, like whether this homeserver can
+ act as the sender, will be skipped.
+ event (SynapseEvent): The membership event.
+ context: The context of the event.
+ is_guest (bool): Whether the sender is a guest.
+ room_hosts ([str]): Homeservers which are likely to already be in
+ the room, and could be danced with in order to join this
+ homeserver for the first time.
+ ratelimit (bool): Whether to rate limit this request.
+ Raises:
+ SynapseError if there was a problem changing the membership.
+ """
+ remote_room_hosts = remote_room_hosts or []
+
+ target_user = UserID.from_string(event.state_key)
+ room_id = event.room_id
+
+ if requester is not None:
+ sender = UserID.from_string(event.sender)
+ assert sender == requester.user, (
+ "Sender (%s) must be same as requester (%s)" %
+ (sender, requester.user)
+ )
+ assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
+ else:
+ requester = Requester(target_user, None, False)
+
+ message_handler = self.hs.get_handlers().message_handler
+ prev_event = message_handler.deduplicate_state_event(event, context)
+ if prev_event is not None:
+ return
+
+ action = "send"
+
+ if event.membership == Membership.JOIN:
+ if requester.is_guest and not self._can_guest_join(context.current_state):
+ # This should be an auth check, but guests are a local concept,
+ # so don't really fit into the general auth process.
+ raise AuthError(403, "Guest access not allowed")
+ do_remote_join_dance, remote_room_hosts = self._should_do_dance(
+ context,
+ (self.get_inviter(event.state_key, context.current_state)),
+ remote_room_hosts,
+ )
+ if do_remote_join_dance:
+ action = "remote_join"
+ elif event.membership == Membership.LEAVE:
+ is_host_in_room = self.is_host_in_room(context.current_state)
+
+ if not is_host_in_room:
+ # perhaps we've been invited
+ inviter = self.get_inviter(
+ target_user.to_string(), context.current_state
+ )
+ if not inviter:
+ raise SynapseError(404, "Not a known room")
+
+ if self.hs.is_mine(inviter):
+ # the inviter was on our server, but has now left. Carry on
+ # with the normal rejection codepath.
+ #
+ # This is a bit of a hack, because the room might still be
+ # active on other servers.
+ pass
+ else:
+ # send the rejection to the inviter's HS.
+ remote_room_hosts = remote_room_hosts + [inviter.domain]
+ action = "remote_reject"
+
+ federation_handler = self.hs.get_handlers().federation_handler
+
+ if action == "remote_join":
+ if len(remote_room_hosts) == 0:
+ raise SynapseError(404, "No known servers")
+
+ # We don't do an auth check if we are doing an invite
+ # join dance for now, since we're kinda implicitly checking
+ # that we are allowed to join when we decide whether or not we
+ # need to do the invite/join dance.
+ yield federation_handler.do_invite_join(
+ remote_room_hosts,
+ event.room_id,
+ event.user_id,
+ event.content,
+ )
+ elif action == "remote_reject":
+ yield federation_handler.do_remotely_reject_invite(
+ remote_room_hosts,
+ room_id,
+ event.user_id
+ )
+ else:
+ yield self.handle_new_client_event(
+ requester,
+ event,
+ context,
+ extra_users=[target_user],
+ ratelimit=ratelimit,
+ )
+
+ prev_member_event = context.current_state.get(
+ (EventTypes.Member, target_user.to_string()),
+ None
+ )
+
+ if event.membership == Membership.JOIN:
+ if not prev_member_event or prev_member_event.membership != Membership.JOIN:
+ # Only fire user_joined_room if the user has acutally joined the
+ # room. Don't bother if the user is just changing their profile
+ # info.
+ yield user_joined_room(self.distributor, target_user, room_id)
+ elif event.membership == Membership.LEAVE:
+ if prev_member_event and prev_member_event.membership == Membership.JOIN:
+ user_left_room(self.distributor, target_user, room_id)
+
+ def _can_guest_join(self, current_state):
+ """
+ Returns whether a guest can join a room based on its current state.
+ """
+ guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
+ return (
+ guest_access
+ and guest_access.content
+ and "guest_access" in guest_access.content
+ and guest_access.content["guest_access"] == "can_join"
+ )
+
+ def _should_do_dance(self, context, inviter, room_hosts=None):
+ # TODO: Shouldn't this be remote_room_host?
+ room_hosts = room_hosts or []
+
+ is_host_in_room = self.is_host_in_room(context.current_state)
+ if is_host_in_room:
+ return False, room_hosts
+
+ if inviter and not self.hs.is_mine(inviter):
+ room_hosts.append(inviter.domain)
+
+ return True, room_hosts
+
+ @defer.inlineCallbacks
+ def lookup_room_alias(self, room_alias):
+ """
+ Get the room ID associated with a room alias.
+
+ Args:
+ room_alias (RoomAlias): The alias to look up.
+ Returns:
+ A tuple of:
+ The room ID as a RoomID object.
+ Hosts likely to be participating in the room ([str]).
+ Raises:
+ SynapseError if room alias could not be found.
+ """
+ directory_handler = self.hs.get_handlers().directory_handler
+ mapping = yield directory_handler.get_association(room_alias)
+
+ if not mapping:
+ raise SynapseError(404, "No such room alias")
+
+ room_id = mapping["room_id"]
+ servers = mapping["servers"]
+
+ defer.returnValue((RoomID.from_string(room_id), servers))
+
+ def get_inviter(self, user_id, current_state):
+ prev_state = current_state.get((EventTypes.Member, user_id))
+ if prev_state and prev_state.membership == Membership.INVITE:
+ return UserID.from_string(prev_state.user_id)
+ return None
+
+ @defer.inlineCallbacks
+ def get_joined_rooms_for_user(self, user):
+ """Returns a list of roomids that the user has any of the given
+ membership states in."""
+
+ rooms = yield self.store.get_rooms_for_user(
+ user.to_string(),
+ )
+
+ # For some reason the list of events contains duplicates
+ # TODO(paul): work out why because I really don't think it should
+ room_ids = set(r.room_id for r in rooms)
+
+ defer.returnValue(room_ids)
+
+ @defer.inlineCallbacks
+ def do_3pid_invite(
+ self,
+ room_id,
+ inviter,
+ medium,
+ address,
+ id_server,
+ requester,
+ txn_id
+ ):
+ invitee = yield self._lookup_3pid(
+ id_server, medium, address
+ )
+
+ if invitee:
+ handler = self.hs.get_handlers().room_member_handler
+ yield handler.update_membership(
+ requester,
+ UserID.from_string(invitee),
+ room_id,
+ "invite",
+ txn_id=txn_id,
+ )
+ else:
+ yield self._make_and_store_3pid_invite(
+ requester,
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter,
+ txn_id=txn_id
+ )
+
+ @defer.inlineCallbacks
+ def _lookup_3pid(self, id_server, medium, address):
+ """Looks up a 3pid in the passed identity server.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ medium (str): The type of the third party identifier (e.g. "email").
+ address (str): The third party identifier (e.g. "foo@example.com").
+
+ Returns:
+ str: the matrix ID of the 3pid, or None if it is not recognized.
+ """
+ try:
+ data = yield self.hs.get_simple_http_client().get_json(
+ "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
+ {
+ "medium": medium,
+ "address": address,
+ }
+ )
+
+ if "mxid" in data:
+ if "signatures" not in data:
+ raise AuthError(401, "No signatures on 3pid binding")
+ self.verify_any_signature(data, id_server)
+ defer.returnValue(data["mxid"])
+
+ except IOError as e:
+ logger.warn("Error from identity server lookup: %s" % (e,))
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def verify_any_signature(self, data, server_hostname):
+ if server_hostname not in data["signatures"]:
+ raise AuthError(401, "No signature from server %s" % (server_hostname,))
+ for key_name, signature in data["signatures"][server_hostname].items():
+ key_data = yield self.hs.get_simple_http_client().get_json(
+ "%s%s/_matrix/identity/api/v1/pubkey/%s" %
+ (id_server_scheme, server_hostname, key_name,),
+ )
+ if "public_key" not in key_data:
+ raise AuthError(401, "No public key named %s from %s" %
+ (key_name, server_hostname,))
+ verify_signed_json(
+ data,
+ server_hostname,
+ decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
+ )
+ return
+
+ @defer.inlineCallbacks
+ def _make_and_store_3pid_invite(
+ self,
+ requester,
+ id_server,
+ medium,
+ address,
+ room_id,
+ user,
+ txn_id
+ ):
+ room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+
+ inviter_display_name = ""
+ inviter_avatar_url = ""
+ member_event = room_state.get((EventTypes.Member, user.to_string()))
+ if member_event:
+ inviter_display_name = member_event.content.get("displayname", "")
+ inviter_avatar_url = member_event.content.get("avatar_url", "")
+
+ canonical_room_alias = ""
+ canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
+ if canonical_alias_event:
+ canonical_room_alias = canonical_alias_event.content.get("alias", "")
+
+ room_name = ""
+ room_name_event = room_state.get((EventTypes.Name, ""))
+ if room_name_event:
+ room_name = room_name_event.content.get("name", "")
+
+ room_join_rules = ""
+ join_rules_event = room_state.get((EventTypes.JoinRules, ""))
+ if join_rules_event:
+ room_join_rules = join_rules_event.content.get("join_rule", "")
+
+ room_avatar_url = ""
+ room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+ if room_avatar_event:
+ room_avatar_url = room_avatar_event.content.get("url", "")
+
+ token, public_keys, fallback_public_key, display_name = (
+ yield self._ask_id_server_for_third_party_invite(
+ id_server=id_server,
+ medium=medium,
+ address=address,
+ room_id=room_id,
+ inviter_user_id=user.to_string(),
+ room_alias=canonical_room_alias,
+ room_avatar_url=room_avatar_url,
+ room_join_rules=room_join_rules,
+ room_name=room_name,
+ inviter_display_name=inviter_display_name,
+ inviter_avatar_url=inviter_avatar_url
+ )
+ )
+
+ msg_handler = self.hs.get_handlers().message_handler
+ yield msg_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.ThirdPartyInvite,
+ "content": {
+ "display_name": display_name,
+ "public_keys": public_keys,
+
+ # For backwards compatibility:
+ "key_validity_url": fallback_public_key["key_validity_url"],
+ "public_key": fallback_public_key["public_key"],
+ },
+ "room_id": room_id,
+ "sender": user.to_string(),
+ "state_key": token,
+ },
+ txn_id=txn_id,
+ )
+
+ @defer.inlineCallbacks
+ def _ask_id_server_for_third_party_invite(
+ self,
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter_user_id,
+ room_alias,
+ room_avatar_url,
+ room_join_rules,
+ room_name,
+ inviter_display_name,
+ inviter_avatar_url
+ ):
+ """
+ Asks an identity server for a third party invite.
+
+ Args:
+ id_server (str): hostname + optional port for the identity server.
+ medium (str): The literal string "email".
+ address (str): The third party address being invited.
+ room_id (str): The ID of the room to which the user is invited.
+ inviter_user_id (str): The user ID of the inviter.
+ room_alias (str): An alias for the room, for cosmetic notifications.
+ room_avatar_url (str): The URL of the room's avatar, for cosmetic
+ notifications.
+ room_join_rules (str): The join rules of the email (e.g. "public").
+ room_name (str): The m.room.name of the room.
+ inviter_display_name (str): The current display name of the
+ inviter.
+ inviter_avatar_url (str): The URL of the inviter's avatar.
+
+ Returns:
+ A deferred tuple containing:
+ token (str): The token which must be signed to prove authenticity.
+ public_keys ([{"public_key": str, "key_validity_url": str}]):
+ public_key is a base64-encoded ed25519 public key.
+ fallback_public_key: One element from public_keys.
+ display_name (str): A user-friendly name to represent the invited
+ user.
+ """
+
+ is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
+ id_server_scheme, id_server,
+ )
+
+ invite_config = {
+ "medium": medium,
+ "address": address,
+ "room_id": room_id,
+ "room_alias": room_alias,
+ "room_avatar_url": room_avatar_url,
+ "room_join_rules": room_join_rules,
+ "room_name": room_name,
+ "sender": inviter_user_id,
+ "sender_display_name": inviter_display_name,
+ "sender_avatar_url": inviter_avatar_url,
+ }
+
+ if self.hs.config.invite_3pid_guest:
+ registration_handler = self.hs.get_handlers().registration_handler
+ guest_access_token = yield registration_handler.guest_access_token_for(
+ medium=medium,
+ address=address,
+ inviter_user_id=inviter_user_id,
+ )
+
+ guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+ guest_access_token
+ )
+
+ invite_config.update({
+ "guest_access_token": guest_access_token,
+ "guest_user_id": guest_user_info["user"].to_string(),
+ })
+
+ data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+ is_url,
+ invite_config
+ )
+ # TODO: Check for success
+ token = data["token"]
+ public_keys = data.get("public_keys", [])
+ if "public_key" in data:
+ fallback_public_key = {
+ "public_key": data["public_key"],
+ "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+ id_server_scheme, id_server,
+ ),
+ }
+ else:
+ fallback_public_key = public_keys[0]
+
+ if not public_keys:
+ public_keys.append(fallback_public_key)
+ display_name = data["display_name"]
+ defer.returnValue((token, public_keys, fallback_public_key, display_name))
+
+ @defer.inlineCallbacks
+ def forget(self, user, room_id):
+ user_id = user.to_string()
+
+ member = yield self.state_handler.get_current_state(
+ room_id=room_id,
+ event_type=EventTypes.Member,
+ state_key=user_id
+ )
+ membership = member.membership if member else None
+
+ if membership is not None and membership != Membership.LEAVE:
+ raise SynapseError(400, "User %s in room %s" % (
+ user_id, room_id
+ ))
+
+ if membership:
+ yield self.store.forget(user_id, room_id)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 48ab5707e1..231140b655 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -17,8 +17,8 @@ from ._base import BaseHandler
from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.async import concurrently_execute
+from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
@@ -250,58 +250,50 @@ class SyncHandler(BaseHandler):
joined = []
invited = []
archived = []
- deferreds = []
-
- room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
- for room_list_chunk in room_list_chunks:
- for event in room_list_chunk:
- if event.membership == Membership.JOIN:
- room_sync_deferred = preserve_fn(
- self.full_state_sync_for_joined_room
- )(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(joined.append)
- deferreds.append(room_sync_deferred)
- elif event.membership == Membership.INVITE:
- invite = yield self.store.get_event(event.event_id)
- invited.append(InvitedSyncResult(
- room_id=event.room_id,
- invite=invite,
- ))
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- # Always send down rooms we were banned or kicked from.
- if not sync_config.filter_collection.include_leave:
- if event.membership == Membership.LEAVE:
- if sync_config.user.to_string() == event.sender:
- continue
-
- leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
- )
- room_sync_deferred = preserve_fn(
- self.full_state_sync_for_archived_room
- )(
- sync_config=sync_config,
- room_id=event.room_id,
- leave_event_id=event.event_id,
- leave_token=leave_token,
- timeline_since_token=timeline_since_token,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(archived.append)
- deferreds.append(room_sync_deferred)
- yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ user_id = sync_config.user.to_string()
+
+ @defer.inlineCallbacks
+ def _generate_room_entry(event):
+ if event.membership == Membership.JOIN:
+ room_result = yield self.full_state_sync_for_joined_room(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ joined.append(room_result)
+ elif event.membership == Membership.INVITE:
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ # Always send down rooms we were banned or kicked from.
+ if not sync_config.filter_collection.include_leave:
+ if event.membership == Membership.LEAVE:
+ if user_id == event.sender:
+ return
+
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_result = yield self.full_state_sync_for_archived_room(
+ sync_config=sync_config,
+ room_id=event.room_id,
+ leave_event_id=event.event_id,
+ leave_token=leave_token,
+ timeline_since_token=timeline_since_token,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ archived.append(room_result)
+
+ yield concurrently_execute(_generate_room_entry, room_list, 10)
account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data)
@@ -671,7 +663,8 @@ class SyncHandler(BaseHandler):
def load_filtered_recents(self, room_id, sync_config, now_token,
since_token=None, recents=None, newly_joined_room=False):
"""
- :returns a Deferred TimelineBatch
+ Returns:
+ a Deferred TimelineBatch
"""
with Measure(self.clock, "load_filtered_recents"):
filtering_factor = 2
@@ -838,8 +831,11 @@ class SyncHandler(BaseHandler):
"""
Get the room state after the given event
- :param synapse.events.EventBase event: event of interest
- :return: A Deferred map from ((type, state_key)->Event)
+ Args:
+ event(synapse.events.EventBase): event of interest
+
+ Returns:
+ A Deferred map from ((type, state_key)->Event)
"""
state = yield self.store.get_state_for_event(event.event_id)
if event.is_state():
@@ -850,9 +846,13 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_state_at(self, room_id, stream_position):
""" Get the room state at a particular stream position
- :param str room_id: room for which to get state
- :param StreamToken stream_position: point at which to get state
- :returns: A Deferred map from ((type, state_key)->Event)
+
+ Args:
+ room_id(str): room for which to get state
+ stream_position(StreamToken): point at which to get state
+
+ Returns:
+ A Deferred map from ((type, state_key)->Event)
"""
last_events, token = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1,
@@ -873,15 +873,18 @@ class SyncHandler(BaseHandler):
""" Works out the differnce in state between the start of the timeline
and the previous sync.
- :param str room_id
- :param TimelineBatch batch: The timeline batch for the room that will
- be sent to the user.
- :param sync_config
- :param str since_token: Token of the end of the previous batch. May be None.
- :param str now_token: Token of the end of the current batch.
- :param bool full_state: Whether to force returning the full state.
+ Args:
+ room_id(str):
+ batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+ the room that will be sent to the user.
+ sync_config(synapse.handlers.sync.SyncConfig):
+ since_token(str|None): Token of the end of the previous batch. May
+ be None.
+ now_token(str): Token of the end of the current batch.
+ full_state(bool): Whether to force returning the full state.
- :returns A new event dictionary
+ Returns:
+ A deferred new event dictionary
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -953,11 +956,13 @@ class SyncHandler(BaseHandler):
Check if the user has just joined the given room (so should
be given the full state)
- :param sync_config:
- :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the
- difference in state since the last sync
+ Args:
+ sync_config(synapse.handlers.sync.SyncConfig):
+ state_delta(dict[(str,str), synapse.events.FrozenEvent]): the
+ difference in state since the last sync
- :returns A deferred Tuple (state_delta, limited)
+ Returns:
+ A deferred Tuple (state_delta, limited)
"""
join_event = state_delta.get((
EventTypes.Member, sync_config.user.to_string()), None)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 1c8bd8666f..e41afeab8e 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -26,14 +26,19 @@ logger = logging.getLogger(__name__)
def parse_integer(request, name, default=None, required=False):
"""Parse an integer parameter from the request string
- :param request: the twisted HTTP request.
- :param name (str): the name of the query parameter.
- :param default: value to use if the parameter is absent, defaults to None.
- :param required (bool): whether to raise a 400 SynapseError if the
- parameter is absent, defaults to False.
- :return: An int value or the default.
- :raises
- SynapseError if the parameter is absent and required, or if the
+ Args:
+ request: the twisted HTTP request.
+ name (str): the name of the query parameter.
+ default (int|None): value to use if the parameter is absent, defaults
+ to None.
+ required (bool): whether to raise a 400 SynapseError if the
+ parameter is absent, defaults to False.
+
+ Returns:
+ int|None: An int value or the default.
+
+ Raises:
+ SynapseError: if the parameter is absent and required, or if the
parameter is present and not an integer.
"""
if name in request.args:
@@ -53,14 +58,19 @@ def parse_integer(request, name, default=None, required=False):
def parse_boolean(request, name, default=None, required=False):
"""Parse a boolean parameter from the request query string
- :param request: the twisted HTTP request.
- :param name (str): the name of the query parameter.
- :param default: value to use if the parameter is absent, defaults to None.
- :param required (bool): whether to raise a 400 SynapseError if the
- parameter is absent, defaults to False.
- :return: A bool value or the default.
- :raises
- SynapseError if the parameter is absent and required, or if the
+ Args:
+ request: the twisted HTTP request.
+ name (str): the name of the query parameter.
+ default (bool|None): value to use if the parameter is absent, defaults
+ to None.
+ required (bool): whether to raise a 400 SynapseError if the
+ parameter is absent, defaults to False.
+
+ Returns:
+ bool|None: A bool value or the default.
+
+ Raises:
+ SynapseError: if the parameter is absent and required, or if the
parameter is present and not one of "true" or "false".
"""
@@ -88,15 +98,20 @@ def parse_string(request, name, default=None, required=False,
allowed_values=None, param_type="string"):
"""Parse a string parameter from the request query string.
- :param request: the twisted HTTP request.
- :param name (str): the name of the query parameter.
- :param default: value to use if the parameter is absent, defaults to None.
- :param required (bool): whether to raise a 400 SynapseError if the
- parameter is absent, defaults to False.
- :param allowed_values (list): List of allowed values for the string,
- or None if any value is allowed, defaults to None
- :return: A string value or the default.
- :raises
+ Args:
+ request: the twisted HTTP request.
+ name (str): the name of the query parameter.
+ default (str|None): value to use if the parameter is absent, defaults
+ to None.
+ required (bool): whether to raise a 400 SynapseError if the
+ parameter is absent, defaults to False.
+ allowed_values (list[str]): List of allowed values for the string,
+ or None if any value is allowed, defaults to None
+
+ Returns:
+ str|None: A string value or the default.
+
+ Raises:
SynapseError if the parameter is absent and required, or if the
parameter is present, must be one of a list of allowed values and
is not one of those allowed values.
@@ -122,9 +137,13 @@ def parse_string(request, name, default=None, required=False,
def parse_json_value_from_request(request):
"""Parse a JSON value from the body of a twisted HTTP request.
- :param request: the twisted HTTP request.
- :returns: The JSON value.
- :raises
+ Args:
+ request: the twisted HTTP request.
+
+ Returns:
+ The JSON value.
+
+ Raises:
SynapseError if the request body couldn't be decoded as JSON.
"""
try:
@@ -143,8 +162,10 @@ def parse_json_value_from_request(request):
def parse_json_object_from_request(request):
"""Parse a JSON object from the body of a twisted HTTP request.
- :param request: the twisted HTTP request.
- :raises
+ Args:
+ request: the twisted HTTP request.
+
+ Raises:
SynapseError if the request body couldn't be decoded as JSON or
if it wasn't a JSON object.
"""
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f00cd8c588..6af7a8f424 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -503,13 +503,14 @@ class Notifier(object):
def wait_for_replication(self, callback, timeout):
"""Wait for an event to happen.
- :param callback:
- Gets called whenever an event happens. If this returns a truthy
- value then ``wait_for_replication`` returns, otherwise it waits
- for another event.
- :param int timeout:
- How many milliseconds to wait for callback return a truthy value.
- :returns:
+ Args:
+ callback: Gets called whenever an event happens. If this returns a
+ truthy value then ``wait_for_replication`` returns, otherwise
+ it waits for another event.
+ timeout: How many milliseconds to wait for callback return a truthy
+ value.
+
+ Returns:
A deferred that resolves with the value returned by the callback.
"""
listener = _NotificationListener(None)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 792af70eb7..6add94beeb 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -19,9 +19,11 @@ import copy
def list_with_base_rules(rawrules):
"""Combine the list of rules set by the user with the default push rules
- :param list rawrules: The rules the user has modified or set.
- :returns: A new list with the rules set by the user combined with the
- defaults.
+ Args:
+ rawrules(list): The rules the user has modified or set.
+
+ Returns:
+ A new list with the rules set by the user combined with the defaults.
"""
ruleslist = []
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 51f73a5b78..c3c2877629 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -133,8 +133,9 @@ class PushRuleEvaluator:
enabled = self.enabled_map.get(r['rule_id'], None)
if enabled is not None and not enabled:
continue
-
- if not r.get("enabled", True):
+ elif enabled is None and not r.get("enabled", True):
+ # if no override, check enabled on the rule itself
+ # (may have come from a base rule)
continue
conditions = r['conditions']
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index d12ef15043..86b8331760 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -37,6 +37,7 @@ REQUIREMENTS = {
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"lxml>=3.6.0": ["lxml"],
+ "pyjwt": ["jwt"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 37a1d3960c..c51a6fa103 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -38,6 +38,7 @@ STREAM_NAMES = (
("backfill",),
("push_rules",),
("pushers",),
+ ("state",),
)
@@ -123,6 +124,7 @@ class ReplicationResource(Resource):
backfill_token = yield self.store.get_current_backfill_token()
push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
pushers_token = self.store.get_pushers_stream_token()
+ state_token = self.store.get_state_stream_token()
defer.returnValue(_ReplicationToken(
room_stream_token,
@@ -133,6 +135,7 @@ class ReplicationResource(Resource):
backfill_token,
push_rules_token,
pushers_token,
+ state_token,
))
@request_handler
@@ -156,6 +159,7 @@ class ReplicationResource(Resource):
yield self.receipts(writer, current_token, limit)
yield self.push_rules(writer, current_token, limit)
yield self.pushers(writer, current_token, limit)
+ yield self.state(writer, current_token, limit)
self.streams(writer, current_token)
logger.info("Replicated %d rows", writer.total)
@@ -200,16 +204,27 @@ class ReplicationResource(Resource):
request_events = current_token.events
if request_backfill is None:
request_backfill = current_token.backfill
- events_rows, backfill_rows = yield self.store.get_all_new_events(
+ res = yield self.store.get_all_new_events(
request_backfill, request_events,
current_token.backfill, current_token.events,
limit
)
+ writer.write_header_and_rows("events", res.new_forward_events, (
+ "position", "internal", "json", "state_group"
+ ))
+ writer.write_header_and_rows("backfill", res.new_backfill_events, (
+ "position", "internal", "json", "state_group"
+ ))
writer.write_header_and_rows(
- "events", events_rows, ("position", "internal", "json")
+ "forward_ex_outliers", res.forward_ex_outliers,
+ ("position", "event_id", "state_group")
)
writer.write_header_and_rows(
- "backfill", backfill_rows, ("position", "internal", "json")
+ "backward_ex_outliers", res.backward_ex_outliers,
+ ("position", "event_id", "state_group")
+ )
+ writer.write_header_and_rows(
+ "state_resets", res.state_resets, ("position",)
)
@defer.inlineCallbacks
@@ -320,6 +335,24 @@ class ReplicationResource(Resource):
"position", "user_id", "app_id", "pushkey"
))
+ @defer.inlineCallbacks
+ def state(self, writer, current_token, limit):
+ current_position = current_token.state
+
+ state = parse_integer(writer.request, "state")
+ if state is not None:
+ state_groups, state_group_state = (
+ yield self.store.get_all_new_state_groups(
+ state, current_position, limit
+ )
+ )
+ writer.write_header_and_rows("state_groups", state_groups, (
+ "position", "room_id", "event_id"
+ ))
+ writer.write_header_and_rows("state_group_state", state_group_state, (
+ "position", "type", "state_key", "event_id"
+ ))
+
class _Writer(object):
"""Writes the streams as a JSON object as the response to the request"""
@@ -350,7 +383,7 @@ class _Writer(object):
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
"events", "presence", "typing", "receipts", "account_data", "backfill",
- "push_rules", "pushers"
+ "push_rules", "pushers", "state"
))):
__slots__ = []
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index fe593d07ce..d14ce3efa2 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -33,6 +33,9 @@ from saml2.client import Saml2Client
import xml.etree.ElementTree as ET
+import jwt
+from jwt.exceptions import InvalidTokenError
+
logger = logging.getLogger(__name__)
@@ -43,12 +46,16 @@ class LoginRestServlet(ClientV1RestServlet):
SAML2_TYPE = "m.login.saml2"
CAS_TYPE = "m.login.cas"
TOKEN_TYPE = "m.login.token"
+ JWT_TYPE = "m.login.jwt"
def __init__(self, hs):
super(LoginRestServlet, self).__init__(hs)
self.idp_redirect_url = hs.config.saml2_idp_redirect_url
self.password_enabled = hs.config.password_enabled
self.saml2_enabled = hs.config.saml2_enabled
+ self.jwt_enabled = hs.config.jwt_enabled
+ self.jwt_secret = hs.config.jwt_secret
+ self.jwt_algorithm = hs.config.jwt_algorithm
self.cas_enabled = hs.config.cas_enabled
self.cas_server_url = hs.config.cas_server_url
self.cas_required_attributes = hs.config.cas_required_attributes
@@ -57,6 +64,8 @@ class LoginRestServlet(ClientV1RestServlet):
def on_GET(self, request):
flows = []
+ if self.jwt_enabled:
+ flows.append({"type": LoginRestServlet.JWT_TYPE})
if self.saml2_enabled:
flows.append({"type": LoginRestServlet.SAML2_TYPE})
if self.cas_enabled:
@@ -98,6 +107,10 @@ class LoginRestServlet(ClientV1RestServlet):
"uri": "%s%s" % (self.idp_redirect_url, relay_state)
}
defer.returnValue((200, result))
+ elif self.jwt_enabled and (login_submission["type"] ==
+ LoginRestServlet.JWT_TYPE):
+ result = yield self.do_jwt_login(login_submission)
+ defer.returnValue(result)
# TODO Delete this after all CAS clients switch to token login instead
elif self.cas_enabled and (login_submission["type"] ==
LoginRestServlet.CAS_TYPE):
@@ -209,6 +222,46 @@ class LoginRestServlet(ClientV1RestServlet):
defer.returnValue((200, result))
+ @defer.inlineCallbacks
+ def do_jwt_login(self, login_submission):
+ token = login_submission['token']
+ if token is None:
+ raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
+
+ try:
+ payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
+ except InvalidTokenError:
+ raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
+
+ user = payload['user']
+ if user is None:
+ raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
+
+ user_id = UserID.create(user, self.hs.hostname).to_string()
+ auth_handler = self.handlers.auth_handler
+ user_exists = yield auth_handler.does_user_exist(user_id)
+ if user_exists:
+ user_id, access_token, refresh_token = (
+ yield auth_handler.get_login_tuple_for_user_id(user_id)
+ )
+ result = {
+ "user_id": user_id, # may have changed
+ "access_token": access_token,
+ "refresh_token": refresh_token,
+ "home_server": self.hs.hostname,
+ }
+ else:
+ user_id, access_token = (
+ yield self.handlers.registration_handler.register(localpart=user)
+ )
+ result = {
+ "user_id": user_id, # may have changed
+ "access_token": access_token,
+ "home_server": self.hs.hostname,
+ }
+
+ defer.returnValue((200, result))
+
# TODO Delete this after all CAS clients switch to token login instead
def parse_cas_response(self, cas_response_body):
root = ET.fromstring(cas_response_body)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index a1fa7daf79..b223fb7e5f 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -405,6 +405,42 @@ class RoomEventContext(ClientV1RestServlet):
defer.returnValue((200, results))
+class RoomForgetRestServlet(ClientV1RestServlet):
+ def register(self, http_server):
+ PATTERNS = ("/rooms/(?P<room_id>[^/]*)/forget")
+ register_txn_path(self, PATTERNS, http_server)
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, room_id, txn_id=None):
+ requester = yield self.auth.get_user_by_req(
+ request,
+ allow_guest=False,
+ )
+
+ yield self.handlers.room_member_handler.forget(
+ user=requester.user,
+ room_id=room_id,
+ )
+
+ defer.returnValue((200, {}))
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, room_id, txn_id):
+ try:
+ defer.returnValue(
+ self.txns.get_client_transaction(request, txn_id)
+ )
+ except KeyError:
+ pass
+
+ response = yield self.on_POST(
+ request, room_id, txn_id
+ )
+
+ self.txns.store_client_transaction(request, txn_id, response)
+ defer.returnValue(response)
+
+
# TODO: Needs unit testing
class RoomMembershipRestServlet(ClientV1RestServlet):
@@ -624,6 +660,7 @@ def register_servlets(hs, http_server):
RoomMemberListRestServlet(hs).register(http_server)
RoomMessageListRestServlet(hs).register(http_server)
JoinRoomAliasServlet(hs).register(http_server)
+ RoomForgetRestServlet(hs).register(http_server)
RoomMembershipRestServlet(hs).register(http_server)
RoomSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index c5785d7074..60d3dc4030 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -199,15 +199,17 @@ class SyncRestServlet(RestServlet):
"""
Encode the joined rooms in a sync result
- :param list[synapse.handlers.sync.JoinedSyncResult] rooms: list of sync
- results for rooms this user is joined to
- :param int time_now: current time - used as a baseline for age
- calculations
- :param int token_id: ID of the user's auth token - used for namespacing
- of transaction IDs
-
- :return: the joined rooms list, in our response format
- :rtype: dict[str, dict[str, object]]
+ Args:
+ rooms(list[synapse.handlers.sync.JoinedSyncResult]): list of sync
+ results for rooms this user is joined to
+ time_now(int): current time - used as a baseline for age
+ calculations
+ token_id(int): ID of the user's auth token - used for namespacing
+ of transaction IDs
+
+ Returns:
+ dict[str, dict[str, object]]: the joined rooms list, in our
+ response format
"""
joined = {}
for room in rooms:
@@ -221,15 +223,17 @@ class SyncRestServlet(RestServlet):
"""
Encode the invited rooms in a sync result
- :param list[synapse.handlers.sync.InvitedSyncResult] rooms: list of
- sync results for rooms this user is joined to
- :param int time_now: current time - used as a baseline for age
- calculations
- :param int token_id: ID of the user's auth token - used for namespacing
+ Args:
+ rooms(list[synapse.handlers.sync.InvitedSyncResult]): list of
+ sync results for rooms this user is joined to
+ time_now(int): current time - used as a baseline for age
+ calculations
+ token_id(int): ID of the user's auth token - used for namespacing
of transaction IDs
- :return: the invited rooms list, in our response format
- :rtype: dict[str, dict[str, object]]
+ Returns:
+ dict[str, dict[str, object]]: the invited rooms list, in our
+ response format
"""
invited = {}
for room in rooms:
@@ -251,15 +255,17 @@ class SyncRestServlet(RestServlet):
"""
Encode the archived rooms in a sync result
- :param list[synapse.handlers.sync.ArchivedSyncResult] rooms: list of
- sync results for rooms this user is joined to
- :param int time_now: current time - used as a baseline for age
- calculations
- :param int token_id: ID of the user's auth token - used for namespacing
- of transaction IDs
-
- :return: the invited rooms list, in our response format
- :rtype: dict[str, dict[str, object]]
+ Args:
+ rooms (list[synapse.handlers.sync.ArchivedSyncResult]): list of
+ sync results for rooms this user is joined to
+ time_now(int): current time - used as a baseline for age
+ calculations
+ token_id(int): ID of the user's auth token - used for namespacing
+ of transaction IDs
+
+ Returns:
+ dict[str, dict[str, object]]: The invited rooms list, in our
+ response format
"""
joined = {}
for room in rooms:
@@ -272,17 +278,18 @@ class SyncRestServlet(RestServlet):
@staticmethod
def encode_room(room, time_now, token_id, joined=True):
"""
- :param JoinedSyncResult|ArchivedSyncResult room: sync result for a
- single room
- :param int time_now: current time - used as a baseline for age
- calculations
- :param int token_id: ID of the user's auth token - used for namespacing
- of transaction IDs
- :param joined: True if the user is joined to this room - will mean
- we handle ephemeral events
-
- :return: the room, encoded in our response format
- :rtype: dict[str, object]
+ Args:
+ room (JoinedSyncResult|ArchivedSyncResult): sync result for a
+ single room
+ time_now (int): current time - used as a baseline for age
+ calculations
+ token_id (int): ID of the user's auth token - used for namespacing
+ of transaction IDs
+ joined (bool): True if the user is joined to this room - will mean
+ we handle ephemeral events
+
+ Returns:
+ dict[str, object]: the room, encoded in our response format
"""
def serialize(event):
# TODO(mjark): Respect formatting requirements in the filter.
diff --git a/synapse/state.py b/synapse/state.py
index 41d32e664a..1bca0f8f78 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -86,7 +86,8 @@ class StateHandler(object):
If `event_type` is specified, then the method returns only the one
event (or None) with that `event_type` and `state_key`.
- :returns map from (type, state_key) to event
+ Returns:
+ map from (type, state_key) to event
"""
event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
@@ -100,7 +101,7 @@ class StateHandler(object):
defer.returnValue(state)
@defer.inlineCallbacks
- def compute_event_context(self, event, old_state=None, outlier=False):
+ def compute_event_context(self, event, old_state=None):
""" Fills out the context with the `current state` of the graph. The
`current state` here is defined to be the state of the event graph
just before the event - i.e. it never includes `event`
@@ -115,7 +116,7 @@ class StateHandler(object):
"""
context = EventContext()
- if outlier:
+ if event.internal_metadata.is_outlier():
# If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and
# persisting the event won't store the state group.
@@ -176,10 +177,11 @@ class StateHandler(object):
""" Given a list of event_ids this method fetches the state at each
event, resolves conflicts between them and returns them.
- :returns a Deferred tuple of (`state_group`, `state`, `prev_state`).
- `state_group` is the name of a state group if one and only one is
- involved. `state` is a map from (type, state_key) to event, and
- `prev_state` is a list of event ids.
+ Returns:
+ a Deferred tuple of (`state_group`, `state`, `prev_state`).
+ `state_group` is the name of a state group if one and only one is
+ involved. `state` is a map from (type, state_key) to event, and
+ `prev_state` is a list of event ids.
"""
logger.debug("resolve_state_groups event_ids %s", event_ids)
@@ -251,9 +253,10 @@ class StateHandler(object):
def _resolve_events(self, state_sets, event_type=None, state_key=""):
"""
- :returns a tuple (new_state, prev_states). new_state is a map
- from (type, state_key) to event. prev_states is a list of event_ids.
- :rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
+ Returns
+ (dict[(str, str), synapse.events.FrozenEvent], list[str]): a tuple
+ (new_state, prev_states). new_state is a map from (type, state_key)
+ to event. prev_states is a list of event_ids.
"""
with Measure(self.clock, "state._resolve_events"):
state = {}
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 250ba536ea..57863bba4d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -88,15 +88,6 @@ class DataStore(RoomMemberStore, RoomStore,
self.hs = hs
self.database_engine = hs.database_engine
- cur = db_conn.cursor()
- try:
- cur.execute("SELECT MIN(stream_ordering) FROM events",)
- rows = cur.fetchall()
- self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
- self.min_stream_token = min(self.min_stream_token, -1)
- finally:
- cur.close()
-
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
@@ -105,6 +96,9 @@ class DataStore(RoomMemberStore, RoomStore,
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering"
)
+ self._backfill_id_gen = StreamIdGenerator(
+ db_conn, "events", "stream_ordering", step=-1
+ )
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
@@ -116,7 +110,7 @@ class DataStore(RoomMemberStore, RoomStore,
)
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
- self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
+ self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
@@ -129,7 +123,7 @@ class DataStore(RoomMemberStore, RoomStore,
extra_tables=[("deleted_pushers", "stream_id")],
)
- events_max = self._stream_id_gen.get_max_token()
+ events_max = self._stream_id_gen.get_current_token()
event_cache_prefill, min_event_val = self._get_cache_dict(
db_conn, "events",
entity_column="room_id",
@@ -145,7 +139,7 @@ class DataStore(RoomMemberStore, RoomStore,
"MembershipStreamChangeCache", events_max,
)
- account_max = self._account_data_id_gen.get_max_token()
+ account_max = self._account_data_id_gen.get_current_token()
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache", account_max,
)
@@ -156,7 +150,7 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "presence_stream",
entity_column="user_id",
stream_column="stream_id",
- max_value=self._presence_id_gen.get_max_token(),
+ max_value=self._presence_id_gen.get_current_token(),
)
self.presence_stream_cache = StreamChangeCache(
"PresenceStreamChangeCache", min_presence_val,
@@ -167,7 +161,7 @@ class DataStore(RoomMemberStore, RoomStore,
db_conn, "push_rules_stream",
entity_column="user_id",
stream_column="stream_id",
- max_value=self._push_rules_stream_id_gen.get_max_token()[0],
+ max_value=self._push_rules_stream_id_gen.get_current_token()[0],
)
self.push_rules_stream_cache = StreamChangeCache(
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index faddefe219..7a7fbf1e52 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -200,7 +200,7 @@ class AccountDataStore(SQLBaseStore):
"add_room_account_data", add_account_data_txn, next_id
)
- result = self._account_data_id_gen.get_max_token()
+ result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
@defer.inlineCallbacks
@@ -239,7 +239,7 @@ class AccountDataStore(SQLBaseStore):
"add_user_account_data", add_account_data_txn, next_id
)
- result = self._account_data_id_gen.get_max_token()
+ result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
def _update_max_stream_id(self, txn, next_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index dc5830450a..3933b6e2c5 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -26,8 +26,9 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore):
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
- :param event: the event set actions for
- :param tuples: list of tuples of (user_id, actions)
+ Args:
+ event: the event set actions for
+ tuples: list of tuples of (user_id, actions)
"""
values = []
for uid, actions in tuples:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5233430028..c4dc3b3d51 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -24,7 +24,7 @@ from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
-from contextlib import contextmanager
+from collections import namedtuple
import logging
import math
@@ -60,64 +60,71 @@ class EventsStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def persist_events(self, events_and_contexts, backfilled=False,
- is_new_state=True):
+ def persist_events(self, events_and_contexts, backfilled=False):
if not events_and_contexts:
return
if backfilled:
- start = self.min_stream_token - 1
- self.min_stream_token -= len(events_and_contexts) + 1
- stream_orderings = range(start, self.min_stream_token, -1)
-
- @contextmanager
- def stream_ordering_manager():
- yield stream_orderings
- stream_ordering_manager = stream_ordering_manager()
+ stream_ordering_manager = self._backfill_id_gen.get_next_mult(
+ len(events_and_contexts)
+ )
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(
len(events_and_contexts)
)
+ state_group_id_manager = self._state_groups_id_gen.get_next_mult(
+ len(events_and_contexts)
+ )
with stream_ordering_manager as stream_orderings:
- for (event, _), stream in zip(events_and_contexts, stream_orderings):
- event.internal_metadata.stream_ordering = stream
-
- chunks = [
- events_and_contexts[x:x + 100]
- for x in xrange(0, len(events_and_contexts), 100)
- ]
+ with state_group_id_manager as state_group_ids:
+ for (event, context), stream, state_group_id in zip(
+ events_and_contexts, stream_orderings, state_group_ids
+ ):
+ event.internal_metadata.stream_ordering = stream
+ # Assign a state group_id in case a new id is needed for
+ # this context. In theory we only need to assign this
+ # for contexts that have current_state and aren't outliers
+ # but that make the code more complicated. Assigning an ID
+ # per event only causes the state_group_ids to grow as fast
+ # as the stream_ordering so in practise shouldn't be a problem.
+ context.new_state_group_id = state_group_id
+
+ chunks = [
+ events_and_contexts[x:x + 100]
+ for x in xrange(0, len(events_and_contexts), 100)
+ ]
- for chunk in chunks:
- # We can't easily parallelize these since different chunks
- # might contain the same event. :(
- yield self.runInteraction(
- "persist_events",
- self._persist_events_txn,
- events_and_contexts=chunk,
- backfilled=backfilled,
- is_new_state=is_new_state,
- )
+ for chunk in chunks:
+ # We can't easily parallelize these since different chunks
+ # might contain the same event. :(
+ yield self.runInteraction(
+ "persist_events",
+ self._persist_events_txn,
+ events_and_contexts=chunk,
+ backfilled=backfilled,
+ )
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context,
- is_new_state=True, current_state=None):
+ def persist_event(self, event, context, current_state=None):
+
try:
with self._stream_id_gen.get_next() as stream_ordering:
- event.internal_metadata.stream_ordering = stream_ordering
- yield self.runInteraction(
- "persist_event",
- self._persist_event_txn,
- event=event,
- context=context,
- is_new_state=is_new_state,
- current_state=current_state,
- )
+ with self._state_groups_id_gen.get_next() as state_group_id:
+ event.internal_metadata.stream_ordering = stream_ordering
+ context.new_state_group_id = state_group_id
+ yield self.runInteraction(
+ "persist_event",
+ self._persist_event_txn,
+ event=event,
+ context=context,
+ current_state=current_state,
+ )
except _RollbackButIsFineException:
pass
- max_persisted_id = yield self._stream_id_gen.get_max_token()
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue((stream_ordering, max_persisted_id))
@defer.inlineCallbacks
@@ -177,8 +184,7 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events})
@log_function
- def _persist_event_txn(self, txn, event, context,
- is_new_state=True, current_state=None):
+ def _persist_event_txn(self, txn, event, context, current_state):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
@@ -186,7 +192,16 @@ class EventsStore(SQLBaseStore):
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
- txn.call_after(self.get_room_name_and_aliases, event.room_id)
+ txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
+
+ # Add an entry to the current_state_resets table to record the point
+ # where we clobbered the current state
+ stream_order = event.internal_metadata.stream_ordering
+ self._simple_insert_txn(
+ txn,
+ table="current_state_resets",
+ values={"event_stream_ordering": stream_order}
+ )
self._simple_delete_txn(
txn,
@@ -210,12 +225,10 @@ class EventsStore(SQLBaseStore):
txn,
[(event, context)],
backfilled=False,
- is_new_state=is_new_state,
)
@log_function
- def _persist_events_txn(self, txn, events_and_contexts, backfilled,
- is_new_state=True):
+ def _persist_events_txn(self, txn, events_and_contexts, backfilled):
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -282,9 +295,7 @@ class EventsStore(SQLBaseStore):
outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
- self._store_state_groups_txn(
- txn, event, context,
- )
+ self._store_mult_state_groups_txn(txn, ((event, context),))
metadata_json = encode_json(
event.internal_metadata.get_dict()
@@ -299,6 +310,18 @@ class EventsStore(SQLBaseStore):
(metadata_json, event.event_id,)
)
+ stream_order = event.internal_metadata.stream_ordering
+ state_group_id = context.state_group or context.new_state_group_id
+ self._simple_insert_txn(
+ txn,
+ table="ex_outlier_stream",
+ values={
+ "event_stream_ordering": stream_order,
+ "event_id": event.event_id,
+ "state_group": state_group_id,
+ }
+ )
+
sql = (
"UPDATE events SET outlier = ?"
" WHERE event_id = ?"
@@ -310,19 +333,14 @@ class EventsStore(SQLBaseStore):
self._update_extremeties(txn, [event])
- events_and_contexts = filter(
- lambda ec: ec[0] not in to_remove,
- events_and_contexts
- )
+ events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0] not in to_remove
+ ]
if not events_and_contexts:
return
- self._store_mult_state_groups_txn(txn, [
- (event, context)
- for event, context in events_and_contexts
- if not event.internal_metadata.is_outlier()
- ])
+ self._store_mult_state_groups_txn(txn, events_and_contexts)
self._handle_mult_prev_events(
txn,
@@ -421,10 +439,9 @@ class EventsStore(SQLBaseStore):
txn, [event for event, _ in events_and_contexts]
)
- state_events_and_contexts = filter(
- lambda i: i[0].is_state(),
- events_and_contexts,
- )
+ state_events_and_contexts = [
+ ec for ec in events_and_contexts if ec[0].is_state()
+ ]
state_values = []
for event, context in state_events_and_contexts:
@@ -462,32 +479,50 @@ class EventsStore(SQLBaseStore):
],
)
- if is_new_state:
- for event, _ in state_events_and_contexts:
- if not context.rejected:
- txn.call_after(
- self._get_current_state_for_key.invalidate,
- (event.room_id, event.type, event.state_key,)
- )
+ if backfilled:
+ # Backfilled events come before the current state so we don't need
+ # to update the current state table
+ return
- if event.type in [EventTypes.Name, EventTypes.Aliases]:
- txn.call_after(
- self.get_room_name_and_aliases.invalidate,
- (event.room_id,)
- )
-
- self._simple_upsert_txn(
- txn,
- "current_state_events",
- keyvalues={
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- values={
- "event_id": event.event_id,
- }
- )
+ for event, _ in state_events_and_contexts:
+ if (not event.internal_metadata.is_invite_from_remote()
+ and event.internal_metadata.is_outlier()):
+ # Outlier events generally shouldn't clobber the current state.
+ # However invites from remote severs for rooms we aren't in
+ # are a bit special: they don't come with any associated
+ # state so are technically an outlier, however all the
+ # client-facing code assumes that they are in the current
+ # state table so we insert the event anyway.
+ continue
+
+ if context.rejected:
+ # If the event failed it's auth checks then it shouldn't
+ # clobbler the current state.
+ continue
+
+ txn.call_after(
+ self._get_current_state_for_key.invalidate,
+ (event.room_id, event.type, event.state_key,)
+ )
+
+ if event.type in [EventTypes.Name, EventTypes.Aliases]:
+ txn.call_after(
+ self.get_room_name_and_aliases.invalidate,
+ (event.room_id,)
+ )
+
+ self._simple_upsert_txn(
+ txn,
+ "current_state_events",
+ keyvalues={
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ }
+ )
return
@@ -1076,10 +1111,7 @@ class EventsStore(SQLBaseStore):
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
-
- # TODO: Fix race with the persit_event txn by using one of the
- # stream id managers
- return -self.min_stream_token
+ return -self._backfill_id_gen.get_current_token()
def get_all_new_events(self, last_backfill_id, last_forward_id,
current_backfill_id, current_forward_id, limit):
@@ -1087,10 +1119,12 @@ class EventsStore(SQLBaseStore):
new events or as backfilled events"""
def get_all_new_events_txn(txn):
sql = (
- "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+ "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
" FROM events as e"
" JOIN event_json as ej"
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " LEFT JOIN event_to_state_groups as eg"
+ " ON e.event_id = eg.event_id"
" WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
@@ -1098,14 +1132,43 @@ class EventsStore(SQLBaseStore):
if last_forward_id != current_forward_id:
txn.execute(sql, (last_forward_id, current_forward_id, limit))
new_forward_events = txn.fetchall()
+
+ if len(new_forward_events) == limit:
+ upper_bound = new_forward_events[-1][0]
+ else:
+ upper_bound = current_forward_id
+
+ sql = (
+ "SELECT -event_stream_ordering FROM current_state_resets"
+ " WHERE ? < event_stream_ordering"
+ " AND event_stream_ordering <= ?"
+ " ORDER BY event_stream_ordering ASC"
+ )
+ txn.execute(sql, (last_forward_id, upper_bound))
+ state_resets = txn.fetchall()
+
+ sql = (
+ "SELECT -event_stream_ordering, event_id, state_group"
+ " FROM ex_outlier_stream"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (last_forward_id, upper_bound))
+ forward_ex_outliers = txn.fetchall()
else:
new_forward_events = []
+ state_resets = []
+ forward_ex_outliers = []
sql = (
- "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+ "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
+ " eg.state_group"
" FROM events as e"
" JOIN event_json as ej"
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+ " LEFT JOIN event_to_state_groups as eg"
+ " ON e.event_id = eg.event_id"
" WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
" ORDER BY e.stream_ordering DESC"
" LIMIT ?"
@@ -1113,8 +1176,35 @@ class EventsStore(SQLBaseStore):
if last_backfill_id != current_backfill_id:
txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
new_backfill_events = txn.fetchall()
+
+ if len(new_backfill_events) == limit:
+ upper_bound = new_backfill_events[-1][0]
+ else:
+ upper_bound = current_backfill_id
+
+ sql = (
+ "SELECT -event_stream_ordering, event_id, state_group"
+ " FROM ex_outlier_stream"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (-last_backfill_id, -upper_bound))
+ backward_ex_outliers = txn.fetchall()
else:
new_backfill_events = []
+ backward_ex_outliers = []
- return (new_forward_events, new_backfill_events)
+ return AllNewEventsResult(
+ new_forward_events, new_backfill_events,
+ forward_ex_outliers, backward_ex_outliers,
+ state_resets,
+ )
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+
+
+AllNewEventsResult = namedtuple("AllNewEventsResult", [
+ "new_forward_events", "new_backfill_events",
+ "forward_ex_outliers", "backward_ex_outliers",
+ "state_resets"
+])
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4cec31e316..59b4ef5ce6 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -68,7 +68,9 @@ class PresenceStore(SQLBaseStore):
self._update_presence_txn, stream_orderings, presence_states,
)
- defer.returnValue((stream_orderings[-1], self._presence_id_gen.get_max_token()))
+ defer.returnValue((
+ stream_orderings[-1], self._presence_id_gen.get_current_token()
+ ))
def _update_presence_txn(self, txn, stream_orderings, presence_states):
for stream_id, state in zip(stream_orderings, presence_states):
@@ -155,7 +157,7 @@ class PresenceStore(SQLBaseStore):
defer.returnValue([UserPresenceState(**row) for row in rows])
def get_current_presence_token(self):
- return self._presence_id_gen.get_max_token()
+ return self._presence_id_gen.get_current_token()
def allow_presence_visible(self, observed_localpart, observer_userid):
return self._simple_insert(
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9dbad2fd5f..d2bf7f2aec 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -392,7 +392,7 @@ class PushRuleStore(SQLBaseStore):
"""Get the position of the push rules stream.
Returns a pair of a stream id for the push_rules stream and the
room stream ordering it corresponds to."""
- return self._push_rules_stream_id_gen.get_max_token()
+ return self._push_rules_stream_id_gen.get_current_token()
def have_push_rules_changed_for_user(self, user_id, last_id):
if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 87b2ac5773..d1669c778a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -78,7 +78,7 @@ class PusherStore(SQLBaseStore):
defer.returnValue(rows)
def get_pushers_stream_token(self):
- return self._pushers_id_gen.get_max_token()
+ return self._pushers_id_gen.get_current_token()
def get_all_updated_pushers(self, last_id, current_id, limit):
def get_all_updated_pushers_txn(txn):
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b9d848eaa..4befebc8e2 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore):
super(ReceiptsStore, self).__init__(hs)
self._receipts_stream_cache = StreamChangeCache(
- "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
)
@cached(num_args=2)
@@ -221,7 +221,7 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue(results)
def get_max_receipt_stream_id(self):
- return self._receipts_id_gen.get_max_token()
+ return self._receipts_id_gen.get_current_token()
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
@@ -346,7 +346,7 @@ class ReceiptsStore(SQLBaseStore):
room_id, receipt_type, user_id, event_ids, data
)
- max_persisted_id = self._stream_id_gen.get_max_token()
+ max_persisted_id = self._stream_id_gen.get_current_token()
defer.returnValue((stream_id, max_persisted_id))
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index bd4eb88a92..d46a963bb8 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -458,12 +458,15 @@ class RegistrationStore(SQLBaseStore):
"""
Gets the 3pid's guest access token if exists, else saves access_token.
- :param medium (str): Medium of the 3pid. Must be "email".
- :param address (str): 3pid address.
- :param access_token (str): The access token to persist if none is
- already persisted.
- :param inviter_user_id (str): User ID of the inviter.
- :return (deferred str): Whichever access token is persisted at the end
+ Args:
+ medium (str): Medium of the 3pid. Must be "email".
+ address (str): 3pid address.
+ access_token (str): The access token to persist if none is
+ already persisted.
+ inviter_user_id (str): User ID of the inviter.
+
+ Returns:
+ deferred str: Whichever access token is persisted at the end
of this function call.
"""
def insert(txn):
diff --git a/synapse/storage/schema/delta/30/state_stream.sql b/synapse/storage/schema/delta/30/state_stream.sql
new file mode 100644
index 0000000000..706fe1dcf4
--- /dev/null
+++ b/synapse/storage/schema/delta/30/state_stream.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * The positions in the event stream_ordering when the current_state was
+ * replaced by the state at the event.
+ */
+
+CREATE TABLE IF NOT EXISTS current_state_resets(
+ event_stream_ordering BIGINT PRIMARY KEY NOT NULL
+);
+
+/* The outlier events that have aquired a state group typically through
+ * backfill. This is tracked separately to the events table, as assigning a
+ * state group change the position of the existing event in the stream
+ * ordering.
+ * However since a stream_ordering is assigned in persist_event for the
+ * (event, state) pair, we can use that stream_ordering to identify when
+ * the new state was assigned for the event.
+ */
+CREATE TABLE IF NOT EXISTS ex_outlier_stream(
+ event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
+ event_id TEXT NOT NULL,
+ state_group BIGINT NOT NULL
+);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 02cefdff26..e9f9406014 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -64,12 +64,12 @@ class StateStore(SQLBaseStore):
for group, state_map in group_to_state.items()
})
- def _store_state_groups_txn(self, txn, event, context):
- return self._store_mult_state_groups_txn(txn, [(event, context)])
-
def _store_mult_state_groups_txn(self, txn, events_and_contexts):
state_groups = {}
for event, context in events_and_contexts:
+ if event.internal_metadata.is_outlier():
+ continue
+
if context.current_state is None:
continue
@@ -82,7 +82,8 @@ class StateStore(SQLBaseStore):
if event.is_state():
state_events[(event.type, event.state_key)] = event
- state_group = self._state_groups_id_gen.get_next()
+ state_group = context.new_state_group_id
+
self._simple_insert_txn(
txn,
table="state_groups",
@@ -114,11 +115,10 @@ class StateStore(SQLBaseStore):
table="event_to_state_groups",
values=[
{
- "state_group": state_groups[event.event_id],
- "event_id": event.event_id,
+ "state_group": state_group_id,
+ "event_id": event_id,
}
- for event, context in events_and_contexts
- if context.current_state is not None
+ for event_id, state_group_id in state_groups.items()
],
)
@@ -249,11 +249,14 @@ class StateStore(SQLBaseStore):
"""
Get the state dict corresponding to a particular event
- :param str event_id: event whose state should be returned
- :param list[(str, str)]|None types: List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
- :return: a deferred dict from (type, state_key) -> state_event
+ Args:
+ event_id(str): event whose state should be returned
+ types(list[(str, str)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. May be None, which
+ matches any key
+
+ Returns:
+ A deferred dict from (type, state_key) -> state_event
"""
state_map = yield self.get_state_for_events([event_id], types)
defer.returnValue(state_map[event_id])
@@ -429,3 +432,33 @@ class StateStore(SQLBaseStore):
}
defer.returnValue(results)
+
+ def get_all_new_state_groups(self, last_id, current_id, limit):
+ def get_all_new_state_groups_txn(txn):
+ sql = (
+ "SELECT id, room_id, event_id FROM state_groups"
+ " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ groups = txn.fetchall()
+
+ if not groups:
+ return ([], [])
+
+ lower_bound = groups[0][0]
+ upper_bound = groups[-1][0]
+ sql = (
+ "SELECT state_group, type, state_key, event_id"
+ " FROM state_groups_state"
+ " WHERE ? <= state_group AND state_group <= ?"
+ )
+
+ txn.execute(sql, (lower_bound, upper_bound))
+ state_group_state = txn.fetchall()
+ return (groups, state_group_state)
+ return self.runInteraction(
+ "get_all_new_state_groups", get_all_new_state_groups_txn
+ )
+
+ def get_state_stream_token(self):
+ return self._state_groups_id_gen.get_current_token()
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index cf84938be5..76bcd9cd00 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -539,7 +539,7 @@ class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_max_id(self, direction='f'):
- token = yield self._stream_id_gen.get_max_token()
+ token = yield self._stream_id_gen.get_current_token()
if direction != 'b':
defer.returnValue("s%d" % (token,))
else:
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index a0e6b42b30..9da23f34cb 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -30,7 +30,7 @@ class TagsStore(SQLBaseStore):
Returns:
A deferred int.
"""
- return self._account_data_id_gen.get_max_token()
+ return self._account_data_id_gen.get_current_token()
@cached()
def get_tags_for_user(self, user_id):
@@ -200,7 +200,7 @@ class TagsStore(SQLBaseStore):
self.get_tags_for_user.invalidate((user_id,))
- result = self._account_data_id_gen.get_max_token()
+ result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
@defer.inlineCallbacks
@@ -222,7 +222,7 @@ class TagsStore(SQLBaseStore):
self.get_tags_for_user.invalidate((user_id,))
- result = self._account_data_id_gen.get_max_token()
+ result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
def _update_revision_txn(self, txn, user_id, room_id, next_id):
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index a02dfc7d58..f69f1cdad4 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -21,7 +21,7 @@ import threading
class IdGenerator(object):
def __init__(self, db_conn, table, column):
self._lock = threading.Lock()
- self._next_id = _load_max_id(db_conn, table, column)
+ self._next_id = _load_current_id(db_conn, table, column)
def get_next(self):
with self._lock:
@@ -29,12 +29,16 @@ class IdGenerator(object):
return self._next_id
-def _load_max_id(db_conn, table, column):
+def _load_current_id(db_conn, table, column, step=1):
cur = db_conn.cursor()
- cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+ if step == 1:
+ cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+ else:
+ cur.execute("SELECT MIN(%s) FROM %s" % (column, table,))
val, = cur.fetchone()
cur.close()
- return int(val) if val else 1
+ current_id = int(val) if val else step
+ return (max if step > 0 else min)(current_id, step)
class StreamIdGenerator(object):
@@ -45,17 +49,32 @@ class StreamIdGenerator(object):
all ids less than or equal to it have completed. This handles the fact that
persistence of events can complete out of order.
+ Args:
+ db_conn(connection): A database connection to use to fetch the
+ initial value of the generator from.
+ table(str): A database table to read the initial value of the id
+ generator from.
+ column(str): The column of the database table to read the initial
+ value from the id generator from.
+ extra_tables(list): List of pairs of database tables and columns to
+ use to source the initial value of the generator from. The value
+ with the largest magnitude is used.
+ step(int): which direction the stream ids grow in. +1 to grow
+ upwards, -1 to grow downwards.
+
Usage:
with stream_id_gen.get_next() as stream_id:
# ... persist event ...
"""
- def __init__(self, db_conn, table, column, extra_tables=[]):
+ def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+ assert step != 0
self._lock = threading.Lock()
- self._current_max = _load_max_id(db_conn, table, column)
+ self._step = step
+ self._current = _load_current_id(db_conn, table, column, step)
for table, column in extra_tables:
- self._current_max = max(
- self._current_max,
- _load_max_id(db_conn, table, column)
+ self._current = (max if step > 0 else min)(
+ self._current,
+ _load_current_id(db_conn, table, column, step)
)
self._unfinished_ids = deque()
@@ -66,8 +85,8 @@ class StreamIdGenerator(object):
# ... persist event ...
"""
with self._lock:
- self._current_max += 1
- next_id = self._current_max
+ self._current += self._step
+ next_id = self._current
self._unfinished_ids.append(next_id)
@@ -88,8 +107,12 @@ class StreamIdGenerator(object):
# ... persist events ...
"""
with self._lock:
- next_ids = range(self._current_max + 1, self._current_max + n + 1)
- self._current_max += n
+ next_ids = range(
+ self._current + self._step,
+ self._current + self._step * (n + 1),
+ self._step
+ )
+ self._current += n
for next_id in next_ids:
self._unfinished_ids.append(next_id)
@@ -105,15 +128,15 @@ class StreamIdGenerator(object):
return manager()
- def get_max_token(self):
+ def get_current_token(self):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""
with self._lock:
if self._unfinished_ids:
- return self._unfinished_ids[0] - 1
+ return self._unfinished_ids[0] - self._step
- return self._current_max
+ return self._current
class ChainedIdGenerator(object):
@@ -125,7 +148,7 @@ class ChainedIdGenerator(object):
def __init__(self, chained_generator, db_conn, table, column):
self.chained_generator = chained_generator
self._lock = threading.Lock()
- self._current_max = _load_max_id(db_conn, table, column)
+ self._current_max = _load_current_id(db_conn, table, column)
self._unfinished_ids = deque()
def get_next(self):
@@ -137,7 +160,7 @@ class ChainedIdGenerator(object):
with self._lock:
self._current_max += 1
next_id = self._current_max
- chained_id = self.chained_generator.get_max_token()
+ chained_id = self.chained_generator.get_current_token()
self._unfinished_ids.append((next_id, chained_id))
@@ -151,7 +174,7 @@ class ChainedIdGenerator(object):
return manager()
- def get_max_token(self):
+ def get_current_token(self):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
"""
@@ -160,4 +183,4 @@ class ChainedIdGenerator(object):
stream_id, chained_id = self._unfinished_ids[0]
return (stream_id - 1, chained_id)
- return (self._current_max, self.chained_generator.get_max_token())
+ return (self._current_max, self.chained_generator.get_current_token())
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 640fae3890..cd4d90f3cf 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -16,7 +16,8 @@
from twisted.internet import defer, reactor
-from .logcontext import PreserveLoggingContext
+from .logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util import unwrapFirstError
@defer.inlineCallbacks
@@ -107,3 +108,32 @@ class ObservableDeferred(object):
return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
id(self), self._result, self._deferred,
)
+
+
+def concurrently_execute(func, args, limit):
+ """Executes the function with each argument conncurrently while limiting
+ the number of concurrent executions.
+
+ Args:
+ func (func): Function to execute, should return a deferred.
+ args (list): List of arguments to pass to func, each invocation of func
+ gets a signle argument.
+ limit (int): Maximum number of conccurent executions.
+
+ Returns:
+ deferred: Resolved when all function invocations have finished.
+ """
+ it = iter(args)
+
+ @defer.inlineCallbacks
+ def _concurrently_execute_inner():
+ try:
+ while True:
+ yield func(it.next())
+ except StopIteration:
+ pass
+
+ return defer.gatherResults([
+ preserve_fn(_concurrently_execute_inner)()
+ for _ in xrange(limit)
+ ], consumeErrors=True).addErrback(unwrapFirstError)
|