diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 53213cdccf..8f8fd82eb0 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .register import RegistrationHandler
from .room import (
RoomCreationHandler, RoomContextHandler,
)
-from .room_member import RoomMemberHandler
from .message import MessageHandler
from .federation import FederationHandler
from .directory import DirectoryHandler
@@ -49,7 +48,6 @@ class Handlers(object):
self.registration_handler = RegistrationHandler(hs)
self.message_handler = MessageHandler(hs)
self.room_creation_handler = RoomCreationHandler(hs)
- self.room_member_handler = RoomMemberHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index faa5609c0c..e089e66fde 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -158,7 +158,7 @@ class BaseHandler(object):
# homeserver.
requester = synapse.types.create_requester(
target_user, is_guest=True)
- handler = self.hs.get_handlers().room_member_handler
+ handler = self.hs.get_room_member_handler()
yield handler.update_membership(
requester,
target_user,
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 258cc345dc..a5365c4fe4 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -863,8 +863,10 @@ class AuthHandler(BaseHandler):
"""
def _do_validate_hash():
- return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
- stored_hash.encode('utf8')) == stored_hash
+ return bcrypt.checkpw(
+ password.encode('utf8') + self.hs.config.password_pepper,
+ stored_hash.encode('utf8')
+ )
if stored_hash:
return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8832ba58bc..520612683e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2153,7 +2153,7 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2197,7 +2197,7 @@ class FederationHandler(BaseHandler):
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7d28c2745c..dd00d8a86c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -52,16 +52,12 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock()
@defer.inlineCallbacks
- def purge_history(self, room_id, event_id, delete_local_events=False):
- event = yield self.store.get_event(event_id)
-
- if event.room_id != room_id:
- raise SynapseError(400, "Event is for wrong room.")
-
- depth = event.depth
-
+ def purge_history(self, room_id, topological_ordering,
+ delete_local_events=False):
with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(room_id, depth, delete_local_events)
+ yield self.store.purge_history(
+ room_id, topological_ordering, delete_local_events,
+ )
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9800e24453..c9c2879038 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -233,7 +233,7 @@ class ProfileHandler(BaseHandler):
)
for room_id in room_ids:
- handler = self.hs.get_handlers().room_member_handler
+ handler = self.hs.get_room_member_handler()
try:
# Assume the target_user isn't a guest,
# because we don't let guests set profile or avatar data.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6ab020bf41..8df8fcbbad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -165,7 +165,7 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
- room_member_handler = self.hs.get_handlers().room_member_handler
+ room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(
requester,
@@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler):
id_server = invite_3pid["id_server"]
address = invite_3pid["address"]
medium = invite_3pid["medium"]
- yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
+ yield self.hs.get_room_member_handler().do_3pid_invite(
room_id,
requester.user,
medium,
@@ -475,12 +475,9 @@ class RoomEventSource(object):
user.to_string()
)
if app_service:
- events, end_key = yield self.store.get_appservice_room_stream(
- service=app_service,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- )
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
else:
room_events = yield self.store.get_membership_changes_for_user(
user.to_string(), from_key, to_key
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 37dc5e99ab..ed3b97730d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -30,24 +30,32 @@ from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer
from synapse.util.distributor import user_left_room, user_joined_room
-from ._base import BaseHandler
logger = logging.getLogger(__name__)
id_server_scheme = "https://"
-class RoomMemberHandler(BaseHandler):
+class RoomMemberHandler(object):
# TODO(paul): This handler currently contains a messy conflation of
# low-level API that works on UserID objects and so on, and REST-level
# API that takes ID strings and returns pagination chunks. These concerns
# ought to be separated out a lot better.
def __init__(self, hs):
- super(RoomMemberHandler, self).__init__(hs)
-
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+ self.state_handler = hs.get_state_handler()
+ self.config = hs.config
+ self.simple_http_client = hs.get_simple_http_client()
+
+ self.federation_handler = hs.get_handlers().federation_handler
+ self.directory_handler = hs.get_handlers().directory_handler
+ self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler()
self.event_creation_hander = hs.get_event_creation_handler()
+ self.replication_layer = hs.get_replication_layer()
self.member_linearizer = Linearizer(name="member")
@@ -138,7 +146,7 @@ class RoomMemberHandler(BaseHandler):
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- yield self.hs.get_handlers().federation_handler.do_invite_join(
+ yield self.federation_handler.do_invite_join(
remote_room_hosts,
room_id,
user.to_string(),
@@ -204,8 +212,7 @@ class RoomMemberHandler(BaseHandler):
# if this is a join with a 3pid signature, we may need to turn a 3pid
# invite into a normal invite before we can handle the join.
if third_party_signed is not None:
- replication = self.hs.get_replication_layer()
- yield replication.exchange_third_party_invite(
+ yield self.replication_layer.exchange_third_party_invite(
third_party_signed["sender"],
target.to_string(),
room_id,
@@ -226,7 +233,7 @@ class RoomMemberHandler(BaseHandler):
requester.user,
)
if not is_requester_admin:
- if self.hs.config.block_non_admin_invites:
+ if self.config.block_non_admin_invites:
logger.info(
"Blocking invite: user is not admin and non-admin "
"invites disabled"
@@ -321,7 +328,7 @@ class RoomMemberHandler(BaseHandler):
else:
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
- fed_handler = self.hs.get_handlers().federation_handler
+ fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts,
@@ -477,7 +484,7 @@ class RoomMemberHandler(BaseHandler):
Raises:
SynapseError if room alias could not be found.
"""
- directory_handler = self.hs.get_handlers().directory_handler
+ directory_handler = self.directory_handler
mapping = yield directory_handler.get_association(room_alias)
if not mapping:
@@ -508,7 +515,7 @@ class RoomMemberHandler(BaseHandler):
requester,
txn_id
):
- if self.hs.config.block_non_admin_invites:
+ if self.config.block_non_admin_invites:
is_requester_admin = yield self.auth.is_server_admin(
requester.user,
)
@@ -555,7 +562,7 @@ class RoomMemberHandler(BaseHandler):
str: the matrix ID of the 3pid, or None if it is not recognized.
"""
try:
- data = yield self.hs.get_simple_http_client().get_json(
+ data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
{
"medium": medium,
@@ -566,7 +573,7 @@ class RoomMemberHandler(BaseHandler):
if "mxid" in data:
if "signatures" not in data:
raise AuthError(401, "No signatures on 3pid binding")
- self.verify_any_signature(data, id_server)
+ yield self.verify_any_signature(data, id_server)
defer.returnValue(data["mxid"])
except IOError as e:
@@ -578,7 +585,7 @@ class RoomMemberHandler(BaseHandler):
if server_hostname not in data["signatures"]:
raise AuthError(401, "No signature from server %s" % (server_hostname,))
for key_name, signature in data["signatures"][server_hostname].items():
- key_data = yield self.hs.get_simple_http_client().get_json(
+ key_data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/pubkey/%s" %
(id_server_scheme, server_hostname, key_name,),
)
@@ -603,7 +610,7 @@ class RoomMemberHandler(BaseHandler):
user,
txn_id
):
- room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+ room_state = yield self.state_handler.get_current_state(room_id)
inviter_display_name = ""
inviter_avatar_url = ""
@@ -727,15 +734,15 @@ class RoomMemberHandler(BaseHandler):
"sender_avatar_url": inviter_avatar_url,
}
- if self.hs.config.invite_3pid_guest:
- registration_handler = self.hs.get_handlers().registration_handler
+ if self.config.invite_3pid_guest:
+ registration_handler = self.registration_handler
guest_access_token = yield registration_handler.guest_access_token_for(
medium=medium,
address=address,
inviter_user_id=inviter_user_id,
)
- guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+ guest_user_info = yield self.auth.get_user_by_access_token(
guest_access_token
)
@@ -744,7 +751,7 @@ class RoomMemberHandler(BaseHandler):
"guest_user_id": guest_user_info["user"].to_string(),
})
- data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+ data = yield self.simple_http_client.post_urlencoded_get_json(
is_url,
invite_config
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b12988f3c9..0f713ce038 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -235,10 +235,10 @@ class SyncHandler(object):
defer.returnValue(rules)
@defer.inlineCallbacks
- def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+ def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
"""Get the ephemeral events for each room the user is in
Args:
- sync_config (SyncConfig): The flags, filters and user for the sync.
+ sync_result_builder(SyncResultBuilder)
now_token (StreamToken): Where the server is currently up to.
since_token (StreamToken): Where the server was when the client
last synced.
@@ -248,10 +248,12 @@ class SyncHandler(object):
typing events for that room.
"""
+ sync_config = sync_result_builder.sync_config
+
with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"
- room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = sync_result_builder.joined_room_ids
typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events(
@@ -565,10 +567,22 @@ class SyncHandler(object):
# Always use the `now_token` in `SyncResultBuilder`
now_token = yield self.event_sources.get_current_token()
+ user_id = sync_config.user.to_string()
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
+ else:
+ joined_room_ids = yield self.get_rooms_for_user_at(
+ user_id, now_token.room_stream_id,
+ )
+
sync_result_builder = SyncResultBuilder(
sync_config, full_state,
since_token=since_token,
now_token=now_token,
+ joined_room_ids=joined_room_ids,
)
account_data_by_room = yield self._generate_sync_entry_for_account_data(
@@ -603,7 +617,6 @@ class SyncHandler(object):
device_id = sync_config.device_id
one_time_key_counts = {}
if device_id:
- user_id = sync_config.user.to_string()
one_time_key_counts = yield self.store.count_e2e_one_time_keys(
user_id, device_id
)
@@ -891,7 +904,7 @@ class SyncHandler(object):
ephemeral_by_room = {}
else:
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_result_builder.sync_config,
+ sync_result_builder,
now_token=sync_result_builder.now_token,
since_token=sync_result_builder.since_token,
)
@@ -996,15 +1009,8 @@ class SyncHandler(object):
if rooms_changed:
defer.returnValue(True)
- app_service = self.store.get_app_service_by_user_id(user_id)
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- joined_room_ids = set(r.room_id for r in rooms)
- else:
- joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
- for room_id in joined_room_ids:
+ for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
defer.returnValue(True)
defer.returnValue(False)
@@ -1028,13 +1034,6 @@ class SyncHandler(object):
assert since_token
- app_service = self.store.get_app_service_by_user_id(user_id)
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- joined_room_ids = set(r.room_id for r in rooms)
- else:
- joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
@@ -1057,7 +1056,7 @@ class SyncHandler(object):
# we do send down the room, and with full state, where necessary
old_state_ids = None
- if room_id in joined_room_ids and non_joins:
+ if room_id in sync_result_builder.joined_room_ids and non_joins:
# Always include if the user (re)joined the room, especially
# important so that device list changes are calculated correctly.
# If there are non join member events, but we are still in the room,
@@ -1067,7 +1066,7 @@ class SyncHandler(object):
# User is in the room so we don't need to do the invite/leave checks
continue
- if room_id in joined_room_ids or has_join:
+ if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
old_mem_ev = None
@@ -1079,7 +1078,7 @@ class SyncHandler(object):
newly_joined_rooms.append(room_id)
# If user is in the room then we don't need to do the invite/leave checks
- if room_id in joined_room_ids:
+ if room_id in sync_result_builder.joined_room_ids:
continue
if not non_joins:
@@ -1146,7 +1145,7 @@ class SyncHandler(object):
# Get all events for rooms we're currently joined to.
room_to_events = yield self.store.get_room_events_stream_for_rooms(
- room_ids=joined_room_ids,
+ room_ids=sync_result_builder.joined_room_ids,
from_key=since_token.room_key,
to_key=now_token.room_key,
limit=timeline_limit + 1,
@@ -1154,7 +1153,7 @@ class SyncHandler(object):
# We loop through all room ids, even if there are no new events, in case
# there are non room events taht we need to notify about.
- for room_id in joined_room_ids:
+ for room_id in sync_result_builder.joined_room_ids:
room_entry = room_to_events.get(room_id, None)
if room_entry:
@@ -1362,6 +1361,54 @@ class SyncHandler(object):
else:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+ @defer.inlineCallbacks
+ def get_rooms_for_user_at(self, user_id, stream_ordering):
+ """Get set of joined rooms for a user at the given stream ordering.
+
+ The stream ordering *must* be recent, otherwise this may throw an
+ exception if older than a month. (This function is called with the
+ current token, which should be perfectly fine).
+
+ Args:
+ user_id (str)
+ stream_ordering (int)
+
+ ReturnValue:
+ Deferred[frozenset[str]]: Set of room_ids the user is in at given
+ stream_ordering.
+ """
+ joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(
+ user_id,
+ )
+
+ joined_room_ids = set()
+
+ # We need to check that the stream ordering of the join for each room
+ # is before the stream_ordering asked for. This might not be the case
+ # if the user joins a room between us getting the current token and
+ # calling `get_rooms_for_user_with_stream_ordering`.
+ # If the membership's stream ordering is after the given stream
+ # ordering, we need to go and work out if the user was in the room
+ # before.
+ for room_id, membership_stream_ordering in joined_rooms:
+ if membership_stream_ordering <= stream_ordering:
+ joined_room_ids.add(room_id)
+ continue
+
+ logger.info("User joined room after current token: %s", room_id)
+
+ extrems = yield self.store.get_forward_extremeties_for_room(
+ room_id, stream_ordering,
+ )
+ users_in_room = yield self.state.get_current_user_in_room(
+ room_id, extrems,
+ )
+ if user_id in users_in_room:
+ joined_room_ids.add(room_id)
+
+ joined_room_ids = frozenset(joined_room_ids)
+ defer.returnValue(joined_room_ids)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1411,7 +1458,8 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
class SyncResultBuilder(object):
"Used to help build up a new SyncResult for a user"
- def __init__(self, sync_config, full_state, since_token, now_token):
+ def __init__(self, sync_config, full_state, since_token, now_token,
+ joined_room_ids):
"""
Args:
sync_config(SyncConfig)
@@ -1423,6 +1471,7 @@ class SyncResultBuilder(object):
self.full_state = full_state
self.since_token = since_token
self.now_token = now_token
+ self.joined_room_ids = joined_room_ids
self.presence = []
self.account_data = []
|