diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 744a9ee507..064e8723c8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -53,25 +53,10 @@ class BaseHandler(object):
self.event_builder_factory = hs.get_event_builder_factory()
@defer.inlineCallbacks
- def _filter_events_for_clients(self, user_tuples, events):
+ def _filter_events_for_clients(self, user_tuples, events, event_id_to_state):
""" Returns dict of user_id -> list of events that user is allowed to
see.
"""
- # If there is only one user, just get the state for that one user,
- # otherwise just get all the state.
- if len(user_tuples) == 1:
- types = (
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_tuples[0][0]),
- )
- else:
- types = None
-
- event_id_to_state = yield self.store.get_state_for_events(
- frozenset(e.event_id for e in events),
- types=types
- )
-
forgotten = yield defer.gatherResults([
self.store.who_forgot_in_room(
room_id,
@@ -135,7 +120,17 @@ class BaseHandler(object):
@defer.inlineCallbacks
def _filter_events_for_client(self, user_id, events, is_peeking=False):
# Assumes that user has at some point joined the room if not is_guest.
- res = yield self._filter_events_for_clients([(user_id, is_peeking)], events)
+ types = (
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, user_id),
+ )
+ event_id_to_state = yield self.store.get_state_for_events(
+ frozenset(e.event_id for e in events),
+ types=types
+ )
+ res = yield self._filter_events_for_clients(
+ [(user_id, is_peeking)], events, event_id_to_state
+ )
defer.returnValue(res.get(user_id, []))
def ratelimit(self, user_id):
@@ -147,7 +142,7 @@ class BaseHandler(object):
)
if not allowed:
raise LimitExceededError(
- retry_after_ms=int(1000*(time_allowed - time_now)),
+ retry_after_ms=int(1000 * (time_allowed - time_now)),
)
@defer.inlineCallbacks
@@ -269,13 +264,13 @@ class BaseHandler(object):
"You don't have permission to redact events"
)
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
- event, context=context
- )
-
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
- event, self
+ event, context, self
+ )
+
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
)
destinations = set()
@@ -293,19 +288,11 @@ class BaseHandler(object):
with PreserveLoggingContext():
# Don't block waiting on waking up all the listeners.
- notify_d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- notify_d.addErrback(log_failure)
-
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 691564c651..4efecb1ffd 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -175,8 +175,8 @@ class DirectoryHandler(BaseHandler):
# If this server is in the list of servers, return it first.
if self.server_name in servers:
servers = (
- [self.server_name]
- + [s for s in servers if s != self.server_name]
+ [self.server_name] +
+ [s for s in servers if s != self.server_name]
)
else:
servers = list(servers)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 254b483da6..4933c31c19 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.types import UserID
from synapse.events.utils import serialize_event
+from synapse.util.logcontext import preserve_context_over_fn
from ._base import BaseHandler
@@ -29,11 +30,17 @@ logger = logging.getLogger(__name__)
def started_user_eventstream(distributor, user):
- return distributor.fire("started_user_eventstream", user)
+ return preserve_context_over_fn(
+ distributor.fire,
+ "started_user_eventstream", user
+ )
def stopped_user_eventstream(distributor, user):
- return distributor.fire("stopped_user_eventstream", user)
+ return preserve_context_over_fn(
+ distributor.fire,
+ "stopped_user_eventstream", user
+ )
class EventStreamHandler(BaseHandler):
@@ -130,7 +137,7 @@ class EventStreamHandler(BaseHandler):
# Add some randomness to this value to try and mitigate against
# thundering herds on restart.
- timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+ timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
events, tokens = yield self.notifier.get_events_for(
auth_user, pagin_config, timeout,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2ce1e9d6c7..da55d43541 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -221,19 +221,11 @@ class FederationHandler(BaseHandler):
extra_users.append(target_user)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
prev_state = context.current_state.get((event.type, event.state_key))
@@ -244,12 +236,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- if not backfilled and not event.internal_metadata.is_outlier():
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
- event, self
- )
-
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
@@ -643,19 +629,11 @@ class FederationHandler(BaseHandler):
)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[joinee]
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -730,18 +708,10 @@ class FederationHandler(BaseHandler):
extra_users.append(target_user)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
@@ -811,19 +781,11 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(event.state_key)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[target_user],
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
defer.returnValue(event)
@defer.inlineCallbacks
@@ -948,18 +910,10 @@ class FederationHandler(BaseHandler):
extra_users.append(target_user)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
new_pdu = event
destinations = set()
@@ -1113,6 +1067,12 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
+ if not backfilled and not event.internal_metadata.is_outlier():
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context, self
+ )
+
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 819ec57c4f..656ce124f9 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -36,14 +36,15 @@ class IdentityHandler(BaseHandler):
self.http_client = hs.get_simple_http_client()
+ self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
+ self.trust_any_id_server_just_for_testing_do_not_use = (
+ hs.config.use_insecure_ssl_client_just_for_testing_do_not_use
+ )
+
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
yield run_on_reactor()
- # XXX: make this configurable!
- # trustedIdServers = ['matrix.org', 'localhost:8090']
- trustedIdServers = ['matrix.org', 'vector.im']
-
if 'id_server' in creds:
id_server = creds['id_server']
elif 'idServer' in creds:
@@ -58,10 +59,19 @@ class IdentityHandler(BaseHandler):
else:
raise SynapseError(400, "No client_secret in creds")
- if id_server not in trustedIdServers:
- logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
- 'credentials', id_server)
- defer.returnValue(None)
+ if id_server not in self.trusted_id_servers:
+ if self.trust_any_id_server_just_for_testing_do_not_use:
+ logger.warn(
+ "Trusting untrustworthy ID server %r even though it isn't"
+ " in the trusted id list for testing because"
+ " 'use_insecure_ssl_client_just_for_testing_do_not_use'"
+ " is set in the config",
+ id_server,
+ )
+ else:
+ logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
+ 'credentials', id_server)
+ defer.returnValue(None)
data = {}
try:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d36eb3b8d7..b61394f2b5 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -34,7 +34,7 @@ metrics = synapse.metrics.get_metrics_for(__name__)
# Don't bother bumping "last active" time if it differs by less than 60 seconds
-LAST_ACTIVE_GRANULARITY = 60*1000
+LAST_ACTIVE_GRANULARITY = 60 * 1000
# Keep no more than this number of offline serial revisions
MAX_OFFLINE_SERIALS = 1000
@@ -378,9 +378,9 @@ class PresenceHandler(BaseHandler):
was_polling = target_user in self._user_cachemap
if now_online and not was_polling:
- self.start_polling_presence(target_user, state=state)
+ yield self.start_polling_presence(target_user, state=state)
elif not now_online and was_polling:
- self.stop_polling_presence(target_user)
+ yield self.stop_polling_presence(target_user)
# TODO(paul): perform a presence push as part of start/stop poll so
# we don't have to do this all the time
@@ -394,7 +394,8 @@ class PresenceHandler(BaseHandler):
if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY:
return
- self.changed_presencelike_data(user, {"last_active": now})
+ with PreserveLoggingContext():
+ self.changed_presencelike_data(user, {"last_active": now})
def get_joined_rooms_for_user(self, user):
"""Get the list of rooms a user is joined to.
@@ -466,11 +467,12 @@ class PresenceHandler(BaseHandler):
local_user, room_ids=[room_id], add_to_cache=False
)
- self.push_update_to_local_and_remote(
- observed_user=local_user,
- users_to_push=[user],
- statuscache=statuscache,
- )
+ with PreserveLoggingContext():
+ self.push_update_to_local_and_remote(
+ observed_user=local_user,
+ users_to_push=[user],
+ statuscache=statuscache,
+ )
@defer.inlineCallbacks
def send_presence_invite(self, observer_user, observed_user):
@@ -556,7 +558,7 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string()
)
- self.start_polling_presence(
+ yield self.start_polling_presence(
observer_user, target_user=observed_user
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c11b98d0b7..24c850ae9b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -21,7 +21,6 @@ from synapse.api.errors import (
AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
)
from ._base import BaseHandler
-import synapse.util.stringutils as stringutils
from synapse.util.async import run_on_reactor
from synapse.http.client import CaptchaServerHttpClient
@@ -45,6 +44,8 @@ class RegistrationHandler(BaseHandler):
self.distributor.declare("registered_user")
self.captcha_client = CaptchaServerHttpClient(hs)
+ self._next_generated_user_id = None
+
@defer.inlineCallbacks
def check_username(self, localpart, guest_access_token=None):
yield run_on_reactor()
@@ -91,7 +92,7 @@ class RegistrationHandler(BaseHandler):
Args:
localpart : The local part of the user ID to register. If None,
- one will be randomly generated.
+ one will be generated.
password (str) : The password to assign to this user so they can
login again. This can be None which means they cannot login again
via a password (e.g. the user is an application service user).
@@ -108,6 +109,18 @@ class RegistrationHandler(BaseHandler):
if localpart:
yield self.check_username(localpart, guest_access_token=guest_access_token)
+ was_guest = guest_access_token is not None
+
+ if not was_guest:
+ try:
+ int(localpart)
+ raise RegistrationError(
+ 400,
+ "Numeric user IDs are reserved for guest users."
+ )
+ except ValueError:
+ pass
+
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -118,38 +131,36 @@ class RegistrationHandler(BaseHandler):
user_id=user_id,
token=token,
password_hash=password_hash,
- was_guest=guest_access_token is not None,
+ was_guest=was_guest,
make_guest=make_guest,
)
yield registered_user(self.distributor, user)
else:
- # autogen a random user ID
+ # autogen a sequential user ID
attempts = 0
- user_id = None
token = None
- while not user_id:
+ user = None
+ while not user:
+ localpart = yield self._generate_user_id(attempts > 0)
+ user = UserID(localpart, self.hs.hostname)
+ user_id = user.to_string()
+ yield self.check_user_id_is_valid(user_id)
+ if generate_token:
+ token = self.auth_handler().generate_access_token(user_id)
try:
- localpart = self._generate_user_id()
- user = UserID(localpart, self.hs.hostname)
- user_id = user.to_string()
- yield self.check_user_id_is_valid(user_id)
- if generate_token:
- token = self.auth_handler().generate_access_token(user_id)
yield self.store.register(
user_id=user_id,
token=token,
- password_hash=password_hash)
-
- yield registered_user(self.distributor, user)
+ password_hash=password_hash,
+ make_guest=make_guest
+ )
except SynapseError:
# if user id is taken, just generate another
user_id = None
token = None
attempts += 1
- if attempts > 5:
- raise RegistrationError(
- 500, "Cannot generate user ID.")
+ yield registered_user(self.distributor, user)
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
@@ -175,7 +186,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=""
)
- registered_user(self.distributor, user)
+ yield registered_user(self.distributor, user)
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -211,7 +222,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID must only contain characters which do not"
" require URL encoding."
- )
+ )
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -281,8 +292,16 @@ class RegistrationHandler(BaseHandler):
errcode=Codes.EXCLUSIVE
)
- def _generate_user_id(self):
- return "-" + stringutils.random_string(18)
+ @defer.inlineCallbacks
+ def _generate_user_id(self, reseed=False):
+ if reseed or self._next_generated_user_id is None:
+ self._next_generated_user_id = (
+ yield self.store.find_next_generated_user_id_localpart()
+ )
+
+ id = self._next_generated_user_id
+ self._next_generated_user_id += 1
+ defer.returnValue(str(id))
@defer.inlineCallbacks
def _validate_captcha(self, ip_addr, private_key, challenge, response):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 58e2d25f97..a8e3a9029c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,13 +18,14 @@ from twisted.internet import defer
from ._base import BaseHandler
-from synapse.types import UserID, RoomAlias, RoomID
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
EventTypes, Membership, JoinRules, RoomCreationPreset,
)
from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
from synapse.util import stringutils, unwrapFirstError
from synapse.util.async import run_on_reactor
+from synapse.util.logcontext import preserve_context_over_fn
from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes
@@ -46,11 +47,17 @@ def collect_presencelike_data(distributor, user, content):
def user_left_room(distributor, user, room_id):
- return distributor.fire("user_left_room", user=user, room_id=room_id)
+ return preserve_context_over_fn(
+ distributor.fire,
+ "user_left_room", user=user, room_id=room_id
+ )
def user_joined_room(distributor, user, room_id):
- return distributor.fire("user_joined_room", user=user, room_id=room_id)
+ return preserve_context_over_fn(
+ distributor.fire,
+ "user_joined_room", user=user, room_id=room_id
+ )
class RoomCreationHandler(BaseHandler):
@@ -876,39 +883,71 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def get_public_room_list(self):
- chunk = yield self.store.get_rooms(is_public=True)
-
- room_members = yield defer.gatherResults(
- [
- self.store.get_users_in_room(room["room_id"])
- for room in chunk
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
-
- avatar_urls = yield defer.gatherResults(
- [
- self.get_room_avatar_url(room["room_id"])
- for room in chunk
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
-
- for i, room in enumerate(chunk):
- room["num_joined_members"] = len(room_members[i])
- if avatar_urls[i]:
- room["avatar_url"] = avatar_urls[i]
+ room_ids = yield self.store.get_public_room_ids()
+
+ @defer.inlineCallbacks
+ def handle_room(room_id):
+ aliases = yield self.store.get_aliases_for_room(room_id)
+ if not aliases:
+ defer.returnValue(None)
+
+ state = yield self.state_handler.get_current_state(room_id)
+
+ result = {"aliases": aliases, "room_id": room_id}
+
+ name_event = state.get((EventTypes.Name, ""), None)
+ if name_event:
+ name = name_event.content.get("name", None)
+ if name:
+ result["name"] = name
+
+ topic_event = state.get((EventTypes.Topic, ""), None)
+ if topic_event:
+ topic = topic_event.content.get("topic", None)
+ if topic:
+ result["topic"] = topic
+
+ canonical_event = state.get((EventTypes.CanonicalAlias, ""), None)
+ if canonical_event:
+ canonical_alias = canonical_event.content.get("alias", None)
+ if canonical_alias:
+ result["canonical_alias"] = canonical_alias
+
+ visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+ visibility = None
+ if visibility_event:
+ visibility = visibility_event.content.get("history_visibility", None)
+ result["world_readable"] = visibility == "world_readable"
+
+ guest_event = state.get((EventTypes.GuestAccess, ""), None)
+ guest = None
+ if guest_event:
+ guest = guest_event.content.get("guest_access", None)
+ result["guest_can_join"] = guest == "can_join"
+
+ avatar_event = state.get(("m.room.avatar", ""), None)
+ if avatar_event:
+ avatar_url = avatar_event.content.get("url", None)
+ if avatar_url:
+ result["avatar_url"] = avatar_url
+
+ result["num_joined_members"] = sum(
+ 1 for (event_type, _), ev in state.items()
+ if event_type == EventTypes.Member and ev.membership == Membership.JOIN
+ )
- # FIXME (erikj): START is no longer a valid value
- defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
+ defer.returnValue(result)
- @defer.inlineCallbacks
- def get_room_avatar_url(self, room_id):
- event = yield self.hs.get_state_handler().get_current_state(
- room_id, "m.room.avatar"
- )
- if event and "url" in event.content:
- defer.returnValue(event.content["url"])
+ result = []
+ for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
+ chunk_result = yield defer.gatherResults([
+ handle_room(room_id)
+ for room_id in chunk
+ ], consumeErrors=True).addErrback(unwrapFirstError)
+ result.extend(v for v in chunk_result if v)
+
+ # FIXME (erikj): START is no longer a valid value
+ defer.returnValue({"start": "START", "end": "END", "chunk": result})
class RoomContextHandler(BaseHandler):
@@ -927,7 +966,7 @@ class RoomContextHandler(BaseHandler):
Returns:
dict, or None if the event isn't found
"""
- before_limit = math.floor(limit/2.)
+ before_limit = math.floor(limit / 2.)
after_limit = limit - before_limit
now_token = yield self.hs.get_event_sources().get_current_token()
@@ -997,6 +1036,11 @@ class RoomEventSource(object):
to_key = yield self.get_current_key()
+ from_token = RoomStreamToken.parse(from_key)
+ if from_token.topological:
+ logger.warn("Stream has topological part!!!! %r", from_key)
+ from_key = "s%s" % (from_token.stream,)
+
app_service = yield self.store.get_app_service_by_user_id(
user.to_string()
)
@@ -1008,15 +1052,30 @@ class RoomEventSource(object):
limit=limit,
)
else:
- events, end_key = yield self.store.get_room_events_stream(
- user_id=user.to_string(),
+ room_events = yield self.store.get_membership_changes_for_user(
+ user.to_string(), from_key, to_key
+ )
+
+ room_to_events = yield self.store.get_room_events_stream_for_rooms(
+ room_ids=room_ids,
from_key=from_key,
to_key=to_key,
- limit=limit,
- room_ids=room_ids,
- is_guest=is_guest,
+ limit=limit or 10,
)
+ events = list(room_events)
+ events.extend(e for evs, _ in room_to_events.values() for e in evs)
+
+ events.sort(key=lambda e: e.internal_metadata.order)
+
+ if limit:
+ events[:] = events[:limit]
+
+ if events:
+ end_key = events[-1].internal_metadata.after
+ else:
+ end_key = to_key
+
defer.returnValue((events, end_key))
def get_current_key(self, direction='f'):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 075566417f..1d0f0058a2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,11 +18,14 @@ from ._base import BaseHandler
from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.metrics import Measure
from twisted.internet import defer
import collections
import logging
+import itertools
logger = logging.getLogger(__name__)
@@ -139,6 +142,15 @@ class SyncHandler(BaseHandler):
A Deferred SyncResult.
"""
+ context = LoggingContext.current_context()
+ if context:
+ if since_token is None:
+ context.tag = "initial_sync"
+ elif full_state:
+ context.tag = "full_state_sync"
+ else:
+ context.tag = "incremental_sync"
+
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
@@ -167,18 +179,6 @@ class SyncHandler(BaseHandler):
else:
return self.incremental_sync_with_gap(sync_config, since_token)
- def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
- if room_id not in ephemeral_by_room:
- return None
- for e in ephemeral_by_room[room_id]:
- if e['type'] != 'm.receipt':
- continue
- for receipt_event_id, val in e['content'].items():
- if 'm.read' in val:
- if user_id in val['m.read']:
- return receipt_event_id
- return None
-
@defer.inlineCallbacks
def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state.
@@ -228,44 +228,51 @@ class SyncHandler(BaseHandler):
invited = []
archived = []
deferreds = []
- for event in room_list:
- if event.membership == Membership.JOIN:
- room_sync_deferred = self.full_state_sync_for_joined_room(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(joined.append)
- deferreds.append(room_sync_deferred)
- elif event.membership == Membership.INVITE:
- invite = yield self.store.get_event(event.event_id)
- invited.append(InvitedSyncResult(
- room_id=event.room_id,
- invite=invite,
- ))
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
- )
- room_sync_deferred = self.full_state_sync_for_archived_room(
- sync_config=sync_config,
- room_id=event.room_id,
- leave_event_id=event.event_id,
- leave_token=leave_token,
- timeline_since_token=timeline_since_token,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(archived.append)
- deferreds.append(room_sync_deferred)
- yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
+ for room_list_chunk in room_list_chunks:
+ for event in room_list_chunk:
+ if event.membership == Membership.JOIN:
+ room_sync_deferred = preserve_fn(
+ self.full_state_sync_for_joined_room
+ )(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ room_sync_deferred.addCallback(joined.append)
+ deferreds.append(room_sync_deferred)
+ elif event.membership == Membership.INVITE:
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_sync_deferred = preserve_fn(
+ self.full_state_sync_for_archived_room
+ )(
+ sync_config=sync_config,
+ room_id=event.room_id,
+ leave_event_id=event.event_id,
+ leave_token=leave_token,
+ timeline_since_token=timeline_since_token,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ room_sync_deferred.addCallback(archived.append)
+ deferreds.append(room_sync_deferred)
+
+ yield defer.gatherResults(
+ deferreds, consumeErrors=True
+ ).addErrback(unwrapFirstError)
account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data)
@@ -305,7 +312,6 @@ class SyncHandler(BaseHandler):
ephemeral_by_room=ephemeral_by_room,
tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room,
- all_ephemeral_by_room=ephemeral_by_room,
batch=batch,
full_state=True,
)
@@ -355,50 +361,51 @@ class SyncHandler(BaseHandler):
typing events for that room.
"""
- typing_key = since_token.typing_key if since_token else "0"
-
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
-
- typing_source = self.event_sources.sources["typing"]
- typing, typing_key = yield typing_source.get_new_events(
- user=sync_config.user,
- from_key=typing_key,
- limit=sync_config.filter_collection.ephemeral_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("typing_key", typing_key)
-
- ephemeral_by_room = {}
-
- for event in typing:
- # we want to exclude the room_id from the event, but modifying the
- # result returned by the event source is poor form (it might cache
- # the object)
- room_id = event["room_id"]
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+ with Measure(self.clock, "ephemeral_by_room"):
+ typing_key = since_token.typing_key if since_token else "0"
- receipt_key = since_token.receipt_key if since_token else "0"
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
- receipt_source = self.event_sources.sources["receipt"]
- receipts, receipt_key = yield receipt_source.get_new_events(
- user=sync_config.user,
- from_key=receipt_key,
- limit=sync_config.filter_collection.ephemeral_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+ typing_source = self.event_sources.sources["typing"]
+ typing, typing_key = yield typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=typing_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+ ephemeral_by_room = {}
+
+ for event in typing:
+ # we want to exclude the room_id from the event, but modifying the
+ # result returned by the event source is poor form (it might cache
+ # the object)
+ room_id = event["room_id"]
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ receipt_key = since_token.receipt_key if since_token else "0"
+
+ receipt_source = self.event_sources.sources["receipt"]
+ receipts, receipt_key = yield receipt_source.get_new_events(
+ user=sync_config.user,
+ from_key=receipt_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("receipt_key", receipt_key)
- for event in receipts:
- room_id = event["room_id"]
- # exclude room id, as above
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+ for event in receipts:
+ room_id = event["room_id"]
+ # exclude room id, as above
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
defer.returnValue((now_token, ephemeral_by_room))
@@ -438,13 +445,6 @@ class SyncHandler(BaseHandler):
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
- # We now fetch all ephemeral events for this room in order to get
- # this users current read receipt. This could almost certainly be
- # optimised.
- _, all_ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token
- )
-
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token
)
@@ -478,7 +478,7 @@ class SyncHandler(BaseHandler):
)
# Get a list of membership change events that have happened.
- rooms_changed = yield self.store.get_room_changes_for_user(
+ rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
)
@@ -576,7 +576,6 @@ class SyncHandler(BaseHandler):
ephemeral_by_room=ephemeral_by_room,
tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room,
- all_ephemeral_by_room=all_ephemeral_by_room,
batch=batch,
full_state=full_state,
)
@@ -606,58 +605,64 @@ class SyncHandler(BaseHandler):
"""
:returns a Deferred TimelineBatch
"""
- filtering_factor = 2
- timeline_limit = sync_config.filter_collection.timeline_limit()
- load_limit = max(timeline_limit * filtering_factor, 10)
- max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
- end_key = room_key
-
- limited = recents is None or newly_joined_room or timeline_limit < len(recents)
-
- if recents is not None:
- recents = sync_config.filter_collection.filter_room_timeline(recents)
- recents = yield self._filter_events_for_client(
- sync_config.user.to_string(),
- recents,
- is_peeking=sync_config.is_guest,
- )
- else:
- recents = []
-
- since_key = None
- if since_token and not newly_joined_room:
- since_key = since_token.room_key
-
- while limited and len(recents) < timeline_limit and max_repeat:
- events, end_key = yield self.store.get_room_events_stream_for_room(
- room_id,
- limit=load_limit + 1,
- from_key=since_key,
- to_key=end_key,
- )
- loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
- loaded_recents = yield self._filter_events_for_client(
- sync_config.user.to_string(),
- loaded_recents,
- is_peeking=sync_config.is_guest,
- )
- loaded_recents.extend(recents)
- recents = loaded_recents
-
- if len(events) <= load_limit:
+ with Measure(self.clock, "load_filtered_recents"):
+ filtering_factor = 2
+ timeline_limit = sync_config.filter_collection.timeline_limit()
+ load_limit = max(timeline_limit * filtering_factor, 10)
+ max_repeat = 5 # Only try a few times per room, otherwise
+ room_key = now_token.room_key
+ end_key = room_key
+
+ if recents is None or newly_joined_room or timeline_limit < len(recents):
+ limited = True
+ else:
limited = False
- break
- max_repeat -= 1
- if len(recents) > timeline_limit:
- limited = True
- recents = recents[-timeline_limit:]
- room_key = recents[0].internal_metadata.before
+ if recents is not None:
+ recents = sync_config.filter_collection.filter_room_timeline(recents)
+ recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ recents,
+ is_peeking=sync_config.is_guest,
+ )
+ else:
+ recents = []
+
+ since_key = None
+ if since_token and not newly_joined_room:
+ since_key = since_token.room_key
+
+ while limited and len(recents) < timeline_limit and max_repeat:
+ events, end_key = yield self.store.get_room_events_stream_for_room(
+ room_id,
+ limit=load_limit + 1,
+ from_key=since_key,
+ to_key=end_key,
+ )
+ loaded_recents = sync_config.filter_collection.filter_room_timeline(
+ events
+ )
+ loaded_recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ loaded_recents,
+ is_peeking=sync_config.is_guest,
+ )
+ loaded_recents.extend(recents)
+ recents = loaded_recents
- prev_batch_token = now_token.copy_and_replace(
- "room_key", room_key
- )
+ if len(events) <= load_limit:
+ limited = False
+ break
+ max_repeat -= 1
+
+ if len(recents) > timeline_limit:
+ limited = True
+ recents = recents[-timeline_limit:]
+ room_key = recents[0].internal_metadata.before
+
+ prev_batch_token = now_token.copy_and_replace(
+ "room_key", room_key
+ )
defer.returnValue(TimelineBatch(
events=recents,
@@ -670,37 +675,11 @@ class SyncHandler(BaseHandler):
since_token, now_token,
ephemeral_by_room, tags_by_room,
account_data_by_room,
- all_ephemeral_by_room,
batch, full_state=False):
- if full_state:
- state = yield self.get_state_at(room_id, now_token)
-
- elif batch.limited:
- current_state = yield self.get_state_at(room_id, now_token)
-
- state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
- )
-
- state = yield self.compute_state_delta(
- since_token=since_token,
- previous_state=state_at_previous_sync,
- current_state=current_state,
- )
- else:
- state = {
- (event.type, event.state_key): event
- for event in batch.events if event.is_state()
- }
-
- just_joined = yield self.check_joined_room(sync_config, state)
- if just_joined:
- state = yield self.get_state_at(room_id, now_token)
-
- state = {
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(state.values())
- }
+ state = yield self.compute_state_delta(
+ room_id, batch, sync_config, since_token, now_token,
+ full_state=full_state
+ )
account_data = self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
@@ -726,14 +705,12 @@ class SyncHandler(BaseHandler):
if room_sync:
notifs = yield self.unread_notifs_for_room_id(
- room_id, sync_config, all_ephemeral_by_room
+ room_id, sync_config
)
if notifs is not None:
- unread_notifications["notification_count"] = len(notifs)
- unread_notifications["highlight_count"] = len([
- 1 for notif in notifs if _action_has_highlight(notif["actions"])
- ])
+ unread_notifications["notification_count"] = notifs["notify_count"]
+ unread_notifications["highlight_count"] = notifs["highlight_count"]
logger.debug("Room sync: %r", room_sync)
@@ -766,30 +743,11 @@ class SyncHandler(BaseHandler):
logger.debug("Recents %r", batch)
- state_events_at_leave = yield self.store.get_state_for_event(
- leave_event_id
+ state_events_delta = yield self.compute_state_delta(
+ room_id, batch, sync_config, since_token, leave_token,
+ full_state=full_state
)
- if not full_state:
- state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
- )
-
- state_events_delta = yield self.compute_state_delta(
- since_token=since_token,
- previous_state=state_at_previous_sync,
- current_state=state_events_at_leave,
- )
- else:
- state_events_delta = state_events_at_leave
-
- state_events_delta = {
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(
- state_events_delta.values()
- )
- }
-
account_data = self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
)
@@ -843,15 +801,19 @@ class SyncHandler(BaseHandler):
state = {}
defer.returnValue(state)
- def compute_state_delta(self, since_token, previous_state, current_state):
- """ Works out the differnce in state between the current state and the
- state the client got when it last performed a sync.
-
- :param str since_token: the point we are comparing against
- :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
- state to compare to
- :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
- new state
+ @defer.inlineCallbacks
+ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
+ full_state):
+ """ Works out the differnce in state between the start of the timeline
+ and the previous sync.
+
+ :param str room_id
+ :param TimelineBatch batch: The timeline batch for the room that will
+ be sent to the user.
+ :param sync_config
+ :param str since_token: Token of the end of the previous batch. May be None.
+ :param str now_token: Token of the end of the current batch.
+ :param bool full_state: Whether to force returning the full state.
:returns A new event dictionary
"""
@@ -860,12 +822,53 @@ class SyncHandler(BaseHandler):
# updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
- state_delta = {}
- for key, event in current_state.iteritems():
- if (key not in previous_state or
- previous_state[key].event_id != event.event_id):
- state_delta[key] = event
- return state_delta
+ with Measure(self.clock, "compute_state_delta"):
+ if full_state:
+ if batch:
+ state = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
+ else:
+ state = yield self.get_state_at(
+ room_id, stream_position=now_token
+ )
+
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
+
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state,
+ previous={},
+ )
+ elif batch.limited:
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
+ )
+
+ state_at_timeline_start = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
+
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
+
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ previous=state_at_previous_sync,
+ )
+ else:
+ state = {}
+
+ defer.returnValue({
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(state.values())
+ })
def check_joined_room(self, sync_config, state_delta):
"""
@@ -886,21 +889,24 @@ class SyncHandler(BaseHandler):
return False
@defer.inlineCallbacks
- def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
- last_unread_event_id = self.last_read_event_id_for_room_and_user(
- room_id, sync_config.user.to_string(), ephemeral_by_room
- )
-
- notifs = []
- if last_unread_event_id:
- notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
- room_id, sync_config.user.to_string(), last_unread_event_id
+ def unread_notifs_for_room_id(self, room_id, sync_config):
+ with Measure(self.clock, "unread_notifs_for_room_id"):
+ last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+ user_id=sync_config.user.to_string(),
+ room_id=room_id,
+ receipt_type="m.read"
)
- defer.returnValue(notifs)
- # There is no new information in this period, so your notification
- # count is whatever it was last time.
- defer.returnValue(None)
+ notifs = []
+ if last_unread_event_id:
+ notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+ room_id, sync_config.user.to_string(), last_unread_event_id
+ )
+ defer.returnValue(notifs)
+
+ # There is no new information in this period, so your notification
+ # count is whatever it was last time.
+ defer.returnValue(None)
def _action_has_highlight(actions):
@@ -912,3 +918,37 @@ def _action_has_highlight(actions):
pass
return False
+
+
+def _calculate_state(timeline_contains, timeline_start, previous):
+ """Works out what state to include in a sync response.
+
+ Args:
+ timeline_contains (dict): state in the timeline
+ timeline_start (dict): state at the start of the timeline
+ previous (dict): state at the end of the previous sync (or empty dict
+ if this is an initial sync)
+
+ Returns:
+ dict
+ """
+ event_id_to_state = {
+ e.event_id: e
+ for e in itertools.chain(
+ timeline_contains.values(),
+ previous.values(),
+ timeline_start.values(),
+ )
+ }
+
+ tc_ids = set(e.event_id for e in timeline_contains.values())
+ p_ids = set(e.event_id for e in previous.values())
+ ts_ids = set(e.event_id for e in timeline_start.values())
+
+ state_ids = (ts_ids - p_ids) - tc_ids
+
+ evs = (event_id_to_state[e] for e in state_ids)
+ return {
+ (e.type, e.state_key): e
+ for e in evs
+ }
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 43bf600913..b16d0017df 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,6 +19,7 @@ from ._base import BaseHandler
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.metrics import Measure
from synapse.types import UserID
import logging
@@ -222,6 +223,7 @@ class TypingNotificationHandler(BaseHandler):
class TypingNotificationEventSource(object):
def __init__(self, hs):
self.hs = hs
+ self.clock = hs.get_clock()
self._handler = None
self._room_member_handler = None
@@ -247,19 +249,20 @@ class TypingNotificationEventSource(object):
}
def get_new_events(self, from_key, room_ids, **kwargs):
- from_key = int(from_key)
- handler = self.handler()
+ with Measure(self.clock, "typing.get_new_events"):
+ from_key = int(from_key)
+ handler = self.handler()
- events = []
- for room_id in room_ids:
- if room_id not in handler._room_serials:
- continue
- if handler._room_serials[room_id] <= from_key:
- continue
+ events = []
+ for room_id in room_ids:
+ if room_id not in handler._room_serials:
+ continue
+ if handler._room_serials[room_id] <= from_key:
+ continue
- events.append(self._make_event_for(room_id))
+ events.append(self._make_event_for(room_id))
- return events, handler._latest_room_serial
+ return events, handler._latest_room_serial
def get_current_key(self):
return self.handler()._latest_room_serial
|