diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7ea8ce9f94..4a81bd2ba9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
- yield self.auth.check_auth_blocking()
+ yield self.auth.check_auth_blocking(user_id)
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@@ -734,7 +734,6 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
- yield self.auth.check_auth_blocking()
auth_api = self.hs.get_auth()
user_id = None
try:
@@ -743,6 +742,7 @@ class AuthHandler(BaseHandler):
auth_api.validate_macaroon(macaroon, "login", True, user_id)
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+ yield self.auth.check_auth_blocking(user_id)
defer.returnValue(user_id)
@defer.inlineCallbacks
@@ -828,12 +828,26 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def delete_threepid(self, user_id, medium, address):
+ """Attempts to unbind the 3pid on the identity servers and deletes it
+ from the local database.
+
+ Args:
+ user_id (str)
+ medium (str)
+ address (str)
+
+ Returns:
+ Deferred[bool]: Returns True if successfully unbound the 3pid on
+ the identity server, False if identity server doesn't support the
+ unbind API.
+ """
+
# 'Canonicalise' email addresses as per above
if medium == 'email':
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
- yield identity_handler.unbind_threepid(
+ result = yield identity_handler.try_unbind_threepid(
user_id,
{
'medium': medium,
@@ -841,10 +855,10 @@ class AuthHandler(BaseHandler):
},
)
- ret = yield self.store.user_delete_threepid(
+ yield self.store.user_delete_threepid(
user_id, medium, address,
)
- defer.returnValue(ret)
+ defer.returnValue(result)
def _save_session(self, session):
# TODO: Persistent storage
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index b3c5a9ee64..b078df4a76 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
erase_data (bool): whether to GDPR-erase the user's data
Returns:
- Deferred
+ Deferred[bool]: True if identity server supports removing
+ threepids, otherwise False.
"""
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
@@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
# leave the user still active so they can try again.
# Ideally we would prevent password resets and then do this in the
# background thread.
+
+ # This will be set to false if the identity server doesn't support
+ # unbinding
+ identity_server_supports_unbinding = True
+
threepids = yield self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
- yield self._identity_handler.unbind_threepid(
+ result = yield self._identity_handler.try_unbind_threepid(
user_id,
{
'medium': threepid['medium'],
'address': threepid['address'],
},
)
+ identity_server_supports_unbinding &= result
except Exception:
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
@@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
# parts users from rooms (if it isn't already running)
self._start_user_parting()
+ defer.returnValue(identity_server_supports_unbinding)
+
def _start_user_parting(self):
"""
Start the process that goes through the table of users
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2d44f15da3..9e017116a9 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import FederationDeniedError
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 14a10c1229..0ebf0fd188 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -49,10 +49,15 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+ ReplicationCleanRoomRestServlet,
+ ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
@@ -91,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+
+ self._send_events_to_master = (
+ ReplicationFederationSendEventsRestServlet.make_client(hs)
+ )
+ self._notify_user_membership_change = (
+ ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+ )
+ self._clean_room_for_join_client = (
+ ReplicationCleanRoomRestServlet.make_client(hs)
+ )
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -1159,7 +1176,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1190,7 +1207,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1433,7 +1450,7 @@ class FederationHandler(BaseHandler):
event, context
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
@@ -1471,7 +1488,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1559,7 +1576,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1570,7 +1587,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, new_event_context)],
)
@@ -2301,7 +2318,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
- response = yield self.hs.get_simple_http_client().get_json(
+ response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@@ -2314,7 +2331,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
- def _persist_events(self, event_and_contexts, backfilled=False):
+ def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -2326,14 +2343,21 @@ class FederationHandler(BaseHandler):
Returns:
Deferred
"""
- max_stream_id = yield self.store.persist_events(
- event_and_contexts,
- backfilled=backfilled,
- )
+ if self.config.worker_app:
+ yield self._send_events_to_master(
+ store=self.store,
+ event_and_contexts=event_and_contexts,
+ backfilled=backfilled
+ )
+ else:
+ max_stream_id = yield self.store.persist_events(
+ event_and_contexts,
+ backfilled=backfilled,
+ )
- if not backfilled: # Never notify for backfilled events
- for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
+ if not backfilled: # Never notify for backfilled events
+ for event, _ in event_and_contexts:
+ self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@@ -2366,15 +2390,30 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
- logcontext.run_in_background(
- self.pusher_pool.on_new_notifications,
+ self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
def _clean_room_for_join(self, room_id):
- return self.store.clean_room_for_join(room_id)
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Args:
+ room_id (str)
+ """
+ if self.config.worker_app:
+ return self._clean_room_for_join_client(room_id)
+ else:
+ return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
- return user_joined_room(self.distributor, user, room_id)
+ if self.config.worker_app:
+ return self._notify_user_membership_change(
+ room_id=room_id,
+ user_id=user.to_string(),
+ change="joined",
+ )
+ else:
+ return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 1d36d967c3..5feb3f22a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def unbind_threepid(self, mxid, threepid):
- """
- Removes a binding from an identity server
+ def try_unbind_threepid(self, mxid, threepid):
+ """Removes a binding from an identity server
+
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
+ Raises:
+ SynapseError: If we failed to contact the identity server
+
Returns:
- Deferred[bool]: True on success, otherwise False
+ Deferred[bool]: True on success, otherwise False if the identity
+ server doesn't support unbinding
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
@@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
content=content,
destination_is=id_server,
)
- yield self.http_client.post_json_get_json(
- url,
- content,
- headers,
- )
+ try:
+ yield self.http_client.post_json_get_json(
+ url,
+ content,
+ headers,
+ )
+ except HttpResponseException as e:
+ if e.code in (400, 404, 501,):
+ # The remote server probably doesn't support unbinding (yet)
+ logger.warn("Received %d response while unbinding threepid", e.code)
+ defer.returnValue(False)
+ else:
+ logger.error("Failed to unbind threepid on identity server: %s", e)
+ raise SynapseError(502, "Failed to contact identity server")
+
defer.returnValue(True)
@defer.inlineCallbacks
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 40e7580a61..e009395207 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client
@@ -372,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
+ # If presence is disabled, return an empty list
+ if not self.hs.config.use_presence:
+ defer.returnValue([])
+
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index bcb093ba3e..e484061cc0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -25,17 +25,24 @@ from twisted.internet import defer
from twisted.internet.defer import succeed
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ Codes,
+ ConsentNotGivenError,
+ NotFoundError,
+ SynapseError,
+)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
+from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -82,28 +89,85 @@ class MessageHandler(object):
defer.returnValue(data)
@defer.inlineCallbacks
- def get_state_events(self, user_id, room_id, is_guest=False):
+ def get_state_events(
+ self, user_id, room_id, types=None, filtered_types=None,
+ at_token=None, is_guest=False,
+ ):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
- left the room return the state events from when they left.
+ left the room return the state events from when they left. If an explicit
+ 'at' parameter is passed, return the state events as of that event, if
+ visible.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
+ at_token(StreamToken|None): the stream token of the at which we are requesting
+ the stats. If the user is not allowed to view the state as of that
+ stream token, we raise a 403 SynapseError. If None, returns the current
+ state based on the current_state_events table.
+ is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
+ Raises:
+ NotFoundError (404) if the at token does not yield an event
+
+ AuthError (403) if the user doesn't have permission to view
+ members of this room.
"""
- membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
- room_id, user_id
- )
+ if at_token:
+ # FIXME this claims to get the state at a stream position, but
+ # get_recent_events_for_room operates by topo ordering. This therefore
+ # does not reliably give you the state at the given stream position.
+ # (https://github.com/matrix-org/synapse/issues/3305)
+ last_events, _ = yield self.store.get_recent_events_for_room(
+ room_id, end_token=at_token.room_key, limit=1,
+ )
- if membership == Membership.JOIN:
- room_state = yield self.state.get_current_state(room_id)
- elif membership == Membership.LEAVE:
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], None
+ if not last_events:
+ raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+ visible_events = yield filter_events_for_client(
+ self.store, user_id, last_events,
)
- room_state = room_state[membership_event_id]
+
+ event = last_events[0]
+ if visible_events:
+ room_state = yield self.store.get_state_for_events(
+ [event.event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[event.event_id]
+ else:
+ raise AuthError(
+ 403,
+ "User %s not allowed to view events in room %s at token %s" % (
+ user_id, room_id, at_token,
+ )
+ )
+ else:
+ membership, membership_event_id = (
+ yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id,
+ )
+ )
+
+ if membership == Membership.JOIN:
+ state_ids = yield self.store.get_filtered_current_state_ids(
+ room_id, types, filtered_types=filtered_types,
+ )
+ room_state = yield self.store.get_events(state_ids.values())
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
@@ -212,10 +276,14 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
-
+ Raises:
+ ResourceLimitError if server is blocked to some resource being
+ exceeded
Returns:
Tuple of created event (FrozenEvent), Context
"""
+ yield self.auth.check_auth_blocking(requester.user.to_string())
+
builder = self.event_builder_factory.new(event_dict)
self.validator.validate_new(builder)
@@ -710,11 +778,8 @@ class EventCreationHandler(object):
event, context=context
)
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id
+ self.pusher_pool.on_new_notifications(
+ event_stream_id, max_stream_id,
)
def _notify():
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index b2849783ed..5170d093e3 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -18,11 +18,11 @@ import logging
from twisted.internet import defer
from twisted.python.failure import Failure
-from synapse.api.constants import Membership
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.types import RoomStreamToken
-from synapse.util.async import ReadWriteLock
+from synapse.util.async_helpers import ReadWriteLock
from synapse.util.logcontext import run_in_background
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -251,6 +251,33 @@ class PaginationHandler(object):
is_peeking=(member_event_id is None),
)
+ state = None
+ if event_filter and event_filter.lazy_load_members():
+ # TODO: remove redundant members
+
+ types = [
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in events
+ )
+ ]
+
+ state_ids = yield self.store.get_state_ids_for_event(
+ events[0].event_id, types=types,
+ )
+
+ if state_ids:
+ state = yield self.store.get_events(list(state_ids.values()))
+
+ if state:
+ state = yield filter_events_for_client(
+ self.store,
+ user_id,
+ state.values(),
+ is_peeking=(member_event_id is None),
+ )
+
time_now = self.clock.time_msec()
chunk = {
@@ -262,4 +289,10 @@ class PaginationHandler(object):
"end": next_token.to_string(),
}
+ if state:
+ chunk["state"] = [
+ serialize_event(e, time_now, as_client_event)
+ for e in state
+ ]
+
defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3732830194..ba3856674d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
@@ -95,6 +95,7 @@ class PresenceHandler(object):
Args:
hs (synapse.server.HomeServer):
"""
+ self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@@ -230,6 +231,10 @@ class PresenceHandler(object):
earlier than they should when synapse is restarted. This affect of this
is some spurious presence changes that will self-correct.
"""
+ # If the DB pool has already terminated, don't try updating
+ if not self.hs.get_db_pool().running:
+ return
+
logger.info(
"Performing _on_shutdown. Persisting %d unpersisted changes",
len(self.user_to_current_state)
@@ -390,6 +395,10 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
+ # If presence is disabled, no-op
+ if not self.hs.config.use_presence:
+ return
+
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -419,6 +428,11 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
+ # Override if it should affect the user's presence, if presence is
+ # disabled.
+ if not self.hs.config.use_presence:
+ affect_presence = False
+
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -464,13 +478,16 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
- syncing_user_ids = {
- user_id for user_id, count in self.user_to_num_current_syncs.items()
- if count
- }
- for user_ids in self.external_process_to_current_syncs.values():
- syncing_user_ids.update(user_ids)
- return syncing_user_ids
+ if self.hs.config.use_presence:
+ syncing_user_ids = {
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count
+ }
+ for user_ids in self.external_process_to_current_syncs.values():
+ syncing_user_ids.update(user_ids)
+ return syncing_user_ids
+ else:
+ return set()
@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 995460f82a..32108568c6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from ._base import BaseHandler
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index cb905a3903..a6f3181f09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
from synapse.types import get_domain_from_id
from synapse.util import logcontext
-from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
affected_room_ids = list(set([r["room_id"] for r in receipts]))
- with PreserveLoggingContext():
- self.notifier.on_new_event(
- "receipt_key", max_batch_id, rooms=affected_room_ids
- )
- # Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
- min_batch_id, max_batch_id, affected_room_ids
- )
+ self.notifier.on_new_event(
+ "receipt_key", max_batch_id, rooms=affected_room_ids
+ )
+ # Note that the min here shouldn't be relied upon to be accurate.
+ self.hs.get_pusherpool().on_new_receipts(
+ min_batch_id, max_batch_id, affected_room_ids,
+ )
- defer.returnValue(True)
+ defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 0e16bbe0ee..f03ee1476b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -28,7 +28,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse.types import RoomAlias, RoomID, UserID, create_requester
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -144,7 +144,8 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
- yield self._check_mau_limits()
+
+ yield self.auth.check_auth_blocking()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -289,7 +290,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID can only contain characters a-z, 0-9, or '=_-./'",
)
- yield self._check_mau_limits()
+ yield self.auth.check_auth_blocking()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -439,7 +440,7 @@ class RegistrationHandler(BaseHandler):
"""
if localpart is None:
raise SynapseError(400, "Request must include user id")
- yield self._check_mau_limits()
+ yield self.auth.check_auth_blocking()
need_register = True
try:
@@ -533,14 +534,3 @@ class RegistrationHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
action="join",
)
-
- @defer.inlineCallbacks
- def _check_mau_limits(self):
- """
- Do not accept registrations if monthly active user limits exceeded
- and limiting is enabled
- """
- try:
- yield self.auth.check_auth_blocking()
- except AuthError as e:
- raise RegistrationError(e.code, str(e), e.errcode)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6a17c42238..c3f820b975 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -98,9 +98,13 @@ class RoomCreationHandler(BaseHandler):
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
+ ResourceLimitError if server is blocked to some resource being
+ exceeded
"""
user_id = requester.user.to_string()
+ self.auth.check_auth_blocking(user_id)
+
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 828229f5c3..37e41afd61 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.types import ThirdPartyInstanceID
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b8e1af580b..f643619047 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -30,7 +30,7 @@ import synapse.types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index dff1f67dcb..648debc8aa 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
@@ -75,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"ephemeral",
"account_data",
"unread_notifications",
+ "summary",
])):
__slots__ = []
@@ -184,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
class SyncHandler(object):
def __init__(self, hs):
+ self.hs_config = hs.config
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
@@ -191,6 +193,7 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
@@ -198,19 +201,27 @@ class SyncHandler(object):
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
+ @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
- A Deferred SyncResult.
+ Deferred[SyncResult]
"""
- return self.response_cache.wrap(
+ # If the user is not part of the mau group, then check that limits have
+ # not been exceeded (if not part of the group by this point, almost certain
+ # auth_blocking will occur)
+ user_id = sync_config.user.to_string()
+ yield self.auth.check_auth_blocking(user_id)
+
+ res = yield self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
+ defer.returnValue(res)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@@ -495,9 +506,141 @@ class SyncHandler(object):
defer.returnValue(state)
@defer.inlineCallbacks
+ def compute_summary(self, room_id, sync_config, batch, state, now_token):
+ """ Works out a room summary block for this room, summarising the number
+ of joined members in the room, and providing the 'hero' members if the
+ room has no name so clients can consistently name rooms. Also adds
+ state events to 'state' if needed to describe the heroes.
+
+ Args:
+ room_id(str):
+ sync_config(synapse.handlers.sync.SyncConfig):
+ batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+ the room that will be sent to the user.
+ state(dict): dict of (type, state_key) -> Event as returned by
+ compute_state_delta
+ now_token(str): Token of the end of the current batch.
+
+ Returns:
+ A deferred dict describing the room summary
+ """
+
+ # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
+ last_events, _ = yield self.store.get_recent_event_ids_for_room(
+ room_id, end_token=now_token.room_key, limit=1,
+ )
+
+ if not last_events:
+ defer.returnValue(None)
+ return
+
+ last_event = last_events[-1]
+ state_ids = yield self.store.get_state_ids_for_event(
+ last_event.event_id, [
+ (EventTypes.Member, None),
+ (EventTypes.Name, ''),
+ (EventTypes.CanonicalAlias, ''),
+ ]
+ )
+
+ member_ids = {
+ state_key: event_id
+ for (t, state_key), event_id in state_ids.iteritems()
+ if t == EventTypes.Member
+ }
+ name_id = state_ids.get((EventTypes.Name, ''))
+ canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+
+ summary = {}
+
+ # FIXME: it feels very heavy to load up every single membership event
+ # just to calculate the counts.
+ member_events = yield self.store.get_events(member_ids.values())
+
+ joined_user_ids = []
+ invited_user_ids = []
+
+ for ev in member_events.values():
+ if ev.content.get("membership") == Membership.JOIN:
+ joined_user_ids.append(ev.state_key)
+ elif ev.content.get("membership") == Membership.INVITE:
+ invited_user_ids.append(ev.state_key)
+
+ # TODO: only send these when they change.
+ summary["m.joined_member_count"] = len(joined_user_ids)
+ summary["m.invited_member_count"] = len(invited_user_ids)
+
+ if name_id or canonical_alias_id:
+ defer.returnValue(summary)
+
+ # FIXME: order by stream ordering, not alphabetic
+
+ me = sync_config.user.to_string()
+ if (joined_user_ids or invited_user_ids):
+ summary['m.heroes'] = sorted(
+ [
+ user_id
+ for user_id in (joined_user_ids + invited_user_ids)
+ if user_id != me
+ ]
+ )[0:5]
+ else:
+ summary['m.heroes'] = sorted(
+ [user_id for user_id in member_ids.keys() if user_id != me]
+ )[0:5]
+
+ if not sync_config.filter_collection.lazy_load_members():
+ defer.returnValue(summary)
+
+ # ensure we send membership events for heroes if needed
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.get_lazy_loaded_members_cache(cache_key)
+
+ # track which members the client should already know about via LL:
+ # Ones which are already in state...
+ existing_members = set(
+ user_id for (typ, user_id) in state.keys()
+ if typ == EventTypes.Member
+ )
+
+ # ...or ones which are in the timeline...
+ for ev in batch.events:
+ if ev.type == EventTypes.Member:
+ existing_members.add(ev.state_key)
+
+ # ...and then ensure any missing ones get included in state.
+ missing_hero_event_ids = [
+ member_ids[hero_id]
+ for hero_id in summary['m.heroes']
+ if (
+ cache.get(hero_id) != member_ids[hero_id] and
+ hero_id not in existing_members
+ )
+ ]
+
+ missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+ missing_hero_state = missing_hero_state.values()
+
+ for s in missing_hero_state:
+ cache.set(s.state_key, s.event_id)
+ state[(EventTypes.Member, s.state_key)] = s
+
+ defer.returnValue(summary)
+
+ def get_lazy_loaded_members_cache(self, cache_key):
+ cache = self.lazy_loaded_members_cache.get(cache_key)
+ if cache is None:
+ logger.debug("creating LruCache for %r", cache_key)
+ cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+ self.lazy_loaded_members_cache[cache_key] = cache
+ else:
+ logger.debug("found LruCache for %r", cache_key)
+ return cache
+
+ @defer.inlineCallbacks
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
full_state):
- """ Works out the differnce in state between the start of the timeline
+ """ Works out the difference in state between the start of the timeline
and the previous sync.
Args:
@@ -511,7 +654,7 @@ class SyncHandler(object):
full_state(bool): Whether to force returning the full state.
Returns:
- A deferred new event dictionary
+ A deferred dict of (type, state_key) -> Event
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -609,13 +752,7 @@ class SyncHandler(object):
if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
- cache = self.lazy_loaded_members_cache.get(cache_key)
- if cache is None:
- logger.debug("creating LruCache for %r", cache_key)
- cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
- self.lazy_loaded_members_cache[cache_key] = cache
- else:
- logger.debug("found LruCache for %r", cache_key)
+ cache = self.get_lazy_loaded_members_cache(cache_key)
# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
@@ -724,7 +861,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
- if not block_all_presence_data:
+ if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
@@ -1416,7 +1553,6 @@ class SyncHandler(object):
if events == [] and tags is None:
return
- since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -1459,6 +1595,18 @@ class SyncHandler(object):
full_state=full_state
)
+ summary = {}
+ if (
+ sync_config.filter_collection.lazy_load_members() and
+ (
+ any(ev.type == EventTypes.Member for ev in batch.events) or
+ since_token is None
+ )
+ ):
+ summary = yield self.compute_summary(
+ room_id, sync_config, batch, state, now_token
+ )
+
if room_builder.rtype == "joined":
unread_notifications = {}
room_sync = JoinedSyncResult(
@@ -1468,6 +1616,7 @@ class SyncHandler(object):
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
+ summary=summary,
)
if room_sync or always_include:
|