diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index ac09d03ba9..dca337ec61 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -90,8 +90,8 @@ class BaseHandler(object):
messages_per_second = override.messages_per_second
burst_count = override.burst_count
else:
- messages_per_second = self.hs.config.rc_messages_per_second
- burst_count = self.hs.config.rc_message_burst_count
+ messages_per_second = self.hs.config.rc_message.per_second
+ burst_count = self.hs.config.rc_message.burst_count
allowed, time_allowed = self.ratelimiter.can_do_action(
user_id, time_now,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 27bd06df5d..a12f9508d8 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -19,7 +19,7 @@ import string
from twisted.internet import defer
-from synapse.api.constants import EventTypes
+from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -43,8 +43,10 @@ class DirectoryHandler(BaseHandler):
self.state = hs.get_state_handler()
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
+ self.store = hs.get_datastore()
self.config = hs.config
self.enable_room_list_search = hs.config.enable_room_list_search
+ self.require_membership = hs.config.require_membership_for_aliases
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -83,7 +85,7 @@ class DirectoryHandler(BaseHandler):
@defer.inlineCallbacks
def create_association(self, requester, room_alias, room_id, servers=None,
- send_event=True):
+ send_event=True, check_membership=True):
"""Attempt to create a new alias
Args:
@@ -93,6 +95,8 @@ class DirectoryHandler(BaseHandler):
servers (list[str]|None): List of servers that others servers
should try and join via
send_event (bool): Whether to send an updated m.room.aliases event
+ check_membership (bool): Whether to check if the user is in the room
+ before the alias can be set (if the server's config requires it).
Returns:
Deferred
@@ -100,6 +104,13 @@ class DirectoryHandler(BaseHandler):
user_id = requester.user.to_string()
+ if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
+ raise SynapseError(
+ 400,
+ "Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
+ Codes.INVALID_PARAM,
+ )
+
service = requester.app_service
if service:
if not service.is_interested_in_alias(room_alias.to_string()):
@@ -108,6 +119,14 @@ class DirectoryHandler(BaseHandler):
" this kind of alias.", errcode=Codes.EXCLUSIVE
)
else:
+ if self.require_membership and check_membership:
+ rooms_for_user = yield self.store.get_rooms_for_user(user_id)
+ if room_id not in rooms_for_user:
+ raise AuthError(
+ 403,
+ "You must be in the room to create an alias for it",
+ )
+
if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
raise AuthError(
403, "This user is not permitted to create this alias",
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1b4d8c74ae..eb525070cf 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -21,7 +21,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
-from synapse.events.utils import serialize_event
from synapse.types import UserID
from synapse.util.logutils import log_function
from synapse.visibility import filter_events_for_client
@@ -50,6 +49,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier()
self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
@log_function
@@ -120,9 +120,12 @@ class EventStreamHandler(BaseHandler):
time_now = self.clock.time_msec()
- chunks = [
- serialize_event(e, time_now, as_client_event) for e in events
- ]
+ chunks = yield self._event_serializer.serialize_events(
+ events, time_now, as_client_event=as_client_event,
+ # We don't bundle "live" events, as otherwise clients
+ # will end up double counting annotations.
+ bundle_aggregations=False,
+ )
chunk = {
"chunk": chunks,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0684778882..cf4fad7de0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1916,6 +1916,11 @@ class FederationHandler(BaseHandler):
event.room_id, latest_event_ids=extrem_ids,
)
+ logger.debug(
+ "Doing soft-fail check for %s: state %s",
+ event.event_id, current_state_ids,
+ )
+
# Now check if event pass auth against said current state
auth_types = auth_types_for_event(event)
current_state_ids = [
@@ -1932,7 +1937,7 @@ class FederationHandler(BaseHandler):
self.auth.check(room_version, event, auth_events=current_auth_events)
except AuthError as e:
logger.warn(
- "Failed current state auth resolution for %r because %s",
+ "Soft-failing %r because %s",
event, e,
)
event.internal_metadata.soft_failed = True
@@ -2008,15 +2013,44 @@ class FederationHandler(BaseHandler):
Args:
origin (str):
- event (synapse.events.FrozenEvent):
+ event (synapse.events.EventBase):
context (synapse.events.snapshot.EventContext):
- auth_events (dict[(str, str)->str]):
+ auth_events (dict[(str, str)->synapse.events.EventBase]):
+ Map from (event_type, state_key) to event
+
+ What we expect the event's auth_events to be, based on the event's
+ position in the dag. I think? maybe??
+
+ Also NB that this function adds entries to it.
+ Returns:
+ defer.Deferred[None]
+ """
+ room_version = yield self.store.get_room_version(event.room_id)
+
+ yield self._update_auth_events_and_context_for_auth(
+ origin, event, context, auth_events
+ )
+ try:
+ self.auth.check(room_version, event, auth_events=auth_events)
+ except AuthError as e:
+ logger.warn("Failed auth resolution for %r because %s", event, e)
+ raise e
+
+ @defer.inlineCallbacks
+ def _update_auth_events_and_context_for_auth(
+ self, origin, event, context, auth_events
+ ):
+ """Helper for do_auth. See there for docs.
+
+ Args:
+ origin (str):
+ event (synapse.events.EventBase):
+ context (synapse.events.snapshot.EventContext):
+ auth_events (dict[(str, str)->synapse.events.EventBase]):
Returns:
defer.Deferred[None]
"""
- # Check if we have all the auth events.
- current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(event.auth_event_ids())
if event.is_state():
@@ -2024,11 +2058,21 @@ class FederationHandler(BaseHandler):
else:
event_key = None
- if event_auth_events - current_state:
+ # if the event's auth_events refers to events which are not in our
+ # calculated auth_events, we need to fetch those events from somewhere.
+ #
+ # we start by fetching them from the store, and then try calling /event_auth/.
+ missing_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
+
+ if missing_auth:
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(
- event_auth_events - current_state
+ missing_auth
)
+ logger.debug("Got events %s from store", have_events)
+ missing_auth.difference_update(have_events.keys())
else:
have_events = {}
@@ -2037,13 +2081,12 @@ class FederationHandler(BaseHandler):
for e in auth_events.values()
})
- seen_events = set(have_events.keys())
-
- missing_auth = event_auth_events - seen_events - current_state
-
if missing_auth:
- logger.info("Missing auth: %s", missing_auth)
# If we don't have all the auth events, we need to get them.
+ logger.info(
+ "auth_events contains unknown events: %s",
+ missing_auth,
+ )
try:
remote_auth_chain = yield self.federation_client.get_event_auth(
origin, event.room_id, event.event_id
@@ -2084,145 +2127,168 @@ class FederationHandler(BaseHandler):
have_events = yield self.store.get_seen_events_with_rejections(
event.auth_event_ids()
)
- seen_events = set(have_events.keys())
except Exception:
# FIXME:
logger.exception("Failed to get auth chain")
+ if event.internal_metadata.is_outlier():
+ logger.info("Skipping auth_event fetch for outlier")
+ return
+
# FIXME: Assumes we have and stored all the state for all the
# prev_events
- current_state = set(e.event_id for e in auth_events.values())
- different_auth = event_auth_events - current_state
+ different_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
- room_version = yield self.store.get_room_version(event.room_id)
+ if not different_auth:
+ return
- if different_auth and not event.internal_metadata.is_outlier():
- # Do auth conflict res.
- logger.info("Different auth: %s", different_auth)
-
- different_events = yield logcontext.make_deferred_yieldable(
- defer.gatherResults([
- logcontext.run_in_background(
- self.store.get_event,
- d,
- allow_none=True,
- allow_rejected=False,
- )
- for d in different_auth
- if d in have_events and not have_events[d]
- ], consumeErrors=True)
- ).addErrback(unwrapFirstError)
-
- if different_events:
- local_view = dict(auth_events)
- remote_view = dict(auth_events)
- remote_view.update({
- (d.type, d.state_key): d for d in different_events if d
- })
+ logger.info(
+ "auth_events refers to events which are not in our calculated auth "
+ "chain: %s",
+ different_auth,
+ )
- new_state = yield self.state_handler.resolve_events(
- room_version,
- [list(local_view.values()), list(remote_view.values())],
- event
+ room_version = yield self.store.get_room_version(event.room_id)
+
+ different_events = yield logcontext.make_deferred_yieldable(
+ defer.gatherResults([
+ logcontext.run_in_background(
+ self.store.get_event,
+ d,
+ allow_none=True,
+ allow_rejected=False,
)
+ for d in different_auth
+ if d in have_events and not have_events[d]
+ ], consumeErrors=True)
+ ).addErrback(unwrapFirstError)
+
+ if different_events:
+ local_view = dict(auth_events)
+ remote_view = dict(auth_events)
+ remote_view.update({
+ (d.type, d.state_key): d for d in different_events if d
+ })
- auth_events.update(new_state)
+ new_state = yield self.state_handler.resolve_events(
+ room_version,
+ [list(local_view.values()), list(remote_view.values())],
+ event
+ )
- current_state = set(e.event_id for e in auth_events.values())
- different_auth = event_auth_events - current_state
+ logger.info(
+ "After state res: updating auth_events with new state %s",
+ {
+ (d.type, d.state_key): d.event_id for d in new_state.values()
+ if auth_events.get((d.type, d.state_key)) != d
+ },
+ )
- yield self._update_context_for_auth_events(
- event, context, auth_events, event_key,
- )
+ auth_events.update(new_state)
- if different_auth and not event.internal_metadata.is_outlier():
- logger.info("Different auth after resolution: %s", different_auth)
+ different_auth = event_auth_events.difference(
+ e.event_id for e in auth_events.values()
+ )
- # Only do auth resolution if we have something new to say.
- # We can't rove an auth failure.
- do_resolution = False
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
- provable = [
- RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
- ]
+ if not different_auth:
+ # we're done
+ return
- for e_id in different_auth:
- if e_id in have_events:
- if have_events[e_id] in provable:
- do_resolution = True
- break
+ logger.info(
+ "auth_events still refers to events which are not in the calculated auth "
+ "chain after state resolution: %s",
+ different_auth,
+ )
- if do_resolution:
- prev_state_ids = yield context.get_prev_state_ids(self.store)
- # 1. Get what we think is the auth chain.
- auth_ids = yield self.auth.compute_auth_events(
- event, prev_state_ids
- )
- local_auth_chain = yield self.store.get_auth_chain(
- auth_ids, include_given=True
- )
+ # Only do auth resolution if we have something new to say.
+ # We can't prove an auth failure.
+ do_resolution = False
- try:
- # 2. Get remote difference.
- result = yield self.federation_client.query_auth(
- origin,
- event.room_id,
- event.event_id,
- local_auth_chain,
- )
+ for e_id in different_auth:
+ if e_id in have_events:
+ if have_events[e_id] == RejectedReason.NOT_ANCESTOR:
+ do_resolution = True
+ break
- seen_remotes = yield self.store.have_seen_events(
- [e.event_id for e in result["auth_chain"]]
- )
+ if not do_resolution:
+ logger.info(
+ "Skipping auth resolution due to lack of provable rejection reasons"
+ )
+ return
- # 3. Process any remote auth chain events we haven't seen.
- for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes:
- continue
+ logger.info("Doing auth resolution")
- if ev.event_id == event.event_id:
- continue
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
- try:
- auth_ids = ev.auth_event_ids()
- auth = {
- (e.type, e.state_key): e
- for e in result["auth_chain"]
- if e.event_id in auth_ids
- or event.type == EventTypes.Create
- }
- ev.internal_metadata.outlier = True
+ # 1. Get what we think is the auth chain.
+ auth_ids = yield self.auth.compute_auth_events(
+ event, prev_state_ids
+ )
+ local_auth_chain = yield self.store.get_auth_chain(
+ auth_ids, include_given=True
+ )
- logger.debug(
- "do_auth %s different_auth: %s",
- event.event_id, e.event_id
- )
+ try:
+ # 2. Get remote difference.
+ result = yield self.federation_client.query_auth(
+ origin,
+ event.room_id,
+ event.event_id,
+ local_auth_chain,
+ )
- yield self._handle_new_event(
- origin, ev, auth_events=auth
- )
+ seen_remotes = yield self.store.have_seen_events(
+ [e.event_id for e in result["auth_chain"]]
+ )
- if ev.event_id in event_auth_events:
- auth_events[(ev.type, ev.state_key)] = ev
- except AuthError:
- pass
+ # 3. Process any remote auth chain events we haven't seen.
+ for ev in result["auth_chain"]:
+ if ev.event_id in seen_remotes:
+ continue
- except Exception:
- # FIXME:
- logger.exception("Failed to query auth chain")
+ if ev.event_id == event.event_id:
+ continue
- # 4. Look at rejects and their proofs.
- # TODO.
+ try:
+ auth_ids = ev.auth_event_ids()
+ auth = {
+ (e.type, e.state_key): e
+ for e in result["auth_chain"]
+ if e.event_id in auth_ids
+ or event.type == EventTypes.Create
+ }
+ ev.internal_metadata.outlier = True
+
+ logger.debug(
+ "do_auth %s different_auth: %s",
+ event.event_id, e.event_id
+ )
- yield self._update_context_for_auth_events(
- event, context, auth_events, event_key,
- )
+ yield self._handle_new_event(
+ origin, ev, auth_events=auth
+ )
- try:
- self.auth.check(room_version, event, auth_events=auth_events)
- except AuthError as e:
- logger.warn("Failed auth resolution for %r because %s", event, e)
- raise e
+ if ev.event_id in event_auth_events:
+ auth_events[(ev.type, ev.state_key)] = ev
+ except AuthError:
+ pass
+
+ except Exception:
+ # FIXME:
+ logger.exception("Failed to query auth chain")
+
+ # 4. Look at rejects and their proofs.
+ # TODO.
+
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 7dfae78db0..aaee5db0b7 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
@@ -43,6 +42,7 @@ class InitialSyncHandler(BaseHandler):
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()
+ self._event_serializer = hs.get_event_client_serializer()
def snapshot_all_rooms(self, user_id=None, pagin_config=None,
as_client_event=True, include_archived=False):
@@ -138,7 +138,9 @@ class InitialSyncHandler(BaseHandler):
d["inviter"] = event.sender
invite_event = yield self.store.get_event(event.event_id)
- d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+ d["invite"] = yield self._event_serializer.serialize_event(
+ invite_event, time_now, as_client_event,
+ )
rooms_ret.append(d)
@@ -185,18 +187,21 @@ class InitialSyncHandler(BaseHandler):
time_now = self.clock.time_msec()
d["messages"] = {
- "chunk": [
- serialize_event(m, time_now, as_client_event)
- for m in messages
- ],
+ "chunk": (
+ yield self._event_serializer.serialize_events(
+ messages, time_now=time_now,
+ as_client_event=as_client_event,
+ )
+ ),
"start": start_token.to_string(),
"end": end_token.to_string(),
}
- d["state"] = [
- serialize_event(c, time_now, as_client_event)
- for c in current_state.values()
- ]
+ d["state"] = yield self._event_serializer.serialize_events(
+ current_state.values(),
+ time_now=time_now,
+ as_client_event=as_client_event
+ )
account_data_events = []
tags = tags_by_room.get(event.room_id)
@@ -337,11 +342,15 @@ class InitialSyncHandler(BaseHandler):
"membership": membership,
"room_id": room_id,
"messages": {
- "chunk": [serialize_event(m, time_now) for m in messages],
+ "chunk": (yield self._event_serializer.serialize_events(
+ messages, time_now,
+ )),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
- "state": [serialize_event(s, time_now) for s in room_state.values()],
+ "state": (yield self._event_serializer.serialize_events(
+ room_state.values(), time_now,
+ )),
"presence": [],
"receipts": [],
})
@@ -355,10 +364,9 @@ class InitialSyncHandler(BaseHandler):
# TODO: These concurrently
time_now = self.clock.time_msec()
- state = [
- serialize_event(x, time_now)
- for x in current_state.values()
- ]
+ state = yield self._event_serializer.serialize_events(
+ current_state.values(), time_now,
+ )
now_token = yield self.hs.get_event_sources().get_current_token()
@@ -425,7 +433,9 @@ class InitialSyncHandler(BaseHandler):
ret = {
"room_id": room_id,
"messages": {
- "chunk": [serialize_event(m, time_now) for m in messages],
+ "chunk": (yield self._event_serializer.serialize_events(
+ messages, time_now,
+ )),
"start": start_token.to_string(),
"end": end_token.to_string(),
},
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 224d34ef3a..0b02469ceb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RelationTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -32,7 +32,6 @@ from synapse.api.errors import (
)
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
-from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
@@ -57,6 +56,7 @@ class MessageHandler(object):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.store = hs.get_datastore()
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -164,9 +164,13 @@ class MessageHandler(object):
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
- defer.returnValue(
- [serialize_event(c, now) for c in room_state.values()]
+ events = yield self._event_serializer.serialize_events(
+ room_state.values(), now,
+ # We don't bother bundling aggregations in when asked for state
+ # events, as clients won't use them.
+ bundle_aggregations=False,
)
+ defer.returnValue(events)
@defer.inlineCallbacks
def get_joined_members(self, requester, room_id):
@@ -228,6 +232,7 @@ class EventCreationHandler(object):
self.ratelimiter = hs.get_ratelimiter()
self.notifier = hs.get_notifier()
self.config = hs.config
+ self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
@@ -336,6 +341,35 @@ class EventCreationHandler(object):
prev_events_and_hashes=prev_events_and_hashes,
)
+ # In an ideal world we wouldn't need the second part of this condition. However,
+ # this behaviour isn't spec'd yet, meaning we should be able to deactivate this
+ # behaviour. Another reason is that this code is also evaluated each time a new
+ # m.room.aliases event is created, which includes hitting a /directory route.
+ # Therefore not including this condition here would render the similar one in
+ # synapse.handlers.directory pointless.
+ if builder.type == EventTypes.Aliases and self.require_membership_for_aliases:
+ # Ideally we'd do the membership check in event_auth.check(), which
+ # describes a spec'd algorithm for authenticating events received over
+ # federation as well as those created locally. As of room v3, aliases events
+ # can be created by users that are not in the room, therefore we have to
+ # tolerate them in event_auth.check().
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+ if not prev_event or prev_event.membership != Membership.JOIN:
+ logger.warning(
+ ("Attempt to send `m.room.aliases` in room %s by user %s but"
+ " membership is %s"),
+ event.room_id,
+ event.sender,
+ prev_event.membership if prev_event else None,
+ )
+
+ raise AuthError(
+ 403,
+ "You must be in the room to create an alias for it",
+ )
+
self.validator.validate_new(event)
defer.returnValue((event, context))
@@ -570,6 +604,20 @@ class EventCreationHandler(object):
self.validator.validate_new(event)
+ # If this event is an annotation then we check that that the sender
+ # can't annotate the same way twice (e.g. stops users from liking an
+ # event multiple times).
+ relation = event.content.get("m.relates_to", {})
+ if relation.get("rel_type") == RelationTypes.ANNOTATION:
+ relates_to = relation["event_id"]
+ aggregation_key = relation["key"]
+
+ already_exists = yield self.store.has_user_annotated_event(
+ relates_to, event.type, aggregation_key, event.sender,
+ )
+ if already_exists:
+ raise SynapseError(400, "Can't send same reaction twice")
+
logger.debug(
"Created event %s",
event.event_id,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e4fdae9266..8f811e24fe 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -20,7 +20,6 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
-from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
@@ -78,6 +77,7 @@ class PaginationHandler(object):
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
+ self._event_serializer = hs.get_event_client_serializer()
def start_purge_history(self, room_id, token,
delete_local_events=False):
@@ -278,18 +278,22 @@ class PaginationHandler(object):
time_now = self.clock.time_msec()
chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
+ "chunk": (
+ yield self._event_serializer.serialize_events(
+ events, time_now,
+ as_client_event=as_client_event,
+ )
+ ),
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
if state:
- chunk["state"] = [
- serialize_event(e, time_now, as_client_event)
- for e in state
- ]
+ chunk["state"] = (
+ yield self._event_serializer.serialize_events(
+ state, time_now,
+ as_client_event=as_client_event,
+ )
+ )
defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index bd1285b15c..6209858bbb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -182,17 +182,27 @@ class PresenceHandler(object):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
+ def run_timeout_handler():
+ return run_as_background_process(
+ "handle_presence_timeouts", self._handle_timeouts
+ )
+
self.clock.call_later(
30,
self.clock.looping_call,
- self._handle_timeouts,
+ run_timeout_handler,
5000,
)
+ def run_persister():
+ return run_as_background_process(
+ "persist_presence_changes", self._persist_unpersisted_changes
+ )
+
self.clock.call_later(
60,
self.clock.looping_call,
- self._persist_unpersisted_changes,
+ run_persister,
60 * 1000,
)
@@ -229,6 +239,7 @@ class PresenceHandler(object):
)
if self.unpersisted_users_changes:
+
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in self.unpersisted_users_changes
@@ -240,30 +251,18 @@ class PresenceHandler(object):
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
"""
- logger.info(
- "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
- len(self.unpersisted_users_changes)
- )
-
unpersisted = self.unpersisted_users_changes
self.unpersisted_users_changes = set()
if unpersisted:
+ logger.info(
+ "Persisting %d upersisted presence updates", len(unpersisted)
+ )
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in unpersisted
])
- logger.info("Finished _persist_unpersisted_changes")
-
- @defer.inlineCallbacks
- def _update_states_and_catch_exception(self, new_states):
- try:
- res = yield self._update_states(new_states)
- defer.returnValue(res)
- except Exception:
- logger.exception("Error updating presence")
-
@defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +337,41 @@ class PresenceHandler(object):
logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- try:
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = set(self.wheel_timer.fetch(now))
-
- # Check whether the lists of syncing processes from an external
- # process have expired.
- expired_process_ids = [
- process_id for process_id, last_update
- in self.external_process_last_updated_ms.items()
- if now - last_update > EXTERNAL_PROCESS_EXPIRY
- ]
- for process_id in expired_process_ids:
- users_to_check.update(
- self.external_process_last_updated_ms.pop(process_id, ())
- )
- self.external_process_last_update.pop(process_id)
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in users_to_check
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc(len(states))
+ timers_fired_counter.inc(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.is_mine_id,
- syncing_user_ids=self.get_currently_syncing_users(),
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- run_in_background(self._update_states_and_catch_exception, changes)
- except Exception:
- logger.exception("Exception in _handle_timeouts loop")
+ return self._update_states(changes)
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
@@ -828,6 +823,11 @@ class PresenceHandler(object):
if typ != EventTypes.Member:
continue
+ if event_id is None:
+ # state has been deleted, so this is not a join. We only care about
+ # joins.
+ continue
+
event = yield self.store.get_event(event_id)
if event.content.get("membership") != Membership.JOIN:
# We only care about joins
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a65c98ff5c..a5fc6c5dbf 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,6 +31,9 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+MAX_DISPLAYNAME_LEN = 100
+MAX_AVATAR_URL_LEN = 1000
+
class BaseProfileHandler(BaseHandler):
"""Handles fetching and updating user profile information.
@@ -53,6 +56,7 @@ class BaseProfileHandler(BaseHandler):
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
+
if self.hs.is_mine(target_user):
try:
displayname = yield self.store.get_profile_displayname(
@@ -161,6 +165,11 @@ class BaseProfileHandler(BaseHandler):
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's displayname")
+ if len(new_displayname) > MAX_DISPLAYNAME_LEN:
+ raise SynapseError(
+ 400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN, ),
+ )
+
if new_displayname == '':
new_displayname = None
@@ -216,6 +225,11 @@ class BaseProfileHandler(BaseHandler):
if not by_admin and target_user != requester.user:
raise AuthError(400, "Cannot set another user's avatar_url")
+ if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
+ raise SynapseError(
+ 400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN, ),
+ )
+
yield self.store.set_profile_avatar_url(
target_user.localpart, new_avatar_url
)
@@ -283,6 +297,48 @@ class BaseProfileHandler(BaseHandler):
room_id, str(e)
)
+ @defer.inlineCallbacks
+ def check_profile_query_allowed(self, target_user, requester=None):
+ """Checks whether a profile query is allowed. If the
+ 'require_auth_for_profile_requests' config flag is set to True and a
+ 'requester' is provided, the query is only allowed if the two users
+ share a room.
+
+ Args:
+ target_user (UserID): The owner of the queried profile.
+ requester (None|UserID): The user querying for the profile.
+
+ Raises:
+ SynapseError(403): The two users share no room, or ne user couldn't
+ be found to be in any room the server is in, and therefore the query
+ is denied.
+ """
+ # Implementation of MSC1301: don't allow looking up profiles if the
+ # requester isn't in the same room as the target. We expect requester to
+ # be None when this function is called outside of a profile query, e.g.
+ # when building a membership event. In this case, we must allow the
+ # lookup.
+ if not self.hs.config.require_auth_for_profile_requests or not requester:
+ return
+
+ try:
+ requester_rooms = yield self.store.get_rooms_for_user(
+ requester.to_string()
+ )
+ target_user_rooms = yield self.store.get_rooms_for_user(
+ target_user.to_string(),
+ )
+
+ # Check if the room lists have no elements in common.
+ if requester_rooms.isdisjoint(target_user_rooms):
+ raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
+ except StoreError as e:
+ if e.code == 404:
+ # This likely means that one of the users doesn't exist,
+ # so we act as if we couldn't find the profile.
+ raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
+ raise
+
class MasterProfileHandler(BaseProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a51d11a257..9a388ea013 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -19,7 +19,7 @@ import logging
from twisted.internet import defer
from synapse import types
-from synapse.api.constants import LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, LoginType
from synapse.api.errors import (
AuthError,
Codes,
@@ -123,6 +123,15 @@ class RegistrationHandler(BaseHandler):
self.check_user_id_not_appservice_exclusive(user_id)
+ if len(user_id) > MAX_USERID_LENGTH:
+ raise SynapseError(
+ 400,
+ "User ID may not be longer than %s characters" % (
+ MAX_USERID_LENGTH,
+ ),
+ Codes.INVALID_USERNAME
+ )
+
users = yield self.store.get_users_by_id_case_insensitive(user_id)
if users:
if not guest_access_token:
@@ -522,6 +531,8 @@ class RegistrationHandler(BaseHandler):
A tuple of (user_id, access_token).
Raises:
RegistrationError if there was a problem registering.
+
+ NB this is only used in tests. TODO: move it to the test package!
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 17628e2684..4a17911a87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -70,6 +70,7 @@ class RoomCreationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
+ self.config = hs.config
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -402,7 +403,7 @@ class RoomCreationHandler(BaseHandler):
yield directory_handler.create_association(
requester, RoomAlias.from_string(alias),
new_room_id, servers=(self.hs.hostname, ),
- send_event=False,
+ send_event=False, check_membership=False,
)
logger.info("Moved alias %s to new room", alias)
except SynapseError as e:
@@ -475,7 +476,11 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
- room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
+ room_version = config.get(
+ "room_version",
+ self.config.default_room_version.identifier,
+ )
+
if not isinstance(room_version, string_types):
raise SynapseError(
400,
@@ -538,6 +543,7 @@ class RoomCreationHandler(BaseHandler):
room_alias=room_alias,
servers=[self.hs.hostname],
send_event=False,
+ check_membership=False,
)
preset_config = config.get(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 024d6db27a..93ac986c86 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -33,6 +34,8 @@ from synapse.types import RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
+from ._base import BaseHandler
+
logger = logging.getLogger(__name__)
id_server_scheme = "https://"
@@ -71,6 +74,12 @@ class RoomMemberHandler(object):
self.spam_checker = hs.get_spam_checker()
self._server_notices_mxid = self.config.server_notices_mxid
self._enable_lookup = hs.config.enable_3pid_lookup
+ self.allow_per_room_profiles = self.config.allow_per_room_profiles
+
+ # This is only used to get at ratelimit function, and
+ # maybe_kick_guest_users. It's fine there are multiple of these as
+ # it doesn't store state.
+ self.base_handler = BaseHandler(hs)
@abc.abstractmethod
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -350,6 +359,13 @@ class RoomMemberHandler(object):
# later on.
content = dict(content)
+ if not self.allow_per_room_profiles:
+ # Strip profile data, knowing that new profile data will be added to the
+ # event's content in event_creation_handler.create_event() using the target's
+ # global profile.
+ content.pop("displayname", None)
+ content.pop("avatar_url", None)
+
effective_membership_state = action
if action in ["kick", "unban"]:
effective_membership_state = "leave"
@@ -703,6 +719,10 @@ class RoomMemberHandler(object):
Codes.FORBIDDEN,
)
+ # We need to rate limit *before* we send out any 3PID invites, so we
+ # can't just rely on the standard ratelimiting of events.
+ yield self.base_handler.ratelimit(requester)
+
invitee = yield self._lookup_3pid(
id_server, medium, address
)
@@ -924,7 +944,7 @@ class RoomMemberHandler(object):
}
if self.config.invite_3pid_guest:
- guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
+ guest_user_id, guest_access_token = yield self.get_or_register_3pid_guest(
requester=requester,
medium=medium,
address=address,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 49c439313e..9bba74d6c9 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -23,7 +23,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
-from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client
@@ -36,6 +35,7 @@ class SearchHandler(BaseHandler):
def __init__(self, hs):
super(SearchHandler, self).__init__(hs)
+ self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
@@ -401,14 +401,16 @@ class SearchHandler(BaseHandler):
time_now = self.clock.time_msec()
for context in contexts.values():
- context["events_before"] = [
- serialize_event(e, time_now)
- for e in context["events_before"]
- ]
- context["events_after"] = [
- serialize_event(e, time_now)
- for e in context["events_after"]
- ]
+ context["events_before"] = (
+ yield self._event_serializer.serialize_events(
+ context["events_before"], time_now,
+ )
+ )
+ context["events_after"] = (
+ yield self._event_serializer.serialize_events(
+ context["events_after"], time_now,
+ )
+ )
state_results = {}
if include_state:
@@ -422,14 +424,13 @@ class SearchHandler(BaseHandler):
# We're now about to serialize the events. We should not make any
# blocking calls after this. Otherwise the 'age' will be wrong
- results = [
- {
+ results = []
+ for e in allowed_events:
+ results.append({
"rank": rank_map[e.event_id],
- "result": serialize_event(e, time_now),
+ "result": (yield self._event_serializer.serialize_event(e, time_now)),
"context": contexts.get(e.event_id, {}),
- }
- for e in allowed_events
- ]
+ })
rooms_cat_res = {
"results": results,
@@ -438,10 +439,13 @@ class SearchHandler(BaseHandler):
}
if state_results:
- rooms_cat_res["state"] = {
- room_id: [serialize_event(e, time_now) for e in state]
- for room_id, state in state_results.items()
- }
+ s = {}
+ for room_id, state in state_results.items():
+ s[room_id] = yield self._event_serializer.serialize_events(
+ state, time_now,
+ )
+
+ rooms_cat_res["state"] = s
if room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..0e92b405ba
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,325 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import UserID
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(StateDeltasHandler):
+ """Handles keeping the *_stats tables updated with a simple time-series of
+ information about the users, rooms and media on the server, such that admins
+ have some idea of who is consuming their resources.
+
+ Heavily derived from UserDirectoryHandler
+ """
+
+ def __init__(self, hs):
+ super(StatsHandler, self).__init__(hs)
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
+ self.server_name = hs.hostname
+ self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
+ self.stats_bucket_size = hs.config.stats_bucket_size
+
+ # The current position in the current_state_delta stream
+ self.pos = None
+
+ # Guard to ensure we only process deltas one at a time
+ self._is_processing = False
+
+ if hs.config.stats_enabled:
+ self.notifier.add_replication_callback(self.notify_new_event)
+
+ # We kick this off so that we don't have to wait for a change before
+ # we start populating stats
+ self.clock.call_later(0, self.notify_new_event)
+
+ def notify_new_event(self):
+ """Called when there may be more deltas to process
+ """
+ if not self.hs.config.stats_enabled:
+ return
+
+ if self._is_processing:
+ return
+
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
+ self._is_processing = True
+ run_as_background_process("stats.notify_new_event", process)
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ # If self.pos is None then means we haven't fetched it from DB
+ if self.pos is None:
+ self.pos = yield self.store.get_stats_stream_pos()
+
+ # If still None then the initial background update hasn't happened yet
+ if self.pos is None:
+ defer.returnValue(None)
+
+ # Loop round handling deltas until we're up to date
+ while True:
+ with Measure(self.clock, "stats_delta"):
+ deltas = yield self.store.get_current_state_deltas(self.pos)
+ if not deltas:
+ return
+
+ logger.info("Handling %d state deltas", len(deltas))
+ yield self._handle_deltas(deltas)
+
+ self.pos = deltas[-1]["stream_id"]
+ yield self.store.update_stats_stream_pos(self.pos)
+
+ event_processing_positions.labels("stats").set(self.pos)
+
+ @defer.inlineCallbacks
+ def _handle_deltas(self, deltas):
+ """
+ Called with the state deltas to process
+ """
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ stream_id = delta["stream_id"]
+ prev_event_id = delta["prev_event_id"]
+
+ logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+ token = yield self.store.get_earliest_token_for_room_stats(room_id)
+
+ # If the earliest token to begin from is larger than our current
+ # stream ID, skip processing this delta.
+ if token is not None and token >= stream_id:
+ logger.debug(
+ "Ignoring: %s as earlier than this room's initial ingestion event",
+ event_id,
+ )
+ continue
+
+ if event_id is None and prev_event_id is None:
+ # Errr...
+ continue
+
+ event_content = {}
+
+ if event_id is not None:
+ event_content = (yield self.store.get_event(event_id)).content or {}
+
+ # quantise time to the nearest bucket
+ now = yield self.store.get_received_ts(event_id)
+ now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+
+ if typ == EventTypes.Member:
+ # we could use _get_key_change here but it's a bit inefficient
+ # given we're not testing for a specific result; might as well
+ # just grab the prev_membership and membership strings and
+ # compare them.
+ prev_event_content = {}
+ if prev_event_id is not None:
+ prev_event_content = (
+ yield self.store.get_event(prev_event_id)
+ ).content
+
+ membership = event_content.get("membership", Membership.LEAVE)
+ prev_membership = prev_event_content.get("membership", Membership.LEAVE)
+
+ if prev_membership == membership:
+ continue
+
+ if prev_membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "joined_members", -1
+ )
+ elif prev_membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "invited_members", -1
+ )
+ elif prev_membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "left_members", -1
+ )
+ elif prev_membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "banned_members", -1
+ )
+ else:
+ err = "%s is not a valid prev_membership" % (repr(prev_membership),)
+ logger.error(err)
+ raise ValueError(err)
+
+ if membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "joined_members", +1
+ )
+ elif membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "invited_members", +1
+ )
+ elif membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "left_members", +1
+ )
+ elif membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "banned_members", +1
+ )
+ else:
+ err = "%s is not a valid membership" % (repr(membership),)
+ logger.error(err)
+ raise ValueError(err)
+
+ user_id = state_key
+ if self.is_mine_id(user_id):
+ # update user_stats as it's one of our users
+ public = yield self._is_public_room(room_id)
+
+ if membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now,
+ "user",
+ user_id,
+ "public_rooms" if public else "private_rooms",
+ -1,
+ )
+ elif membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now,
+ "user",
+ user_id,
+ "public_rooms" if public else "private_rooms",
+ +1,
+ )
+
+ elif typ == EventTypes.Create:
+ # Newly created room. Add it with all blank portions.
+ yield self.store.update_room_state(
+ room_id,
+ {
+ "join_rules": None,
+ "history_visibility": None,
+ "encryption": None,
+ "name": None,
+ "topic": None,
+ "avatar": None,
+ "canonical_alias": None,
+ },
+ )
+
+ elif typ == EventTypes.JoinRules:
+ yield self.store.update_room_state(
+ room_id, {"join_rules": event_content.get("join_rule")}
+ )
+
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
+
+ elif typ == EventTypes.RoomHistoryVisibility:
+ yield self.store.update_room_state(
+ room_id,
+ {"history_visibility": event_content.get("history_visibility")},
+ )
+
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "history_visibility", "world_readable"
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
+
+ elif typ == EventTypes.Encryption:
+ yield self.store.update_room_state(
+ room_id, {"encryption": event_content.get("algorithm")}
+ )
+ elif typ == EventTypes.Name:
+ yield self.store.update_room_state(
+ room_id, {"name": event_content.get("name")}
+ )
+ elif typ == EventTypes.Topic:
+ yield self.store.update_room_state(
+ room_id, {"topic": event_content.get("topic")}
+ )
+ elif typ == EventTypes.RoomAvatar:
+ yield self.store.update_room_state(
+ room_id, {"avatar": event_content.get("url")}
+ )
+ elif typ == EventTypes.CanonicalAlias:
+ yield self.store.update_room_state(
+ room_id, {"canonical_alias": event_content.get("alias")}
+ )
+
+ @defer.inlineCallbacks
+ def update_public_room_stats(self, ts, room_id, is_public):
+ """
+ Increment/decrement a user's number of public rooms when a room they are
+ in changes to/from public visibility.
+
+ Args:
+ ts (int): Timestamp in seconds
+ room_id (str)
+ is_public (bool)
+ """
+ # For now, blindly iterate over all local users in the room so that
+ # we can handle the whole problem of copying buckets over as needed
+ user_ids = yield self.store.get_users_in_room(room_id)
+
+ for user_id in user_ids:
+ if self.hs.is_mine(UserID.from_string(user_id)):
+ yield self.store.update_stats_delta(
+ ts, "user", user_id, "public_rooms", +1 if is_public else -1
+ )
+ yield self.store.update_stats_delta(
+ ts, "user", user_id, "private_rooms", -1 if is_public else +1
+ )
+
+ @defer.inlineCallbacks
+ def _is_public_room(self, room_id):
+ join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
+ history_visibility = yield self.state.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility
+ )
+
+ if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
+ (
+ history_visibility
+ and history_visibility.content.get("history_visibility")
+ == "world_readable"
+ )
+ ):
+ defer.returnValue(True)
+ else:
+ defer.returnValue(False)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7cf757f65a..72997d6d04 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -934,7 +934,7 @@ class SyncHandler(object):
res = yield self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)
- newly_joined_rooms, newly_joined_users, _, _ = res
+ newly_joined_rooms, newly_joined_or_invited_users, _, _ = res
_, _, newly_left_rooms, newly_left_users = res
block_all_presence_data = (
@@ -943,7 +943,7 @@ class SyncHandler(object):
)
if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
- sync_result_builder, newly_joined_rooms, newly_joined_users
+ sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
)
yield self._generate_sync_entry_for_to_device(sync_result_builder)
@@ -951,7 +951,7 @@ class SyncHandler(object):
device_lists = yield self._generate_sync_entry_for_device_list(
sync_result_builder,
newly_joined_rooms=newly_joined_rooms,
- newly_joined_users=newly_joined_users,
+ newly_joined_or_invited_users=newly_joined_or_invited_users,
newly_left_rooms=newly_left_rooms,
newly_left_users=newly_left_users,
)
@@ -1036,7 +1036,8 @@ class SyncHandler(object):
@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
def _generate_sync_entry_for_device_list(self, sync_result_builder,
- newly_joined_rooms, newly_joined_users,
+ newly_joined_rooms,
+ newly_joined_or_invited_users,
newly_left_rooms, newly_left_users):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
@@ -1050,7 +1051,7 @@ class SyncHandler(object):
# share a room with?
for room_id in newly_joined_rooms:
joined_users = yield self.state.get_current_users_in_room(room_id)
- newly_joined_users.update(joined_users)
+ newly_joined_or_invited_users.update(joined_users)
for room_id in newly_left_rooms:
left_users = yield self.state.get_current_users_in_room(room_id)
@@ -1058,7 +1059,7 @@ class SyncHandler(object):
# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
- changed.update(newly_joined_users)
+ changed.update(newly_joined_or_invited_users)
if not changed and not newly_left_users:
defer.returnValue(DeviceLists(
@@ -1176,7 +1177,7 @@ class SyncHandler(object):
@defer.inlineCallbacks
def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
- newly_joined_users):
+ newly_joined_or_invited_users):
"""Generates the presence portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1184,8 +1185,9 @@ class SyncHandler(object):
sync_result_builder(SyncResultBuilder)
newly_joined_rooms(list): List of rooms that the user has joined
since the last sync (or empty if an initial sync)
- newly_joined_users(list): List of users that have joined rooms
- since the last sync (or empty if an initial sync)
+ newly_joined_or_invited_users(list): List of users that have joined
+ or been invited to rooms since the last sync (or empty if an initial
+ sync)
"""
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -1211,7 +1213,7 @@ class SyncHandler(object):
"presence_key", presence_key
)
- extra_users_ids = set(newly_joined_users)
+ extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms:
users = yield self.state.get_current_users_in_room(room_id)
extra_users_ids.update(users)
@@ -1243,7 +1245,8 @@ class SyncHandler(object):
Returns:
Deferred(tuple): Returns a 4-tuple of
- `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
+ `(newly_joined_rooms, newly_joined_or_invited_users,
+ newly_left_rooms, newly_left_users)`
"""
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
@@ -1314,8 +1317,8 @@ class SyncHandler(object):
sync_result_builder.invited.extend(invited)
- # Now we want to get any newly joined users
- newly_joined_users = set()
+ # Now we want to get any newly joined or invited users
+ newly_joined_or_invited_users = set()
newly_left_users = set()
if since_token:
for joined_sync in sync_result_builder.joined:
@@ -1324,19 +1327,22 @@ class SyncHandler(object):
)
for event in it:
if event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- newly_joined_users.add(event.state_key)
+ if (
+ event.membership == Membership.JOIN or
+ event.membership == Membership.INVITE
+ ):
+ newly_joined_or_invited_users.add(event.state_key)
else:
prev_content = event.unsigned.get("prev_content", {})
prev_membership = prev_content.get("membership", None)
if prev_membership == Membership.JOIN:
newly_left_users.add(event.state_key)
- newly_left_users -= newly_joined_users
+ newly_left_users -= newly_joined_or_invited_users
defer.returnValue((
newly_joined_rooms,
- newly_joined_users,
+ newly_joined_or_invited_users,
newly_left_rooms,
newly_left_users,
))
@@ -1381,7 +1387,7 @@ class SyncHandler(object):
where:
room_entries is a list [RoomSyncResultBuilder]
invited_rooms is a list [InvitedSyncResult]
- newly_joined rooms is a list[str] of room ids
+ newly_joined_rooms is a list[str] of room ids
newly_left_rooms is a list[str] of room ids
"""
user_id = sync_result_builder.sync_config.user.to_string()
@@ -1422,7 +1428,7 @@ class SyncHandler(object):
if room_id in sync_result_builder.joined_room_ids and non_joins:
# Always include if the user (re)joined the room, especially
# important so that device list changes are calculated correctly.
- # If there are non join member events, but we are still in the room,
+ # If there are non-join member events, but we are still in the room,
# then the user must have left and joined
newly_joined_rooms.append(room_id)
|