diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index d358842b3e..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,13 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from .register import RegistrationHandler
-from .room import RoomContextHandler
-from .message import MessageHandler
-from .federation import FederationHandler
-from .directory import DirectoryHandler
from .admin import AdminHandler
+from .directory import DirectoryHandler
+from .federation import FederationHandler
from .identity import IdentityHandler
+from .register import RegistrationHandler
from .search import SearchHandler
@@ -44,10 +42,8 @@ class Handlers(object):
def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs)
- self.message_handler = MessageHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
- self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 2d1db0c245..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -18,11 +18,10 @@ import logging
from twisted.internet import defer
import synapse.types
-from synapse.api.constants import Membership, EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import LimitExceededError
from synapse.types import UserID
-
logger = logging.getLogger(__name__)
@@ -113,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
+ current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
- list(context.current_state_ids.values())
+ list(current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index f36b358b45..5d629126fc 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from twisted.internet import defer
from ._base import BaseHandler
-import logging
-
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 1c29c43a83..f0f89af7dc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -13,19 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
from six import itervalues
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
import synapse
from synapse.api.constants import EventTypes
-from synapse.util.metrics import Measure
-from synapse.util.logcontext import (
- make_deferred_yieldable, run_in_background,
+from synapse.metrics import (
+ event_processing_loop_counter,
+ event_processing_loop_room_count,
)
-from prometheus_client import Counter
-
-import logging
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -107,7 +111,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key)
if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
+ def start_scheduler():
+ return self.scheduler.start().addErrback(log_failure)
+ run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
# Fork off pushes to these services
@@ -134,6 +140,12 @@ class ApplicationServicesHandler(object):
events_processed_counter.inc(len(events))
+ event_processing_loop_room_count.labels(
+ "appservice_sender"
+ ).inc(len(events_by_room))
+
+ event_processing_loop_counter.labels("appservice_sender").inc()
+
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3c0051586d..4a81bd2ba9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,29 +13,34 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import logging
+import unicodedata
+
+import attr
+import bcrypt
+import pymacaroons
+from canonicaljson import json
+
from twisted.internet import defer, threads
+from twisted.web.client import PartialDownloadError
-from ._base import BaseHandler
+import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import (
- AuthError, Codes, InteractiveAuthIncompleteError, LoginError, StoreError,
+ AuthError,
+ Codes,
+ InteractiveAuthIncompleteError,
+ LoginError,
+ StoreError,
SynapseError,
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
-from synapse.util.async import run_on_reactor
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable
-from twisted.web.client import PartialDownloadError
-
-import logging
-import bcrypt
-import pymacaroons
-import simplejson
-
-import synapse.util.stringutils as stringutils
-
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -402,7 +407,7 @@ class AuthHandler(BaseHandler):
except PartialDownloadError as pde:
# Twisted is silly
data = pde.response
- resp_body = simplejson.loads(data)
+ resp_body = json.loads(data)
if 'success' in resp_body:
# Note that we do NOT check the hostname here: we explicitly
@@ -423,15 +428,11 @@ class AuthHandler(BaseHandler):
def _check_msisdn(self, authdict, _):
return self._check_threepid('msisdn', authdict)
- @defer.inlineCallbacks
def _check_dummy_auth(self, authdict, _):
- yield run_on_reactor()
- defer.returnValue(True)
+ return defer.succeed(True)
@defer.inlineCallbacks
def _check_threepid(self, medium, authdict):
- yield run_on_reactor()
-
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@@ -519,6 +520,7 @@ class AuthHandler(BaseHandler):
"""
logger.info("Logging in user %s on device %s", user_id, device_id)
access_token = yield self.issue_access_token(user_id, device_id)
+ yield self.auth.check_auth_blocking(user_id)
# the device *should* have been registered before we got here; however,
# it's possible we raced against a DELETE operation. The thing we
@@ -626,6 +628,7 @@ class AuthHandler(BaseHandler):
# special case to check for "password" for the check_password interface
# for the auth providers
password = login_submission.get("password")
+
if login_type == LoginType.PASSWORD:
if not self._password_enabled:
raise SynapseError(400, "Password login has been disabled.")
@@ -707,9 +710,10 @@ class AuthHandler(BaseHandler):
multiple inexact matches.
Args:
- user_id (str): complete @user:id
+ user_id (unicode): complete @user:id
+ password (unicode): the provided password
Returns:
- (str) the canonical_user_id, or None if unknown user / bad password
+ (unicode) the canonical_user_id, or None if unknown user / bad password
"""
lookupres = yield self._find_user_id_and_pwd_hash(user_id)
if not lookupres:
@@ -728,15 +732,18 @@ class AuthHandler(BaseHandler):
device_id)
defer.returnValue(access_token)
+ @defer.inlineCallbacks
def validate_short_term_login_token_and_get_user_id(self, login_token):
auth_api = self.hs.get_auth()
+ user_id = None
try:
macaroon = pymacaroons.Macaroon.deserialize(login_token)
user_id = auth_api.get_user_id_from_macaroon(macaroon)
auth_api.validate_macaroon(macaroon, "login", True, user_id)
- return user_id
except Exception:
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+ yield self.auth.check_auth_blocking(user_id)
+ defer.returnValue(user_id)
@defer.inlineCallbacks
def delete_access_token(self, access_token):
@@ -821,14 +828,37 @@ class AuthHandler(BaseHandler):
@defer.inlineCallbacks
def delete_threepid(self, user_id, medium, address):
+ """Attempts to unbind the 3pid on the identity servers and deletes it
+ from the local database.
+
+ Args:
+ user_id (str)
+ medium (str)
+ address (str)
+
+ Returns:
+ Deferred[bool]: Returns True if successfully unbound the 3pid on
+ the identity server, False if identity server doesn't support the
+ unbind API.
+ """
+
# 'Canonicalise' email addresses as per above
if medium == 'email':
address = address.lower()
- ret = yield self.store.user_delete_threepid(
+ identity_handler = self.hs.get_handlers().identity_handler
+ result = yield identity_handler.try_unbind_threepid(
+ user_id,
+ {
+ 'medium': medium,
+ 'address': address,
+ },
+ )
+
+ yield self.store.user_delete_threepid(
user_id, medium, address,
)
- defer.returnValue(ret)
+ defer.returnValue(result)
def _save_session(self, session):
# TODO: Persistent storage
@@ -840,45 +870,62 @@ class AuthHandler(BaseHandler):
"""Computes a secure hash of password.
Args:
- password (str): Password to hash.
+ password (unicode): Password to hash.
Returns:
- Deferred(str): Hashed password.
+ Deferred(unicode): Hashed password.
"""
def _do_hash():
- return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
- bcrypt.gensalt(self.bcrypt_rounds))
-
- return make_deferred_yieldable(threads.deferToThread(_do_hash))
+ # Normalise the Unicode in the password
+ pw = unicodedata.normalize("NFKC", password)
+
+ return bcrypt.hashpw(
+ pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
+ bcrypt.gensalt(self.bcrypt_rounds),
+ ).decode('ascii')
+
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(
+ self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
+ ),
+ )
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
Args:
- password (str): Password to hash.
- stored_hash (str): Expected hash value.
+ password (unicode): Password to hash.
+ stored_hash (unicode): Expected hash value.
Returns:
Deferred(bool): Whether self.hash(password) == stored_hash.
"""
def _do_validate_hash():
+ # Normalise the Unicode in the password
+ pw = unicodedata.normalize("NFKC", password)
+
return bcrypt.checkpw(
- password.encode('utf8') + self.hs.config.password_pepper,
+ pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
stored_hash.encode('utf8')
)
if stored_hash:
- return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(
+ self.hs.get_reactor(),
+ self.hs.get_reactor().getThreadPool(),
+ _do_validate_hash,
+ ),
+ )
else:
return defer.succeed(False)
-class MacaroonGeneartor(object):
- def __init__(self, hs):
- self.clock = hs.get_clock()
- self.server_name = hs.config.server_name
- self.macaroon_secret_key = hs.config.macaroon_secret_key
+@attr.s
+class MacaroonGenerator(object):
+
+ hs = attr.ib()
def generate_access_token(self, user_id, extra_caveats=None):
extra_caveats = extra_caveats or []
@@ -896,7 +943,7 @@ class MacaroonGeneartor(object):
def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
- now = self.clock.time_msec()
+ now = self.hs.get_clock().time_msec()
expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
@@ -908,9 +955,9 @@ class MacaroonGeneartor(object):
def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
- location=self.server_name,
+ location=self.hs.config.server_name,
identifier="key",
- key=self.macaroon_secret_key)
+ key=self.hs.config.macaroon_secret_key)
macaroon.add_first_party_caveat("gen = 1")
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index c5e92f6214..b078df4a76 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,13 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
+import logging
-from ._base import BaseHandler
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
from synapse.types import UserID, create_requester
from synapse.util.logcontext import run_in_background
-import logging
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -30,6 +32,7 @@ class DeactivateAccountHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
self._room_member_handler = hs.get_room_member_handler()
+ self._identity_handler = hs.get_handlers().identity_handler
self.user_directory_handler = hs.get_user_directory_handler()
# Flag that indicates whether the process to part users from rooms is running
@@ -37,29 +40,58 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
- reactor.callWhenRunning(self._start_user_parting)
+ hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
- def deactivate_account(self, user_id):
+ def deactivate_account(self, user_id, erase_data):
"""Deactivate a user's account
Args:
user_id (str): ID of user to be deactivated
+ erase_data (bool): whether to GDPR-erase the user's data
Returns:
- Deferred
+ Deferred[bool]: True if identity server supports removing
+ threepids, otherwise False.
"""
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
- # first delete any devices belonging to the user, which will also
+ # delete threepids first. We remove these from the IS so if this fails,
+ # leave the user still active so they can try again.
+ # Ideally we would prevent password resets and then do this in the
+ # background thread.
+
+ # This will be set to false if the identity server doesn't support
+ # unbinding
+ identity_server_supports_unbinding = True
+
+ threepids = yield self.store.user_get_threepids(user_id)
+ for threepid in threepids:
+ try:
+ result = yield self._identity_handler.try_unbind_threepid(
+ user_id,
+ {
+ 'medium': threepid['medium'],
+ 'address': threepid['address'],
+ },
+ )
+ identity_server_supports_unbinding &= result
+ except Exception:
+ # Do we want this to be a fatal error or should we carry on?
+ logger.exception("Failed to remove threepid from ID server")
+ raise SynapseError(400, "Failed to remove threepid from ID server")
+ yield self.store.user_delete_threepid(
+ user_id, threepid['medium'], threepid['address'],
+ )
+
+ # delete any devices belonging to the user, which will also
# delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id)
# then delete any remaining access tokens which weren't associated with
# a device.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
- yield self.store.user_delete_threepids(user_id)
yield self.store.user_set_password_hash(user_id, None)
# Add the user to a table of users pending deactivation (ie.
@@ -69,10 +101,17 @@ class DeactivateAccountHandler(BaseHandler):
# delete from user directory
yield self.user_directory_handler.handle_user_deactivated(user_id)
+ # Mark the user as erased, if they asked for that
+ if erase_data:
+ logger.info("Marking %s as erased", user_id)
+ yield self.store.mark_user_erased(user_id)
+
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
+ defer.returnValue(identity_server_supports_unbinding)
+
def _start_user_parting(self):
"""
Start the process that goes through the table of users
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 11c6fb3657..9e017116a9 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,21 +12,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.api.errors import FederationDeniedError
+from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.retryutils import NotRetryingDestination
from synapse.util.metrics import measure_func
-from synapse.types import get_domain_from_id, RoomStreamToken
-from twisted.internet import defer
-from ._base import BaseHandler
-
-import logging
+from synapse.util.retryutils import NotRetryingDestination
-from six import itervalues, iteritems
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -537,7 +539,7 @@ class DeviceListEduUpdater(object):
yield self.device_handler.notify_device_update(user_id, device_ids)
else:
# Simply update the single device, since we know that is the only
- # change (becuase of the single prev_id matching the current cache)
+ # change (because of the single prev_id matching the current cache)
for device_id, stream_id, prev_ids, content in pending_updates:
yield self.store.update_remote_device_list_cache_entry(
user_id, device_id, content, stream_id,
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index f147a20b73..2e2e5261de 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,10 +18,9 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
-from synapse.types import get_domain_from_id, UserID
+from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
-
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c5b6e75e03..ef866da1b6 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -14,15 +14,16 @@
# limitations under the License.
+import logging
+import string
+
from twisted.internet import defer
-from ._base import BaseHandler
-from synapse.api.errors import SynapseError, Codes, CodeMessageException, AuthError
from synapse.api.constants import EventTypes
+from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
from synapse.types import RoomAlias, UserID, get_domain_from_id
-import logging
-import string
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 8a2d177539..5816bf8b4f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -14,17 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import simplejson as json
import logging
-from canonicaljson import encode_canonical_json
-from twisted.internet import defer
from six import iteritems
-from synapse.api.errors import (
- SynapseError, CodeMessageException, FederationDeniedError,
-)
-from synapse.types import get_domain_from_id, UserID
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
+
+from synapse.api.errors import CodeMessageException, FederationDeniedError, SynapseError
+from synapse.types import UserID, get_domain_from_id
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination
@@ -80,7 +79,7 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
- # Firt get local devices.
+ # First get local devices.
failures = {}
results = {}
if local_query:
@@ -357,7 +356,7 @@ def _exception_to_failure(e):
# include ConnectionRefused and other errors
#
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
- # give a string for e.message, which simplejson then fails to serialize.
+ # give a string for e.message, which json then fails to serialize.
return {
"status": 503, "message": str(e.message),
}
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 8bc642675f..f772e62c28 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -13,20 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import random
+
from twisted.internet import defer
-from synapse.util.logutils import log_function
-from synapse.types import UserID
-from synapse.events.utils import serialize_event
-from synapse.api.constants import Membership, EventTypes
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import AuthError
from synapse.events import EventBase
+from synapse.events.utils import serialize_event
+from synapse.types import UserID
+from synapse.util.logutils import log_function
+from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
-import logging
-import random
-
-
logger = logging.getLogger(__name__)
@@ -130,11 +131,13 @@ class EventStreamHandler(BaseHandler):
class EventHandler(BaseHandler):
@defer.inlineCallbacks
- def get_event(self, user, event_id):
+ def get_event(self, user, room_id, event_id):
"""Retrieve a single specified event.
Args:
user (synapse.types.UserID): The user requesting the event
+ room_id (str|None): The expected room id. We'll return None if the
+ event's room does not match.
event_id (str): The event ID to obtain.
Returns:
dict: An event, or None if there is no event matching this ID.
@@ -143,13 +146,26 @@ class EventHandler(BaseHandler):
AuthError if the user does not have the rights to inspect this
event.
"""
- event = yield self.store.get_event(event_id)
+ event = yield self.store.get_event(event_id, check_room_id=room_id)
if not event:
defer.returnValue(None)
return
- if hasattr(event, "room_id"):
- yield self.auth.check_joined_room(event.room_id, user.to_string())
+ users = yield self.store.get_users_in_room(event.room_id)
+ is_peeking = user.to_string() not in users
+
+ filtered = yield filter_events_for_client(
+ self.store,
+ user.to_string(),
+ [event],
+ is_peeking=is_peeking
+ )
+
+ if not filtered:
+ raise AuthError(
+ 403,
+ "You don't have permission to access that event."
+ )
defer.returnValue(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..0ebf0fd188 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,37 +20,51 @@ import itertools
import logging
import sys
+import six
+from six import iteritems, itervalues
+from six.moves import http_client, zip
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
-import six
-from six.moves import http_client
-from six import iteritems
-from twisted.internet import defer
from unpaddedbase64 import decode_base64
-from ._base import BaseHandler
+from twisted.internet import defer
+from synapse.api.constants import (
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ Membership,
+ RejectedReason,
+)
from synapse.api.errors import (
- AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+ AuthError,
+ CodeMessageException,
FederationDeniedError,
+ FederationError,
+ StoreError,
+ SynapseError,
)
-from synapse.api.constants import EventTypes, Membership, RejectedReason
-from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
-from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
- compute_event_signature, add_hashes_and_signatures,
+ add_hashes_and_signatures,
+ compute_event_signature,
)
+from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+ ReplicationCleanRoomRestServlet,
+ ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
+from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
-
-from synapse.events.utils import prune_event
-
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async_helpers import Linearizer
+from synapse.util.distributor import user_joined_room
+from synapse.util.frozenutils import unfreeze
+from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
-from synapse.util.distributor import user_joined_room
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -72,7 +86,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_federation_client()
+ self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -82,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+
+ self._send_events_to_master = (
+ ReplicationFederationSendEventsRestServlet.make_client(hs)
+ )
+ self._notify_user_membership_change = (
+ ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+ )
+ self._clean_room_for_join_client = (
+ ReplicationCleanRoomRestServlet.make_client(hs)
+ )
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -89,7 +115,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_receive_pdu(self, origin, pdu, get_missing=True):
+ def on_receive_pdu(
+ self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ ):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -103,8 +131,10 @@ class FederationHandler(BaseHandler):
"""
# We reprocess pdus when we have seen them only as outliers
- existing = yield self.get_persisted_pdu(
- origin, pdu.event_id, do_auth=False
+ existing = yield self.store.get_event(
+ pdu.event_id,
+ allow_none=True,
+ allow_rejected=True,
)
# FIXME: Currently we fetch an event again when we already have it
@@ -161,14 +191,11 @@ class FederationHandler(BaseHandler):
"Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin,
)
- return
+ defer.returnValue(None)
state = None
-
auth_chain = []
- fetch_state = False
-
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
@@ -223,26 +250,61 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5],
)
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ if sent_to_us_directly and prevs - seen:
+ # If they have sent it to us directly, and the server
+ # isn't telling us about the auth events that it's
+ # made a message referencing, we explode
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
)
- fetch_state = True
+ elif prevs - seen:
+ # Calculate the state of the previous events, and
+ # de-conflict them to find the current state.
+ state_groups = []
+ auth_chains = set()
+ try:
+ # Get the state of the events we know about
+ ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ state_groups.append(ours)
+
+ # Ask the remote server for the states we don't
+ # know about
+ for p in prevs - seen:
+ state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, pdu.room_id, p
+ )
+ )
+ auth_chains.update(got_auth_chain)
+ state_group = {(x.type, x.state_key): x.event_id for x in state}
+ state_groups.append(state_group)
+
+ # Resolve any conflicting state
+ def fetch(ev_ids):
+ return self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False
+ )
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.replication_layer.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except Exception:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
+ room_version = yield self.store.get_room_version(pdu.room_id)
+ state_map = yield resolve_events_with_factory(
+ room_version, state_groups, {pdu.event_id: pdu}, fetch
+ )
+
+ state = (yield self.store.get_events(state_map.values())).values()
+ auth_chain = list(auth_chains)
+ except Exception:
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=pdu.event_id,
+ )
yield self._process_received_pdu(
origin,
@@ -299,7 +361,7 @@ class FederationHandler(BaseHandler):
#
# see https://github.com/matrix-org/synapse/pull/1744
- missing_events = yield self.replication_layer.get_missing_events(
+ missing_events = yield self.federation_client.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
@@ -320,11 +382,17 @@ class FederationHandler(BaseHandler):
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn("Event %s failed history check.")
+ else:
+ raise
@log_function
@defer.inlineCallbacks
@@ -355,7 +423,7 @@ class FederationHandler(BaseHandler):
)
try:
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
except AuthError as e:
@@ -399,7 +467,7 @@ class FederationHandler(BaseHandler):
yield self._handle_new_events(origin, event_infos)
try:
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin,
event,
state=state,
@@ -424,24 +492,16 @@ class FederationHandler(BaseHandler):
except StoreError:
logger.exception("Failed to store room.")
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
# Only fire user_joined_room if the user has acutally
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
- prev_state_id = context.prev_state_ids.get(
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@@ -453,84 +513,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
-
- @measure_func("_filter_events_for_server")
- @defer.inlineCallbacks
- def _filter_events_for_server(self, server_name, room_id, events):
- event_to_state_ids = yield self.store.get_state_ids_for_events(
- frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, None),
- )
- )
-
- # We only want to pull out member events that correspond to the
- # server's domain.
-
- def check_match(id):
- try:
- return server_name == get_domain_from_id(id)
- except Exception:
- return False
-
- # Parses mapping `event_id -> (type, state_key) -> state event_id`
- # to get all state ids that we're interested in.
- event_map = yield self.store.get_events([
- e_id
- for key_to_eid in list(event_to_state_ids.values())
- for key, e_id in key_to_eid.items()
- if key[0] != EventTypes.Member or check_match(key[1])
- ])
-
- event_to_state = {
- e_id: {
- key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.iteritems()
- if inner_e_id in event_map
- }
- for e_id, key_to_eid in event_to_state_ids.iteritems()
- }
-
- def redact_disallowed(event, state):
- if not state:
- return event
-
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- if visibility in ["invited", "joined"]:
- # We now loop through all state events looking for
- # membership states for the requesting server to determine
- # if the server is either in the room or has been invited
- # into the room.
- for ev in state.itervalues():
- if ev.type != EventTypes.Member:
- continue
- try:
- domain = get_domain_from_id(ev.state_key)
- except Exception:
- continue
-
- if domain != server_name:
- continue
-
- memtype = ev.membership
- if memtype == Membership.JOIN:
- return event
- elif memtype == Membership.INVITE:
- if visibility == "invited":
- return event
- else:
- return prune_event(event)
-
- return event
-
- defer.returnValue([
- redact_disallowed(e, event_to_state[e.event_id])
- for e in events
- ])
+ yield self.user_joined_room(user, event.room_id)
@log_function
@defer.inlineCallbacks
@@ -551,7 +534,7 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
- events = yield self.replication_layer.backfill(
+ events = yield self.federation_client.backfill(
dest,
room_id,
limit=limit,
@@ -599,7 +582,7 @@ class FederationHandler(BaseHandler):
state_events = {}
events_to_state = {}
for e_id in edges:
- state, auth = yield self.replication_layer.get_state_for_room(
+ state, auth = yield self.federation_client.get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id
@@ -641,7 +624,7 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
- self.replication_layer.get_pdu,
+ self.federation_client.get_pdu,
[dest],
event_id,
outlier=True,
@@ -763,7 +746,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.iteritems()
+ for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -780,7 +763,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+ return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -843,7 +826,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.iterkeys())
+ event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -859,15 +842,15 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+ [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.iteritems()
+ for k, e_id in iteritems(state_dict)
if e_id in state_map
- } for key, state_dict in states.iteritems()
+ } for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
@@ -922,7 +905,7 @@ class FederationHandler(BaseHandler):
Invites must be signed by the invitee's server before distribution.
"""
- pdu = yield self.replication_layer.send_invite(
+ pdu = yield self.federation_client.send_invite(
destination=target_host,
room_id=event.room_id,
event_id=event.event_id,
@@ -938,16 +921,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -972,6 +945,9 @@ class FederationHandler(BaseHandler):
joinee,
"join",
content,
+ params={
+ "ver": KNOWN_ROOM_VERSIONS,
+ },
)
# This shouldn't happen, because the RoomMemberHandler has a
@@ -981,7 +957,7 @@ class FederationHandler(BaseHandler):
self.room_queues[room_id] = []
- yield self.store.clean_room_for_join(room_id)
+ yield self._clean_room_for_join(room_id)
handled_events = set()
@@ -994,7 +970,7 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
- ret = yield self.replication_layer.send_join(target_hosts, event)
+ ret = yield self.federation_client.send_join(target_hosts, event)
origin = ret["origin"]
state = ret["state"]
@@ -1020,15 +996,10 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ yield self._persist_auth_tree(
origin, auth_chain, state, event
)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[joinee]
- )
-
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -1123,7 +1094,7 @@ class FederationHandler(BaseHandler):
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin, event
)
@@ -1133,25 +1104,17 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
- yield user_joined_room(self.distributor, user, event.room_id)
+ yield self.user_joined_room(user, event.room_id)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
- state_ids = list(context.prev_state_ids.values())
+ state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(list(context.prev_state_ids.values()))
+ state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@@ -1213,17 +1176,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1248,35 +1201,26 @@ class FederationHandler(BaseHandler):
except ValueError:
pass
- yield self.replication_layer.send_leave(
+ yield self.federation_client.send_leave(
target_hosts,
event
)
context = yield self.state_handler.compute_event_context(event)
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- )
-
- target_user = UserID.from_string(event.state_key)
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
- content={},):
- origin, pdu = yield self.replication_layer.make_membership_event(
+ content={}, params=None):
+ origin, pdu = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
membership,
content,
+ params=params,
)
logger.debug("Got response to make_%s: %s", membership, pdu)
@@ -1316,7 +1260,7 @@ class FederationHandler(BaseHandler):
@log_function
def on_make_leave_request(self, room_id, user_id):
""" We've received a /make_leave/ request, so we create a partial
- join event for the room and return that. We do *not* persist or
+ leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
builder = self.event_builder_factory.new({
@@ -1355,7 +1299,7 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ yield self._handle_new_event(
origin, event
)
@@ -1365,23 +1309,16 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
-
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
-
defer.returnValue(None)
@defer.inlineCallbacks
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
+
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id,
+ )
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
@@ -1393,8 +1330,7 @@ class FederationHandler(BaseHandler):
(e.type, e.state_key): e for e in state
}
- event = yield self.store.get_event(event_id)
- if event and event.is_state():
+ if event.is_state():
# Get previous state
if "replaces_state" in event.unsigned:
prev_id = event.unsigned["replaces_state"]
@@ -1405,18 +1341,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1425,7 +1349,9 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id,
+ )
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
@@ -1435,8 +1361,7 @@ class FederationHandler(BaseHandler):
_, state = state_groups.items().pop()
results = state
- event = yield self.store.get_event(event_id)
- if event and event.is_state():
+ if event.is_state():
# Get previous state
if "replaces_state" in event.unsigned:
prev_id = event.unsigned["replaces_state"]
@@ -1462,17 +1387,26 @@ class FederationHandler(BaseHandler):
limit
)
- events = yield self._filter_events_for_server(origin, room_id, events)
+ events = yield filter_events_for_server(self.store, origin, events)
defer.returnValue(events)
@defer.inlineCallbacks
@log_function
- def get_persisted_pdu(self, origin, event_id, do_auth=True):
- """ Get a PDU from the database with given origin and id.
+ def get_persisted_pdu(self, origin, event_id):
+ """Get an event from the database for the given server.
+
+ Args:
+ origin [str]: hostname of server which is requesting the event; we
+ will check that the server is allowed to see it.
+ event_id [str]: id of the event being requested
Returns:
- Deferred: Results in a `Pdu`.
+ Deferred[EventBase|None]: None if we know nothing about the event;
+ otherwise the (possibly-redacted) event.
+
+ Raises:
+ AuthError if the server is not currently in the room
"""
event = yield self.store.get_event(
event_id,
@@ -1481,32 +1415,17 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
- if do_auth:
- in_room = yield self.auth.check_host_in_room(
- event.room_id,
- origin
- )
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
- events = yield self._filter_events_for_server(
- origin, event.room_id, [event]
- )
-
- event = events[0]
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+ events = yield filter_events_for_server(
+ self.store, origin, [event],
+ )
+ event = events[0]
defer.returnValue(event)
else:
defer.returnValue(None)
@@ -1531,9 +1450,8 @@ class FederationHandler(BaseHandler):
event, context
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
+ yield self.persist_events_and_notify(
+ [(event, context)],
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
@@ -1546,15 +1464,7 @@ class FederationHandler(BaseHandler):
six.reraise(tp, value, tb)
- if not backfilled:
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- logcontext.run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id,
- )
-
- defer.returnValue((context, event_stream_id, max_stream_id))
+ defer.returnValue(context)
@defer.inlineCallbacks
def _handle_new_events(self, origin, event_infos, backfilled=False):
@@ -1562,6 +1472,8 @@ class FederationHandler(BaseHandler):
should not depend on one another, e.g. this should be used to persist
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
+
+ Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
@@ -1576,10 +1488,10 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self.store.persist_events(
+ yield self.persist_events_and_notify(
[
(ev_info["event"], context)
- for ev_info, context in itertools.izip(event_infos, contexts)
+ for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@@ -1588,7 +1500,8 @@ class FederationHandler(BaseHandler):
def _persist_auth_tree(self, origin, auth_events, state, event):
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
- Persists the event seperately.
+ Persists the event separately. Notifies about the persisted events
+ where appropriate.
Will attempt to fetch missing auth events.
@@ -1599,8 +1512,7 @@ class FederationHandler(BaseHandler):
event (Event)
Returns:
- 2-tuple of (event_stream_id, max_stream_id) from the persist_event
- call for `event`
+ Deferred
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
@@ -1626,7 +1538,7 @@ class FederationHandler(BaseHandler):
missing_auth_events.add(e_id)
for e_id in missing_auth_events:
- m_ev = yield self.replication_layer.get_pdu(
+ m_ev = yield self.federation_client.get_pdu(
[origin],
e_id,
outlier=True,
@@ -1664,7 +1576,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self.store.persist_events(
+ yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1675,12 +1587,10 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event, new_event_context,
+ yield self.persist_events_and_notify(
+ [(event, new_event_context)],
)
- defer.returnValue((event_stream_id, max_stream_id))
-
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
"""
@@ -1699,8 +1609,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -1736,8 +1647,19 @@ class FederationHandler(BaseHandler):
defer.returnValue(context)
@defer.inlineCallbacks
- def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
+ def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
missing):
+ in_room = yield self.auth.check_host_in_room(
+ room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ event = yield self.store.get_event(
+ event_id, allow_none=False, check_room_id=room_id
+ )
+
# Just go through and process each event in `remote_auth_chain`. We
# don't want to fall into the trap of `missing` being wrong.
for e in remote_auth_chain:
@@ -1747,7 +1669,6 @@ class FederationHandler(BaseHandler):
pass
# Now get the current auth_chain for the event.
- event = yield self.store.get_event(event_id)
local_auth_chain = yield self.store.get_auth_chain(
[auth_id for auth_id, _ in event.auth_events],
include_given=True
@@ -1760,15 +1681,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -1794,8 +1706,8 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
- missing_events = yield self._filter_events_for_server(
- origin, room_id, missing_events,
+ missing_events = yield filter_events_for_server(
+ self.store, origin, missing_events,
)
defer.returnValue(missing_events)
@@ -1844,7 +1756,7 @@ class FederationHandler(BaseHandler):
logger.info("Missing auth: %s", missing_auth)
# If we don't have all the auth events, we need to get them.
try:
- remote_auth_chain = yield self.replication_layer.get_event_auth(
+ remote_auth_chain = yield self.federation_client.get_event_auth(
origin, event.room_id, event.event_id
)
@@ -1917,7 +1829,10 @@ class FederationHandler(BaseHandler):
(d.type, d.state_key): d for d in different_events if d
})
+ room_version = yield self.store.get_room_version(event.room_id)
+
new_state = self.state_handler.resolve_events(
+ room_version,
[list(local_view.values()), list(remote_view.values())],
event
)
@@ -1949,9 +1864,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids
+ event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@@ -1959,7 +1875,7 @@ class FederationHandler(BaseHandler):
try:
# 2. Get remote difference.
- result = yield self.replication_layer.query_auth(
+ result = yield self.federation_client.query_auth(
origin,
event.room_id,
event.event_id,
@@ -2041,21 +1957,34 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update(state_updates)
- if context.delta_ids is not None:
- context.delta_ids = dict(context.delta_ids)
- context.delta_ids.update(state_updates)
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ current_state_ids = dict(current_state_ids)
+
+ current_state_ids.update(state_updates)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
- context.state_group = yield self.store.store_state_group(
+
+ # create a new state group as a delta from the existing one.
+ prev_group = context.state_group
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
- prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ current_state_ids=current_state_ids,
+ )
+
+ yield context.update_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
)
@defer.inlineCallbacks
@@ -2245,7 +2174,7 @@ class FederationHandler(BaseHandler):
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
- yield self.replication_layer.forward_third_party_invite(
+ yield self.federation_client.forward_third_party_invite(
destinations,
room_id,
event_dict,
@@ -2295,7 +2224,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
- original_invite_id = context.prev_state_ids.get(key)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@@ -2337,7 +2267,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
- invite_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)
@@ -2387,7 +2318,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
- response = yield self.hs.get_simple_http_client().get_json(
+ response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@@ -2398,3 +2329,91 @@ class FederationHandler(BaseHandler):
)
if "valid" not in response or not response["valid"]:
raise AuthError(403, "Third party certificate was invalid")
+
+ @defer.inlineCallbacks
+ def persist_events_and_notify(self, event_and_contexts, backfilled=False):
+ """Persists events and tells the notifier/pushers about them, if
+ necessary.
+
+ Args:
+ event_and_contexts(list[tuple[FrozenEvent, EventContext]])
+ backfilled (bool): Whether these events are a result of
+ backfilling or not
+
+ Returns:
+ Deferred
+ """
+ if self.config.worker_app:
+ yield self._send_events_to_master(
+ store=self.store,
+ event_and_contexts=event_and_contexts,
+ backfilled=backfilled
+ )
+ else:
+ max_stream_id = yield self.store.persist_events(
+ event_and_contexts,
+ backfilled=backfilled,
+ )
+
+ if not backfilled: # Never notify for backfilled events
+ for event, _ in event_and_contexts:
+ self._notify_persisted_event(event, max_stream_id)
+
+ def _notify_persisted_event(self, event, max_stream_id):
+ """Checks to see if notifier/pushers should be notified about the
+ event or not.
+
+ Args:
+ event (FrozenEvent)
+ max_stream_id (int): The max_stream_id returned by persist_events
+ """
+
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+
+ # We notify for memberships if its an invite for one of our
+ # users
+ if event.internal_metadata.is_outlier():
+ if event.membership != Membership.INVITE:
+ if not self.is_mine_id(target_user_id):
+ return
+
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
+ elif event.internal_metadata.is_outlier():
+ return
+
+ event_stream_id = event.internal_metadata.stream_ordering
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+
+ self.pusher_pool.on_new_notifications(
+ event_stream_id, max_stream_id,
+ )
+
+ def _clean_room_for_join(self, room_id):
+ """Called to clean up any data in DB for a given room, ready for the
+ server to join the room.
+
+ Args:
+ room_id (str)
+ """
+ if self.config.worker_app:
+ return self._clean_room_for_join_client(room_id)
+ else:
+ return self.store.clean_room_for_join(room_id)
+
+ def user_joined_room(self, user, room_id):
+ """Called when a new user has joined the room
+ """
+ if self.config.worker_app:
+ return self._notify_user_membership_change(
+ room_id=room_id,
+ user_id=user.to_string(),
+ change="joined",
+ )
+ else:
+ return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index dcae083734..53e5e2648b 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -14,14 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
+
from six import iteritems
+from twisted.internet import defer
+
from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
-import logging
-
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 91a0898860..5feb3f22a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,16 +19,18 @@
import logging
-import simplejson as json
+from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import (
- MatrixCodeMessageException, CodeMessageException
+ CodeMessageException,
+ Codes,
+ HttpResponseException,
+ SynapseError,
)
+
from ._base import BaseHandler
-from synapse.util.async import run_on_reactor
-from synapse.api.errors import SynapseError, Codes
logger = logging.getLogger(__name__)
@@ -38,6 +41,7 @@ class IdentityHandler(BaseHandler):
super(IdentityHandler, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
+ self.federation_http_client = hs.get_http_client()
self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
self.trust_any_id_server_just_for_testing_do_not_use = (
@@ -60,8 +64,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
- yield run_on_reactor()
-
if 'id_server' in creds:
id_server = creds['id_server']
elif 'idServer' in creds:
@@ -83,7 +85,6 @@ class IdentityHandler(BaseHandler):
)
defer.returnValue(None)
- data = {}
try:
data = yield self.http_client.get_json(
"https://%s%s" % (
@@ -92,11 +93,9 @@ class IdentityHandler(BaseHandler):
),
{'sid': creds['sid'], 'client_secret': client_secret}
)
- except MatrixCodeMessageException as e:
+ except HttpResponseException as e:
logger.info("getValidated3pid failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
- data = json.loads(e.msg)
+ raise e.to_synapse_error()
if 'medium' in data:
defer.returnValue(data)
@@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def bind_threepid(self, creds, mxid):
- yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid)
data = None
@@ -135,13 +133,71 @@ class IdentityHandler(BaseHandler):
)
logger.debug("bound threepid %r to %s", creds, mxid)
except CodeMessageException as e:
- data = json.loads(e.msg)
+ data = json.loads(e.msg) # XXX WAT?
defer.returnValue(data)
@defer.inlineCallbacks
- def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
- yield run_on_reactor()
+ def try_unbind_threepid(self, mxid, threepid):
+ """Removes a binding from an identity server
+
+ Args:
+ mxid (str): Matrix user ID of binding to be removed
+ threepid (dict): Dict with medium & address of binding to be removed
+
+ Raises:
+ SynapseError: If we failed to contact the identity server
+
+ Returns:
+ Deferred[bool]: True on success, otherwise False if the identity
+ server doesn't support unbinding
+ """
+ logger.debug("unbinding threepid %r from %s", threepid, mxid)
+ if not self.trusted_id_servers:
+ logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+ defer.returnValue(False)
+
+ # We don't track what ID server we added 3pids on (perhaps we ought to)
+ # but we assume that any of the servers in the trusted list are in the
+ # same ID server federation, so we can pick any one of them to send the
+ # deletion request to.
+ id_server = next(iter(self.trusted_id_servers))
+
+ url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+ content = {
+ "mxid": mxid,
+ "threepid": threepid,
+ }
+ headers = {}
+ # we abuse the federation http client to sign the request, but we have to send it
+ # using the normal http client since we don't want the SRV lookup and want normal
+ # 'browser-like' HTTPS.
+ self.federation_http_client.sign_request(
+ destination=None,
+ method='POST',
+ url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
+ headers_dict=headers,
+ content=content,
+ destination_is=id_server,
+ )
+ try:
+ yield self.http_client.post_json_get_json(
+ url,
+ content,
+ headers,
+ )
+ except HttpResponseException as e:
+ if e.code in (400, 404, 501,):
+ # The remote server probably doesn't support unbinding (yet)
+ logger.warn("Received %d response while unbinding threepid", e.code)
+ defer.returnValue(False)
+ else:
+ logger.error("Failed to unbind threepid on identity server: %s", e)
+ raise SynapseError(502, "Failed to contact identity server")
+
+ defer.returnValue(True)
+ @defer.inlineCallbacks
+ def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
@@ -164,20 +220,15 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
- except MatrixCodeMessageException as e:
- logger.info("Proxied requestToken failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
+ except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
- raise e
+ raise e.to_synapse_error()
@defer.inlineCallbacks
def requestMsisdnToken(
self, id_server, country, phone_number,
client_secret, send_attempt, **kwargs
):
- yield run_on_reactor()
-
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
@@ -201,9 +252,6 @@ class IdentityHandler(BaseHandler):
params
)
defer.returnValue(data)
- except MatrixCodeMessageException as e:
- logger.info("Proxied requestToken failed with Matrix error: %r", e)
- raise SynapseError(e.code, e.msg, e.errcode)
- except CodeMessageException as e:
+ except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
- raise e
+ raise e.to_synapse_error()
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 71af86fe21..e009395207 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
@@ -21,20 +23,15 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
-from synapse.types import (
- UserID, StreamToken,
-)
+from synapse.types import StreamToken, UserID
from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
-import logging
-
-
logger = logging.getLogger(__name__)
@@ -151,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try:
if event.membership == Membership.JOIN:
room_end_token = now_token.room_key
- deferred_room_state = self.state_handler.get_current_state(
- event.room_id
+ deferred_room_state = run_in_background(
+ self.state_handler.get_current_state,
+ event.room_id,
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
- deferred_room_state = self.store.get_state_for_events(
- [event.event_id], None
+ deferred_room_state = run_in_background(
+ self.store.get_state_for_events,
+ [event.event_id], None,
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -373,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
+ # If presence is disabled, return an empty list
+ if not self.hs.config.use_presence:
+ defer.returnValue([])
+
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
@@ -390,19 +393,21 @@ class InitialSyncHandler(BaseHandler):
receipts = []
defer.returnValue(receipts)
- presence, receipts, (messages, token) = yield defer.gatherResults(
- [
- run_in_background(get_presence),
- run_in_background(get_receipts),
- run_in_background(
- self.store.get_recent_events_for_room,
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ presence, receipts, (messages, token) = yield make_deferred_yieldable(
+ defer.gatherResults(
+ [
+ run_in_background(get_presence),
+ run_in_background(get_receipts),
+ run_in_background(
+ self.store.get_recent_events_for_room,
+ room_id,
+ limit=limit,
+ end_token=now_token.room_key,
+ )
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError),
+ )
messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..e484061cc0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,269 +14,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import simplejson
import sys
-from canonicaljson import encode_canonical_json
import six
-from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from six import iteritems, itervalues, string_types
+
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
-from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import (
- AuthError, Codes, SynapseError,
+ AuthError,
+ Codes,
ConsentNotGivenError,
+ NotFoundError,
+ SynapseError,
)
from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.types import (
- UserID, RoomAlias, RoomStreamToken,
-)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.types import RoomAlias, UserID
+from synapse.util.async_helpers import Linearizer
+from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
-from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class PurgeStatus(object):
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
-
- Attributes:
- status (int): Tracks whether this request has completed. One of
- STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+ """Contains some read only APIs to get state about a room
"""
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- def __init__(self):
- self.status = PurgeStatus.STATUS_ACTIVE
-
- def asdict(self):
- return {
- "status": PurgeStatus.STATUS_TEXT[self.status]
- }
-
-
-class MessageHandler(BaseHandler):
-
def __init__(self, hs):
- super(MessageHandler, self).__init__(hs)
- self.hs = hs
- self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
self.clock = hs.get_clock()
-
- self.pagination_lock = ReadWriteLock()
- self._purges_in_progress_by_room = set()
- # map from purge id to PurgeStatus
- self._purges_by_id = {}
-
- def start_purge_history(self, room_id, token,
- delete_local_events=False):
- """Start off a history purge on a room.
-
- Args:
- room_id (str): The room to purge from
-
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- str: unique ID for this purge transaction.
- """
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400,
- "History purge already in progress for %s" % (room_id, ),
- )
-
- purge_id = random_string(16)
-
- # we log the purge_id here so that it can be tied back to the
- # request id in the log lines.
- logger.info("[purge] starting purge_id %s", purge_id)
-
- self._purges_by_id[purge_id] = PurgeStatus()
- run_in_background(
- self._purge_history,
- purge_id, room_id, token, delete_local_events,
- )
- return purge_id
-
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token,
- delete_local_events):
- """Carry out a history purge on a room.
-
- Args:
- purge_id (str): The id for this purge
- room_id (str): The room to purge from
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- Deferred
- """
- self._purges_in_progress_by_room.add(room_id)
- try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, token, delete_local_events,
- )
- logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
- except Exception:
- logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge():
- del self._purges_by_id[purge_id]
- reactor.callLater(24 * 3600, clear_purge)
-
- def get_purge_status(self, purge_id):
- """Get the current status of an active purge
-
- Args:
- purge_id (str): purge_id returned by start_purge_history
-
- Returns:
- PurgeStatus|None
- """
- return self._purges_by_id.get(purge_id)
-
- @defer.inlineCallbacks
- def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True, event_filter=None):
- """Get messages in a room.
-
- Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
- Returns:
- dict: Pagination API results
- """
- user_id = requester.user.to_string()
-
- if pagin_config.from_token:
- room_token = pagin_config.from_token.room_key
- else:
- pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token_for_room(
- room_id=room_id
- )
- )
- room_token = pagin_config.from_token.room_key
-
- room_token = RoomStreamToken.parse(room_token)
-
- pagin_config.from_token = pagin_config.from_token.copy_and_replace(
- "room_key", str(room_token)
- )
-
- source_config = pagin_config.get_source_config("room")
-
- with (yield self.pagination_lock.read(room_id)):
- membership, member_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
-
- if membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
- )
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
-
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
-
- events, next_key = yield self.store.paginate_room_events(
- room_id=room_id,
- from_key=source_config.from_key,
- to_key=source_config.to_key,
- direction=source_config.direction,
- limit=source_config.limit,
- event_filter=event_filter,
- )
-
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
-
- if not events:
- defer.returnValue({
- "chunk": [],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- })
-
- if event_filter:
- events = event_filter.filter(events)
-
- events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
- )
-
- time_now = self.clock.time_msec()
-
- chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- }
-
- defer.returnValue(chunk)
+ self.state = hs.get_state_handler()
+ self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -290,12 +71,12 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- data = yield self.state_handler.get_current_state(
+ data = yield self.state.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
@@ -308,53 +89,85 @@ class MessageHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def _check_in_room_or_world_readable(self, room_id, user_id):
- try:
- # check_user_was_in_room will return the most recent membership
- # event for the user if:
- # * The user is a non-guest user, and was ever in the room
- # * The user is a guest user, and has joined the room
- # else it will throw.
- member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- defer.returnValue((member_event.membership, member_event.event_id))
- return
- except AuthError:
- visibility = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if (
- visibility and
- visibility.content["history_visibility"] == "world_readable"
- ):
- defer.returnValue((Membership.JOIN, None))
- return
- raise AuthError(
- 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
- )
-
- @defer.inlineCallbacks
- def get_state_events(self, user_id, room_id, is_guest=False):
+ def get_state_events(
+ self, user_id, room_id, types=None, filtered_types=None,
+ at_token=None, is_guest=False,
+ ):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
- left the room return the state events from when they left.
+ left the room return the state events from when they left. If an explicit
+ 'at' parameter is passed, return the state events as of that event, if
+ visible.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
+ at_token(StreamToken|None): the stream token of the at which we are requesting
+ the stats. If the user is not allowed to view the state as of that
+ stream token, we raise a 403 SynapseError. If None, returns the current
+ state based on the current_state_events table.
+ is_guest(bool): whether this user is a guest
Returns:
A list of dicts representing state events. [{}, {}, {}]
+ Raises:
+ NotFoundError (404) if the at token does not yield an event
+
+ AuthError (403) if the user doesn't have permission to view
+ members of this room.
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
+ if at_token:
+ # FIXME this claims to get the state at a stream position, but
+ # get_recent_events_for_room operates by topo ordering. This therefore
+ # does not reliably give you the state at the given stream position.
+ # (https://github.com/matrix-org/synapse/issues/3305)
+ last_events, _ = yield self.store.get_recent_events_for_room(
+ room_id, end_token=at_token.room_key, limit=1,
+ )
- if membership == Membership.JOIN:
- room_state = yield self.state_handler.get_current_state(room_id)
- elif membership == Membership.LEAVE:
- room_state = yield self.store.get_state_for_events(
- [membership_event_id], None
+ if not last_events:
+ raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+ visible_events = yield filter_events_for_client(
+ self.store, user_id, last_events,
)
- room_state = room_state[membership_event_id]
+
+ event = last_events[0]
+ if visible_events:
+ room_state = yield self.store.get_state_for_events(
+ [event.event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[event.event_id]
+ else:
+ raise AuthError(
+ 403,
+ "User %s not allowed to view events in room %s at token %s" % (
+ user_id, room_id, at_token,
+ )
+ )
+ else:
+ membership, membership_event_id = (
+ yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id,
+ )
+ )
+
+ if membership == Membership.JOIN:
+ state_ids = yield self.store.get_filtered_current_state_ids(
+ room_id, types, filtered_types=filtered_types,
+ )
+ room_state = yield self.store.get_events(state_ids.values())
+ elif membership == Membership.LEAVE:
+ room_state = yield self.store.get_state_for_events(
+ [membership_event_id], types, filtered_types=filtered_types,
+ )
+ room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
@@ -377,7 +190,7 @@ class MessageHandler(BaseHandler):
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
- membership, _ = yield self._check_in_room_or_world_readable(
+ membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
@@ -388,7 +201,7 @@ class MessageHandler(BaseHandler):
users_with_profile = yield self.state.get_current_user_in_room(room_id)
# If this is an AS, double check that they are allowed to see the members.
- # This can either be because the AS user is in the room or becuase there
+ # This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
@@ -422,7 +235,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
- self.http_client = hs.get_simple_http_client()
+ self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -431,7 +244,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5)
+ self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
@@ -463,10 +276,14 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
-
+ Raises:
+ ResourceLimitError if server is blocked to some resource being
+ exceeded
Returns:
Tuple of created event (FrozenEvent), Context
"""
+ yield self.auth.check_auth_blocking(requester.user.to_string())
+
builder = self.event_builder_factory.new(event_dict)
self.validator.validate_new(builder)
@@ -491,7 +308,7 @@ class EventCreationHandler(object):
target, e
)
- is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+ is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
if not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
@@ -509,12 +326,13 @@ class EventCreationHandler(object):
defer.returnValue((event, context))
- def _is_exempt_from_privacy_policy(self, builder):
+ def _is_exempt_from_privacy_policy(self, builder, requester):
""""Determine if an event to be sent is exempt from having to consent
to the privacy policy
Args:
builder (synapse.events.builder.EventBuilder): event being created
+ requester (Requster): user requesting this event
Returns:
Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +343,9 @@ class EventCreationHandler(object):
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
return self._is_server_notices_room(builder.room_id)
+ elif membership == Membership.LEAVE:
+ # the user is always allowed to leave (but not kick people)
+ return builder.state_key == requester.user.to_string()
return succeed(False)
@defer.inlineCallbacks
@@ -630,7 +451,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@@ -752,8 +574,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
- "Created event %s with state: %s",
- event.event_id, context.prev_state_ids,
+ "Created event %s",
+ event.event_id,
)
defer.returnValue(
@@ -793,7 +615,7 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
dump = frozendict_json_encoder.encode(event.content)
- simplejson.loads(dump)
+ json.loads(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
@@ -805,10 +627,9 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
- yield send_event_to_master(
- self.http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ yield self.send_event_to_master(
+ event_id=event.event_id,
+ store=self.store,
requester=requester,
event=event,
context=context,
@@ -883,9 +704,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
state_to_include_ids = [
e_id
- for k, e_id in iteritems(context.current_state_ids)
+ for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -921,8 +744,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -942,26 +766,23 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
- if event.type == EventTypes.Create and context.prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
+ if event.type == EventTypes.Create:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ if prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id
+ self.pusher_pool.on_new_notifications(
+ event_stream_id, max_stream_id,
)
- @defer.inlineCallbacks
def _notify():
- yield run_on_reactor()
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..5170d093e3
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,298 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async_helpers import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+ """Object tracking the status of a purge request
+
+ This class contains information on the progress of a purge request, for
+ return by get_purge_status.
+
+ Attributes:
+ status (int): Tracks whether this request has completed. One of
+ STATUS_{ACTIVE,COMPLETE,FAILED}
+ """
+
+ STATUS_ACTIVE = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+
+ STATUS_TEXT = {
+ STATUS_ACTIVE: "active",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ }
+
+ def __init__(self):
+ self.status = PurgeStatus.STATUS_ACTIVE
+
+ def asdict(self):
+ return {
+ "status": PurgeStatus.STATUS_TEXT[self.status]
+ }
+
+
+class PaginationHandler(object):
+ """Handles pagination and purge history requests.
+
+ These are in the same handler due to the fact we need to block clients
+ paginating during a purge.
+ """
+
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ self.pagination_lock = ReadWriteLock()
+ self._purges_in_progress_by_room = set()
+ # map from purge id to PurgeStatus
+ self._purges_by_id = {}
+
+ def start_purge_history(self, room_id, token,
+ delete_local_events=False):
+ """Start off a history purge on a room.
+
+ Args:
+ room_id (str): The room to purge from
+
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ str: unique ID for this purge transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400,
+ "History purge already in progress for %s" % (room_id, ),
+ )
+
+ purge_id = random_string(16)
+
+ # we log the purge_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info("[purge] starting purge_id %s", purge_id)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+ run_in_background(
+ self._purge_history,
+ purge_id, room_id, token, delete_local_events,
+ )
+ return purge_id
+
+ @defer.inlineCallbacks
+ def _purge_history(self, purge_id, room_id, token,
+ delete_local_events):
+ """Carry out a history purge on a room.
+
+ Args:
+ purge_id (str): The id for this purge
+ room_id (str): The room to purge from
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ Deferred
+ """
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.purge_history(
+ room_id, token, delete_local_events,
+ )
+ logger.info("[purge] complete")
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ except Exception:
+ logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the purge from the list 24 hours after it completes
+ def clear_purge():
+ del self._purges_by_id[purge_id]
+ self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+ def get_purge_status(self, purge_id):
+ """Get the current status of an active purge
+
+ Args:
+ purge_id (str): purge_id returned by start_purge_history
+
+ Returns:
+ PurgeStatus|None
+ """
+ return self._purges_by_id.get(purge_id)
+
+ @defer.inlineCallbacks
+ def get_messages(self, requester, room_id=None, pagin_config=None,
+ as_client_event=True, event_filter=None):
+ """Get messages in a room.
+
+ Args:
+ requester (Requester): The user requesting messages.
+ room_id (str): The room they want messages from.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config rules to apply, if any.
+ as_client_event (bool): True to get events in client-server format.
+ event_filter (Filter): Filter to apply to results or None
+ Returns:
+ dict: Pagination API results
+ """
+ user_id = requester.user.to_string()
+
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
+ pagin_config.from_token = (
+ yield self.hs.get_event_sources().get_current_token_for_room(
+ room_id=room_id
+ )
+ )
+ room_token = pagin_config.from_token.room_key
+
+ room_token = RoomStreamToken.parse(room_token)
+
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ with (yield self.pagination_lock.read(room_id)):
+ membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id
+ )
+
+ if source_config.direction == 'b':
+ # if we're going backwards, we might need to backfill. This
+ # requires that we have a topo token.
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token(
+ room_id, room_token.stream
+ )
+
+ if membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < max_topo:
+ source_config.from_key = str(leave_token)
+
+ yield self.hs.get_handlers().federation_handler.maybe_backfill(
+ room_id, max_topo
+ )
+
+ events, next_key = yield self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=source_config.from_key,
+ to_key=source_config.to_key,
+ direction=source_config.direction,
+ limit=source_config.limit,
+ event_filter=event_filter,
+ )
+
+ next_token = pagin_config.from_token.copy_and_replace(
+ "room_key", next_key
+ )
+
+ if not events:
+ defer.returnValue({
+ "chunk": [],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ })
+
+ if event_filter:
+ events = event_filter.filter(events)
+
+ events = yield filter_events_for_client(
+ self.store,
+ user_id,
+ events,
+ is_peeking=(member_event_id is None),
+ )
+
+ state = None
+ if event_filter and event_filter.lazy_load_members():
+ # TODO: remove redundant members
+
+ types = [
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in events
+ )
+ ]
+
+ state_ids = yield self.store.get_state_ids_for_event(
+ events[0].event_id, types=types,
+ )
+
+ if state_ids:
+ state = yield self.store.get_events(list(state_ids.values()))
+
+ if state:
+ state = yield filter_events_for_client(
+ self.store,
+ user_id,
+ state.values(),
+ is_peeking=(member_event_id is None),
+ )
+
+ time_now = self.clock.time_msec()
+
+ chunk = {
+ "chunk": [
+ serialize_event(e, time_now, as_client_event)
+ for e in events
+ ],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+
+ if state:
+ chunk["state"] = [
+ serialize_event(e, time_now, as_client_event)
+ for e in state
+ ]
+
+ defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..ba3856674d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,27 +22,26 @@ The methods that define policy are:
- should_notify
"""
-from twisted.internet import defer, reactor
+import logging
from contextlib import contextmanager
-from six import itervalues, iteritems
+from six import iteritems, itervalues
+
+from prometheus_client import Counter
+
+from twisted.internet import defer
-from synapse.api.errors import SynapseError
from synapse.api.constants import PresenceState
+from synapse.api.errors import SynapseError
+from synapse.metrics import LaterGauge
from synapse.storage.presence import UserPresenceState
-
+from synapse.types import UserID, get_domain_from_id
+from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.util.async import Linearizer
from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domain_from_id
-from synapse.metrics import LaterGauge
-
-import logging
-
-from prometheus_client import Counter
logger = logging.getLogger(__name__)
@@ -96,6 +95,7 @@ class PresenceHandler(object):
Args:
hs (synapse.server.HomeServer):
"""
+ self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
@@ -179,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
- reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+ hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {}
self._next_serial = 1
@@ -231,6 +231,10 @@ class PresenceHandler(object):
earlier than they should when synapse is restarted. This affect of this
is some spurious presence changes that will self-correct.
"""
+ # If the DB pool has already terminated, don't try updating
+ if not self.hs.get_db_pool().running:
+ return
+
logger.info(
"Performing _on_shutdown. Persisting %d unpersisted changes",
len(self.user_to_current_state)
@@ -391,6 +395,10 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
+ # If presence is disabled, no-op
+ if not self.hs.config.use_presence:
+ return
+
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -420,6 +428,11 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
+ # Override if it should affect the user's presence, if presence is
+ # disabled.
+ if not self.hs.config.use_presence:
+ affect_presence = False
+
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -465,13 +478,16 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
- syncing_user_ids = {
- user_id for user_id, count in self.user_to_num_current_syncs.items()
- if count
- }
- for user_ids in self.external_process_to_current_syncs.values():
- syncing_user_ids.update(user_ids)
- return syncing_user_ids
+ if self.hs.config.use_presence:
+ syncing_user_ids = {
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count
+ }
+ for user_ids in self.external_process_to_current_syncs.values():
+ syncing_user_ids.update(user_ids)
+ return syncing_user_ids
+ else:
+ return set()
@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 3465a787ab..75b8b7ce6a 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,19 +17,31 @@ import logging
from twisted.internet import defer
-from synapse.api.errors import SynapseError, AuthError, CodeMessageException
+from synapse.api.errors import (
+ AuthError,
+ CodeMessageException,
+ Codes,
+ StoreError,
+ SynapseError,
+)
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, get_domain_from_id
+
from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class ProfileHandler(BaseHandler):
- PROFILE_UPDATE_MS = 60 * 1000
- PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
+class BaseProfileHandler(BaseHandler):
+ """Handles fetching and updating user profile information.
+
+ BaseProfileHandler can be instantiated directly on workers and will
+ delegate to master when necessary. The master process should use the
+ subclass MasterProfileHandler
+ """
def __init__(self, hs):
- super(ProfileHandler, self).__init__(hs)
+ super(BaseProfileHandler, self).__init__(hs)
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@@ -38,21 +50,21 @@ class ProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
- if hs.config.worker_app is None:
- self.clock.looping_call(
- self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
- )
-
@defer.inlineCallbacks
def get_profile(self, user_id):
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
- displayname = yield self.store.get_profile_displayname(
- target_user.localpart
- )
- avatar_url = yield self.store.get_profile_avatar_url(
- target_user.localpart
- )
+ try:
+ displayname = yield self.store.get_profile_displayname(
+ target_user.localpart
+ )
+ avatar_url = yield self.store.get_profile_avatar_url(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue({
"displayname": displayname,
@@ -72,7 +84,6 @@ class ProfileHandler(BaseHandler):
except CodeMessageException as e:
if e.code != 404:
logger.exception("Failed to get displayname")
-
raise
@defer.inlineCallbacks
@@ -83,12 +94,17 @@ class ProfileHandler(BaseHandler):
"""
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
- displayname = yield self.store.get_profile_displayname(
- target_user.localpart
- )
- avatar_url = yield self.store.get_profile_avatar_url(
- target_user.localpart
- )
+ try:
+ displayname = yield self.store.get_profile_displayname(
+ target_user.localpart
+ )
+ avatar_url = yield self.store.get_profile_avatar_url(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue({
"displayname": displayname,
@@ -101,9 +117,14 @@ class ProfileHandler(BaseHandler):
@defer.inlineCallbacks
def get_displayname(self, target_user):
if self.hs.is_mine(target_user):
- displayname = yield self.store.get_profile_displayname(
- target_user.localpart
- )
+ try:
+ displayname = yield self.store.get_profile_displayname(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue(displayname)
else:
@@ -120,7 +141,6 @@ class ProfileHandler(BaseHandler):
except CodeMessageException as e:
if e.code != 404:
logger.exception("Failed to get displayname")
-
raise
except Exception:
logger.exception("Failed to get displayname")
@@ -155,10 +175,14 @@ class ProfileHandler(BaseHandler):
@defer.inlineCallbacks
def get_avatar_url(self, target_user):
if self.hs.is_mine(target_user):
- avatar_url = yield self.store.get_profile_avatar_url(
- target_user.localpart
- )
-
+ try:
+ avatar_url = yield self.store.get_profile_avatar_url(
+ target_user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue(avatar_url)
else:
try:
@@ -211,16 +235,20 @@ class ProfileHandler(BaseHandler):
just_field = args.get("field", None)
response = {}
+ try:
+ if just_field is None or just_field == "displayname":
+ response["displayname"] = yield self.store.get_profile_displayname(
+ user.localpart
+ )
- if just_field is None or just_field == "displayname":
- response["displayname"] = yield self.store.get_profile_displayname(
- user.localpart
- )
-
- if just_field is None or just_field == "avatar_url":
- response["avatar_url"] = yield self.store.get_profile_avatar_url(
- user.localpart
- )
+ if just_field is None or just_field == "avatar_url":
+ response["avatar_url"] = yield self.store.get_profile_avatar_url(
+ user.localpart
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+ raise
defer.returnValue(response)
@@ -253,6 +281,26 @@ class ProfileHandler(BaseHandler):
room_id, str(e.message)
)
+
+class MasterProfileHandler(BaseProfileHandler):
+ PROFILE_UPDATE_MS = 60 * 1000
+ PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
+
+ def __init__(self, hs):
+ super(MasterProfileHandler, self).__init__(hs)
+
+ assert hs.config.worker_app is None
+
+ self.clock.looping_call(
+ self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+ )
+
+ def _start_update_remote_profile_cache(self):
+ return run_as_background_process(
+ "Update remote profile", self._update_remote_profile_cache,
+ )
+
+ @defer.inlineCallbacks
def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we haven't
checked in a while.
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 5142ae153d..32108568c6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -13,13 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import BaseHandler
+import logging
from twisted.internet import defer
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
+
+from ._base import BaseHandler
-import logging
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 2e0672161c..a6f3181f09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -12,17 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util import logcontext
-
-from ._base import BaseHandler
+import logging
from twisted.internet import defer
-from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import get_domain_from_id
+from synapse.util import logcontext
-import logging
-
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -118,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
affected_room_ids = list(set([r["room_id"] for r in receipts]))
- with PreserveLoggingContext():
- self.notifier.on_new_event(
- "receipt_key", max_batch_id, rooms=affected_room_ids
- )
- # Note that the min here shouldn't be relied upon to be accurate.
- self.hs.get_pusherpool().on_new_receipts(
- min_batch_id, max_batch_id, affected_room_ids
- )
+ self.notifier.on_new_event(
+ "receipt_key", max_batch_id, rooms=affected_room_ids
+ )
+ # Note that the min here shouldn't be relied upon to be accurate.
+ self.hs.get_pusherpool().on_new_receipts(
+ min_batch_id, max_batch_id, affected_room_ids,
+ )
- defer.returnValue(True)
+ defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7e52adda3c..1e53f2c635 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -18,14 +18,19 @@ import logging
from twisted.internet import defer
+from synapse import types
from synapse.api.errors import (
- AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
+ AuthError,
+ Codes,
+ InvalidCaptchaError,
+ RegistrationError,
+ SynapseError,
)
from synapse.http.client import CaptchaServerHttpClient
-from synapse import types
-from synapse.types import UserID, create_requester, RoomID, RoomAlias
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.util.async_helpers import Linearizer
from synapse.util.threepids import check_3pid_allowed
+
from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -40,7 +45,7 @@ class RegistrationHandler(BaseHandler):
hs (synapse.server.HomeServer):
"""
super(RegistrationHandler, self).__init__(hs)
-
+ self.hs = hs
self.auth = hs.get_auth()
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
@@ -120,13 +125,14 @@ class RegistrationHandler(BaseHandler):
guest_access_token=None,
make_guest=False,
admin=False,
+ threepid=None,
):
"""Registers a new client on the server.
Args:
localpart : The local part of the user ID to register. If None,
one will be generated.
- password (str) : The password to assign to this user so they can
+ password (unicode) : The password to assign to this user so they can
login again. This can be None which means they cannot login again
via a password (e.g. the user is an application service user).
generate_token (bool): Whether a new access token should be
@@ -139,7 +145,8 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
- yield run_on_reactor()
+
+ yield self.auth.check_auth_blocking(threepid=threepid)
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@@ -284,6 +291,7 @@ class RegistrationHandler(BaseHandler):
400,
"User ID can only contain characters a-z, 0-9, or '=_-./'",
)
+ yield self.auth.check_auth_blocking()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
@@ -431,11 +439,9 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
- yield run_on_reactor()
-
if localpart is None:
raise SynapseError(400, "Request must include user id")
-
+ yield self.auth.check_auth_blocking()
need_register = True
try:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2abd63ad05..c3f820b975 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,23 +15,29 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
-from twisted.internet import defer
+import itertools
+import logging
+import math
+import string
+from collections import OrderedDict
-from ._base import BaseHandler
+from six import string_types
+
+from twisted.internet import defer
-from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
- EventTypes, JoinRules, RoomCreationPreset
+ DEFAULT_ROOM_VERSION,
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ JoinRules,
+ RoomCreationPreset,
)
-from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
-from collections import OrderedDict
-
-import logging
-import math
-import string
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -92,15 +98,34 @@ class RoomCreationHandler(BaseHandler):
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
+ ResourceLimitError if server is blocked to some resource being
+ exceeded
"""
user_id = requester.user.to_string()
+ self.auth.check_auth_blocking(user_id)
+
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")
if ratelimit:
yield self.ratelimit(requester)
+ room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+ if not isinstance(room_version, string_types):
+ raise SynapseError(
+ 400,
+ "room_version must be a string",
+ Codes.BAD_JSON,
+ )
+
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ raise SynapseError(
+ 400,
+ "Your homeserver does not support this room version",
+ Codes.UNSUPPORTED_ROOM_VERSION,
+ )
+
if "room_alias_name" in config:
for wchar in string.whitespace:
if wchar in config["room_alias_name"]:
@@ -115,7 +140,11 @@ class RoomCreationHandler(BaseHandler):
)
if mapping:
- raise SynapseError(400, "Room alias already taken")
+ raise SynapseError(
+ 400,
+ "Room alias already taken",
+ Codes.ROOM_IN_USE
+ )
else:
room_alias = None
@@ -182,6 +211,9 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
+ # override any attempt to set room versions via the creation_content
+ creation_content["room_version"] = room_version
+
room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(
@@ -394,9 +426,13 @@ class RoomCreationHandler(BaseHandler):
)
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+
@defer.inlineCallbacks
- def get_event_context(self, user, room_id, event_id, limit):
+ def get_event_context(self, user, room_id, event_id, limit, event_filter):
"""Retrieves events, pagination tokens and state around a given event
in a room.
@@ -406,6 +442,8 @@ class RoomContextHandler(BaseHandler):
event_id (str)
limit (int): The maximum number of events to return in total
(excluding state).
+ event_filter (Filter|None): the filter to apply to the events returned
+ (excluding the target event_id)
Returns:
dict, or None if the event isn't found
@@ -413,8 +451,6 @@ class RoomContextHandler(BaseHandler):
before_limit = math.floor(limit / 2.)
after_limit = limit - before_limit
- now_token = yield self.hs.get_event_sources().get_current_token()
-
users = yield self.store.get_users_in_room(room_id)
is_peeking = user.to_string() not in users
@@ -440,7 +476,7 @@ class RoomContextHandler(BaseHandler):
)
results = yield self.store.get_events_around(
- room_id, event_id, before_limit, after_limit
+ room_id, event_id, before_limit, after_limit, event_filter
)
results["events_before"] = yield filter_evts(results["events_before"])
@@ -452,16 +488,35 @@ class RoomContextHandler(BaseHandler):
else:
last_event_id = event_id
+ types = None
+ filtered_types = None
+ if event_filter and event_filter.lazy_load_members():
+ members = set(ev.sender for ev in itertools.chain(
+ results["events_before"],
+ (results["event"],),
+ results["events_after"],
+ ))
+ filtered_types = [EventTypes.Member]
+ types = [(EventTypes.Member, member) for member in members]
+
+ # XXX: why do we return the state as of the last event rather than the
+ # first? Shouldn't we be consistent with /sync?
+ # https://github.com/matrix-org/matrix-doc/issues/687
+
state = yield self.store.get_state_for_events(
- [last_event_id], None
+ [last_event_id], types, filtered_types=filtered_types,
)
results["state"] = list(state[last_event_id].values())
- results["start"] = now_token.copy_and_replace(
+ # We use a dummy token here as we only care about the room portion of
+ # the token, which we replace.
+ token = StreamToken.START
+
+ results["start"] = token.copy_and_replace(
"room_key", results["start"]
).to_string()
- results["end"] = now_token.copy_and_replace(
+ results["end"] = token.copy_and_replace(
"room_key", results["end"]
).to_string()
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index fc507cef36..37e41afd61 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -13,26 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
+from collections import namedtuple
from six import iteritems
from six.moves import range
-from ._base import BaseHandler
+import msgpack
+from unpaddedbase64 import decode_base64, encode_base64
+
+from twisted.internet import defer
-from synapse.api.constants import (
- EventTypes, JoinRules,
-)
-from synapse.util.async import concurrently_execute
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.types import ThirdPartyInstanceID
+from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
-from synapse.types import ThirdPartyInstanceID
-
-from collections import namedtuple
-from unpaddedbase64 import encode_base64, decode_base64
-import logging
-import msgpack
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -40,7 +38,7 @@ REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
# This is used to indicate we should only return rooms published to the main list.
-EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
+EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
@@ -52,7 +50,7 @@ class RoomListHandler(BaseHandler):
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
- network_tuple=EMTPY_THIRD_PARTY_ID,):
+ network_tuple=EMPTY_THIRD_PARTY_ID,):
"""Generate a local public room list.
There are multiple different lists: the main one plus one per third
@@ -89,7 +87,7 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
search_filter=None,
- network_tuple=EMTPY_THIRD_PARTY_ID,):
+ network_tuple=EMPTY_THIRD_PARTY_ID,):
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
else:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f930e939e8..f643619047 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -21,19 +21,17 @@ from six.moves import http_client
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
-from twisted.internet import defer
from unpaddedbase64 import decode_base64
+from twisted.internet import defer
+
import synapse.server
import synapse.types
-from synapse.api.constants import (
- EventTypes, Membership,
-)
-from synapse.api.errors import AuthError, SynapseError, Codes
-from synapse.types import UserID, RoomID
-from synapse.util.async import Linearizer
-from synapse.util.distributor import user_left_room, user_joined_room
-
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.types import RoomID, UserID
+from synapse.util.async_helpers import Linearizer
+from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)
@@ -203,7 +201,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
None
)
@@ -344,6 +344,7 @@ class RoomMemberHandler(object):
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
+
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)
@@ -498,9 +499,10 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
- guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+ guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@@ -519,7 +521,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key),
None
)
@@ -707,6 +709,10 @@ class RoomMemberHandler(object):
inviter_display_name = member_event.content.get("displayname", "")
inviter_avatar_url = member_event.content.get("avatar_url", "")
+ # if user has no display name, default to their MXID
+ if not inviter_display_name:
+ inviter_display_name = user.to_string()
+
canonical_room_alias = ""
canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
if canonical_alias_event:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 493aec1e48..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,15 +20,24 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import (
- remote_join, remote_reject_invite, get_or_register_3pid_guest,
- notify_user_membership_change,
+ ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+ ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+ ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+ ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
)
-
logger = logging.getLogger(__name__)
class RoomMemberWorkerHandler(RoomMemberHandler):
+ def __init__(self, hs):
+ super(RoomMemberWorkerHandler, self).__init__(hs)
+
+ self._get_register_3pid_client = Repl3PID.make_client(hs)
+ self._remote_join_client = ReplRemoteJoin.make_client(hs)
+ self._remote_reject_client = ReplRejectInvite.make_client(hs)
+ self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
@@ -36,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
- ret = yield remote_join(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ ret = yield self._remote_join_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -54,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
- return remote_reject_invite(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._remote_reject_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -67,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="joined",
@@ -79,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="left",
@@ -91,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest
"""
- return get_or_register_3pid_guest(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._get_register_3pid_client(
requester=requester,
medium=medium,
address=address,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 1eca26aa1e..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -13,21 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import itertools
+import logging
-from ._base import BaseHandler
+from unpaddedbase64 import decode_base64, encode_base64
-from synapse.api.constants import Membership, EventTypes
-from synapse.api.filtering import Filter
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
+from synapse.api.filtering import Filter
from synapse.events.utils import serialize_event
from synapse.visibility import filter_events_for_client
-from unpaddedbase64 import decode_base64, encode_base64
-
-import itertools
-import logging
-
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -64,6 +63,13 @@ class SearchHandler(BaseHandler):
except Exception:
raise SynapseError(400, "Invalid batch")
+ logger.info(
+ "Search batch properties: %r, %r, %r",
+ batch_group, batch_group_key, batch_token,
+ )
+
+ logger.info("Search content: %s", content)
+
try:
room_cat = content["search_categories"]["room_events"]
@@ -271,6 +277,8 @@ class SearchHandler(BaseHandler):
# We should never get here due to the guard earlier.
raise NotImplementedError()
+ logger.info("Found %d events to return", len(allowed_events))
+
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
@@ -279,7 +287,12 @@ class SearchHandler(BaseHandler):
contexts = {}
for event in allowed_events:
res = yield self.store.get_events_around(
- event.room_id, event.event_id, before_limit, after_limit
+ event.room_id, event.event_id, before_limit, after_limit,
+ )
+
+ logger.info(
+ "Context for search returned %d and %d events",
+ len(res["events_before"]), len(res["events_after"]),
)
res["events_before"] = yield filter_events_for_client(
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index e057ae54c9..7ecdede4dc 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -17,6 +17,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import Codes, StoreError, SynapseError
+
from ._base import BaseHandler
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 51ec727df0..ef20c2296c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,24 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.api.constants import Membership, EventTypes
-from synapse.util.async import concurrently_execute
+import collections
+import itertools
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.push.clientformat import format_push_rules_for_user
+from synapse.types import RoomStreamToken
+from synapse.util.async_helpers import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
+from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
-from synapse.util.caches.response_cache import ResponseCache
-from synapse.push.clientformat import format_push_rules_for_user
from synapse.visibility import filter_events_for_client
-from synapse.types import RoomStreamToken
-from twisted.internet import defer
-
-import collections
-import logging
-import itertools
+logger = logging.getLogger(__name__)
-from six import itervalues, iteritems
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
-logger = logging.getLogger(__name__)
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncConfig = collections.namedtuple("SyncConfig", [
@@ -64,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"ephemeral",
"account_data",
"unread_notifications",
+ "summary",
])):
__slots__ = []
@@ -145,7 +157,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
- "device_lists", # List of user_ids whose devices have chanegd
+ "device_lists", # List of user_ids whose devices have changed
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",
@@ -173,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
class SyncHandler(object):
def __init__(self, hs):
+ self.hs_config = hs.config
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
@@ -180,20 +193,35 @@ class SyncHandler(object):
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
+
+ # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+ self.lazy_loaded_members_cache = ExpiringCache(
+ "lazy_loaded_members_cache", self.clock,
+ max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+ )
+ @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
- A Deferred SyncResult.
+ Deferred[SyncResult]
"""
- return self.response_cache.wrap(
+ # If the user is not part of the mau group, then check that limits have
+ # not been exceeded (if not part of the group by this point, almost certain
+ # auth_blocking will occur)
+ user_id = sync_config.user.to_string()
+ yield self.auth.check_auth_blocking(user_id)
+
+ res = yield self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
+ defer.returnValue(res)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@@ -416,29 +444,44 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event):
+ def get_state_after_event(self, event, types=None, filtered_types=None):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+ state_ids = yield self.store.get_state_ids_for_event(
+ event.event_id, types, filtered_types=filtered_types,
+ )
if event.is_state():
state_ids = state_ids.copy()
state_ids[(event.type, event.state_key)] = event.event_id
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position):
+ def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
@@ -453,7 +496,9 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
- state = yield self.get_state_after_event(last_event)
+ state = yield self.get_state_after_event(
+ last_event, types, filtered_types=filtered_types,
+ )
else:
# no events in this room - so presumably no state
@@ -461,9 +506,141 @@ class SyncHandler(object):
defer.returnValue(state)
@defer.inlineCallbacks
+ def compute_summary(self, room_id, sync_config, batch, state, now_token):
+ """ Works out a room summary block for this room, summarising the number
+ of joined members in the room, and providing the 'hero' members if the
+ room has no name so clients can consistently name rooms. Also adds
+ state events to 'state' if needed to describe the heroes.
+
+ Args:
+ room_id(str):
+ sync_config(synapse.handlers.sync.SyncConfig):
+ batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+ the room that will be sent to the user.
+ state(dict): dict of (type, state_key) -> Event as returned by
+ compute_state_delta
+ now_token(str): Token of the end of the current batch.
+
+ Returns:
+ A deferred dict describing the room summary
+ """
+
+ # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
+ last_events, _ = yield self.store.get_recent_event_ids_for_room(
+ room_id, end_token=now_token.room_key, limit=1,
+ )
+
+ if not last_events:
+ defer.returnValue(None)
+ return
+
+ last_event = last_events[-1]
+ state_ids = yield self.store.get_state_ids_for_event(
+ last_event.event_id, [
+ (EventTypes.Member, None),
+ (EventTypes.Name, ''),
+ (EventTypes.CanonicalAlias, ''),
+ ]
+ )
+
+ member_ids = {
+ state_key: event_id
+ for (t, state_key), event_id in state_ids.iteritems()
+ if t == EventTypes.Member
+ }
+ name_id = state_ids.get((EventTypes.Name, ''))
+ canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+
+ summary = {}
+
+ # FIXME: it feels very heavy to load up every single membership event
+ # just to calculate the counts.
+ member_events = yield self.store.get_events(member_ids.values())
+
+ joined_user_ids = []
+ invited_user_ids = []
+
+ for ev in member_events.values():
+ if ev.content.get("membership") == Membership.JOIN:
+ joined_user_ids.append(ev.state_key)
+ elif ev.content.get("membership") == Membership.INVITE:
+ invited_user_ids.append(ev.state_key)
+
+ # TODO: only send these when they change.
+ summary["m.joined_member_count"] = len(joined_user_ids)
+ summary["m.invited_member_count"] = len(invited_user_ids)
+
+ if name_id or canonical_alias_id:
+ defer.returnValue(summary)
+
+ # FIXME: order by stream ordering, not alphabetic
+
+ me = sync_config.user.to_string()
+ if (joined_user_ids or invited_user_ids):
+ summary['m.heroes'] = sorted(
+ [
+ user_id
+ for user_id in (joined_user_ids + invited_user_ids)
+ if user_id != me
+ ]
+ )[0:5]
+ else:
+ summary['m.heroes'] = sorted(
+ [user_id for user_id in member_ids.keys() if user_id != me]
+ )[0:5]
+
+ if not sync_config.filter_collection.lazy_load_members():
+ defer.returnValue(summary)
+
+ # ensure we send membership events for heroes if needed
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.get_lazy_loaded_members_cache(cache_key)
+
+ # track which members the client should already know about via LL:
+ # Ones which are already in state...
+ existing_members = set(
+ user_id for (typ, user_id) in state.keys()
+ if typ == EventTypes.Member
+ )
+
+ # ...or ones which are in the timeline...
+ for ev in batch.events:
+ if ev.type == EventTypes.Member:
+ existing_members.add(ev.state_key)
+
+ # ...and then ensure any missing ones get included in state.
+ missing_hero_event_ids = [
+ member_ids[hero_id]
+ for hero_id in summary['m.heroes']
+ if (
+ cache.get(hero_id) != member_ids[hero_id] and
+ hero_id not in existing_members
+ )
+ ]
+
+ missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+ missing_hero_state = missing_hero_state.values()
+
+ for s in missing_hero_state:
+ cache.set(s.state_key, s.event_id)
+ state[(EventTypes.Member, s.state_key)] = s
+
+ defer.returnValue(summary)
+
+ def get_lazy_loaded_members_cache(self, cache_key):
+ cache = self.lazy_loaded_members_cache.get(cache_key)
+ if cache is None:
+ logger.debug("creating LruCache for %r", cache_key)
+ cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+ self.lazy_loaded_members_cache[cache_key] = cache
+ else:
+ logger.debug("found LruCache for %r", cache_key)
+ return cache
+
+ @defer.inlineCallbacks
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
full_state):
- """ Works out the differnce in state between the start of the timeline
+ """ Works out the difference in state between the start of the timeline
and the previous sync.
Args:
@@ -477,7 +654,7 @@ class SyncHandler(object):
full_state(bool): Whether to force returning the full state.
Returns:
- A deferred new event dictionary
+ A deferred dict of (type, state_key) -> Event
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
@@ -485,59 +662,130 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+
+ types = None
+ filtered_types = None
+
+ lazy_load_members = sync_config.filter_collection.lazy_load_members()
+ include_redundant_members = (
+ sync_config.filter_collection.include_redundant_members()
+ )
+
+ if lazy_load_members:
+ # We only request state for the members needed to display the
+ # timeline:
+
+ types = [
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in batch.events
+ )
+ ]
+
+ # only apply the filtering to room members
+ filtered_types = [EventTypes.Member]
+
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events if event.is_state()
+ }
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
+
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token
+ room_id, stream_position=now_token, types=types,
+ filtered_types=filtered_types,
)
state_ids = current_state_ids
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
previous={},
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
+ room_id, stream_position=since_token, types=types,
+ filtered_types=filtered_types,
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
+ if lazy_load_members:
+ if types:
+ # We're returning an incremental sync, with no "gap" since
+ # the previous sync, so normally there would be no state to return
+ # But we're lazy-loading, so the client might need some more
+ # member events to understand the events in this timeline.
+ # So we fish out all the member events corresponding to the
+ # timeline here, and then dedupe any redundant ones below.
+
+ state_ids = yield self.store.get_state_ids_for_event(
+ batch.events[0].event_id, types=types,
+ filtered_types=None, # we only want members!
+ )
+
+ if lazy_load_members and not include_redundant_members:
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.get_lazy_loaded_members_cache(cache_key)
+
+ # if it's a new sync sequence, then assume the client has had
+ # amnesia and doesn't want any recent lazy-loaded members
+ # de-duplicated.
+ if since_token is None:
+ logger.debug("clearing LruCache for %r", cache_key)
+ cache.clear()
+ else:
+ # only send members which aren't in our LruCache (either
+ # because they're new to this client or have been pushed out
+ # of the cache)
+ logger.debug("filtering state from %r...", state_ids)
+ state_ids = {
+ t: event_id
+ for t, event_id in state_ids.iteritems()
+ if cache.get(t[1]) != event_id
+ }
+ logger.debug("...to %r", state_ids)
+
+ # add any member IDs we are about to send into our LruCache
+ for t, event_id in itertools.chain(
+ state_ids.items(),
+ timeline_state.items(),
+ ):
+ if t[0] == EventTypes.Member:
+ cache.set(t[1], event_id)
state = {}
if state_ids:
@@ -620,7 +868,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
- if not block_all_presence_data:
+ if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
@@ -1312,7 +1560,6 @@ class SyncHandler(object):
if events == [] and tags is None:
return
- since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config
@@ -1355,6 +1602,18 @@ class SyncHandler(object):
full_state=full_state
)
+ summary = {}
+ if (
+ sync_config.filter_collection.lazy_load_members() and
+ (
+ any(ev.type == EventTypes.Member for ev in batch.events) or
+ since_token is None
+ )
+ ):
+ summary = yield self.compute_summary(
+ room_id, sync_config, batch, state, now_token
+ )
+
if room_builder.rtype == "joined":
unread_notifications = {}
room_sync = JoinedSyncResult(
@@ -1364,6 +1623,7 @@ class SyncHandler(object):
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
+ summary=summary,
)
if room_sync or always_include:
@@ -1448,7 +1708,9 @@ def _action_has_highlight(actions):
return False
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+ timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
"""Works out what state to include in a sync response.
Args:
@@ -1457,6 +1719,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
previous (dict): state at the end of the previous sync (or empty dict
if this is an initial sync)
current (dict): state at the end of the timeline
+ lazy_load_members (bool): whether to return members from timeline_start
+ or not. assumes that timeline_start has already been filtered to
+ include only the members the client needs to know about.
Returns:
dict
@@ -1472,9 +1737,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
}
c_ids = set(e for e in current.values())
- tc_ids = set(e for e in timeline_contains.values())
- p_ids = set(e for e in previous.values())
ts_ids = set(e for e in timeline_start.values())
+ p_ids = set(e for e in previous.values())
+ tc_ids = set(e for e in timeline_contains.values())
+
+ # If we are lazyloading room members, we explicitly add the membership events
+ # for the senders in the timeline into the state block returned by /sync,
+ # as we may not have sent them to the client before. We find these membership
+ # events by filtering them out of timeline_start, which has already been filtered
+ # to only include membership events for the senders in the timeline.
+ # In practice, we can do this by removing them from the p_ids list,
+ # which is the list of relevant state we know we have already sent to the client.
+ # see https://github.com/matrix-org/synapse/pull/2970
+ # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+ if lazy_load_members:
+ p_ids.difference_update(
+ e for t, e in timeline_start.iteritems()
+ if t[0] == EventTypes.Member
+ )
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 5d9736e88f..2d2d3d5a0d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -13,17 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+from collections import namedtuple
+
from twisted.internet import defer
-from synapse.api.errors import SynapseError, AuthError
+from synapse.api.errors import AuthError, SynapseError
+from synapse.types import UserID, get_domain_from_id
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domain_from_id
-
-import logging
-
-from collections import namedtuple
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a39f0f7343..d8413d6aa7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,15 +14,15 @@
# limitations under the License.
import logging
+
+from six import iteritems
+
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
-from synapse.util.metrics import Measure
-from synapse.util.async import sleep
from synapse.types import get_localpart_from_id
-
-from six import iteritems
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -119,6 +119,8 @@ class UserDirectoryHandler(object):
"""Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in.
"""
+ # FIXME(#3714): We should probably do this in the same worker as all
+ # the other changes.
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None,
)
@@ -127,6 +129,8 @@ class UserDirectoryHandler(object):
def handle_user_deactivated(self, user_id):
"""Called when a user ID is deactivated
"""
+ # FIXME(#3714): We should probably do this in the same worker as all
+ # the other changes.
yield self.store.remove_from_user_dir(user_id)
yield self.store.remove_from_user_in_public_room(user_id)
@@ -174,7 +178,7 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
- yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
@@ -188,7 +192,7 @@ class UserDirectoryHandler(object):
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
- yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
logger.info("Processed all users")
@@ -236,7 +240,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
@@ -251,7 +255,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
- yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+ yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)
|