diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 6519f183df..5fd20285d2 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -92,7 +92,15 @@ class BaseHandler(object):
membership_event = state.get((EventTypes.Member, user_id), None)
if membership_event:
- membership = membership_event.membership
+ was_forgotten_at_event = yield self.store.was_forgotten_at(
+ membership_event.state_key,
+ membership_event.room_id,
+ membership_event.event_id
+ )
+ if was_forgotten_at_event:
+ membership = None
+ else:
+ membership = membership_event.membership
else:
membership = None
diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/account_data.py
index 1abe45ed7b..fe773bee9b 100644
--- a/synapse/handlers/private_user_data.py
+++ b/synapse/handlers/account_data.py
@@ -16,22 +16,23 @@
from twisted.internet import defer
-class PrivateUserDataEventSource(object):
+class AccountDataEventSource(object):
def __init__(self, hs):
self.store = hs.get_datastore()
def get_current_key(self, direction='f'):
- return self.store.get_max_private_user_data_stream_id()
+ return self.store.get_max_account_data_stream_id()
@defer.inlineCallbacks
def get_new_events(self, user, from_key, **kwargs):
user_id = user.to_string()
last_stream_id = from_key
- current_stream_id = yield self.store.get_max_private_user_data_stream_id()
- tags = yield self.store.get_updated_tags(user_id, last_stream_id)
+ current_stream_id = yield self.store.get_max_account_data_stream_id()
results = []
+ tags = yield self.store.get_updated_tags(user_id, last_stream_id)
+
for room_id, room_tags in tags.items():
results.append({
"type": "m.tag",
@@ -39,6 +40,24 @@ class PrivateUserDataEventSource(object):
"room_id": room_id,
})
+ account_data, room_account_data = (
+ yield self.store.get_updated_account_data_for_user(user_id, last_stream_id)
+ )
+
+ for account_data_type, content in account_data.items():
+ results.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ for room_id, account_data in room_account_data.items():
+ for account_data_type, content in account_data.items():
+ results.append({
+ "type": account_data_type,
+ "content": content,
+ "room_id": room_id,
+ })
+
defer.returnValue((results, current_stream_id))
@defer.inlineCallbacks
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d852a18555..04fa58df65 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -30,34 +30,27 @@ class AdminHandler(BaseHandler):
@defer.inlineCallbacks
def get_whois(self, user):
- res = yield self.store.get_user_ip_and_agents(user)
-
- d = {}
- for r in res:
- # Note that device_id is always None
- device = d.setdefault(r["device_id"], {})
- session = device.setdefault(r["access_token"], [])
- session.append({
- "ip": r["ip"],
- "user_agent": r["user_agent"],
- "last_seen": r["last_seen"],
+ connections = []
+
+ sessions = yield self.store.get_user_ip_and_agents(user)
+ for session in sessions:
+ connections.append({
+ "ip": session["ip"],
+ "last_seen": session["last_seen"],
+ "user_agent": session["user_agent"],
})
ret = {
"user_id": user.to_string(),
- "devices": [
- {
- "device_id": k,
+ "devices": {
+ "": {
"sessions": [
{
- # "access_token": x, TODO (erikj)
- "connections": y,
+ "connections": connections,
}
- for x, y in v.items()
]
- }
- for k, v in d.items()
- ],
+ },
+ },
}
defer.returnValue(ret)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 1b11dbdffd..e64b67cdfd 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.types import UserID
-from synapse.api.errors import LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes
from synapse.util.async import run_on_reactor
from twisted.web.client import PartialDownloadError
@@ -46,6 +46,7 @@ class AuthHandler(BaseHandler):
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
self.sessions = {}
+ self.INVALID_TOKEN_HTTP_STATUS = 401
@defer.inlineCallbacks
def check_auth(self, flows, clientdict, clientip):
@@ -297,10 +298,11 @@ class AuthHandler(BaseHandler):
defer.returnValue((user_id, access_token, refresh_token))
@defer.inlineCallbacks
- def login_with_cas_user_id(self, user_id):
+ def get_login_tuple_for_user_id(self, user_id):
"""
- Authenticates the user with the given user ID,
- intended to have been captured from a CAS response
+ Gets login tuple for the user with the given user ID.
+ The user is assumed to have been authenticated by some other
+ machanism (e.g. CAS)
Args:
user_id (str): User ID
@@ -393,6 +395,23 @@ class AuthHandler(BaseHandler):
))
return m.serialize()
+ def generate_short_term_login_token(self, user_id):
+ macaroon = self._generate_base_macaroon(user_id)
+ macaroon.add_first_party_caveat("type = login")
+ now = self.hs.get_clock().time_msec()
+ expiry = now + (2 * 60 * 1000)
+ macaroon.add_first_party_caveat("time < %d" % (expiry,))
+ return macaroon.serialize()
+
+ def validate_short_term_login_token_and_get_user_id(self, login_token):
+ try:
+ macaroon = pymacaroons.Macaroon.deserialize(login_token)
+ auth_api = self.hs.get_auth()
+ auth_api.validate_macaroon(macaroon, "login", True)
+ return self._get_user_from_macaroon(macaroon)
+ except (pymacaroons.exceptions.MacaroonException, TypeError, ValueError):
+ raise AuthError(401, "Invalid token", errcode=Codes.UNKNOWN_TOKEN)
+
def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
location=self.hs.config.server_name,
@@ -402,6 +421,16 @@ class AuthHandler(BaseHandler):
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon
+ def _get_user_from_macaroon(self, macaroon):
+ user_prefix = "user_id = "
+ for caveat in macaroon.caveats:
+ if caveat.caveat_id.startswith(user_prefix):
+ return caveat.caveat_id[len(user_prefix):]
+ raise AuthError(
+ self.INVALID_TOKEN_HTTP_STATUS, "No user_id found in token",
+ errcode=Codes.UNKNOWN_TOKEN
+ )
+
@defer.inlineCallbacks
def set_password(self, user_id, newpassword):
password_hash = self.hash(newpassword)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 0e4c0d4d06..fe300433e6 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -28,6 +28,18 @@ import random
logger = logging.getLogger(__name__)
+def started_user_eventstream(distributor, user):
+ return distributor.fire("started_user_eventstream", user)
+
+
+def stopped_user_eventstream(distributor, user):
+ return distributor.fire("stopped_user_eventstream", user)
+
+
+def user_joined_room(distributor, user, room_id):
+ return distributor.fire("user_joined_room", user, room_id)
+
+
class EventStreamHandler(BaseHandler):
def __init__(self, hs):
@@ -66,7 +78,7 @@ class EventStreamHandler(BaseHandler):
except:
logger.exception("Failed to cancel event timer")
else:
- yield self.distributor.fire("started_user_eventstream", user)
+ yield started_user_eventstream(self.distributor, user)
self._streams_per_user[user] += 1
@@ -89,7 +101,7 @@ class EventStreamHandler(BaseHandler):
self._stop_timer_per_user.pop(user, None)
- return self.distributor.fire("stopped_user_eventstream", user)
+ return stopped_user_eventstream(self.distributor, user)
logger.debug("Scheduling _later: for %s", user)
self._stop_timer_per_user[user] = (
@@ -120,9 +132,7 @@ class EventStreamHandler(BaseHandler):
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
if is_guest:
- yield self.distributor.fire(
- "user_joined_room", user=auth_user, room_id=room_id
- )
+ yield user_joined_room(self.distributor, auth_user, room_id)
events, tokens = yield self.notifier.get_events_for(
auth_user, pagin_config, timeout,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c1bce07e31..2855f2d7c3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -44,6 +44,10 @@ import logging
logger = logging.getLogger(__name__)
+def user_joined_room(distributor, user, room_id):
+ return distributor.fire("user_joined_room", user, room_id)
+
+
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -60,10 +64,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.distributor.observe(
- "user_joined_room",
- self._on_user_joined
- )
+ self.distributor.observe("user_joined_room", self.user_joined_room)
self.waiting_for_join_list = {}
@@ -176,7 +177,7 @@ class FederationHandler(BaseHandler):
)
try:
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
event,
state=state,
@@ -233,10 +234,13 @@ class FederationHandler(BaseHandler):
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
- user = UserID.from_string(event.state_key)
- yield self.distributor.fire(
- "user_joined_room", user=user, room_id=event.room_id
- )
+ prev_state = context.current_state.get((event.type, event.state_key))
+ if not prev_state or prev_state.membership != Membership.JOIN:
+ # Only fire user_joined_room if the user has acutally
+ # joined the room. Don't bother if the user is just
+ # changing their profile info.
+ user = UserID.from_string(event.state_key)
+ yield user_joined_room(self.distributor, user, event.room_id)
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
@@ -733,9 +737,7 @@ class FederationHandler(BaseHandler):
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
- yield self.distributor.fire(
- "user_joined_room", user=user, room_id=event.room_id
- )
+ yield user_joined_room(self.distributor, user, event.room_id)
new_pdu = event
@@ -1082,7 +1084,7 @@ class FederationHandler(BaseHandler):
return self.store.get_min_depth(context)
@log_function
- def _on_user_joined(self, user, room_id):
+ def user_joined_room(self, user, room_id):
waiters = self.waiting_for_join_list.get(
(user.to_string(), room_id),
[]
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 2a99921d5f..f1fa562fff 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -20,7 +20,6 @@ from synapse.api.errors import (
CodeMessageException
)
from ._base import BaseHandler
-from synapse.http.client import SimpleHttpClient
from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError
@@ -35,13 +34,12 @@ class IdentityHandler(BaseHandler):
def __init__(self, hs):
super(IdentityHandler, self).__init__(hs)
+ self.http_client = hs.get_simple_http_client()
+
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
yield run_on_reactor()
- # TODO: get this from the homeserver rather than creating a new one for
- # each request
- http_client = SimpleHttpClient(self.hs)
# XXX: make this configurable!
# trustedIdServers = ['matrix.org', 'localhost:8090']
trustedIdServers = ['matrix.org', 'vector.im']
@@ -67,7 +65,7 @@ class IdentityHandler(BaseHandler):
data = {}
try:
- data = yield http_client.get_json(
+ data = yield self.http_client.get_json(
"https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/3pid/getValidated3pid"
@@ -85,7 +83,6 @@ class IdentityHandler(BaseHandler):
def bind_threepid(self, creds, mxid):
yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid)
- http_client = SimpleHttpClient(self.hs)
data = None
if 'id_server' in creds:
@@ -103,7 +100,7 @@ class IdentityHandler(BaseHandler):
raise SynapseError(400, "No client_secret in creds")
try:
- data = yield http_client.post_urlencoded_get_json(
+ data = yield self.http_client.post_urlencoded_get_json(
"https://%s%s" % (
id_server, "/_matrix/identity/api/v1/3pid/bind"
),
@@ -121,7 +118,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
yield run_on_reactor()
- http_client = SimpleHttpClient(self.hs)
params = {
'email': email,
@@ -131,7 +127,7 @@ class IdentityHandler(BaseHandler):
params.update(kwargs)
try:
- data = yield http_client.post_urlencoded_get_json(
+ data = yield self.http_client.post_urlencoded_get_json(
"https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/validate/email/requestToken"
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 14051aee99..ccdd3d8473 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -26,11 +26,17 @@ from synapse.types import UserID, RoomStreamToken, StreamToken
from ._base import BaseHandler
+from canonicaljson import encode_canonical_json
+
import logging
logger = logging.getLogger(__name__)
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -195,10 +201,8 @@ class MessageHandler(BaseHandler):
if membership == Membership.JOIN:
joinee = UserID.from_string(builder.state_key)
# If event doesn't include a display name, add one.
- yield self.distributor.fire(
- "collect_presencelike_data",
- joinee,
- builder.content
+ yield collect_presencelike_data(
+ self.distributor, joinee, builder.content
)
if token_id is not None:
@@ -211,6 +215,16 @@ class MessageHandler(BaseHandler):
builder=builder,
)
+ if event.is_state():
+ prev_state = context.current_state.get((event.type, event.state_key))
+ if prev_state and event.user_id == prev_state.user_id:
+ prev_content = encode_canonical_json(prev_state.content)
+ next_content = encode_canonical_json(event.content)
+ if prev_content == next_content:
+ # Duplicate suppression for state updates with same sender
+ # and content.
+ defer.returnValue(prev_state)
+
if event.type == EventTypes.Member:
member_handler = self.hs.get_handlers().room_member_handler
yield member_handler.change_membership(event, context, is_guest=is_guest)
@@ -359,6 +373,10 @@ class MessageHandler(BaseHandler):
tags_by_room = yield self.store.get_tags_for_user(user_id)
+ account_data, account_data_by_room = (
+ yield self.store.get_account_data_for_user(user_id)
+ )
+
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
@@ -436,14 +454,22 @@ class MessageHandler(BaseHandler):
for c in current_state.values()
]
- private_user_data = []
+ account_data_events = []
tags = tags_by_room.get(event.room_id)
if tags:
- private_user_data.append({
+ account_data_events.append({
"type": "m.tag",
"content": {"tags": tags},
})
- d["private_user_data"] = private_user_data
+
+ account_data = account_data_by_room.get(event.room_id, {})
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ d["account_data"] = account_data_events
except:
logger.exception("Failed to get snapshot")
@@ -456,9 +482,17 @@ class MessageHandler(BaseHandler):
consumeErrors=True
).addErrback(unwrapFirstError)
+ account_data_events = []
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
ret = {
"rooms": rooms_ret,
"presence": presence,
+ "account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
}
@@ -498,14 +532,22 @@ class MessageHandler(BaseHandler):
user_id, room_id, pagin_config, membership, member_event_id, is_guest
)
- private_user_data = []
+ account_data_events = []
tags = yield self.store.get_tags_for_room(user_id, room_id)
if tags:
- private_user_data.append({
+ account_data_events.append({
"type": "m.tag",
"content": {"tags": tags},
})
- result["private_user_data"] = private_user_data
+
+ account_data = yield self.store.get_account_data_for_room(user_id, room_id)
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ result["account_data"] = account_data_events
defer.returnValue(result)
@@ -588,23 +630,28 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
- states = {}
- if not is_guest:
- states = yield presence_handler.get_states(
- target_users=[UserID.from_string(m.user_id) for m in room_members],
- auth_user=auth_user,
- as_event=True,
- check_auth=False,
- )
+ states = yield presence_handler.get_states(
+ target_users=[UserID.from_string(m.user_id) for m in room_members],
+ auth_user=auth_user,
+ as_event=True,
+ check_auth=False,
+ )
defer.returnValue(states.values())
- receipts_handler = self.hs.get_handlers().receipts_handler
+ @defer.inlineCallbacks
+ def get_receipts():
+ receipts_handler = self.hs.get_handlers().receipts_handler
+ receipts = yield receipts_handler.get_receipts_for_room(
+ room_id,
+ now_token.receipt_key
+ )
+ defer.returnValue(receipts)
presence, receipts, (messages, token) = yield defer.gatherResults(
[
get_presence(),
- receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
+ get_receipts(),
self.store.get_recent_events_for_room(
room_id,
limit=limit,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index aca65096fc..63d6f30a7b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -62,6 +62,14 @@ def partitionbool(l, func):
return ret.get(True, []), ret.get(False, [])
+def user_presence_changed(distributor, user, statuscache):
+ return distributor.fire("user_presence_changed", user, statuscache)
+
+
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
class PresenceHandler(BaseHandler):
STATE_LEVELS = {
@@ -361,9 +369,7 @@ class PresenceHandler(BaseHandler):
yield self.store.set_presence_state(
target_user.localpart, state_to_store
)
- yield self.distributor.fire(
- "collect_presencelike_data", target_user, state
- )
+ yield collect_presencelike_data(self.distributor, target_user, state)
if now_level > was_level:
state["last_active"] = self.clock.time_msec()
@@ -467,7 +473,7 @@ class PresenceHandler(BaseHandler):
)
@defer.inlineCallbacks
- def send_invite(self, observer_user, observed_user):
+ def send_presence_invite(self, observer_user, observed_user):
"""Request the presence of a local or remote user for a local user"""
if not self.hs.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
@@ -878,7 +884,7 @@ class PresenceHandler(BaseHandler):
room_ids=room_ids,
statuscache=statuscache,
)
- yield self.distributor.fire("user_presence_changed", user, statuscache)
+ yield user_presence_changed(self.distributor, user, statuscache)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
@@ -1116,9 +1122,7 @@ class PresenceHandler(BaseHandler):
self._user_cachemap[user].get_state()["last_active"]
)
- yield self.distributor.fire(
- "collect_presencelike_data", user, state
- )
+ yield collect_presencelike_data(self.distributor, user, state)
if "last_active" in state:
state = dict(state)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 799faffe53..576c6f09b4 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -28,6 +28,14 @@ import logging
logger = logging.getLogger(__name__)
+def changed_presencelike_data(distributor, user, state):
+ return distributor.fire("changed_presencelike_data", user, state)
+
+
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
class ProfileHandler(BaseHandler):
def __init__(self, hs):
@@ -95,11 +103,9 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_displayname
)
- yield self.distributor.fire(
- "changed_presencelike_data", target_user, {
- "displayname": new_displayname,
- }
- )
+ yield changed_presencelike_data(self.distributor, target_user, {
+ "displayname": new_displayname,
+ })
yield self._update_join_states(target_user)
@@ -144,11 +150,9 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_avatar_url
)
- yield self.distributor.fire(
- "changed_presencelike_data", target_user, {
- "avatar_url": new_avatar_url,
- }
- )
+ yield changed_presencelike_data(self.distributor, target_user, {
+ "avatar_url": new_avatar_url,
+ })
yield self._update_join_states(target_user)
@@ -208,9 +212,7 @@ class ProfileHandler(BaseHandler):
"membership": Membership.JOIN,
}
- yield self.distributor.fire(
- "collect_presencelike_data", user, content
- )
+ yield collect_presencelike_data(self.distributor, user, content)
msg_handler = self.hs.get_handlers().message_handler
try:
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 493a087031..a037da0f70 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -31,6 +31,10 @@ import urllib
logger = logging.getLogger(__name__)
+def registered_user(distributor, user):
+ return distributor.fire("registered_user", user)
+
+
class RegistrationHandler(BaseHandler):
def __init__(self, hs):
@@ -38,6 +42,7 @@ class RegistrationHandler(BaseHandler):
self.distributor = hs.get_distributor()
self.distributor.declare("registered_user")
+ self.captch_client = CaptchaServerHttpClient(hs)
@defer.inlineCallbacks
def check_username(self, localpart):
@@ -98,7 +103,7 @@ class RegistrationHandler(BaseHandler):
password_hash=password_hash
)
- yield self.distributor.fire("registered_user", user)
+ yield registered_user(self.distributor, user)
else:
# autogen a random user ID
attempts = 0
@@ -117,7 +122,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=password_hash)
- self.distributor.fire("registered_user", user)
+ yield registered_user(self.distributor, user)
except SynapseError:
# if user id is taken, just generate another
user_id = None
@@ -167,7 +172,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=""
)
- self.distributor.fire("registered_user", user)
+ registered_user(self.distributor, user)
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -215,7 +220,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=None
)
- yield self.distributor.fire("registered_user", user)
+ yield registered_user(self.distributor, user)
except Exception, e:
yield self.store.add_access_token_to_user(user_id, token)
# Ignore Registration errors
@@ -302,10 +307,7 @@ class RegistrationHandler(BaseHandler):
"""
Used only by c/s api v1
"""
- # TODO: get this from the homeserver rather than creating a new one for
- # each request
- client = CaptchaServerHttpClient(self.hs)
- data = yield client.post_urlencoded_get_raw(
+ data = yield self.captcha_client.post_urlencoded_get_raw(
"http://www.google.com:80/recaptcha/api/verify",
args={
'privatekey': private_key,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3f04752581..116a998c42 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -41,6 +41,18 @@ logger = logging.getLogger(__name__)
id_server_scheme = "https://"
+def collect_presencelike_data(distributor, user, content):
+ return distributor.fire("collect_presencelike_data", user, content)
+
+
+def user_left_room(distributor, user, room_id):
+ return distributor.fire("user_left_room", user=user, room_id=room_id)
+
+
+def user_joined_room(distributor, user, room_id):
+ return distributor.fire("user_joined_room", user=user, room_id=room_id)
+
+
class RoomCreationHandler(BaseHandler):
PRESETS_DICT = {
@@ -438,9 +450,7 @@ class RoomMemberHandler(BaseHandler):
if prev_state and prev_state.membership == Membership.JOIN:
user = UserID.from_string(event.user_id)
- self.distributor.fire(
- "user_left_room", user=user, room_id=event.room_id
- )
+ user_left_room(self.distributor, user, event.room_id)
defer.returnValue({"room_id": room_id})
@@ -458,9 +468,7 @@ class RoomMemberHandler(BaseHandler):
raise SynapseError(404, "No known servers")
# If event doesn't include a display name, add one.
- yield self.distributor.fire(
- "collect_presencelike_data", joinee, content
- )
+ yield collect_presencelike_data(self.distributor, joinee, content)
content.update({"membership": Membership.JOIN})
builder = self.event_builder_factory.new({
@@ -517,10 +525,13 @@ class RoomMemberHandler(BaseHandler):
do_auth=do_auth,
)
- user = UserID.from_string(event.user_id)
- yield self.distributor.fire(
- "user_joined_room", user=user, room_id=room_id
- )
+ prev_state = context.current_state.get((event.type, event.state_key))
+ if not prev_state or prev_state.membership != Membership.JOIN:
+ # Only fire user_joined_room if the user has acutally joined the
+ # room. Don't bother if the user is just changing their profile
+ # info.
+ user = UserID.from_string(event.user_id)
+ yield user_joined_room(self.distributor, user, room_id)
@defer.inlineCallbacks
def get_inviter(self, event):
@@ -743,6 +754,9 @@ class RoomMemberHandler(BaseHandler):
)
defer.returnValue((token, public_key, key_validity_url, display_name))
+ def forget(self, user, room_id):
+ self.store.forget(user.to_string(), room_id)
+
class RoomListHandler(BaseHandler):
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index b7545c111f..bc79564287 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -17,13 +17,14 @@ from twisted.internet import defer
from ._base import BaseHandler
-from synapse.api.constants import Membership
+from synapse.api.constants import Membership, EventTypes
from synapse.api.filtering import Filter
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from unpaddedbase64 import decode_base64, encode_base64
+import itertools
import logging
@@ -79,6 +80,9 @@ class SearchHandler(BaseHandler):
# What to order results by (impacts whether pagination can be doen)
order_by = room_cat.get("order_by", "rank")
+ # Return the current state of the rooms?
+ include_state = room_cat.get("include_state", False)
+
# Include context around each event?
event_context = room_cat.get(
"event_context", None
@@ -96,6 +100,10 @@ class SearchHandler(BaseHandler):
after_limit = int(event_context.get(
"after_limit", 5
))
+
+ # Return the historic display name and avatar for the senders
+ # of the events?
+ include_profile = bool(event_context.get("include_profile", False))
except KeyError:
raise SynapseError(400, "Invalid search query")
@@ -123,6 +131,17 @@ class SearchHandler(BaseHandler):
if batch_group == "room_id":
room_ids.intersection_update({batch_group_key})
+ if not room_ids:
+ defer.returnValue({
+ "search_categories": {
+ "room_events": {
+ "results": [],
+ "count": 0,
+ "highlights": [],
+ }
+ }
+ })
+
rank_map = {} # event_id -> rank of event
allowed_events = []
room_groups = {} # Holds result of grouping by room, if applicable
@@ -131,11 +150,18 @@ class SearchHandler(BaseHandler):
# Holds the next_batch for the entire result set if one of those exists
global_next_batch = None
+ highlights = set()
+
if order_by == "rank":
- results = yield self.store.search_msgs(
+ search_result = yield self.store.search_msgs(
room_ids, search_term, keys
)
+ if search_result["highlights"]:
+ highlights.update(search_result["highlights"])
+
+ results = search_result["results"]
+
results_map = {r["event"].event_id: r for r in results}
rank_map.update({r["event"].event_id: r["rank"] for r in results})
@@ -163,80 +189,76 @@ class SearchHandler(BaseHandler):
s["results"].append(e.event_id)
elif order_by == "recent":
- # In this case we specifically loop through each room as the given
- # limit applies to each room, rather than a global list.
- # This is not necessarilly a good idea.
- for room_id in room_ids:
- room_events = []
- if batch_group == "room_id" and batch_group_key == room_id:
- pagination_token = batch_token
- else:
- pagination_token = None
- i = 0
-
- # We keep looping and we keep filtering until we reach the limit
- # or we run out of things.
- # But only go around 5 times since otherwise synapse will be sad.
- while len(room_events) < search_filter.limit() and i < 5:
- i += 1
- results = yield self.store.search_room(
- room_id, search_term, keys, search_filter.limit() * 2,
- pagination_token=pagination_token,
- )
+ room_events = []
+ i = 0
+
+ pagination_token = batch_token
+
+ # We keep looping and we keep filtering until we reach the limit
+ # or we run out of things.
+ # But only go around 5 times since otherwise synapse will be sad.
+ while len(room_events) < search_filter.limit() and i < 5:
+ i += 1
+ search_result = yield self.store.search_rooms(
+ room_ids, search_term, keys, search_filter.limit() * 2,
+ pagination_token=pagination_token,
+ )
- results_map = {r["event"].event_id: r for r in results}
+ if search_result["highlights"]:
+ highlights.update(search_result["highlights"])
- rank_map.update({r["event"].event_id: r["rank"] for r in results})
+ results = search_result["results"]
- filtered_events = search_filter.filter([
- r["event"] for r in results
- ])
+ results_map = {r["event"].event_id: r for r in results}
- events = yield self._filter_events_for_client(
- user.to_string(), filtered_events
- )
+ rank_map.update({r["event"].event_id: r["rank"] for r in results})
- room_events.extend(events)
- room_events = room_events[:search_filter.limit()]
+ filtered_events = search_filter.filter([
+ r["event"] for r in results
+ ])
- if len(results) < search_filter.limit() * 2:
- pagination_token = None
- break
- else:
- pagination_token = results[-1]["pagination_token"]
-
- if room_events:
- res = results_map[room_events[-1].event_id]
- pagination_token = res["pagination_token"]
-
- group = room_groups.setdefault(room_id, {})
- if pagination_token:
- next_batch = encode_base64("%s\n%s\n%s" % (
- "room_id", room_id, pagination_token
- ))
- group["next_batch"] = next_batch
-
- if batch_token:
- global_next_batch = next_batch
-
- group["results"] = [e.event_id for e in room_events]
- group["order"] = max(
- e.origin_server_ts/1000 for e in room_events
- if hasattr(e, "origin_server_ts")
- )
+ events = yield self._filter_events_for_client(
+ user.to_string(), filtered_events
+ )
- allowed_events.extend(room_events)
+ room_events.extend(events)
+ room_events = room_events[:search_filter.limit()]
- # Normalize the group orders
- if room_groups:
- if len(room_groups) > 1:
- mx = max(g["order"] for g in room_groups.values())
- mn = min(g["order"] for g in room_groups.values())
+ if len(results) < search_filter.limit() * 2:
+ pagination_token = None
+ break
+ else:
+ pagination_token = results[-1]["pagination_token"]
- for g in room_groups.values():
- g["order"] = (g["order"] - mn) * 1.0 / (mx - mn)
+ for event in room_events:
+ group = room_groups.setdefault(event.room_id, {
+ "results": [],
+ })
+ group["results"].append(event.event_id)
+
+ if room_events and len(room_events) >= search_filter.limit():
+ last_event_id = room_events[-1].event_id
+ pagination_token = results_map[last_event_id]["pagination_token"]
+
+ # We want to respect the given batch group and group keys so
+ # that if people blindly use the top level `next_batch` token
+ # it returns more from the same group (if applicable) rather
+ # than reverting to searching all results again.
+ if batch_group and batch_group_key:
+ global_next_batch = encode_base64("%s\n%s\n%s" % (
+ batch_group, batch_group_key, pagination_token
+ ))
else:
- room_groups.values()[0]["order"] = 1
+ global_next_batch = encode_base64("%s\n%s\n%s" % (
+ "all", "", pagination_token
+ ))
+
+ for room_id, group in room_groups.items():
+ group["next_batch"] = encode_base64("%s\n%s\n%s" % (
+ "room_id", room_id, pagination_token
+ ))
+
+ allowed_events.extend(room_events)
else:
# We should never get here due to the guard earlier.
@@ -269,6 +291,33 @@ class SearchHandler(BaseHandler):
"room_key", res["end"]
).to_string()
+ if include_profile:
+ senders = set(
+ ev.sender
+ for ev in itertools.chain(
+ res["events_before"], [event], res["events_after"]
+ )
+ )
+
+ if res["events_after"]:
+ last_event_id = res["events_after"][-1].event_id
+ else:
+ last_event_id = event.event_id
+
+ state = yield self.store.get_state_for_event(
+ last_event_id,
+ types=[(EventTypes.Member, sender) for sender in senders]
+ )
+
+ res["profile_info"] = {
+ s.state_key: {
+ "displayname": s.content.get("displayname", None),
+ "avatar_url": s.content.get("avatar_url", None),
+ }
+ for s in state.values()
+ if s.type == EventTypes.Member and s.state_key in senders
+ }
+
contexts[event.event_id] = res
else:
contexts = {}
@@ -287,22 +336,39 @@ class SearchHandler(BaseHandler):
for e in context["events_after"]
]
- results = {
- e.event_id: {
+ state_results = {}
+ if include_state:
+ rooms = set(e.room_id for e in allowed_events)
+ for room_id in rooms:
+ state = yield self.state_handler.get_current_state(room_id)
+ state_results[room_id] = state.values()
+
+ state_results.values()
+
+ # We're now about to serialize the events. We should not make any
+ # blocking calls after this. Otherwise the 'age' will be wrong
+
+ results = [
+ {
"rank": rank_map[e.event_id],
"result": serialize_event(e, time_now),
"context": contexts.get(e.event_id, {}),
}
for e in allowed_events
- }
-
- logger.info("Found %d results", len(results))
+ ]
rooms_cat_res = {
"results": results,
- "count": len(results)
+ "count": len(results),
+ "highlights": list(highlights),
}
+ if state_results:
+ rooms_cat_res["state"] = {
+ room_id: [serialize_event(e, time_now) for e in state]
+ for room_id, state in state_results.items()
+ }
+
if room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6dc9d0fb92..24c2b2fad6 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -51,7 +51,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"timeline", # TimelineBatch
"state", # dict[(str, str), FrozenEvent]
"ephemeral",
- "private_user_data",
+ "account_data",
])):
__slots__ = []
@@ -63,7 +63,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
self.timeline
or self.state
or self.ephemeral
- or self.private_user_data
+ or self.account_data
)
@@ -71,7 +71,7 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id", # str
"timeline", # TimelineBatch
"state", # dict[(str, str), FrozenEvent]
- "private_user_data",
+ "account_data",
])):
__slots__ = []
@@ -82,7 +82,7 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
return bool(
self.timeline
or self.state
- or self.private_user_data
+ or self.account_data
)
@@ -100,6 +100,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
+ "account_data", # List of account_data events for the user.
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
@@ -185,13 +186,19 @@ class SyncHandler(BaseHandler):
pagination_config=pagination_config.get_source_config("presence"),
key=None
)
+
+ membership_list = (Membership.INVITE, Membership.JOIN)
+ if sync_config.filter.include_leave:
+ membership_list += (Membership.LEAVE, Membership.BAN)
+
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=sync_config.user.to_string(),
- membership_list=(
- Membership.INVITE,
- Membership.JOIN,
- Membership.LEAVE,
- Membership.BAN
+ membership_list=membership_list
+ )
+
+ account_data, account_data_by_room = (
+ yield self.store.get_account_data_for_user(
+ sync_config.user.to_string()
)
)
@@ -211,6 +218,7 @@ class SyncHandler(BaseHandler):
timeline_since_token=timeline_since_token,
ephemeral_by_room=ephemeral_by_room,
tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
)
joined.append(room_sync)
elif event.membership == Membership.INVITE:
@@ -230,11 +238,13 @@ class SyncHandler(BaseHandler):
leave_token=leave_token,
timeline_since_token=timeline_since_token,
tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
)
archived.append(room_sync)
defer.returnValue(SyncResult(
presence=presence,
+ account_data=self.account_data_for_user(account_data),
joined=joined,
invited=invited,
archived=archived,
@@ -244,7 +254,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token,
- ephemeral_by_room, tags_by_room):
+ ephemeral_by_room, tags_by_room,
+ account_data_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
@@ -261,20 +272,39 @@ class SyncHandler(BaseHandler):
timeline=batch,
state=current_state,
ephemeral=ephemeral_by_room.get(room_id, []),
- private_user_data=self.private_user_data_for_room(
- room_id, tags_by_room
+ account_data=self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
),
))
- def private_user_data_for_room(self, room_id, tags_by_room):
- private_user_data = []
+ def account_data_for_user(self, account_data):
+ account_data_events = []
+
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ return account_data_events
+
+ def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
+ account_data_events = []
tags = tags_by_room.get(room_id)
if tags is not None:
- private_user_data.append({
+ account_data_events.append({
"type": "m.tag",
"content": {"tags": tags},
})
- return private_user_data
+
+ account_data = account_data_by_room.get(room_id, {})
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ return account_data_events
@defer.inlineCallbacks
def ephemeral_by_room(self, sync_config, now_token, since_token=None):
@@ -341,7 +371,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token,
- timeline_since_token, tags_by_room):
+ timeline_since_token, tags_by_room,
+ account_data_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
@@ -357,8 +388,8 @@ class SyncHandler(BaseHandler):
room_id=room_id,
timeline=batch,
state=leave_state,
- private_user_data=self.private_user_data_for_room(
- room_id, tags_by_room
+ account_data=self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
),
))
@@ -412,7 +443,14 @@ class SyncHandler(BaseHandler):
tags_by_room = yield self.store.get_updated_tags(
sync_config.user.to_string(),
- since_token.private_user_data_key,
+ since_token.account_data_key,
+ )
+
+ account_data, account_data_by_room = (
+ yield self.store.get_updated_account_data_for_user(
+ sync_config.user.to_string(),
+ since_token.account_data_key,
+ )
)
joined = []
@@ -468,8 +506,8 @@ class SyncHandler(BaseHandler):
),
state=state,
ephemeral=ephemeral_by_room.get(room_id, []),
- private_user_data=self.private_user_data_for_room(
- room_id, tags_by_room
+ account_data=self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
),
)
logger.debug("Result for room %s: %r", room_id, room_sync)
@@ -492,14 +530,15 @@ class SyncHandler(BaseHandler):
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
- ephemeral_by_room, tags_by_room
+ ephemeral_by_room, tags_by_room, account_data_by_room
)
if room_sync:
joined.append(room_sync)
for leave_event in leave_events:
room_sync = yield self.incremental_sync_for_archived_room(
- sync_config, leave_event, since_token, tags_by_room
+ sync_config, leave_event, since_token, tags_by_room,
+ account_data_by_room
)
archived.append(room_sync)
@@ -510,6 +549,7 @@ class SyncHandler(BaseHandler):
defer.returnValue(SyncResult(
presence=presence,
+ account_data=self.account_data_for_user(account_data),
joined=joined,
invited=invited,
archived=archived,
@@ -566,7 +606,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
- ephemeral_by_room, tags_by_room):
+ ephemeral_by_room, tags_by_room,
+ account_data_by_room):
""" Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to
state.
@@ -605,8 +646,8 @@ class SyncHandler(BaseHandler):
timeline=batch,
state=state,
ephemeral=ephemeral_by_room.get(room_id, []),
- private_user_data=self.private_user_data_for_room(
- room_id, tags_by_room
+ account_data=self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
),
)
@@ -616,7 +657,8 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def incremental_sync_for_archived_room(self, sync_config, leave_event,
- since_token, tags_by_room):
+ since_token, tags_by_room,
+ account_data_by_room):
""" Get the incremental delta needed to bring the client up to date for
the archived room.
Returns:
@@ -653,8 +695,8 @@ class SyncHandler(BaseHandler):
room_id=leave_event.room_id,
timeline=batch,
state=state_events_delta,
- private_user_data=self.private_user_data_for_room(
- leave_event.room_id, tags_by_room
+ account_data=self.account_data_for_room(
+ leave_event.room_id, tags_by_room, account_data_by_room
),
)
|