diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2a5eab124f..329e3c7d71 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -22,7 +22,7 @@ import bcrypt
import pymacaroons
from canonicaljson import json
-from twisted.internet import defer, threads
+from twisted.internet import defer
from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
@@ -37,8 +37,8 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
+from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable
from ._base import BaseHandler
@@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
- return make_deferred_yieldable(
- threads.deferToThreadPool(
- self.hs.get_reactor(),
- self.hs.get_reactor().getThreadPool(),
- _do_validate_hash,
- ),
- )
+ return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b078df4a76..75fe50c42c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
-from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
None
"""
if not self._user_parter_running:
- run_in_background(self._user_parter_loop)
+ run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 18741c5fac..7d67bf803a 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -43,6 +43,7 @@ class DirectoryHandler(BaseHandler):
self.state = hs.get_state_handler()
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
+ self.config = hs.config
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -80,42 +81,68 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def create_association(self, user_id, room_alias, room_id, servers=None):
- # association creation for human users
- # TODO(erikj): Do user auth.
+ def create_association(self, requester, room_alias, room_id, servers=None,
+ send_event=True):
+ """Attempt to create a new alias
- if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
- raise SynapseError(
- 403, "This user is not permitted to create this alias",
- )
+ Args:
+ requester (Requester)
+ room_alias (RoomAlias)
+ room_id (str)
+ servers (list[str]|None): List of servers that others servers
+ should try and join via
+ send_event (bool): Whether to send an updated m.room.aliases event
- can_create = yield self.can_modify_alias(
- room_alias,
- user_id=user_id
- )
- if not can_create:
- raise SynapseError(
- 400, "This alias is reserved by an application service.",
- errcode=Codes.EXCLUSIVE
- )
- yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ Returns:
+ Deferred
+ """
- @defer.inlineCallbacks
- def create_appservice_association(self, service, room_alias, room_id,
- servers=None):
- if not service.is_interested_in_alias(room_alias.to_string()):
- raise SynapseError(
- 400, "This application service has not reserved"
- " this kind of alias.", errcode=Codes.EXCLUSIVE
+ user_id = requester.user.to_string()
+
+ service = requester.app_service
+ if service:
+ if not service.is_interested_in_alias(room_alias.to_string()):
+ raise SynapseError(
+ 400, "This application service has not reserved"
+ " this kind of alias.", errcode=Codes.EXCLUSIVE
+ )
+ else:
+ if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+ raise AuthError(
+ 403, "This user is not permitted to create this alias",
+ )
+
+ if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()):
+ # Lets just return a generic message, as there may be all sorts of
+ # reasons why we said no. TODO: Allow configurable error messages
+ # per alias creation rule?
+ raise SynapseError(
+ 403, "Not allowed to create alias",
+ )
+
+ can_create = yield self.can_modify_alias(
+ room_alias,
+ user_id=user_id
)
+ if not can_create:
+ raise AuthError(
+ 400, "This alias is reserved by an application service.",
+ errcode=Codes.EXCLUSIVE
+ )
- # association creation for app services
- yield self._create_association(room_alias, room_id, servers)
+ yield self._create_association(room_alias, room_id, servers, creator=user_id)
+ if send_event:
+ yield self.send_room_alias_update_event(
+ requester,
+ room_id
+ )
@defer.inlineCallbacks
- def delete_association(self, requester, user_id, room_alias):
+ def delete_association(self, requester, room_alias):
# association deletion for human users
+ user_id = requester.user.to_string()
+
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
@@ -143,7 +170,6 @@ class DirectoryHandler(BaseHandler):
try:
yield self.send_room_alias_update_event(
requester,
- requester.user.to_string(),
room_id
)
@@ -261,7 +287,7 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
- def send_room_alias_update_event(self, requester, user_id, room_id):
+ def send_room_alias_update_event(self, requester, room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
@@ -270,7 +296,7 @@ class DirectoryHandler(BaseHandler):
"type": EventTypes.Aliases,
"state_key": self.hs.hostname,
"room_id": room_id,
- "sender": user_id,
+ "sender": requester.user.to_string(),
"content": {"aliases": aliases},
},
ratelimit=False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index cab57a8849..cd5b9bbb19 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -53,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
-from synapse.state import resolve_events_with_factory
+from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
@@ -384,24 +384,24 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
- # Resolve any conflicting state
- @defer.inlineCallbacks
- def fetch(ev_ids):
- fetched = yield self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
- # add any events we fetch here to the `event_map` so that we
- # can use them to build the state event list below.
- event_map.update(fetched)
- defer.returnValue(fetched)
-
room_version = yield self.store.get_room_version(room_id)
- state_map = yield resolve_events_with_factory(
- room_version, state_maps, event_map, fetch,
+ state_map = yield resolve_events_with_store(
+ room_version, state_maps, event_map,
+ state_res_store=StateResolutionStore(self.store),
)
- # we need to give _process_received_pdu the actual state events
+ # We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
+
+ # First though we need to fetch all the events that are in
+ # state_map, so we can build up the state below.
+ evs = yield self.store.get_events(
+ list(state_map.values()),
+ get_prev_content=False,
+ check_redacted=False,
+ )
+ event_map.update(evs)
+
state = [
event_map[e] for e in six.itervalues(state_map)
]
@@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
- self.pusher_pool.on_new_notifications(
+ return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 53e5e2648b..173315af6c 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@@ -37,9 +37,23 @@ def _create_rerouter(func_name):
)
else:
destination = get_domain_from_id(group_id)
- return getattr(self.transport_client, func_name)(
+ d = getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
+
+ # Capture errors returned by the remote homeserver and
+ # re-throw specific errors as SynapseErrors. This is so
+ # when the remote end responds with things like 403 Not
+ # In Group, we can communicate that to the client instead
+ # of a 500.
+ def h(failure):
+ failure.trap(HttpResponseException)
+ e = failure.value
+ if e.code == 403:
+ raise e.to_synapse_error()
+ return failure
+ d.addErrback(h)
+ return d
return f
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e009395207..563bb3cea3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -156,7 +156,7 @@ class InitialSyncHandler(BaseHandler):
room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = run_in_background(
self.store.get_state_for_events,
- [event.event_id], None,
+ [event.event_id],
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -301,7 +301,7 @@ class InitialSyncHandler(BaseHandler):
def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
membership, member_event_id, is_peeking):
room_state = yield self.store.get_state_for_events(
- [member_event_id], None
+ [member_event_id],
)
room_state = room_state[member_event_id]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4954b23a0d..969e588e73 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -35,6 +35,7 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -80,7 +81,7 @@ class MessageHandler(object):
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
- [membership_event_id], [key]
+ [membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
@@ -88,7 +89,7 @@ class MessageHandler(object):
@defer.inlineCallbacks
def get_state_events(
- self, user_id, room_id, types=None, filtered_types=None,
+ self, user_id, room_id, state_filter=StateFilter.all(),
at_token=None, is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
@@ -100,13 +101,8 @@ class MessageHandler(object):
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
@@ -139,7 +135,7 @@ class MessageHandler(object):
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
- [event.event_id], types, filtered_types=filtered_types,
+ [event.event_id], state_filter=state_filter,
)
room_state = room_state[event.event_id]
else:
@@ -158,12 +154,12 @@ class MessageHandler(object):
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
- room_id, types, filtered_types=filtered_types,
+ room_id, state_filter=state_filter,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
- [membership_event_id], types, filtered_types=filtered_types,
+ [membership_event_id], state_filter=state_filter,
)
room_state = room_state[membership_event_id]
@@ -779,7 +775,7 @@ class EventCreationHandler(object):
event, context=context
)
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a155b6e938..43f81bd607 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -21,6 +21,7 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.logcontext import run_in_background
@@ -255,16 +256,14 @@ class PaginationHandler(object):
if event_filter and event_filter.lazy_load_members():
# TODO: remove redundant members
- types = [
- (EventTypes.Member, state_key)
- for state_key in set(
- event.sender # FIXME: we also care about invite targets etc.
- for event in events
- )
- ]
+ # FIXME: we also care about invite targets etc.
+ state_filter = StateFilter.from_types(
+ (EventTypes.Member, event.sender)
+ for event in events
+ )
state_ids = yield self.store.get_state_ids_for_event(
- events[0].event_id, types=types,
+ events[0].event_id, state_filter=state_filter,
)
if state_ids:
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a6f3181f09..4c2690ba26 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
+ yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index da914c46ff..e9d7b25a36 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -50,6 +50,7 @@ class RegistrationHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
+ self.room_creation_handler = self.hs.get_room_creation_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self._next_generated_user_id = None
@@ -220,9 +221,36 @@ class RegistrationHandler(BaseHandler):
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
+
+ # try to create the room if we're the first user on the server
+ should_auto_create_rooms = False
+ if self.hs.config.autocreate_auto_join_rooms:
+ count = yield self.store.count_all_users()
+ should_auto_create_rooms = count == 1
+
for r in self.hs.config.auto_join_rooms:
try:
- yield self._join_user_to_room(fake_requester, r)
+ if should_auto_create_rooms:
+ room_alias = RoomAlias.from_string(r)
+ if self.hs.hostname != room_alias.domain:
+ logger.warning(
+ 'Cannot create room alias %s, '
+ 'it does not match server domain',
+ r,
+ )
+ else:
+ # create room expects the localpart of the room alias
+ room_alias_localpart = room_alias.localpart
+ yield self.room_creation_handler.create_room(
+ fake_requester,
+ config={
+ "preset": "public_chat",
+ "room_alias_name": room_alias_localpart
+ },
+ ratelimit=False,
+ )
+ else:
+ yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c3f820b975..3ba92bdb4c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -33,6 +33,7 @@ from synapse.api.constants import (
RoomCreationPreset,
)
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@@ -190,10 +191,11 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
- user_id=user_id,
+ requester=requester,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
+ send_event=False,
)
preset_config = config.get(
@@ -289,7 +291,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
yield directory_handler.send_room_alias_update_event(
- requester, user_id, room_id
+ requester, room_id
)
defer.returnValue(result)
@@ -488,23 +490,24 @@ class RoomContextHandler(object):
else:
last_event_id = event_id
- types = None
- filtered_types = None
if event_filter and event_filter.lazy_load_members():
- members = set(ev.sender for ev in itertools.chain(
- results["events_before"],
- (results["event"],),
- results["events_after"],
- ))
- filtered_types = [EventTypes.Member]
- types = [(EventTypes.Member, member) for member in members]
+ state_filter = StateFilter.from_lazy_load_member_list(
+ ev.sender
+ for ev in itertools.chain(
+ results["events_before"],
+ (results["event"],),
+ results["events_after"],
+ )
+ )
+ else:
+ state_filter = StateFilter.all()
# XXX: why do we return the state as of the last event rather than the
# first? Shouldn't we be consistent with /sync?
# https://github.com/matrix-org/matrix-doc/issues/687
state = yield self.store.get_state_for_events(
- [last_event_id], types, filtered_types=filtered_types,
+ [last_event_id], state_filter=state_filter,
)
results["state"] = list(state[last_event_id].values())
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 38e1737ec9..dc88620885 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,7 +16,7 @@
import logging
from collections import namedtuple
-from six import iteritems
+from six import PY3, iteritems
from six.moves import range
import msgpack
@@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
@classmethod
def from_token(cls, token):
+ if PY3:
+ # The argument raw=False is only available on new versions of
+ # msgpack, and only really needed on Python 3. Gate it behind
+ # a PY3 check to avoid causing issues on Debian-packaged versions.
+ decoded = msgpack.loads(decode_base64(token), raw=False)
+ else:
+ decoded = msgpack.loads(decode_base64(token))
return RoomListNextBatch(**{
cls.REVERSE_KEY_DICT[key]: val
- for key, val in msgpack.loads(decode_base64(token)).items()
+ for key, val in decoded.items()
})
def to_token(self):
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 0c1d52fd11..80e7b15de8 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -324,9 +325,12 @@ class SearchHandler(BaseHandler):
else:
last_event_id = event.event_id
+ state_filter = StateFilter.from_types(
+ [(EventTypes.Member, sender) for sender in senders]
+ )
+
state = yield self.store.get_state_for_event(
- last_event_id,
- types=[(EventTypes.Member, sender) for sender in senders]
+ last_event_id, state_filter
)
res["profile_info"] = {
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 351892a94f..09739f2862 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -27,6 +27,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
+from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -469,25 +470,20 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event, types=None, filtered_types=None):
+ def get_state_after_event(self, event, state_filter=StateFilter.all()):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
state_ids = yield self.store.get_state_ids_for_event(
- event.event_id, types, filtered_types=filtered_types,
+ event.event_id, state_filter=state_filter,
)
if event.is_state():
state_ids = state_ids.copy()
@@ -495,18 +491,14 @@ class SyncHandler(object):
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
+ def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
@@ -522,7 +514,7 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
state = yield self.get_state_after_event(
- last_event, types, filtered_types=filtered_types,
+ last_event, state_filter=state_filter,
)
else:
@@ -563,10 +555,11 @@ class SyncHandler(object):
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
- last_event.event_id, [
+ last_event.event_id,
+ state_filter=StateFilter.from_types([
(EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''),
- ]
+ ]),
)
# this is heavily cached, thus: fast.
@@ -717,8 +710,7 @@ class SyncHandler(object):
with Measure(self.clock, "compute_state_delta"):
- types = None
- filtered_types = None
+ members_to_fetch = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@@ -729,16 +721,21 @@ class SyncHandler(object):
# We only request state for the members needed to display the
# timeline:
- types = [
- (EventTypes.Member, state_key)
- for state_key in set(
- event.sender # FIXME: we also care about invite targets etc.
- for event in batch.events
- )
- ]
+ members_to_fetch = set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in batch.events
+ )
+
+ if full_state:
+ # always make sure we LL ourselves so we know we're in the room
+ # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+ # We only need apply this on full state syncs given we disabled
+ # LL for incr syncs in #3840.
+ members_to_fetch.add(sync_config.user.to_string())
- # only apply the filtering to room members
- filtered_types = [EventTypes.Member]
+ state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+ else:
+ state_filter = StateFilter.all()
timeline_state = {
(event.type, event.state_key): event.event_id
@@ -746,28 +743,19 @@ class SyncHandler(object):
}
if full_state:
- if lazy_load_members:
- # always make sure we LL ourselves so we know we're in the room
- # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
- # We only need apply this on full state syncs given we disabled
- # LL for incr syncs in #3840.
- types.append((EventTypes.Member, sync_config.user.to_string()))
-
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[-1].event_id, state_filter=state_filter,
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[0].event_id, state_filter=state_filter,
)
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token, types=types,
- filtered_types=filtered_types,
+ room_id, stream_position=now_token,
+ state_filter=state_filter,
)
state_ids = current_state_ids
@@ -781,8 +769,7 @@ class SyncHandler(object):
)
elif batch.limited:
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[0].event_id, state_filter=state_filter,
)
# for now, we disable LL for gappy syncs - see
@@ -797,17 +784,15 @@ class SyncHandler(object):
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
- types = None
- filtered_types = None
+ state_filter = StateFilter.all()
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token, types=types,
- filtered_types=filtered_types,
+ room_id, stream_position=since_token,
+ state_filter=state_filter,
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[-1].event_id, state_filter=state_filter,
)
state_ids = _calculate_state(
@@ -821,7 +806,7 @@ class SyncHandler(object):
else:
state_ids = {}
if lazy_load_members:
- if types and batch.events:
+ if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
@@ -831,8 +816,12 @@ class SyncHandler(object):
# timeline here, and then dedupe any redundant ones below.
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, types=types,
- filtered_types=None, # we only want members!
+ batch.events[0].event_id,
+ # we only want members!
+ state_filter=StateFilter.from_types(
+ (EventTypes.Member, member)
+ for member in members_to_fetch
+ ),
)
if lazy_load_members and not include_redundant_members:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d8413d6aa7..f11b430126 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,6 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
@@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
"""
return self.store.search_user_dir(user_id, search_term, limit)
- @defer.inlineCallbacks
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
@@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
if self._is_processing:
return
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
self._is_processing = True
- try:
- yield self._unsafe_process()
- finally:
- self._is_processing = False
+ run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):
|