diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 6519f183df..5fd20285d2 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -92,7 +92,15 @@ class BaseHandler(object):
membership_event = state.get((EventTypes.Member, user_id), None)
if membership_event:
- membership = membership_event.membership
+ was_forgotten_at_event = yield self.store.was_forgotten_at(
+ membership_event.state_key,
+ membership_event.room_id,
+ membership_event.event_id
+ )
+ if was_forgotten_at_event:
+ membership = None
+ else:
+ membership = membership_event.membership
else:
membership = None
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 1d35d3b7dc..fe773bee9b 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -29,9 +29,10 @@ class AccountDataEventSource(object):
last_stream_id = from_key
current_stream_id = yield self.store.get_max_account_data_stream_id()
- tags = yield self.store.get_updated_tags(user_id, last_stream_id)
results = []
+ tags = yield self.store.get_updated_tags(user_id, last_stream_id)
+
for room_id, room_tags in tags.items():
results.append({
"type": "m.tag",
@@ -39,6 +40,24 @@ class AccountDataEventSource(object):
"room_id": room_id,
})
+ account_data, room_account_data = (
+ yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
+ )
+
+ for account_data_type, content in account_data.items():
+ results.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ for room_id, account_data in room_account_data.items():
+ for account_data_type, content in account_data.items():
+ results.append({
+ "type": account_data_type,
+ "content": content,
+ "room_id": room_id,
+ })
+
defer.returnValue((results, current_stream_id))
@defer.inlineCallbacks
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d852a18555..04fa58df65 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -30,34 +30,27 @@ class AdminHandler(BaseHandler):
@defer.inlineCallbacks
def get_whois(self, user):
- res = yield self.store.get_user_ip_and_agents(user)
-
- d = {}
- for r in res:
- # Note that device_id is always None
- device = d.setdefault(r["device_id"], {})
- session = device.setdefault(r["access_token"], [])
- session.append({
- "ip": r["ip"],
- "user_agent": r["user_agent"],
- "last_seen": r["last_seen"],
+ connections = []
+
+ sessions = yield self.store.get_user_ip_and_agents(user)
+ for session in sessions:
+ connections.append({
+ "ip": session["ip"],
+ "last_seen": session["last_seen"],
+ "user_agent": session["user_agent"],
})
ret = {
"user_id": user.to_string(),
- "devices": [
- {
- "device_id": k,
+ "devices": {
+ "": {
"sessions": [
{
- # "access_token": x, TODO (erikj)
- "connections": y,
+ "connections": connections,
}
- for x, y in v.items()
]
- }
- for k, v in d.items()
- ],
+ },
+ },
}
defer.returnValue(ret)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 0e4c0d4d06..576d77e0e7 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -28,6 +28,18 @@ import random
logger = logging.getLogger(__name__)
+def started_user_eventstream(distributor, user):
+ return distributor.fire("started_user_eventstream", user)
+
+
+def stopped_user_eventstream(distributor, user):
+ return distributor.fire("stopped_user_eventstream", user)
+
+
+def user_joined_room(distributor, user, room_id):
+ return distributor.fire("user_joined_room", user, room_id)
+
+
class EventStreamHandler(BaseHandler):
def __init__(self, hs):
@@ -57,7 +69,12 @@ class EventStreamHandler(BaseHandler):
A deferred that completes once their presence has been updated.
"""
if user not in self._streams_per_user:
- self._streams_per_user[user] = 0
+ # Make sure we set the streams per user to 1 here rather than
+ # setting it to zero and incrementing the value below.
+ # Otherwise this may race with stopped_stream causing the
+ # user to be erased from the map before we have a chance
+ # to increment it.
+ self._streams_per_user[user] = 1
if user in self._stop_timer_per_user:
try:
self.clock.cancel_call_later(
@@ -66,9 +83,9 @@ class EventStreamHandler(BaseHandler):
except:
logger.exception("Failed to cancel event timer")
else:
- yield self.distributor.fire("started_user_eventstream", user)
-
- self._streams_per_user[user] += 1
+ yield started_user_eventstream(self.distributor, user)
+ else:
+ self._streams_per_user[user] += 1
def stopped_stream(self, user):
"""If there are no streams for a user this starts a timer that will
@@ -89,7 +106,7 @@ class EventStreamHandler(BaseHandler):
self._stop_timer_per_user.pop(user, None)
- return self.distributor.fire("stopped_user_eventstream", user)
+ return stopped_user_eventstream(self.distributor, user)
logger.debug("Scheduling _later: for %s", user)
self._stop_timer_per_user[user] = (
@@ -120,9 +137,7 @@ class EventStreamHandler(BaseHandler):
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
if is_guest:
- yield self.distributor.fire(
- "user_joined_room", user=auth_user, room_id=room_id
- )
+ yield user_joined_room(self.distributor, auth_user, room_id)
events, tokens = yield self.notifier.get_events_for(
auth_user, pagin_config, timeout,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c1bce07e31..28f2ff68d6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -44,6 +44,10 @@ import logging
logger = logging.getLogger(__name__)
+def user_joined_room(distributor, user, room_id):
+ return distributor.fire("user_joined_room", user, room_id)
+
+
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -60,10 +64,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.distributor.observe(
- "user_joined_room",
- self._on_user_joined
- )
+ self.distributor.observe("user_joined_room", self.user_joined_room)
self.waiting_for_join_list = {}
@@ -176,7 +177,7 @@ class FederationHandler(BaseHandler):
)
try:
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
event,
state=state,
@@ -233,10 +234,13 @@ class FederationHandler(BaseHandler):
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
- user = UserID.from_string(event.state_key)
- yield self.distributor.fire(
- "user_joined_room", user=user, room_id=event.room_id
- )
+ prev_state = context.current_state.get((event.type, event.state_key))
+ if not prev_state or prev_state.membership != Membership.JOIN:
+ # Only fire user_joined_room if the user has acutally
+ # joined the room. Don't bother if the user is just
+ # changing their profile info.
+ user = UserID.from_string(event.state_key)
+ yield user_joined_room(self.distributor, user, event.room_id)
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
@@ -592,7 +596,7 @@ class FederationHandler(BaseHandler):
handled_events = set()
try:
- new_event = self._sign_event(event)
+ event = self._sign_event(event)
# Try the host we successfully got a response to /make_join/
# request first.
try:
@@ -600,7 +604,7 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
- ret = yield self.replication_layer.send_join(target_hosts, new_event)
+ ret = yield self.replication_layer.send_join(target_hosts, event)
origin = ret["origin"]
state = ret["state"]
@@ -609,12 +613,12 @@ class FederationHandler(BaseHandler):
handled_events.update([s.event_id for s in state])
handled_events.update([a.event_id for a in auth_chain])
- handled_events.add(new_event.event_id)
+ handled_events.add(event.event_id)
logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)
- logger.debug("do_invite_join event: %s", new_event)
+ logger.debug("do_invite_join event: %s", event)
try:
yield self.store.store_room(
@@ -632,14 +636,14 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- new_event, event_stream_id, max_stream_id,
+ event, event_stream_id, max_stream_id,
extra_users=[joinee]
)
def log_failure(f):
logger.warn(
"Failed to notify about %s: %s",
- new_event.event_id, f.value
+ event.event_id, f.value
)
d.addErrback(log_failure)
@@ -733,9 +737,7 @@ class FederationHandler(BaseHandler):
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
- yield self.distributor.fire(
- "user_joined_room", user=user, room_id=event.room_id
- )
+ yield user_joined_room(self.distributor, user, event.room_id)
new_pdu = event
@@ -1082,7 +1084,7 @@ class FederationHandler(BaseHandler):
return self.store.get_min_depth(context)
@log_function
- def _on_user_joined(self, user, room_id):
+ def user_joined_room(self, user, room_id):
waiters = self.waiting_for_join_list.get(
(user.to_string(), room_id),
[]
@@ -1648,11 +1650,22 @@ class FederationHandler(BaseHandler):
sender = invite["sender"]
room_id = invite["room_id"]
+ if "signed" not in invite or "token" not in invite["signed"]:
+ logger.info(
+ "Discarding received notification of third party invite "
+ "without signed: %s" % (invite,)
+ )
+ return
+
+ third_party_invite = {
+ "signed": invite["signed"],
+ }
+
event_dict = {
"type": EventTypes.Member,
"content": {
"membership": Membership.INVITE,
- "third_party_invite": invite,
+ "third_party_invite": third_party_invite,
},
"room_id": room_id,
"sender": sender,
@@ -1663,6 +1676,11 @@ class FederationHandler(BaseHandler):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
event, context = yield self._create_new_client_event(builder=builder)
+
+ event, context = yield self.add_display_name_to_third_party_invite(
+ event_dict, event, context
+ )
+
self.auth.check(event, context.current_state)
yield self._validate_keyserver(event, auth_events=context.current_state)
member_handler = self.hs.get_handlers().room_member_handler
@@ -1684,6 +1702,10 @@ class FederationHandler(BaseHandler):
builder=builder,
)
+ event, context = yield self.add_display_name_to_third_party_invite(
+ event_dict, event, context
+ )
+
self.auth.check(event, auth_events=context.current_state)
yield self._validate_keyserver(event, auth_events=context.current_state)
@@ -1694,6 +1716,27 @@ class FederationHandler(BaseHandler):
yield member_handler.change_membership(event, context)
@defer.inlineCallbacks
+ def add_display_name_to_third_party_invite(self, event_dict, event, context):
+ key = (
+ EventTypes.ThirdPartyInvite,
+ event.content["third_party_invite"]["signed"]["token"]
+ )
+ original_invite = context.current_state.get(key)
+ if not original_invite:
+ logger.info(
+ "Could not find invite event for third_party_invite - "
+ "discarding: %s" % (event_dict,)
+ )
+ return
+
+ display_name = original_invite.content["display_name"]
+ event_dict["content"]["third_party_invite"]["display_name"] = display_name
+ builder = self.event_builder_factory.new(event_dict)
+ EventValidator().validate_new(builder)
+ event, context = yield self._create_new_client_event(builder=builder)
+ defer.returnValue((event, context))
+
+ @defer.inlineCallbacks
def _validate_keyserver(self, event, auth_events):
token = event.content["third_party_invite"]["signed"]["token"]
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 2a99921d5f..f1fa562fff 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -20,7 +20,6 @@ from synapse.api.errors import (
CodeMessageException
)
from ._base import BaseHandler
-from synapse.http.client import SimpleHttpClient
from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError
@@ -35,13 +34,12 @@ class IdentityHandler(BaseHandler):
def __init__(self, hs):
super(IdentityHandler, self).__init__(hs)
+ self.http_client = hs.get_simple_http_client()
+
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
yield run_on_reactor()
- # TODO: get this from the homeserver rather than creating a new one for
- # each request
- http_client = SimpleHttpClient(self.hs)
# XXX: make this configurable!
# trustedIdServers = ['matrix.org', 'localhost:8090']
trustedIdServers = ['matrix.org', 'vector.im']
@@ -67,7 +65,7 @@ class IdentityHandler(BaseHandler):
data = {}
try:
- data = yield http_client.get_json(
+ data = yield self.http_client.get_json(
"https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/3pid/getValidated3pid"
@@ -85,7 +83,6 @@ class IdentityHandler(BaseHandler):
def bind_threepid(self, creds, mxid):
yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid)
- http_client = SimpleHttpClient(self.hs)
data = None
if 'id_server' in creds:
@@ -103,7 +100,7 @@ class IdentityHandler(BaseHandler):
raise SynapseError(400, "No client_secret in creds")
try:
- data = yield http_client.post_urlencoded_get_json(
+ data = yield self.http_client.post_urlencoded_get_json(
"https://%s%s" % (
id_server, "/_matrix/identity/api/v1/3pid/bind"
),
@@ -121,7 +118,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
yield run_on_reactor()
- http_client = SimpleHttpClient(self.hs)
params = {
'email': email,
@@ -131,7 +127,7 @@ class IdentityHandler(BaseHandler):
params.update(kwargs)
try:
- data = yield http_client.post_urlencoded_get_json(
+ data = yield self.http_client.post_urlencoded_get_json(
"https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/validate/email/requestToken"
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2e7d0d7f82..a1bed9b0dc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,15 +22,22 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.types import UserID, RoomStreamToken, StreamToken
from ._base import BaseHandler
+from canonicaljson import encode_canonical_json
+
import logging
logger = logging.getLogger(__name__)
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -39,6 +46,7 @@ class MessageHandler(BaseHandler):
self.state = hs.get_state_handler()
self.clock = hs.get_clock()
self.validator = EventValidator()
+ self.snapshot_cache = SnapshotCache()
@defer.inlineCallbacks
def get_message(self, msg_id=None, room_id=None, sender_id=None,
@@ -195,10 +203,8 @@ class MessageHandler(BaseHandler):
if membership == Membership.JOIN:
joinee = UserID.from_string(builder.state_key)
# If event doesn't include a display name, add one.
- yield self.distributor.fire(
- "collect_presencelike_data",
- joinee,
- builder.content
+ yield collect_presencelike_data(
+ self.distributor, joinee, builder.content
)
if token_id is not None:
@@ -211,6 +217,16 @@ class MessageHandler(BaseHandler):
builder=builder,
)
+ if event.is_state():
+ prev_state = context.current_state.get((event.type, event.state_key))
+ if prev_state and event.user_id == prev_state.user_id:
+ prev_content = encode_canonical_json(prev_state.content)
+ next_content = encode_canonical_json(event.content)
+ if prev_content == next_content:
+ # Duplicate suppression for state updates with same sender
+ # and content.
+ defer.returnValue(prev_state)
+
if event.type == EventTypes.Member:
member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.change_membership(event, context, is_guest=is_guest)
@@ -312,7 +328,6 @@ class MessageHandler(BaseHandler):
[serialize_event(c, now) for c in room_state.values()]
)
- @defer.inlineCallbacks
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
as_client_event=True, include_archived=False):
"""Retrieve a snapshot of all rooms the user is invited or has joined.
@@ -332,6 +347,28 @@ class MessageHandler(BaseHandler):
is joined on, may return a "messages" key with messages, depending
on the specified PaginationConfig.
"""
+ key = (
+ user_id,
+ pagin_config.from_token,
+ pagin_config.to_token,
+ pagin_config.direction,
+ pagin_config.limit,
+ as_client_event,
+ include_archived,
+ )
+ now_ms = self.clock.time_msec()
+ result = self.snapshot_cache.get(now_ms, key)
+ if result is not None:
+ return result
+
+ return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms(
+ user_id, pagin_config, as_client_event, include_archived
+ ))
+
+ @defer.inlineCallbacks
+ def _snapshot_all_rooms(self, user_id=None, pagin_config=None,
+ as_client_event=True, include_archived=False):
+
memberships = [Membership.INVITE, Membership.JOIN]
if include_archived:
memberships.append(Membership.LEAVE)
@@ -359,6 +396,10 @@ class MessageHandler(BaseHandler):
tags_by_room = yield self.store.get_tags_for_user(user_id)
+ account_data, account_data_by_room = (
+ yield self.store.get_account_data_for_user(user_id)
+ )
+
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
@@ -436,14 +477,22 @@ class MessageHandler(BaseHandler):
for c in current_state.values()
]
- account_data = []
+ account_data_events = []
tags = tags_by_room.get(event.room_id)
if tags:
- account_data.append({
+ account_data_events.append({
"type": "m.tag",
"content": {"tags": tags},
})
- d["account_data"] = account_data
+
+ account_data = account_data_by_room.get(event.room_id, {})
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ d["account_data"] = account_data_events
except:
logger.exception("Failed to get snapshot")
@@ -456,9 +505,17 @@ class MessageHandler(BaseHandler):
consumeErrors=True
).addErrback(unwrapFirstError)
+ account_data_events = []
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
ret = {
"rooms": rooms_ret,
"presence": presence,
+ "account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
}
@@ -498,14 +555,22 @@ class MessageHandler(BaseHandler):
user_id, room_id, pagin_config, membership, member_event_id, is_guest
)
- account_data = []
+ account_data_events = []
tags = yield self.store.get_tags_for_room(user_id, room_id)
if tags:
- account_data.append({
+ account_data_events.append({
"type": "m.tag",
"content": {"tags": tags},
})
- result["account_data"] = account_data
+
+ account_data = yield self.store.get_account_data_for_room(user_id, room_id)
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ result["account_data"] = account_data_events
defer.returnValue(result)
@@ -588,23 +653,28 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
- states = {}
- if not is_guest:
- states = yield presence_handler.get_states(
- target_users=[UserID.from_string(m.user_id) for m in room_members],
- auth_user=auth_user,
- as_event=True,
- check_auth=False,
- )
+ states = yield presence_handler.get_states(
+ target_users=[UserID.from_string(m.user_id) for m in room_members],
+ auth_user=auth_user,
+ as_event=True,
+ check_auth=False,
+ )
defer.returnValue(states.values())
- receipts_handler = self.hs.get_handlers().receipts_handler
+ @defer.inlineCallbacks
+ def get_receipts():
+ receipts_handler = self.hs.get_handlers().receipts_handler
+ receipts = yield receipts_handler.get_receipts_for_room(
+ room_id,
+ now_token.receipt_key
+ )
+ defer.returnValue(receipts)
presence, receipts, (messages, token) = yield defer.gatherResults(
[
get_presence(),
- receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
+ get_receipts(),
self.store.get_recent_events_for_room(
room_id,
limit=limit,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index aca65096fc..63d6f30a7b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -62,6 +62,14 @@ def partitionbool(l, func):
return ret.get(True, []), ret.get(False, [])
+def user_presence_changed(distributor, user, statuscache):
+ return distributor.fire("user_presence_changed", user, statuscache)
+
+
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
class PresenceHandler(BaseHandler):
STATE_LEVELS = {
@@ -361,9 +369,7 @@ class PresenceHandler(BaseHandler):
yield self.store.set_presence_state(
target_user.localpart, state_to_store
)
- yield self.distributor.fire(
- "collect_presencelike_data", target_user, state
- )
+ yield collect_presencelike_data(self.distributor, target_user, state)
if now_level > was_level:
state["last_active"] = self.clock.time_msec()
@@ -467,7 +473,7 @@ class PresenceHandler(BaseHandler):
)
@defer.inlineCallbacks
- def send_invite(self, observer_user, observed_user):
+ def send_presence_invite(self, observer_user, observed_user):
"""Request the presence of a local or remote user for a local user"""
if not self.hs.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
@@ -878,7 +884,7 @@ class PresenceHandler(BaseHandler):
room_ids=room_ids,
statuscache=statuscache,
)
- yield self.distributor.fire("user_presence_changed", user, statuscache)
+ yield user_presence_changed(self.distributor, user, statuscache)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
@@ -1116,9 +1122,7 @@ class PresenceHandler(BaseHandler):
self._user_cachemap[user].get_state()["last_active"]
)
- yield self.distributor.fire(
- "collect_presencelike_data", user, state
- )
+ yield collect_presencelike_data(self.distributor, user, state)
if "last_active" in state:
state = dict(state)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 799faffe53..576c6f09b4 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -28,6 +28,14 @@ import logging
logger = logging.getLogger(__name__)
+def changed_presencelike_data(distributor, user, state):
+ return distributor.fire("changed_presencelike_data", user, state)
+
+
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
class ProfileHandler(BaseHandler):
def __init__(self, hs):
@@ -95,11 +103,9 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_displayname
)
- yield self.distributor.fire(
- "changed_presencelike_data", target_user, {
- "displayname": new_displayname,
- }
- )
+ yield changed_presencelike_data(self.distributor, target_user, {
+ "displayname": new_displayname,
+ })
yield self._update_join_states(target_user)
@@ -144,11 +150,9 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_avatar_url
)
- yield self.distributor.fire(
- "changed_presencelike_data", target_user, {
- "avatar_url": new_avatar_url,
- }
- )
+ yield changed_presencelike_data(self.distributor, target_user, {
+ "avatar_url": new_avatar_url,
+ })
yield self._update_join_states(target_user)
@@ -208,9 +212,7 @@ class ProfileHandler(BaseHandler):
"membership": Membership.JOIN,
}
- yield self.distributor.fire(
- "collect_presencelike_data", user, content
- )
+ yield collect_presencelike_data(self.distributor, user, content)
msg_handler = self.hs.get_handlers().message_handler
try:
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 493a087031..baf7c14e40 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -31,6 +31,10 @@ import urllib
logger = logging.getLogger(__name__)
+def registered_user(distributor, user):
+ return distributor.fire("registered_user", user)
+
+
class RegistrationHandler(BaseHandler):
def __init__(self, hs):
@@ -38,6 +42,7 @@ class RegistrationHandler(BaseHandler):
self.distributor = hs.get_distributor()
self.distributor.declare("registered_user")
+ self.captcha_client = CaptchaServerHttpClient(hs)
@defer.inlineCallbacks
def check_username(self, localpart):
@@ -98,7 +103,7 @@ class RegistrationHandler(BaseHandler):
password_hash=password_hash
)
- yield self.distributor.fire("registered_user", user)
+ yield registered_user(self.distributor, user)
else:
# autogen a random user ID
attempts = 0
@@ -117,7 +122,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=password_hash)
- self.distributor.fire("registered_user", user)
+ yield registered_user(self.distributor, user)
except SynapseError:
# if user id is taken, just generate another
user_id = None
@@ -127,25 +132,9 @@ class RegistrationHandler(BaseHandler):
raise RegistrationError(
500, "Cannot generate user ID.")
- # create a default avatar for the user
- # XXX: ideally clients would explicitly specify one, but given they don't
- # and we want consistent and pretty identicons for random users, we'll
- # do it here.
- try:
- auth_user = UserID.from_string(user_id)
- media_repository = self.hs.get_resource_for_media_repository()
- identicon_resource = media_repository.getChildWithDefault("identicon", None)
- upload_resource = media_repository.getChildWithDefault("upload", None)
- identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320)
- content_uri = yield upload_resource.create_content(
- "image/png", None, identicon_bytes, len(identicon_bytes), auth_user
- )
- profile_handler = self.hs.get_handlers().profile_handler
- profile_handler.set_avatar_url(
- auth_user, auth_user, ("%s#auto" % (content_uri,))
- )
- except NotImplementedError:
- pass # make tests pass without messing around creating default avatars
+ # We used to generate default identicons here, but nowadays
+ # we want clients to generate their own as part of their branding
+ # rather than there being consistent matrix-wide ones, so we don't.
defer.returnValue((user_id, token))
@@ -167,7 +156,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=""
)
- self.distributor.fire("registered_user", user)
+ registered_user(self.distributor, user)
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -215,7 +204,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=None
)
- yield self.distributor.fire("registered_user", user)
+ yield registered_user(self.distributor, user)
except Exception, e:
yield self.store.add_access_token_to_user(user_id, token)
# Ignore Registration errors
@@ -302,10 +291,7 @@ class RegistrationHandler(BaseHandler):
"""
Used only by c/s api v1
"""
- # TODO: get this from the homeserver rather than creating a new one for
- # each request
- client = CaptchaServerHttpClient(self.hs)
- data = yield client.post_urlencoded_get_raw(
+ data = yield self.captcha_client.post_urlencoded_get_raw(
"http://www.google.com:80/recaptcha/api/verify",
args={
'privatekey': private_key,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3f04752581..13f66e0df0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -41,6 +41,18 @@ logger = logging.getLogger(__name__)
id_server_scheme = "https://"
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
+def user_left_room(distributor, user, room_id):
+ return distributor.fire("user_left_room", user=user, room_id=room_id)
+
+
+def user_joined_room(distributor, user, room_id):
+ return distributor.fire("user_joined_room", user=user, room_id=room_id)
+
+
class RoomCreationHandler(BaseHandler):
PRESETS_DICT = {
@@ -438,9 +450,7 @@ class RoomMemberHandler(BaseHandler):
if prev_state and prev_state.membership == Membership.JOIN:
user = UserID.from_string(event.user_id)
- self.distributor.fire(
- "user_left_room", user=user, room_id=event.room_id
- )
+ user_left_room(self.distributor, user, event.room_id)
defer.returnValue({"room_id": room_id})
@@ -458,9 +468,7 @@ class RoomMemberHandler(BaseHandler):
raise SynapseError(404, "No known servers")
# If event doesn't include a display name, add one.
- yield self.distributor.fire(
- "collect_presencelike_data", joinee, content
- )
+ yield collect_presencelike_data(self.distributor, joinee, content)
content.update({"membership": Membership.JOIN})
builder = self.event_builder_factory.new({
@@ -517,10 +525,13 @@ class RoomMemberHandler(BaseHandler):
do_auth=do_auth,
)
- user = UserID.from_string(event.user_id)
- yield self.distributor.fire(
- "user_joined_room", user=user, room_id=room_id
- )
+ prev_state = context.current_state.get((event.type, event.state_key))
+ if not prev_state or prev_state.membership != Membership.JOIN:
+ # Only fire user_joined_room if the user has acutally joined the
+ # room. Don't bother if the user is just changing their profile
+ # info.
+ user = UserID.from_string(event.user_id)
+ yield user_joined_room(self.distributor, user, room_id)
@defer.inlineCallbacks
def get_inviter(self, event):
@@ -693,13 +704,48 @@ class RoomMemberHandler(BaseHandler):
token_id,
txn_id
):
+ room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+
+ inviter_display_name = ""
+ inviter_avatar_url = ""
+ member_event = room_state.get((EventTypes.Member, user.to_string()))
+ if member_event:
+ inviter_display_name = member_event.content.get("displayname", "")
+ inviter_avatar_url = member_event.content.get("avatar_url", "")
+
+ canonical_room_alias = ""
+ canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
+ if canonical_alias_event:
+ canonical_room_alias = canonical_alias_event.content.get("alias", "")
+
+ room_name = ""
+ room_name_event = room_state.get((EventTypes.Name, ""))
+ if room_name_event:
+ room_name = room_name_event.content.get("name", "")
+
+ room_join_rules = ""
+ join_rules_event = room_state.get((EventTypes.JoinRules, ""))
+ if join_rules_event:
+ room_join_rules = join_rules_event.content.get("join_rule", "")
+
+ room_avatar_url = ""
+ room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+ if room_avatar_event:
+ room_avatar_url = room_avatar_event.content.get("url", "")
+
token, public_key, key_validity_url, display_name = (
yield self._ask_id_server_for_third_party_invite(
- id_server,
- medium,
- address,
- room_id,
- user.to_string()
+ id_server=id_server,
+ medium=medium,
+ address=address,
+ room_id=room_id,
+ inviter_user_id=user.to_string(),
+ room_alias=canonical_room_alias,
+ room_avatar_url=room_avatar_url,
+ room_join_rules=room_join_rules,
+ room_name=room_name,
+ inviter_display_name=inviter_display_name,
+ inviter_avatar_url=inviter_avatar_url
)
)
msg_handler = self.hs.get_handlers().message_handler
@@ -721,7 +767,19 @@ class RoomMemberHandler(BaseHandler):
@defer.inlineCallbacks
def _ask_id_server_for_third_party_invite(
- self, id_server, medium, address, room_id, sender):
+ self,
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter_user_id,
+ room_alias,
+ room_avatar_url,
+ room_join_rules,
+ room_name,
+ inviter_display_name,
+ inviter_avatar_url
+ ):
is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
id_server_scheme, id_server,
)
@@ -731,7 +789,13 @@ class RoomMemberHandler(BaseHandler):
"medium": medium,
"address": address,
"room_id": room_id,
- "sender": sender,
+ "room_alias": room_alias,
+ "room_avatar_url": room_avatar_url,
+ "room_join_rules": room_join_rules,
+ "room_name": room_name,
+ "sender": inviter_user_id,
+ "sender_display_name": inviter_display_name,
+ "sender_avatar_url": inviter_avatar_url,
}
)
# TODO: Check for success
@@ -743,13 +807,17 @@ class RoomMemberHandler(BaseHandler):
)
defer.returnValue((token, public_key, key_validity_url, display_name))
+ def forget(self, user, room_id):
+ return self.store.forget(user.to_string(), room_id)
+
class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_public_room_list(self):
chunk = yield self.store.get_rooms(is_public=True)
- results = yield defer.gatherResults(
+
+ room_members = yield defer.gatherResults(
[
self.store.get_users_in_room(room["room_id"])
for room in chunk
@@ -757,12 +825,30 @@ class RoomListHandler(BaseHandler):
consumeErrors=True,
).addErrback(unwrapFirstError)
+ avatar_urls = yield defer.gatherResults(
+ [
+ self.get_room_avatar_url(room["room_id"])
+ for room in chunk
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
for i, room in enumerate(chunk):
- room["num_joined_members"] = len(results[i])
+ room["num_joined_members"] = len(room_members[i])
+ if avatar_urls[i]:
+ room["avatar_url"] = avatar_urls[i]
# FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
+ @defer.inlineCallbacks
+ def get_room_avatar_url(self, room_id):
+ event = yield self.hs.get_state_handler().get_current_state(
+ room_id, "m.room.avatar"
+ )
+ if event and "url" in event.content:
+ defer.returnValue(event.content["url"])
+
class RoomContextHandler(BaseHandler):
@defer.inlineCallbacks
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 50688e51a8..99ef56871c 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -131,6 +131,17 @@ class SearchHandler(BaseHandler):
if batch_group == "room_id":
room_ids.intersection_update({batch_group_key})
+ if not room_ids:
+ defer.returnValue({
+ "search_categories": {
+ "room_events": {
+ "results": [],
+ "count": 0,
+ "highlights": [],
+ }
+ }
+ })
+
rank_map = {} # event_id -> rank of event
allowed_events = []
room_groups = {} # Holds result of grouping by room, if applicable
@@ -139,11 +150,22 @@ class SearchHandler(BaseHandler):
# Holds the next_batch for the entire result set if one of those exists
global_next_batch = None
+ highlights = set()
+
+ count = None
+
if order_by == "rank":
- results = yield self.store.search_msgs(
+ search_result = yield self.store.search_msgs(
room_ids, search_term, keys
)
+ count = search_result["count"]
+
+ if search_result["highlights"]:
+ highlights.update(search_result["highlights"])
+
+ results = search_result["results"]
+
results_map = {r["event"].event_id: r for r in results}
rank_map.update({r["event"].event_id: r["rank"] for r in results})
@@ -171,80 +193,78 @@ class SearchHandler(BaseHandler):
s["results"].append(e.event_id)
elif order_by == "recent":
- # In this case we specifically loop through each room as the given
- # limit applies to each room, rather than a global list.
- # This is not necessarilly a good idea.
- for room_id in room_ids:
- room_events = []
- if batch_group == "room_id" and batch_group_key == room_id:
- pagination_token = batch_token
- else:
- pagination_token = None
- i = 0
-
- # We keep looping and we keep filtering until we reach the limit
- # or we run out of things.
- # But only go around 5 times since otherwise synapse will be sad.
- while len(room_events) < search_filter.limit() and i < 5:
- i += 1
- results = yield self.store.search_room(
- room_id, search_term, keys, search_filter.limit() * 2,
- pagination_token=pagination_token,
- )
+ room_events = []
+ i = 0
+
+ pagination_token = batch_token
+
+ # We keep looping and we keep filtering until we reach the limit
+ # or we run out of things.
+ # But only go around 5 times since otherwise synapse will be sad.
+ while len(room_events) < search_filter.limit() and i < 5:
+ i += 1
+ search_result = yield self.store.search_rooms(
+ room_ids, search_term, keys, search_filter.limit() * 2,
+ pagination_token=pagination_token,
+ )
- results_map = {r["event"].event_id: r for r in results}
+ if search_result["highlights"]:
+ highlights.update(search_result["highlights"])
- rank_map.update({r["event"].event_id: r["rank"] for r in results})
+ count = search_result["count"]
- filtered_events = search_filter.filter([
- r["event"] for r in results
- ])
+ results = search_result["results"]
- events = yield self._filter_events_for_client(
- user.to_string(), filtered_events
- )
+ results_map = {r["event"].event_id: r for r in results}
- room_events.extend(events)
- room_events = room_events[:search_filter.limit()]
+ rank_map.update({r["event"].event_id: r["rank"] for r in results})
- if len(results) < search_filter.limit() * 2:
- pagination_token = None
- break
- else:
- pagination_token = results[-1]["pagination_token"]
-
- if room_events:
- res = results_map[room_events[-1].event_id]
- pagination_token = res["pagination_token"]
-
- group = room_groups.setdefault(room_id, {})
- if pagination_token:
- next_batch = encode_base64("%s\n%s\n%s" % (
- "room_id", room_id, pagination_token
- ))
- group["next_batch"] = next_batch
-
- if batch_token:
- global_next_batch = next_batch
-
- group["results"] = [e.event_id for e in room_events]
- group["order"] = max(
- e.origin_server_ts/1000 for e in room_events
- if hasattr(e, "origin_server_ts")
- )
+ filtered_events = search_filter.filter([
+ r["event"] for r in results
+ ])
+
+ events = yield self._filter_events_for_client(
+ user.to_string(), filtered_events
+ )
- allowed_events.extend(room_events)
+ room_events.extend(events)
+ room_events = room_events[:search_filter.limit()]
- # Normalize the group orders
- if room_groups:
- if len(room_groups) > 1:
- mx = max(g["order"] for g in room_groups.values())
- mn = min(g["order"] for g in room_groups.values())
+ if len(results) < search_filter.limit() * 2:
+ pagination_token = None
+ break
+ else:
+ pagination_token = results[-1]["pagination_token"]
- for g in room_groups.values():
- g["order"] = (g["order"] - mn) * 1.0 / (mx - mn)
+ for event in room_events:
+ group = room_groups.setdefault(event.room_id, {
+ "results": [],
+ })
+ group["results"].append(event.event_id)
+
+ if room_events and len(room_events) >= search_filter.limit():
+ last_event_id = room_events[-1].event_id
+ pagination_token = results_map[last_event_id]["pagination_token"]
+
+ # We want to respect the given batch group and group keys so
+ # that if people blindly use the top level `next_batch` token
+ # it returns more from the same group (if applicable) rather
+ # than reverting to searching all results again.
+ if batch_group and batch_group_key:
+ global_next_batch = encode_base64("%s\n%s\n%s" % (
+ batch_group, batch_group_key, pagination_token
+ ))
else:
- room_groups.values()[0]["order"] = 1
+ global_next_batch = encode_base64("%s\n%s\n%s" % (
+ "all", "", pagination_token
+ ))
+
+ for room_id, group in room_groups.items():
+ group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+ "room_id", room_id, pagination_token
+ ))
+
+ allowed_events.extend(room_events)
else:
# We should never get here due to the guard earlier.
@@ -334,20 +354,19 @@ class SearchHandler(BaseHandler):
# We're now about to serialize the events. We should not make any
# blocking calls after this. Otherwise the 'age' will be wrong
- results = {
- e.event_id: {
+ results = [
+ {
"rank": rank_map[e.event_id],
"result": serialize_event(e, time_now),
"context": contexts.get(e.event_id, {}),
}
for e in allowed_events
- }
-
- logger.info("Found %d results", len(results))
+ ]
rooms_cat_res = {
"results": results,
- "count": len(results)
+ "count": count,
+ "highlights": list(highlights),
}
if state_results:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 877328b29e..feea407ea2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,8 +15,9 @@
from ._base import BaseHandler
-from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
+from synapse.api.errors import GuestAccessError
+from synapse.util import unwrapFirstError
from twisted.internet import defer
@@ -28,6 +29,7 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
+ "is_guest",
"filter",
])
@@ -100,6 +102,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
+ "account_data", # List of account_data events for the user.
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
@@ -115,6 +118,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.presence or self.joined or self.invited
)
+GuestRoom = collections.namedtuple("GuestRoom", ("room_id", "membership"))
+
class SyncHandler(BaseHandler):
@@ -133,6 +138,18 @@ class SyncHandler(BaseHandler):
A Deferred SyncResult.
"""
+ if sync_config.is_guest:
+ bad_rooms = []
+ for room_id in sync_config.filter.list_rooms():
+ world_readable = yield self._is_world_readable(room_id)
+ if not world_readable:
+ bad_rooms.append(room_id)
+
+ if bad_rooms:
+ raise GuestAccessError(
+ bad_rooms, 403, "Guest access not allowed"
+ )
+
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
@@ -149,6 +166,17 @@ class SyncHandler(BaseHandler):
)
defer.returnValue(result)
+ @defer.inlineCallbacks
+ def _is_world_readable(self, room_id):
+ state = yield self.hs.get_state_handler().get_current_state(
+ room_id,
+ EventTypes.RoomHistoryVisibility
+ )
+ if state and "history_visibility" in state.content:
+ defer.returnValue(state.content["history_visibility"] == "world_readable")
+ else:
+ defer.returnValue(False)
+
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
@@ -172,47 +200,71 @@ class SyncHandler(BaseHandler):
"""
now_token = yield self.event_sources.get_current_token()
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token
- )
+ if sync_config.is_guest:
+ room_list = [
+ GuestRoom(room_id, Membership.JOIN)
+ for room_id in sync_config.filter.list_rooms()
+ ]
+
+ account_data = {}
+ account_data_by_room = {}
+ tags_by_room = {}
+
+ else:
+ membership_list = (Membership.INVITE, Membership.JOIN)
+ if sync_config.filter.include_leave:
+ membership_list += (Membership.LEAVE, Membership.BAN)
+
+ room_list = yield self.store.get_rooms_for_user_where_membership_is(
+ user_id=sync_config.user.to_string(),
+ membership_list=membership_list
+ )
+
+ account_data, account_data_by_room = (
+ yield self.store.get_account_data_for_user(
+ sync_config.user.to_string()
+ )
+ )
+
+ tags_by_room = yield self.store.get_tags_for_user(
+ sync_config.user.to_string()
+ )
presence_stream = self.event_sources.sources["presence"]
- # TODO (mjark): This looks wrong, shouldn't we be getting the presence
- # UP to the present rather than after the present?
- pagination_config = PaginationConfig(from_token=now_token)
- presence, _ = yield presence_stream.get_pagination_rows(
+
+ joined_room_ids = [
+ room.room_id for room in room_list
+ if room.membership == Membership.JOIN
+ ]
+
+ presence, _ = yield presence_stream.get_new_events(
+ from_key=0,
user=sync_config.user,
- pagination_config=pagination_config.get_source_config("presence"),
- key=None
- )
- room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=sync_config.user.to_string(),
- membership_list=(
- Membership.INVITE,
- Membership.JOIN,
- Membership.LEAVE,
- Membership.BAN
- )
+ room_ids=joined_room_ids,
+ is_guest=sync_config.is_guest,
)
- tags_by_room = yield self.store.get_tags_for_user(
- sync_config.user.to_string()
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token, joined_room_ids
)
joined = []
invited = []
archived = []
+ deferreds = []
for event in room_list:
if event.membership == Membership.JOIN:
- room_sync = yield self.full_state_sync_for_joined_room(
+ room_sync_deferred = self.full_state_sync_for_joined_room(
room_id=event.room_id,
sync_config=sync_config,
now_token=now_token,
timeline_since_token=timeline_since_token,
ephemeral_by_room=ephemeral_by_room,
tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
)
- joined.append(room_sync)
+ room_sync_deferred.addCallback(joined.append)
+ deferreds.append(room_sync_deferred)
elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(
@@ -223,18 +275,25 @@ class SyncHandler(BaseHandler):
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
- room_sync = yield self.full_state_sync_for_archived_room(
+ room_sync_deferred = self.full_state_sync_for_archived_room(
sync_config=sync_config,
room_id=event.room_id,
leave_event_id=event.event_id,
leave_token=leave_token,
timeline_since_token=timeline_since_token,
tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
)
- archived.append(room_sync)
+ room_sync_deferred.addCallback(archived.append)
+ deferreds.append(room_sync_deferred)
+
+ yield defer.gatherResults(
+ deferreds, consumeErrors=True
+ ).addErrback(unwrapFirstError)
defer.returnValue(SyncResult(
presence=presence,
+ account_data=self.account_data_for_user(account_data),
joined=joined,
invited=invited,
archived=archived,
@@ -244,7 +303,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token,
- ephemeral_by_room, tags_by_room):
+ ephemeral_by_room, tags_by_room,
+ account_data_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
@@ -262,26 +322,47 @@ class SyncHandler(BaseHandler):
state=current_state,
ephemeral=ephemeral_by_room.get(room_id, []),
account_data=self.account_data_for_room(
- room_id, tags_by_room
+ room_id, tags_by_room, account_data_by_room
),
))
- def account_data_for_room(self, room_id, tags_by_room):
- account_data = []
+ def account_data_for_user(self, account_data):
+ account_data_events = []
+
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ return account_data_events
+
+ def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
+ account_data_events = []
tags = tags_by_room.get(room_id)
if tags is not None:
- account_data.append({
+ account_data_events.append({
"type": "m.tag",
"content": {"tags": tags},
})
- return account_data
+
+ account_data = account_data_by_room.get(room_id, {})
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ return account_data_events
@defer.inlineCallbacks
- def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+ def ephemeral_by_room(self, sync_config, now_token, room_ids,
+ since_token=None):
"""Get the ephemeral events for each room the user is in
Args:
sync_config (SyncConfig): The flags, filters and user for the sync.
now_token (StreamToken): Where the server is currently up to.
+ room_ids (list): List of room id strings to get data for.
since_token (StreamToken): Where the server was when the client
last synced.
Returns:
@@ -292,9 +373,6 @@ class SyncHandler(BaseHandler):
typing_key = since_token.typing_key if since_token else "0"
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
-
typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events(
user=sync_config.user,
@@ -341,7 +419,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token,
- timeline_since_token, tags_by_room):
+ timeline_since_token, tags_by_room,
+ account_data_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
@@ -358,7 +437,7 @@ class SyncHandler(BaseHandler):
timeline=batch,
state=leave_state,
account_data=self.account_data_for_room(
- room_id, tags_by_room
+ room_id, tags_by_room, account_data_by_room
),
))
@@ -371,8 +450,38 @@ class SyncHandler(BaseHandler):
"""
now_token = yield self.event_sources.get_current_token()
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
+ if sync_config.is_guest:
+ room_ids = sync_config.filter.list_rooms()
+
+ tags_by_room = {}
+ account_data = {}
+ account_data_by_room = {}
+
+ else:
+ rooms = yield self.store.get_rooms_for_user(
+ sync_config.user.to_string()
+ )
+ room_ids = [room.room_id for room in rooms]
+
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token, since_token
+ )
+
+ tags_by_room = yield self.store.get_updated_tags(
+ sync_config.user.to_string(),
+ since_token.account_data_key,
+ )
+
+ account_data, account_data_by_room = (
+ yield self.store.get_updated_account_data_for_user(
+ sync_config.user.to_string(),
+ since_token.account_data_key,
+ )
+ )
+
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token, room_ids, since_token
+ )
presence_source = self.event_sources.sources["presence"]
presence, presence_key = yield presence_source.get_new_events(
@@ -380,15 +489,10 @@ class SyncHandler(BaseHandler):
from_key=since_token.presence_key,
limit=sync_config.filter.presence_limit(),
room_ids=room_ids,
- # /sync doesn't support guest access, they can't get to this point in code
- is_guest=False,
+ is_guest=sync_config.is_guest,
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token, since_token
- )
-
rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
sync_config.user.to_string()
@@ -408,11 +512,8 @@ class SyncHandler(BaseHandler):
from_key=since_token.room_key,
to_key=now_token.room_key,
limit=timeline_limit + 1,
- )
-
- tags_by_room = yield self.store.get_updated_tags(
- sync_config.user.to_string(),
- since_token.account_data_key,
+ room_ids=room_ids if sync_config.is_guest else (),
+ is_guest=sync_config.is_guest,
)
joined = []
@@ -469,7 +570,7 @@ class SyncHandler(BaseHandler):
state=state,
ephemeral=ephemeral_by_room.get(room_id, []),
account_data=self.account_data_for_room(
- room_id, tags_by_room
+ room_id, tags_by_room, account_data_by_room
),
)
logger.debug("Result for room %s: %r", room_id, room_sync)
@@ -492,14 +593,15 @@ class SyncHandler(BaseHandler):
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
- ephemeral_by_room, tags_by_room
+ ephemeral_by_room, tags_by_room, account_data_by_room
)
if room_sync:
joined.append(room_sync)
for leave_event in leave_events:
room_sync = yield self.incremental_sync_for_archived_room(
- sync_config, leave_event, since_token, tags_by_room
+ sync_config, leave_event, since_token, tags_by_room,
+ account_data_by_room
)
archived.append(room_sync)
@@ -510,6 +612,7 @@ class SyncHandler(BaseHandler):
defer.returnValue(SyncResult(
presence=presence,
+ account_data=self.account_data_for_user(account_data),
joined=joined,
invited=invited,
archived=archived,
@@ -542,7 +645,10 @@ class SyncHandler(BaseHandler):
end_key = "s" + room_key.split('-')[-1]
loaded_recents = sync_config.filter.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
- sync_config.user.to_string(), loaded_recents,
+ sync_config.user.to_string(),
+ loaded_recents,
+ is_guest=sync_config.is_guest,
+ require_all_visible_for_guests=False
)
loaded_recents.extend(recents)
recents = loaded_recents
@@ -566,7 +672,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
- ephemeral_by_room, tags_by_room):
+ ephemeral_by_room, tags_by_room,
+ account_data_by_room):
""" Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to
state.
@@ -606,7 +713,7 @@ class SyncHandler(BaseHandler):
state=state,
ephemeral=ephemeral_by_room.get(room_id, []),
account_data=self.account_data_for_room(
- room_id, tags_by_room
+ room_id, tags_by_room, account_data_by_room
),
)
@@ -616,7 +723,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def incremental_sync_for_archived_room(self, sync_config, leave_event,
- since_token, tags_by_room):
+ since_token, tags_by_room,
+ account_data_by_room):
""" Get the incremental delta needed to bring the client up to date for
the archived room.
Returns:
@@ -654,7 +762,7 @@ class SyncHandler(BaseHandler):
timeline=batch,
state=state_events_delta,
account_data=self.account_data_for_room(
- leave_event.room_id, tags_by_room
+ leave_event.room_id, tags_by_room, account_data_by_room
),
)
|