diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9e912fdfbe..44e38b777a 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError
-from synapse.types import Requester, RoomID, UserID, EventID
+from synapse.types import Requester, UserID, get_domian_from_id
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.metrics import Measure
@@ -91,8 +91,8 @@ class Auth(object):
"Room %r does not exist" % (event.room_id,)
)
- creating_domain = RoomID.from_string(event.room_id).domain
- originating_domain = UserID.from_string(event.sender).domain
+ creating_domain = get_domian_from_id(event.room_id)
+ originating_domain = get_domian_from_id(event.sender)
if creating_domain != originating_domain:
if not self.can_federate(event, auth_events):
raise AuthError(
@@ -219,7 +219,7 @@ class Auth(object):
for event in curr_state.values():
if event.type == EventTypes.Member:
try:
- if UserID.from_string(event.state_key).domain != host:
+ if get_domian_from_id(event.state_key) != host:
continue
except:
logger.warn("state_key not user_id: %s", event.state_key)
@@ -266,8 +266,8 @@ class Auth(object):
target_user_id = event.state_key
- creating_domain = RoomID.from_string(event.room_id).domain
- target_domain = UserID.from_string(target_user_id).domain
+ creating_domain = get_domian_from_id(event.room_id)
+ target_domain = get_domian_from_id(target_user_id)
if creating_domain != target_domain:
if not self.can_federate(event, auth_events):
raise AuthError(
@@ -612,7 +612,8 @@ class Auth(object):
def get_user_from_macaroon(self, macaroon_str):
try:
macaroon = pymacaroons.Macaroon.deserialize(macaroon_str)
- self.validate_macaroon(macaroon, "access", False)
+
+ self.validate_macaroon(macaroon, "access", self.hs.config.expire_access_token)
user_prefix = "user_id = "
user = None
@@ -889,8 +890,8 @@ class Auth(object):
if user_level >= redact_level:
return False
- redacter_domain = EventID.from_string(event.event_id).domain
- redactee_domain = EventID.from_string(event.redacts).domain
+ redacter_domain = get_domian_from_id(event.event_id)
+ redactee_domain = get_domian_from_id(event.redacts)
if redacter_domain == redactee_domain:
return True
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index cd699ef27f..4f5a4281fa 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -15,6 +15,8 @@
from synapse.api.errors import SynapseError
from synapse.types import UserID, RoomID
+from twisted.internet import defer
+
import ujson as json
@@ -24,10 +26,10 @@ class Filtering(object):
super(Filtering, self).__init__()
self.store = hs.get_datastore()
+ @defer.inlineCallbacks
def get_user_filter(self, user_localpart, filter_id):
- result = self.store.get_user_filter(user_localpart, filter_id)
- result.addCallback(FilterCollection)
- return result
+ result = yield self.store.get_user_filter(user_localpart, filter_id)
+ defer.returnValue(FilterCollection(result))
def add_user_filter(self, user_localpart, user_filter):
self.check_valid_filter(user_filter)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index b5339f030d..135dd58c15 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -20,11 +20,14 @@ from synapse.server import HomeServer
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
+from synapse.config.emailconfig import EmailConfig
from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.storage.roommember import RoomMemberStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.storage.engines import create_engine
from synapse.storage import DataStore
from synapse.util.async import sleep
@@ -58,6 +61,7 @@ class SlaveConfig(DatabaseConfig):
self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file"))
+ self.public_baseurl = config["public_baseurl"]
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("pusher.pid")
@@ -91,12 +95,13 @@ class SlaveConfig(DatabaseConfig):
""" % locals()
-class PusherSlaveConfig(SlaveConfig, LoggingConfig):
+class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
pass
class PusherSlaveStore(
- SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore
+ SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
+ SlavedAccountDataStore
):
update_pusher_last_stream_ordering_and_success = (
DataStore.update_pusher_last_stream_ordering_and_success.__func__
@@ -110,6 +115,31 @@ class PusherSlaveStore(
DataStore.update_pusher_last_stream_ordering.__func__
)
+ get_throttle_params_by_room = (
+ DataStore.get_throttle_params_by_room.__func__
+ )
+
+ set_throttle_params = (
+ DataStore.set_throttle_params.__func__
+ )
+
+ get_time_of_last_push_action_before = (
+ DataStore.get_time_of_last_push_action_before.__func__
+ )
+
+ get_profile_displayname = (
+ DataStore.get_profile_displayname.__func__
+ )
+
+ # XXX: This is a bit broken because we don't persist forgotten rooms
+ # in a way that they can be streamed. This means that we don't have a
+ # way to invalidate the forgotten rooms cache correctly.
+ # For now we expire the cache every 10 minutes.
+ BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
+ who_forgot_in_room = (
+ RoomMemberStore.__dict__["who_forgot_in_room"]
+ )
+
class PusherServer(HomeServer):
@@ -189,6 +219,7 @@ class PusherServer(HomeServer):
store = self.get_datastore()
replication_url = self.config.replication_url
pusher_pool = self.get_pusherpool()
+ clock = self.get_clock()
def stop_pusher(user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
@@ -240,11 +271,21 @@ class PusherServer(HomeServer):
min_stream_id, max_stream_id, affected_room_ids
)
+ def expire_broken_caches():
+ store.who_forgot_in_room.invalidate_all()
+
+ next_expire_broken_caches_ms = 0
while True:
try:
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
+ now_ms = clock.time_msec()
+ if now_ms > next_expire_broken_caches_ms:
+ expire_broken_caches()
+ next_expire_broken_caches_ms = (
+ now_ms + store.BROKEN_CACHE_EXPIRY_MS
+ )
yield store.process_replication(result)
poke_pushers(result)
except:
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
new file mode 100644
index 0000000000..90bdd08f00
--- /dev/null
+++ b/synapse/config/emailconfig.py
@@ -0,0 +1,98 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file can't be called email.py because if it is, we cannot:
+import email.utils
+
+from ._base import Config
+
+
+class EmailConfig(Config):
+ def read_config(self, config):
+ self.email_enable_notifs = False
+
+ email_config = config.get("email", {})
+ self.email_enable_notifs = email_config.get("enable_notifs", False)
+
+ if self.email_enable_notifs:
+ # make sure we can import the required deps
+ import jinja2
+ import bleach
+ # prevent unused warnings
+ jinja2
+ bleach
+
+ required = [
+ "smtp_host",
+ "smtp_port",
+ "notif_from",
+ "template_dir",
+ "notif_template_html",
+ "notif_template_text",
+ ]
+
+ missing = []
+ for k in required:
+ if k not in email_config:
+ missing.append(k)
+
+ if (len(missing) > 0):
+ raise RuntimeError(
+ "email.enable_notifs is True but required keys are missing: %s" %
+ (", ".join(["email." + k for k in missing]),)
+ )
+
+ if config.get("public_baseurl") is None:
+ raise RuntimeError(
+ "email.enable_notifs is True but no public_baseurl is set"
+ )
+
+ self.email_smtp_host = email_config["smtp_host"]
+ self.email_smtp_port = email_config["smtp_port"]
+ self.email_notif_from = email_config["notif_from"]
+ self.email_template_dir = email_config["template_dir"]
+ self.email_notif_template_html = email_config["notif_template_html"]
+ self.email_notif_template_text = email_config["notif_template_text"]
+ self.email_notif_for_new_users = email_config.get(
+ "notif_for_new_users", True
+ )
+ if "app_name" in email_config:
+ self.email_app_name = email_config["app_name"]
+ else:
+ self.email_app_name = "Matrix"
+
+ # make sure it's valid
+ parsed = email.utils.parseaddr(self.email_notif_from)
+ if parsed[1] == '':
+ raise RuntimeError("Invalid notif_from address")
+ else:
+ self.email_enable_notifs = False
+ # Not much point setting defaults for the rest: it would be an
+ # error for them to be used.
+
+ def default_config(self, config_dir_path, server_name, **kwargs):
+ return """
+ # Enable sending emails for notification events
+ #email:
+ # enable_notifs: false
+ # smtp_host: "localhost"
+ # smtp_port: 25
+ # notif_from: Your Friendly Matrix Home Server <noreply@example.com>
+ # app_name: Matrix
+ # template_dir: res/templates
+ # notif_template_html: notif_mail.html
+ # notif_template_text: notif_mail.txt
+ # notif_for_new_users: True
+ """
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 9a80ac39ec..fc2445484c 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -31,13 +31,14 @@ from .cas import CasConfig
from .password import PasswordConfig
from .jwt import JWTConfig
from .ldap import LDAPConfig
+from .emailconfig import EmailConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
- JWTConfig, LDAPConfig, PasswordConfig,):
+ JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
pass
diff --git a/synapse/config/key.py b/synapse/config/key.py
index a072aec714..6ee643793e 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -57,6 +57,8 @@ class KeyConfig(Config):
seed = self.signing_key[0].seed
self.macaroon_secret_key = hashlib.sha256(seed)
+ self.expire_access_token = config.get("expire_access_token", False)
+
def default_config(self, config_dir_path, server_name, is_generating_file=False,
**kwargs):
base_key_name = os.path.join(config_dir_path, server_name)
@@ -69,6 +71,9 @@ class KeyConfig(Config):
return """\
macaroon_secret_key: "%(macaroon_secret_key)s"
+ # Used to enable access token expiration.
+ expire_access_token: False
+
## Signing Keys ##
# Path to the signing key to sign messages with
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 87e500c97a..cc3f879857 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -32,6 +32,7 @@ class RegistrationConfig(Config):
)
self.registration_shared_secret = config.get("registration_shared_secret")
+ self.user_creation_max_duration = int(config["user_creation_max_duration"])
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"]
@@ -54,6 +55,11 @@ class RegistrationConfig(Config):
# secret, even if registration is otherwise disabled.
registration_shared_secret: "%(registration_shared_secret)s"
+ # Sets the expiry for the short term user creation in
+ # milliseconds. For instance the bellow duration is two weeks
+ # in milliseconds.
+ user_creation_max_duration: 1209600000
+
# Set the number of bcrypt rounds used to generate password hash.
# Larger numbers increase the work factor needed to generate the hash.
# The default number of rounds is 12.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 46c633548a..0b5f462e44 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -28,6 +28,11 @@ class ServerConfig(Config):
self.print_pidfile = config.get("print_pidfile")
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
+ self.public_baseurl = config.get("public_baseurl")
+
+ if self.public_baseurl is not None:
+ if self.public_baseurl[-1] != '/':
+ self.public_baseurl += '/'
self.start_pushers = config.get("start_pushers", True)
self.listeners = config.get("listeners", [])
@@ -143,6 +148,9 @@ class ServerConfig(Config):
# Whether to serve a web client from the HTTP/HTTPS root resource.
web_client: True
+ # The public-facing base URL for the client API (not including _matrix/...)
+ # public_baseurl: https://example.com:8448/
+
# Set the soft limit on the number of file descriptors synapse can use
# Zero is used to indicate synapse should set the soft limit to the
# hard limit.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 429ab6ddec..f1d231b9d8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -388,6 +388,11 @@ class FederationServer(FederationBase):
})
@log_function
+ def on_openid_userinfo(self, token):
+ ts_now_ms = self._clock.time_msec()
+ return self.store.get_user_id_for_open_id_token(token, ts_now_ms)
+
+ @log_function
def _get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 1928da03b3..5787f854d4 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,6 +20,7 @@ from .persistence import TransactionActions
from .units import Transaction
from synapse.api.errors import HttpResponseException
+from synapse.util.async import run_on_reactor
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.retryutils import (
@@ -199,6 +200,8 @@ class TransactionQueue(object):
@defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination):
+ yield run_on_reactor()
+
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d65a7893d8..5b6c7d11dd 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.api.errors import Codes, SynapseError
from synapse.http.server import JsonResource
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request, parse_string
from synapse.util.ratelimitutils import FederationRateLimiter
import functools
@@ -323,7 +323,7 @@ class FederationSendLeaveServlet(BaseFederationServlet):
class FederationEventAuthServlet(BaseFederationServlet):
- PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)"
+ PATH = "/event_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
def on_GET(self, origin, content, query, context, event_id):
return self.handler.on_event_auth(origin, context, event_id)
@@ -448,6 +448,50 @@ class On3pidBindServlet(BaseFederationServlet):
return code
+class OpenIdUserInfo(BaseFederationServlet):
+ """
+ Exchange a bearer token for information about a user.
+
+ The response format should be compatible with:
+ http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse
+
+ GET /openid/userinfo?access_token=ABDEFGH HTTP/1.1
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "sub": "@userpart:example.org",
+ }
+ """
+
+ PATH = "/openid/userinfo"
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ token = parse_string(request, "access_token")
+ if token is None:
+ defer.returnValue((401, {
+ "errcode": "M_MISSING_TOKEN", "error": "Access Token required"
+ }))
+ return
+
+ user_id = yield self.handler.on_openid_userinfo(token)
+
+ if user_id is None:
+ defer.returnValue((401, {
+ "errcode": "M_UNKNOWN_TOKEN",
+ "error": "Access Token unknown or expired"
+ }))
+
+ defer.returnValue((200, {"sub": user_id}))
+
+ # Avoid doing remote HS authorization checks which are done by default by
+ # BaseFederationServlet.
+ def _wrap(self, code):
+ return code
+
+
SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -468,6 +512,7 @@ SERVLET_CLASSES = (
FederationClientKeysClaimServlet,
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
+ OpenIdUserInfo,
)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 13a675b208..c904c6c500 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,13 +15,10 @@
from twisted.internet import defer
-from synapse.api.errors import LimitExceededError, SynapseError, AuthError
-from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.api.errors import LimitExceededError
from synapse.api.constants import Membership, EventTypes
-from synapse.types import UserID, RoomAlias, Requester
-from synapse.push.action_generator import ActionGenerator
+from synapse.types import UserID, Requester
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
import logging
@@ -29,23 +26,6 @@ import logging
logger = logging.getLogger(__name__)
-VISIBILITY_PRIORITY = (
- "world_readable",
- "shared",
- "invited",
- "joined",
-)
-
-
-MEMBERSHIP_PRIORITY = (
- Membership.JOIN,
- Membership.INVITE,
- Membership.KNOCK,
- Membership.LEAVE,
- Membership.BAN,
-)
-
-
class BaseHandler(object):
"""
Common base class for the event handlers.
@@ -65,161 +45,10 @@ class BaseHandler(object):
self.clock = hs.get_clock()
self.hs = hs
- self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.event_builder_factory = hs.get_event_builder_factory()
- @defer.inlineCallbacks
- def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
- """ Returns dict of user_id -> list of events that user is allowed to
- see.
-
- Args:
- user_tuples (str, bool): (user id, is_peeking) for each user to be
- checked. is_peeking should be true if:
- * the user is not currently a member of the room, and:
- * the user has not been a member of the room since the
- given events
- events ([synapse.events.EventBase]): list of events to filter
- """
- forgotten = yield defer.gatherResults([
- self.store.who_forgot_in_room(
- room_id,
- )
- for room_id in frozenset(e.room_id for e in events)
- ], consumeErrors=True)
-
- # Set of membership event_ids that have been forgotten
- event_id_forgotten = frozenset(
- row["event_id"] for rows in forgotten for row in rows
- )
-
- def allowed(event, user_id, is_peeking):
- """
- Args:
- event (synapse.events.EventBase): event to check
- user_id (str)
- is_peeking (bool)
- """
- state = event_id_to_state[event.event_id]
-
- # get the room_visibility at the time of the event.
- visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
- if visibility_event:
- visibility = visibility_event.content.get("history_visibility", "shared")
- else:
- visibility = "shared"
-
- if visibility not in VISIBILITY_PRIORITY:
- visibility = "shared"
-
- # if it was world_readable, it's easy: everyone can read it
- if visibility == "world_readable":
- return True
-
- # Always allow history visibility events on boundaries. This is done
- # by setting the effective visibility to the least restrictive
- # of the old vs new.
- if event.type == EventTypes.RoomHistoryVisibility:
- prev_content = event.unsigned.get("prev_content", {})
- prev_visibility = prev_content.get("history_visibility", None)
-
- if prev_visibility not in VISIBILITY_PRIORITY:
- prev_visibility = "shared"
-
- new_priority = VISIBILITY_PRIORITY.index(visibility)
- old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
- if old_priority < new_priority:
- visibility = prev_visibility
-
- # likewise, if the event is the user's own membership event, use
- # the 'most joined' membership
- membership = None
- if event.type == EventTypes.Member and event.state_key == user_id:
- membership = event.content.get("membership", None)
- if membership not in MEMBERSHIP_PRIORITY:
- membership = "leave"
-
- prev_content = event.unsigned.get("prev_content", {})
- prev_membership = prev_content.get("membership", None)
- if prev_membership not in MEMBERSHIP_PRIORITY:
- prev_membership = "leave"
-
- new_priority = MEMBERSHIP_PRIORITY.index(membership)
- old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
- if old_priority < new_priority:
- membership = prev_membership
-
- # otherwise, get the user's membership at the time of the event.
- if membership is None:
- membership_event = state.get((EventTypes.Member, user_id), None)
- if membership_event:
- if membership_event.event_id not in event_id_forgotten:
- membership = membership_event.membership
-
- # if the user was a member of the room at the time of the event,
- # they can see it.
- if membership == Membership.JOIN:
- return True
-
- if visibility == "joined":
- # we weren't a member at the time of the event, so we can't
- # see this event.
- return False
-
- elif visibility == "invited":
- # user can also see the event if they were *invited* at the time
- # of the event.
- return membership == Membership.INVITE
-
- else:
- # visibility is shared: user can also see the event if they have
- # become a member since the event
- #
- # XXX: if the user has subsequently joined and then left again,
- # ideally we would share history up to the point they left. But
- # we don't know when they left.
- return not is_peeking
-
- defer.returnValue({
- user_id: [
- event
- for event in events
- if allowed(event, user_id, is_peeking)
- ]
- for user_id, is_peeking in user_tuples
- })
-
- @defer.inlineCallbacks
- def _filter_events_for_client(self, user_id, events, is_peeking=False):
- """
- Check which events a user is allowed to see
-
- Args:
- user_id(str): user id to be checked
- events([synapse.events.EventBase]): list of events to be checked
- is_peeking(bool): should be True if:
- * the user is not currently a member of the room, and:
- * the user has not been a member of the room since the given
- events
-
- Returns:
- [synapse.events.EventBase]
- """
- types = (
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_id),
- )
- event_id_to_state = yield self.store.get_state_for_events(
- frozenset(e.event_id for e in events),
- types=types
- )
- res = yield self.filter_events_for_clients(
- [(user_id, is_peeking)], events, event_id_to_state
- )
- defer.returnValue(res.get(user_id, []))
-
def ratelimit(self, requester):
time_now = self.clock.time()
allowed, time_allowed = self.ratelimiter.send_message(
@@ -232,56 +61,6 @@ class BaseHandler(object):
retry_after_ms=int(1000 * (time_allowed - time_now)),
)
- @defer.inlineCallbacks
- def _create_new_client_event(self, builder, prev_event_ids=None):
- if prev_event_ids:
- prev_events = yield self.store.add_event_hashes(prev_event_ids)
- prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
- depth = prev_max_depth + 1
- else:
- latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
- builder.room_id,
- )
-
- if latest_ret:
- depth = max([d for _, _, d in latest_ret]) + 1
- else:
- depth = 1
-
- prev_events = [
- (event_id, prev_hashes)
- for event_id, prev_hashes, _ in latest_ret
- ]
-
- builder.prev_events = prev_events
- builder.depth = depth
-
- state_handler = self.state_handler
-
- context = yield state_handler.compute_event_context(builder)
-
- if builder.is_state():
- builder.prev_state = yield self.store.add_event_hashes(
- context.prev_state_events
- )
-
- yield self.auth.add_auth_events(builder, context)
-
- add_hashes_and_signatures(
- builder, self.server_name, self.signing_key
- )
-
- event = builder.build()
-
- logger.debug(
- "Created event %s with current state: %s",
- event.event_id, context.current_state,
- )
-
- defer.returnValue(
- (event, context,)
- )
-
def is_host_in_room(self, current_state):
room_members = [
(state_key, event.membership)
@@ -296,154 +75,13 @@ class BaseHandler(object):
return True
for (state_key, membership) in room_members:
if (
- UserID.from_string(state_key).domain == self.hs.hostname
+ self.hs.is_mine_id(state_key)
and membership == Membership.JOIN
):
return True
return False
@defer.inlineCallbacks
- def handle_new_client_event(
- self,
- requester,
- event,
- context,
- ratelimit=True,
- extra_users=[]
- ):
- # We now need to go and hit out to wherever we need to hit out to.
-
- if ratelimit:
- self.ratelimit(requester)
-
- try:
- self.auth.check(event, auth_events=context.current_state)
- except AuthError as err:
- logger.warn("Denying new event %r because %s", event, err)
- raise err
-
- yield self.maybe_kick_guest_users(event, context.current_state.values())
-
- if event.type == EventTypes.CanonicalAlias:
- # Check the alias is acually valid (at this time at least)
- room_alias_str = event.content.get("alias", None)
- if room_alias_str:
- room_alias = RoomAlias.from_string(room_alias_str)
- directory_handler = self.hs.get_handlers().directory_handler
- mapping = yield directory_handler.get_association(room_alias)
-
- if mapping["room_id"] != event.room_id:
- raise SynapseError(
- 400,
- "Room alias %s does not point to the room" % (
- room_alias_str,
- )
- )
-
- federation_handler = self.hs.get_handlers().federation_handler
-
- if event.type == EventTypes.Member:
- if event.content["membership"] == Membership.INVITE:
- def is_inviter_member_event(e):
- return (
- e.type == EventTypes.Member and
- e.sender == event.sender
- )
-
- event.unsigned["invite_room_state"] = [
- {
- "type": e.type,
- "state_key": e.state_key,
- "content": e.content,
- "sender": e.sender,
- }
- for k, e in context.current_state.items()
- if e.type in self.hs.config.room_invite_state_types
- or is_inviter_member_event(e)
- ]
-
- invitee = UserID.from_string(event.state_key)
- if not self.hs.is_mine(invitee):
- # TODO: Can we add signature from remote server in a nicer
- # way? If we have been invited by a remote server, we need
- # to get them to sign the event.
-
- returned_invite = yield federation_handler.send_invite(
- invitee.domain,
- event,
- )
-
- event.unsigned.pop("room_state", None)
-
- # TODO: Make sure the signatures actually are correct.
- event.signatures.update(
- returned_invite.signatures
- )
-
- if event.type == EventTypes.Redaction:
- if self.auth.check_redaction(event, auth_events=context.current_state):
- original_event = yield self.store.get_event(
- event.redacts,
- check_redacted=False,
- get_prev_content=False,
- allow_rejected=False,
- allow_none=False
- )
- if event.user_id != original_event.user_id:
- raise AuthError(
- 403,
- "You don't have permission to redact events"
- )
-
- if event.type == EventTypes.Create and context.current_state:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
-
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
- event, context, self
- )
-
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
- event, context=context
- )
-
- # this intentionally does not yield: we don't care about the result
- # and don't need to wait for it.
- preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
- event_stream_id, max_stream_id
- )
-
- destinations = set()
- for k, s in context.current_state.items():
- try:
- if k[0] == EventTypes.Member:
- if s.content["membership"] == Membership.JOIN:
- destinations.add(
- UserID.from_string(s.state_key).domain
- )
- except SynapseError:
- logger.warn(
- "Failed to get destination from event %s", s.event_id
- )
-
- with PreserveLoggingContext():
- # Don't block waiting on waking up all the listeners.
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
- # If invite, remove room_state from unsigned before sending.
- event.unsigned.pop("invite_room_state", None)
-
- federation_handler.handle_new_event(
- event, destinations=destinations,
- )
-
- @defer.inlineCallbacks
def maybe_kick_guest_users(self, event, current_state):
# Technically this function invalidates current_state by changing it.
# Hopefully this isn't that important to the caller.
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 61fe56032a..68d0d78fc6 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -521,11 +521,11 @@ class AuthHandler(BaseHandler):
))
return m.serialize()
- def generate_short_term_login_token(self, user_id):
+ def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
now = self.hs.get_clock().time_msec()
- expiry = now + (2 * 60 * 1000)
+ expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
@@ -615,4 +615,7 @@ class AuthHandler(BaseHandler):
Returns:
Whether self.hash(password) == stored_hash (bool).
"""
- return bcrypt.hashpw(password, stored_hash) == stored_hash
+ if stored_hash:
+ return bcrypt.hashpw(password, stored_hash) == stored_hash
+ else:
+ return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d95e0b23b1..c21d9d4d83 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -33,7 +33,7 @@ from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
)
-from synapse.types import UserID
+from synapse.types import UserID, get_domian_from_id
from synapse.events.utils import prune_event
@@ -453,7 +453,7 @@ class FederationHandler(BaseHandler):
joined_domains = {}
for u, d in joined_users:
try:
- dom = UserID.from_string(u).domain
+ dom = get_domian_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
@@ -682,7 +682,8 @@ class FederationHandler(BaseHandler):
})
try:
- event, context = yield self._create_new_client_event(
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
builder=builder,
)
except AuthError as e:
@@ -743,9 +744,7 @@ class FederationHandler(BaseHandler):
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.JOIN:
- destinations.add(
- UserID.from_string(s.state_key).domain
- )
+ destinations.add(get_domian_from_id(s.state_key))
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
@@ -915,7 +914,8 @@ class FederationHandler(BaseHandler):
"state_key": user_id,
})
- event, context = yield self._create_new_client_event(
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
builder=builder,
)
@@ -970,9 +970,7 @@ class FederationHandler(BaseHandler):
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.LEAVE:
- destinations.add(
- UserID.from_string(s.state_key).domain
- )
+ destinations.add(get_domian_from_id(s.state_key))
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
@@ -1115,7 +1113,7 @@ class FederationHandler(BaseHandler):
if not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
- event, context, self
+ event, context
)
event_stream_id, max_stream_id = yield self.store.persist_event(
@@ -1692,7 +1690,10 @@ class FederationHandler(BaseHandler):
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- event, context = yield self._create_new_client_event(builder=builder)
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
+ builder=builder
+ )
event, context = yield self.add_display_name_to_third_party_invite(
event_dict, event, context
@@ -1720,7 +1721,8 @@ class FederationHandler(BaseHandler):
def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
builder = self.event_builder_factory.new(event_dict)
- event, context = yield self._create_new_client_event(
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(
builder=builder,
)
@@ -1759,7 +1761,8 @@ class FederationHandler(BaseHandler):
event_dict["content"]["third_party_invite"]["display_name"] = display_name
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- event, context = yield self._create_new_client_event(builder=builder)
+ message_handler = self.hs.get_handlers().message_handler
+ event, context = yield message_handler._create_new_client_event(builder=builder)
defer.returnValue((event, context))
@defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f51feda2f4..13154edb78 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,13 +17,19 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+ UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id
+)
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -123,7 +129,8 @@ class MessageHandler(BaseHandler):
"end": next_token.to_string(),
})
- events = yield self._filter_events_for_client(
+ events = yield filter_events_for_client(
+ self.store,
user_id,
events,
is_peeking=(member_event_id is None),
@@ -483,8 +490,8 @@ class MessageHandler(BaseHandler):
]
).addErrback(unwrapFirstError)
- messages = yield self._filter_events_for_client(
- user_id, messages
+ messages = yield filter_events_for_client(
+ self.store, user_id, messages
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -619,8 +626,8 @@ class MessageHandler(BaseHandler):
end_token=stream_token
)
- messages = yield self._filter_events_for_client(
- user_id, messages, is_peeking=is_peeking
+ messages = yield filter_events_for_client(
+ self.store, user_id, messages, is_peeking=is_peeking
)
start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -700,8 +707,8 @@ class MessageHandler(BaseHandler):
consumeErrors=True,
).addErrback(unwrapFirstError)
- messages = yield self._filter_events_for_client(
- user_id, messages, is_peeking=is_peeking,
+ messages = yield filter_events_for_client(
+ self.store, user_id, messages, is_peeking=is_peeking,
)
start_token = now_token.copy_and_replace("room_key", token[0])
@@ -724,3 +731,193 @@ class MessageHandler(BaseHandler):
ret["membership"] = membership
defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def _create_new_client_event(self, builder, prev_event_ids=None):
+ if prev_event_ids:
+ prev_events = yield self.store.add_event_hashes(prev_event_ids)
+ prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+ depth = prev_max_depth + 1
+ else:
+ latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+ builder.room_id,
+ )
+
+ if latest_ret:
+ depth = max([d for _, _, d in latest_ret]) + 1
+ else:
+ depth = 1
+
+ prev_events = [
+ (event_id, prev_hashes)
+ for event_id, prev_hashes, _ in latest_ret
+ ]
+
+ builder.prev_events = prev_events
+ builder.depth = depth
+
+ state_handler = self.state_handler
+
+ context = yield state_handler.compute_event_context(builder)
+
+ if builder.is_state():
+ builder.prev_state = yield self.store.add_event_hashes(
+ context.prev_state_events
+ )
+
+ yield self.auth.add_auth_events(builder, context)
+
+ signing_key = self.hs.config.signing_key[0]
+ add_hashes_and_signatures(
+ builder, self.server_name, signing_key
+ )
+
+ event = builder.build()
+
+ logger.debug(
+ "Created event %s with current state: %s",
+ event.event_id, context.current_state,
+ )
+
+ defer.returnValue(
+ (event, context,)
+ )
+
+ @defer.inlineCallbacks
+ def handle_new_client_event(
+ self,
+ requester,
+ event,
+ context,
+ ratelimit=True,
+ extra_users=[]
+ ):
+ # We now need to go and hit out to wherever we need to hit out to.
+
+ if ratelimit:
+ self.ratelimit(requester)
+
+ try:
+ self.auth.check(event, auth_events=context.current_state)
+ except AuthError as err:
+ logger.warn("Denying new event %r because %s", event, err)
+ raise err
+
+ yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+ if event.type == EventTypes.CanonicalAlias:
+ # Check the alias is acually valid (at this time at least)
+ room_alias_str = event.content.get("alias", None)
+ if room_alias_str:
+ room_alias = RoomAlias.from_string(room_alias_str)
+ directory_handler = self.hs.get_handlers().directory_handler
+ mapping = yield directory_handler.get_association(room_alias)
+
+ if mapping["room_id"] != event.room_id:
+ raise SynapseError(
+ 400,
+ "Room alias %s does not point to the room" % (
+ room_alias_str,
+ )
+ )
+
+ federation_handler = self.hs.get_handlers().federation_handler
+
+ if event.type == EventTypes.Member:
+ if event.content["membership"] == Membership.INVITE:
+ def is_inviter_member_event(e):
+ return (
+ e.type == EventTypes.Member and
+ e.sender == event.sender
+ )
+
+ event.unsigned["invite_room_state"] = [
+ {
+ "type": e.type,
+ "state_key": e.state_key,
+ "content": e.content,
+ "sender": e.sender,
+ }
+ for k, e in context.current_state.items()
+ if e.type in self.hs.config.room_invite_state_types
+ or is_inviter_member_event(e)
+ ]
+
+ invitee = UserID.from_string(event.state_key)
+ if not self.hs.is_mine(invitee):
+ # TODO: Can we add signature from remote server in a nicer
+ # way? If we have been invited by a remote server, we need
+ # to get them to sign the event.
+
+ returned_invite = yield federation_handler.send_invite(
+ invitee.domain,
+ event,
+ )
+
+ event.unsigned.pop("room_state", None)
+
+ # TODO: Make sure the signatures actually are correct.
+ event.signatures.update(
+ returned_invite.signatures
+ )
+
+ if event.type == EventTypes.Redaction:
+ if self.auth.check_redaction(event, auth_events=context.current_state):
+ original_event = yield self.store.get_event(
+ event.redacts,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=False,
+ allow_none=False
+ )
+ if event.user_id != original_event.user_id:
+ raise AuthError(
+ 403,
+ "You don't have permission to redact events"
+ )
+
+ if event.type == EventTypes.Create and context.current_state:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
+
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context
+ )
+
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
+ )
+
+ # this intentionally does not yield: we don't care about the result
+ # and don't need to wait for it.
+ preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+ event_stream_id, max_stream_id
+ )
+
+ destinations = set()
+ for k, s in context.current_state.items():
+ try:
+ if k[0] == EventTypes.Member:
+ if s.content["membership"] == Membership.JOIN:
+ destinations.add(get_domian_from_id(s.state_key))
+ except SynapseError:
+ logger.warn(
+ "Failed to get destination from event %s", s.event_id
+ )
+
+ with PreserveLoggingContext():
+ # Don't block waiting on waking up all the listeners.
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
+
+ # If invite, remove room_state from unsigned before sending.
+ event.unsigned.pop("invite_room_state", None)
+
+ federation_handler.handle_new_event(
+ event, destinations=destinations,
+ )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d0c8f1328b..a8529cce42 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,7 +33,7 @@ from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID
+from synapse.types import UserID, get_domian_from_id
import synapse.metrics
from ._base import BaseHandler
@@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler):
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
- 0 * 1000,
+ 30 * 1000,
self.clock.looping_call,
self._handle_timeouts,
5000,
@@ -440,7 +440,7 @@ class PresenceHandler(BaseHandler):
if not local_states:
continue
- host = UserID.from_string(user_id).domain
+ host = get_domian_from_id(user_id)
hosts_to_states.setdefault(host, []).extend(local_states)
# TODO: de-dup hosts_to_states, as a single host might have multiple
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index b0862067e1..5883b9111e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -358,6 +358,59 @@ class RegistrationHandler(BaseHandler):
)
defer.returnValue(data)
+ @defer.inlineCallbacks
+ def get_or_create_user(self, localpart, displayname, duration_seconds):
+ """Creates a new user or returns an access token for an existing one
+
+ Args:
+ localpart : The local part of the user ID to register. If None,
+ one will be randomly generated.
+ Returns:
+ A tuple of (user_id, access_token).
+ Raises:
+ RegistrationError if there was a problem registering.
+ """
+ yield run_on_reactor()
+
+ if localpart is None:
+ raise SynapseError(400, "Request must include user id")
+
+ need_register = True
+
+ try:
+ yield self.check_username(localpart)
+ except SynapseError as e:
+ if e.errcode == Codes.USER_IN_USE:
+ need_register = False
+ else:
+ raise
+
+ user = UserID(localpart, self.hs.hostname)
+ user_id = user.to_string()
+ auth_handler = self.hs.get_handlers().auth_handler
+ token = auth_handler.generate_short_term_login_token(user_id, duration_seconds)
+
+ if need_register:
+ yield self.store.register(
+ user_id=user_id,
+ token=token,
+ password_hash=None
+ )
+
+ yield registered_user(self.distributor, user)
+ else:
+ yield self.store.flush_user(user_id=user_id)
+ yield self.store.add_access_token_to_user(user_id=user_id, token=token)
+
+ if displayname is not None:
+ logger.info("setting user display name: %s -> %s", user_id, displayname)
+ profile_handler = self.hs.get_handlers().profile_handler
+ yield profile_handler.set_displayname(
+ user, user, displayname
+ )
+
+ defer.returnValue((user_id, token))
+
def auth_handler(self):
return self.hs.get_handlers().auth_handler
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index dd9c18df84..3d63b3c513 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.util import stringutils
from synapse.util.async import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
from collections import OrderedDict
@@ -449,10 +450,12 @@ class RoomContextHandler(BaseHandler):
now_token = yield self.hs.get_event_sources().get_current_token()
def filter_evts(events):
- return self._filter_events_for_client(
+ return filter_events_for_client(
+ self.store,
user.to_string(),
events,
- is_peeking=is_guest)
+ is_peeking=is_guest
+ )
event = yield self.store.get_event(event_id, get_prev_content=True,
allow_none=True)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed2cda837f..b44e52a515 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -113,7 +113,7 @@ class RoomMemberHandler(BaseHandler):
prev_event_ids=prev_event_ids,
)
- yield self.handle_new_client_event(
+ yield msg_handler.handle_new_client_event(
requester,
event,
context,
@@ -357,7 +357,7 @@ class RoomMemberHandler(BaseHandler):
# so don't really fit into the general auth process.
raise AuthError(403, "Guest access not allowed")
- yield self.handle_new_client_event(
+ yield message_handler.handle_new_client_event(
requester,
event,
context,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9937d8dd7f..df75d70fac 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.api.filtering import Filter
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
+from synapse.visibility import filter_events_for_client
from unpaddedbase64 import decode_base64, encode_base64
@@ -172,8 +173,8 @@ class SearchHandler(BaseHandler):
filtered_events = search_filter.filter([r["event"] for r in results])
- events = yield self._filter_events_for_client(
- user.to_string(), filtered_events
+ events = yield filter_events_for_client(
+ self.store, user.to_string(), filtered_events
)
events.sort(key=lambda e: -rank_map[e.event_id])
@@ -223,8 +224,8 @@ class SearchHandler(BaseHandler):
r["event"] for r in results
])
- events = yield self._filter_events_for_client(
- user.to_string(), filtered_events
+ events = yield filter_events_for_client(
+ self.store, user.to_string(), filtered_events
)
room_events.extend(events)
@@ -281,12 +282,12 @@ class SearchHandler(BaseHandler):
event.room_id, event.event_id, before_limit, after_limit
)
- res["events_before"] = yield self._filter_events_for_client(
- user.to_string(), res["events_before"]
+ res["events_before"] = yield filter_events_for_client(
+ self.store, user.to_string(), res["events_before"]
)
- res["events_after"] = yield self._filter_events_for_client(
- user.to_string(), res["events_after"]
+ res["events_after"] = yield filter_events_for_client(
+ self.store, user.to_string(), res["events_after"]
)
res["start"] = now_token.copy_and_replace(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 231140b655..921215469f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -22,6 +22,7 @@ from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
+from synapse.visibility import filter_events_for_client
from twisted.internet import defer
@@ -247,6 +248,10 @@ class SyncHandler(BaseHandler):
sync_config.user.to_string()
)
+ ignored_users = account_data.get(
+ "m.ignored_user_list", {}
+ ).get("ignored_users", {}).keys()
+
joined = []
invited = []
archived = []
@@ -267,6 +272,8 @@ class SyncHandler(BaseHandler):
)
joined.append(room_result)
elif event.membership == Membership.INVITE:
+ if event.sender in ignored_users:
+ return
invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult(
room_id=event.room_id,
@@ -515,6 +522,15 @@ class SyncHandler(BaseHandler):
sync_config.user
)
+ ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+ "m.ignored_user_list", user_id=user_id,
+ )
+
+ if ignored_account_data:
+ ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+ else:
+ ignored_users = frozenset()
+
# Get a list of membership change events that have happened.
rooms_changed = yield self.store.get_membership_changes_for_user(
user_id, since_token.room_key, now_token.room_key
@@ -549,9 +565,10 @@ class SyncHandler(BaseHandler):
# Only bother if we're still currently invited
should_invite = non_joins[-1].membership == Membership.INVITE
if should_invite:
- room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
- if room_sync:
- invited.append(room_sync)
+ if event.sender not in ignored_users:
+ room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+ if room_sync:
+ invited.append(room_sync)
# Always include leave/ban events. Just take the last one.
# TODO: How do we handle ban -> leave in same batch?
@@ -681,7 +698,8 @@ class SyncHandler(BaseHandler):
if recents is not None:
recents = sync_config.filter_collection.filter_room_timeline(recents)
- recents = yield self._filter_events_for_client(
+ recents = yield filter_events_for_client(
+ self.store,
sync_config.user.to_string(),
recents,
)
@@ -702,7 +720,8 @@ class SyncHandler(BaseHandler):
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
)
- loaded_recents = yield self._filter_events_for_client(
+ loaded_recents = yield filter_events_for_client(
+ self.store,
sync_config.user.to_string(),
loaded_recents,
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6af7a8f424..33b79c0ec7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -21,6 +21,7 @@ from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import StreamToken
+from synapse.visibility import filter_events_for_client
import synapse.metrics
from collections import namedtuple
@@ -398,8 +399,8 @@ class Notifier(object):
)
if name == "room":
- room_member_handler = self.hs.get_handlers().room_member_handler
- new_events = yield room_member_handler._filter_events_for_client(
+ new_events = yield filter_events_for_client(
+ self.store,
user.to_string(),
new_events,
is_peeking=is_peeking,
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index a0160994b7..9b208668b6 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -37,14 +37,14 @@ class ActionGenerator:
# tag (ie. we just need all the users).
@defer.inlineCallbacks
- def handle_push_actions_for_event(self, event, context, handler):
+ def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "handle_push_actions_for_event"):
bulk_evaluator = yield evaluator_for_event(
event, self.hs, self.store
)
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
- event, handler, context.current_state
+ event, context.current_state
)
context.push_actions = [
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index f97df36d80..25e13b3423 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -22,6 +22,7 @@ from .baserules import list_with_base_rules
from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_clients
logger = logging.getLogger(__name__)
@@ -126,7 +127,7 @@ class BulkPushRuleEvaluator:
self.store = store
@defer.inlineCallbacks
- def action_for_event_by_user(self, event, handler, current_state):
+ def action_for_event_by_user(self, event, current_state):
actions_by_user = {}
# None of these users can be peeking since this list of users comes
@@ -136,8 +137,8 @@ class BulkPushRuleEvaluator:
(u, False) for u in self.rules_by_user.keys()
]
- filtered_by_user = yield handler.filter_events_for_clients(
- user_tuples, [event], {event.event_id: current_state}
+ filtered_by_user = yield filter_events_for_clients(
+ self.store, user_tuples, [event], {event.event_id: current_state}
)
room_members = yield self.store.get_users_in_room(self.room_id)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
new file mode 100644
index 0000000000..3a13c7485a
--- /dev/null
+++ b/synapse/push/emailpusher.py
@@ -0,0 +1,251 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, reactor
+
+import logging
+
+from synapse.util.metrics import Measure
+from synapse.util.logcontext import LoggingContext
+
+from mailer import Mailer
+
+logger = logging.getLogger(__name__)
+
+# The amount of time we always wait before ever emailing about a notification
+# (to give the user a chance to respond to other push or notice the window)
+DELAY_BEFORE_MAIL_MS = 2 * 60 * 1000
+
+THROTTLE_START_MS = 2 * 60 * 1000
+THROTTLE_MAX_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days
+THROTTLE_MULTIPLIER = 2
+
+# If no event triggers a notification for this long after the previous,
+# the throttle is released.
+THROTTLE_RESET_AFTER_MS = (2 * 60 * 1000) * (2 ** 11) # ~3 days
+
+
+class EmailPusher(object):
+ """
+ A pusher that sends email notifications about events (approximately)
+ when they happen.
+ This shares quite a bit of code with httpusher: it would be good to
+ factor out the common parts
+ """
+ def __init__(self, hs, pusherdict):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.clock = self.hs.get_clock()
+ self.pusher_id = pusherdict['id']
+ self.user_id = pusherdict['user_name']
+ self.app_id = pusherdict['app_id']
+ self.email = pusherdict['pushkey']
+ self.last_stream_ordering = pusherdict['last_stream_ordering']
+ self.timed_call = None
+ self.throttle_params = None
+
+ # See httppusher
+ self.max_stream_ordering = None
+
+ self.processing = False
+
+ if self.hs.config.email_enable_notifs:
+ self.mailer = Mailer(self.hs)
+ else:
+ self.mailer = None
+
+ @defer.inlineCallbacks
+ def on_started(self):
+ if self.mailer is not None:
+ self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.pusher_id
+ )
+ yield self._process()
+
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
+
+ @defer.inlineCallbacks
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+ self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+ yield self._process()
+
+ def on_new_receipts(self, min_stream_id, max_stream_id):
+ # We could wake up and cancel the timer but there tend to be quite a
+ # lot of read receipts so it's probably less work to just let the
+ # timer fire
+ return defer.succeed(None)
+
+ @defer.inlineCallbacks
+ def on_timer(self):
+ self.timed_call = None
+ yield self._process()
+
+ @defer.inlineCallbacks
+ def _process(self):
+ if self.processing:
+ return
+
+ with LoggingContext("emailpush._process"):
+ with Measure(self.clock, "emailpush._process"):
+ try:
+ self.processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
+ try:
+ yield self._unsafe_process()
+ except:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self.processing = False
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ """
+ Main logic of the push loop without the wrapper function that sets
+ up logging, measures and guards against multiple instances of it
+ being run.
+ """
+ unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+ self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ )
+
+ soonest_due_at = None
+
+ for push_action in unprocessed:
+ received_at = push_action['received_ts']
+ if received_at is None:
+ received_at = 0
+ notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
+
+ room_ready_at = self.room_ready_to_notify_at(
+ push_action['room_id']
+ )
+
+ should_notify_at = max(notif_ready_at, room_ready_at)
+
+ if should_notify_at < self.clock.time_msec():
+ # one of our notifications is ready for sending, so we send
+ # *one* email updating the user on their notifications,
+ # we then consider all previously outstanding notifications
+ # to be delivered.
+ yield self.send_notification(unprocessed)
+
+ yield self.save_last_stream_ordering_and_success(max([
+ ea['stream_ordering'] for ea in unprocessed
+ ]))
+ yield self.sent_notif_update_throttle(
+ push_action['room_id'], push_action
+ )
+ break
+ else:
+ if soonest_due_at is None or should_notify_at < soonest_due_at:
+ soonest_due_at = should_notify_at
+
+ if self.timed_call is not None:
+ self.timed_call.cancel()
+ self.timed_call = None
+
+ if soonest_due_at is not None:
+ self.timed_call = reactor.callLater(
+ self.seconds_until(soonest_due_at), self.on_timer
+ )
+
+ @defer.inlineCallbacks
+ def save_last_stream_ordering_and_success(self, last_stream_ordering):
+ self.last_stream_ordering = last_stream_ordering
+ yield self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id, self.email, self.user_id,
+ last_stream_ordering, self.clock.time_msec()
+ )
+
+ def seconds_until(self, ts_msec):
+ return (ts_msec - self.clock.time_msec()) / 1000
+
+ def get_room_throttle_ms(self, room_id):
+ if room_id in self.throttle_params:
+ return self.throttle_params[room_id]["throttle_ms"]
+ else:
+ return 0
+
+ def get_room_last_sent_ts(self, room_id):
+ if room_id in self.throttle_params:
+ return self.throttle_params[room_id]["last_sent_ts"]
+ else:
+ return 0
+
+ def room_ready_to_notify_at(self, room_id):
+ """
+ Determines whether throttling should prevent us from sending an email
+ for the given room
+ Returns: True if we should send, False if we should not
+ """
+ last_sent_ts = self.get_room_last_sent_ts(room_id)
+ throttle_ms = self.get_room_throttle_ms(room_id)
+
+ may_send_at = last_sent_ts + throttle_ms
+ return may_send_at
+
+ @defer.inlineCallbacks
+ def sent_notif_update_throttle(self, room_id, notified_push_action):
+ # We have sent a notification, so update the throttle accordingly.
+ # If the event that triggered the notif happened more than
+ # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
+ # notif, we release the throttle. Otherwise, the throttle is increased.
+ time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
+ notified_push_action['stream_ordering']
+ )
+
+ time_of_this_notifs = notified_push_action['received_ts']
+
+ if time_of_previous_notifs is not None and time_of_this_notifs is not None:
+ gap = time_of_this_notifs - time_of_previous_notifs
+ else:
+ # if we don't know the arrival time of one of the notifs (it was not
+ # stored prior to email notification code) then assume a gap of
+ # zero which will just not reset the throttle
+ gap = 0
+
+ current_throttle_ms = self.get_room_throttle_ms(room_id)
+
+ if gap > THROTTLE_RESET_AFTER_MS:
+ new_throttle_ms = THROTTLE_START_MS
+ else:
+ if current_throttle_ms == 0:
+ new_throttle_ms = THROTTLE_START_MS
+ else:
+ new_throttle_ms = min(
+ current_throttle_ms * THROTTLE_MULTIPLIER,
+ THROTTLE_MAX_MS
+ )
+ self.throttle_params[room_id] = {
+ "last_sent_ts": self.clock.time_msec(),
+ "throttle_ms": new_throttle_ms
+ }
+ yield self.store.set_throttle_params(
+ self.pusher_id, room_id, self.throttle_params[room_id]
+ )
+
+ @defer.inlineCallbacks
+ def send_notification(self, push_actions):
+ logger.info("Sending notif email for user %r", self.user_id)
+ yield self.mailer.send_notification_mail(
+ self.user_id, self.email, push_actions
+ )
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
new file mode 100644
index 0000000000..2fd38a036a
--- /dev/null
+++ b/synapse/push/mailer.py
@@ -0,0 +1,456 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from twisted.mail.smtp import sendmail
+
+import email.utils
+import email.mime.multipart
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+
+from synapse.util.async import concurrently_execute
+from synapse.util.presentable_names import (
+ calculate_room_name, name_from_member_event, descriptor_from_member_events
+)
+from synapse.types import UserID
+from synapse.api.errors import StoreError
+from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_client
+
+import jinja2
+import bleach
+
+import time
+import urllib
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \
+ "in the %s room..."
+MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
+MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
+MESSAGES_IN_ROOM = "There are some messages on %(app)s for you in the %(room)s room..."
+MESSAGES_IN_ROOMS = "Here are some messages on %(app)s you may have missed..."
+INVITE_FROM_PERSON_TO_ROOM = "%(person)s has invited you to join the " \
+ "%(room)s room on %(app)s..."
+INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
+
+CONTEXT_BEFORE = 1
+CONTEXT_AFTER = 1
+
+# From https://github.com/matrix-org/matrix-react-sdk/blob/master/src/HtmlUtils.js
+ALLOWED_TAGS = [
+ 'font', # custom to matrix for IRC-style font coloring
+ 'del', # for markdown
+ # deliberately no h1/h2 to stop people shouting.
+ 'h3', 'h4', 'h5', 'h6', 'blockquote', 'p', 'a', 'ul', 'ol',
+ 'nl', 'li', 'b', 'i', 'u', 'strong', 'em', 'strike', 'code', 'hr', 'br', 'div',
+ 'table', 'thead', 'caption', 'tbody', 'tr', 'th', 'td', 'pre'
+]
+ALLOWED_ATTRS = {
+ # custom ones first:
+ "font": ["color"], # custom to matrix
+ "a": ["href", "name", "target"], # remote target: custom to matrix
+ # We don't currently allow img itself by default, but this
+ # would make sense if we did
+ "img": ["src"],
+}
+# When bleach release a version with this option, we can specify schemes
+# ALLOWED_SCHEMES = ["http", "https", "ftp", "mailto"]
+
+
+class Mailer(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.state_handler = self.hs.get_state_handler()
+ loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
+ self.app_name = self.hs.config.email_app_name
+ env = jinja2.Environment(loader=loader)
+ env.filters["format_ts"] = format_ts_filter
+ env.filters["mxc_to_http"] = self.mxc_to_http_filter
+ self.notif_template_html = env.get_template(
+ self.hs.config.email_notif_template_html
+ )
+ self.notif_template_text = env.get_template(
+ self.hs.config.email_notif_template_text
+ )
+
+ @defer.inlineCallbacks
+ def send_notification_mail(self, user_id, email_address, push_actions):
+ raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1]
+ raw_to = email.utils.parseaddr(email_address)[1]
+
+ if raw_to == '':
+ raise RuntimeError("Invalid 'to' address")
+
+ rooms_in_order = deduped_ordered_list(
+ [pa['room_id'] for pa in push_actions]
+ )
+
+ notif_events = yield self.store.get_events(
+ [pa['event_id'] for pa in push_actions]
+ )
+
+ notifs_by_room = {}
+ for pa in push_actions:
+ notifs_by_room.setdefault(pa["room_id"], []).append(pa)
+
+ # collect the current state for all the rooms in which we have
+ # notifications
+ state_by_room = {}
+
+ try:
+ user_display_name = yield self.store.get_profile_displayname(
+ UserID.from_string(user_id).localpart
+ )
+ except StoreError:
+ user_display_name = user_id
+
+ @defer.inlineCallbacks
+ def _fetch_room_state(room_id):
+ room_state = yield self.state_handler.get_current_state(room_id)
+ state_by_room[room_id] = room_state
+
+ # Run at most 3 of these at once: sync does 10 at a time but email
+ # notifs are much realtime than sync so we can afford to wait a bit.
+ yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
+
+ rooms = []
+
+ for r in rooms_in_order:
+ roomvars = yield self.get_room_vars(
+ r, user_id, notifs_by_room[r], notif_events, state_by_room[r]
+ )
+ rooms.append(roomvars)
+
+ summary_text = self.make_summary_text(
+ notifs_by_room, state_by_room, notif_events, user_id
+ )
+
+ template_vars = {
+ "user_display_name": user_display_name,
+ "unsubscribe_link": self.make_unsubscribe_link(),
+ "summary_text": summary_text,
+ "app_name": self.app_name,
+ "rooms": rooms,
+ }
+
+ html_text = self.notif_template_html.render(**template_vars)
+ html_part = MIMEText(html_text, "html", "utf8")
+
+ plain_text = self.notif_template_text.render(**template_vars)
+ text_part = MIMEText(plain_text, "plain", "utf8")
+
+ multipart_msg = MIMEMultipart('alternative')
+ multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
+ multipart_msg['From'] = self.hs.config.email_notif_from
+ multipart_msg['To'] = email_address
+ multipart_msg['Date'] = email.utils.formatdate()
+ multipart_msg['Message-ID'] = email.utils.make_msgid()
+ multipart_msg.attach(text_part)
+ multipart_msg.attach(html_part)
+
+ logger.info("Sending email push notification to %s" % email_address)
+ # logger.debug(html_text)
+
+ yield sendmail(
+ self.hs.config.email_smtp_host,
+ raw_from, raw_to, multipart_msg.as_string(),
+ port=self.hs.config.email_smtp_port
+ )
+
+ @defer.inlineCallbacks
+ def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state):
+ my_member_event = room_state[("m.room.member", user_id)]
+ is_invite = my_member_event.content["membership"] == "invite"
+
+ room_vars = {
+ "title": calculate_room_name(room_state, user_id),
+ "hash": string_ordinal_total(room_id), # See sender avatar hash
+ "notifs": [],
+ "invite": is_invite,
+ "link": self.make_room_link(room_id),
+ }
+
+ if not is_invite:
+ for n in notifs:
+ notifvars = yield self.get_notif_vars(
+ n, user_id, notif_events[n['event_id']], room_state
+ )
+
+ # merge overlapping notifs together.
+ # relies on the notifs being in chronological order.
+ merge = False
+ if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
+ prev_messages = room_vars['notifs'][-1]['messages']
+ for message in notifvars['messages']:
+ pm = filter(lambda pm: pm['id'] == message['id'], prev_messages)
+ if pm:
+ if not message["is_historical"]:
+ pm[0]["is_historical"] = False
+ merge = True
+ elif merge:
+ # we're merging, so append any remaining messages
+ # in this notif to the previous one
+ prev_messages.append(message)
+
+ if not merge:
+ room_vars['notifs'].append(notifvars)
+
+ defer.returnValue(room_vars)
+
+ @defer.inlineCallbacks
+ def get_notif_vars(self, notif, user_id, notif_event, room_state):
+ results = yield self.store.get_events_around(
+ notif['room_id'], notif['event_id'],
+ before_limit=CONTEXT_BEFORE, after_limit=CONTEXT_AFTER
+ )
+
+ ret = {
+ "link": self.make_notif_link(notif),
+ "ts": notif['received_ts'],
+ "messages": [],
+ }
+
+ the_events = yield filter_events_for_client(
+ self.store, user_id, results["events_before"]
+ )
+ the_events.append(notif_event)
+
+ for event in the_events:
+ messagevars = self.get_message_vars(notif, event, room_state)
+ if messagevars is not None:
+ ret['messages'].append(messagevars)
+
+ defer.returnValue(ret)
+
+ def get_message_vars(self, notif, event, room_state):
+ if event.type != EventTypes.Message:
+ return None
+
+ sender_state_event = room_state[("m.room.member", event.sender)]
+ sender_name = name_from_member_event(sender_state_event)
+ sender_avatar_url = sender_state_event.content["avatar_url"]
+
+ # 'hash' for deterministically picking default images: use
+ # sender_hash % the number of default images to choose from
+ sender_hash = string_ordinal_total(event.sender)
+
+ ret = {
+ "msgtype": event.content["msgtype"],
+ "is_historical": event.event_id != notif['event_id'],
+ "id": event.event_id,
+ "ts": event.origin_server_ts,
+ "sender_name": sender_name,
+ "sender_avatar_url": sender_avatar_url,
+ "sender_hash": sender_hash,
+ }
+
+ if event.content["msgtype"] == "m.text":
+ self.add_text_message_vars(ret, event)
+ elif event.content["msgtype"] == "m.image":
+ self.add_image_message_vars(ret, event)
+
+ if "body" in event.content:
+ ret["body_text_plain"] = event.content["body"]
+
+ return ret
+
+ def add_text_message_vars(self, messagevars, event):
+ if "format" in event.content:
+ msgformat = event.content["format"]
+ else:
+ msgformat = None
+ messagevars["format"] = msgformat
+
+ if msgformat == "org.matrix.custom.html":
+ messagevars["body_text_html"] = safe_markup(event.content["formatted_body"])
+ else:
+ messagevars["body_text_html"] = safe_text(event.content["body"])
+
+ return messagevars
+
+ def add_image_message_vars(self, messagevars, event):
+ messagevars["image_url"] = event.content["url"]
+
+ return messagevars
+
+ def make_summary_text(self, notifs_by_room, state_by_room, notif_events, user_id):
+ if len(notifs_by_room) == 1:
+ # Only one room has new stuff
+ room_id = notifs_by_room.keys()[0]
+
+ # If the room has some kind of name, use it, but we don't
+ # want the generated-from-names one here otherwise we'll
+ # end up with, "new message from Bob in the Bob room"
+ room_name = calculate_room_name(
+ state_by_room[room_id], user_id, fallback_to_members=False
+ )
+
+ my_member_event = state_by_room[room_id][("m.room.member", user_id)]
+ if my_member_event.content["membership"] == "invite":
+ inviter_member_event = state_by_room[room_id][
+ ("m.room.member", my_member_event.sender)
+ ]
+ inviter_name = name_from_member_event(inviter_member_event)
+
+ if room_name is None:
+ return INVITE_FROM_PERSON % {
+ "person": inviter_name,
+ "app": self.app_name
+ }
+ else:
+ return INVITE_FROM_PERSON_TO_ROOM % {
+ "person": inviter_name,
+ "room": room_name,
+ "app": self.app_name,
+ }
+
+ sender_name = None
+ if len(notifs_by_room[room_id]) == 1:
+ # There is just the one notification, so give some detail
+ event = notif_events[notifs_by_room[room_id][0]["event_id"]]
+ if ("m.room.member", event.sender) in state_by_room[room_id]:
+ state_event = state_by_room[room_id][("m.room.member", event.sender)]
+ sender_name = name_from_member_event(state_event)
+
+ if sender_name is not None and room_name is not None:
+ return MESSAGE_FROM_PERSON_IN_ROOM % {
+ "person": sender_name,
+ "room": room_name,
+ "app": self.app_name,
+ }
+ elif sender_name is not None:
+ return MESSAGE_FROM_PERSON % {
+ "person": sender_name,
+ "app": self.app_name,
+ }
+ else:
+ # There's more than one notification for this room, so just
+ # say there are several
+ if room_name is not None:
+ return MESSAGES_IN_ROOM % {
+ "room": room_name,
+ "app": self.app_name,
+ }
+ else:
+ # If the room doesn't have a name, say who the messages
+ # are from explicitly to avoid, "messages in the Bob room"
+ sender_ids = list(set([
+ notif_events[n['event_id']].sender
+ for n in notifs_by_room[room_id]
+ ]))
+
+ return MESSAGES_FROM_PERSON % {
+ "person": descriptor_from_member_events([
+ state_by_room[room_id][("m.room.member", s)]
+ for s in sender_ids
+ ]),
+ "app": self.app_name,
+ }
+ else:
+ # Stuff's happened in multiple different rooms
+ return MESSAGES_IN_ROOMS % {
+ "app": self.app_name,
+ }
+
+ def make_room_link(self, room_id):
+ # need /beta for Universal Links to work on iOS
+ if self.app_name == "Vector":
+ return "https://vector.im/beta/#/room/%s" % (room_id,)
+ else:
+ return "https://matrix.to/#/%s" % (room_id,)
+
+ def make_notif_link(self, notif):
+ # need /beta for Universal Links to work on iOS
+ if self.app_name == "Vector":
+ return "https://vector.im/beta/#/room/%s/%s" % (
+ notif['room_id'], notif['event_id']
+ )
+ else:
+ return "https://matrix.to/#/%s/%s" % (
+ notif['room_id'], notif['event_id']
+ )
+
+ def make_unsubscribe_link(self):
+ # XXX: matrix.to
+ return "https://vector.im/#/settings"
+
+ def mxc_to_http_filter(self, value, width, height, resize_method="crop"):
+ if value[0:6] != "mxc://":
+ return ""
+
+ serverAndMediaId = value[6:]
+ fragment = None
+ if '#' in serverAndMediaId:
+ (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1)
+ fragment = "#" + fragment
+
+ params = {
+ "width": width,
+ "height": height,
+ "method": resize_method,
+ }
+ return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
+ self.hs.config.public_baseurl,
+ serverAndMediaId,
+ urllib.urlencode(params),
+ fragment or "",
+ )
+
+
+def safe_markup(raw_html):
+ return jinja2.Markup(bleach.linkify(bleach.clean(
+ raw_html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRS,
+ # bleach master has this, but it isn't released yet
+ # protocols=ALLOWED_SCHEMES,
+ strip=True
+ )))
+
+
+def safe_text(raw_text):
+ """
+ Process text: treat it as HTML but escape any tags (ie. just escape the
+ HTML) then linkify it.
+ """
+ return jinja2.Markup(bleach.linkify(bleach.clean(
+ raw_text, tags=[], attributes={},
+ strip=False
+ )))
+
+
+def deduped_ordered_list(l):
+ seen = set()
+ ret = []
+ for item in l:
+ if item not in seen:
+ seen.add(item)
+ ret.append(item)
+ return ret
+
+
+def string_ordinal_total(s):
+ tot = 0
+ for c in s:
+ tot += ord(c)
+ return tot
+
+
+def format_ts_filter(value, format):
+ return time.strftime(format, time.localtime(value / 1000))
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 4960837504..e6c0806415 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -1,10 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
from httppusher import HttpPusher
-PUSHER_TYPES = {
- 'http': HttpPusher
-}
+import logging
+logger = logging.getLogger(__name__)
def create_pusher(hs, pusherdict):
+ logger.info("trying to create_pusher for %r", pusherdict)
+
+ PUSHER_TYPES = {
+ "http": HttpPusher,
+ }
+
+ logger.info("email enable notifs: %r", hs.config.email_enable_notifs)
+ if hs.config.email_enable_notifs:
+ from synapse.push.emailpusher import EmailPusher
+ PUSHER_TYPES["email"] = EmailPusher
+ logger.info("defined email pusher type")
+
if pusherdict['kind'] in PUSHER_TYPES:
+ logger.info("found pusher")
return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 6ef48d63f7..5853ec36a9 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -17,7 +17,6 @@
from twisted.internet import defer
import pusher
-from synapse.push import PusherConfigException
from synapse.util.logcontext import preserve_fn
from synapse.util.async import run_on_reactor
@@ -50,6 +49,7 @@ class PusherPool:
# recreated, added and started: this means we have only one
# code path adding pushers.
pusher.create_pusher(self.hs, {
+ "id": None,
"user_name": user_id,
"kind": kind,
"app_id": app_id,
@@ -185,8 +185,8 @@ class PusherPool:
for pusherdict in pushers:
try:
p = pusher.create_pusher(self.hs, pusherdict)
- except PusherConfigException:
- logger.exception("Couldn't start a pusher: caught PusherConfigException")
+ except:
+ logger.exception("Couldn't start a pusher: caught Exception")
continue
if p:
appid_pushkey = "%s:%s" % (
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0eb3d6c1de..e0a7a19777 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -44,6 +44,10 @@ CONDITIONAL_REQUIREMENTS = {
"preview_url": {
"netaddr>=0.7.18": ["netaddr"],
},
+ "email.enable_notifs": {
+ "Jinja2>=2.8": ["Jinja2>=2.8"],
+ "bleach>=1.4.2": ["bleach>=1.4.2"],
+ },
}
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index ff78c60f13..0e983ae7fa 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -159,6 +159,15 @@ class ReplicationResource(Resource):
result = yield self.notifier.wait_for_replication(replicate, timeout)
+ for stream_name, stream_content in result.items():
+ logger.info(
+ "Replicating %d rows of %s from %s -> %s",
+ len(stream_content["rows"]),
+ stream_name,
+ request_streams.get(stream_name),
+ stream_content["position"],
+ )
+
request.write(json.dumps(result, ensure_ascii=False))
finish_request(request)
diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py
new file mode 100644
index 0000000000..f59b0eabbc
--- /dev/null
+++ b/synapse/replication/slave/storage/account_data.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.account_data import AccountDataStore
+
+
+class SlavedAccountDataStore(BaseSlavedStore):
+
+ def __init__(self, db_conn, hs):
+ super(SlavedAccountDataStore, self).__init__(db_conn, hs)
+ self._account_data_id_gen = SlavedIdTracker(
+ db_conn, "account_data_max_stream_id", "stream_id",
+ )
+
+ get_global_account_data_by_type_for_users = (
+ AccountDataStore.__dict__["get_global_account_data_by_type_for_users"]
+ )
+
+ get_global_account_data_by_type_for_user = (
+ AccountDataStore.__dict__["get_global_account_data_by_type_for_user"]
+ )
+
+ def stream_positions(self):
+ result = super(SlavedAccountDataStore, self).stream_positions()
+ position = self._account_data_id_gen.get_current_token()
+ result["user_account_data"] = position
+ result["room_account_data"] = position
+ result["tag_account_data"] = position
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("user_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
+ for row in stream["rows"]:
+ user_id, data_type = row[1:3]
+ self.get_global_account_data_by_type_for_user.invalidate(
+ (data_type, user_id,)
+ )
+
+ stream = result.get("room_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
+
+ stream = result.get("tag_account_data")
+ if stream:
+ self._account_data_id_gen.advance(int(stream["position"]))
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 86f00b6ff5..c0d741452d 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
+ _get_state_group_for_events = (
+ StateStore.__dict__["_get_state_group_for_events"]
+ )
+ _get_state_group_for_event = (
+ StateStore.__dict__["_get_state_group_for_event"]
+ )
+ _get_state_groups_from_groups = (
+ StateStore.__dict__["_get_state_groups_from_groups"]
+ )
+ _get_state_group_from_group = (
+ StateStore.__dict__["_get_state_group_from_group"]
+ )
get_unread_push_actions_for_user_in_range = (
DataStore.get_unread_push_actions_for_user_in_range.__func__
@@ -83,6 +95,7 @@ class SlavedEventStore(BaseSlavedStore):
DataStore.get_push_action_users_in_range.__func__
)
get_event = DataStore.get_event.__func__
+ get_events = DataStore.get_events.__func__
get_current_state = DataStore.get_current_state.__func__
get_current_state_for_key = DataStore.get_current_state_for_key.__func__
get_rooms_for_user_where_membership_is = (
@@ -95,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore):
get_room_events_stream_for_room = (
DataStore.get_room_events_stream_for_room.__func__
)
+ get_events_around = DataStore.get_events_around.__func__
+ get_state_for_events = DataStore.get_state_for_events.__func__
+ get_state_groups = DataStore.get_state_groups.__func__
_set_before_and_after = DataStore._set_before_and_after
@@ -104,6 +120,7 @@ class SlavedEventStore(BaseSlavedStore):
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
_parse_events_txn = DataStore._parse_events_txn.__func__
_get_events_txn = DataStore._get_events_txn.__func__
+ _get_event_txn = DataStore._get_event_txn.__func__
_enqueue_events = DataStore._enqueue_events.__func__
_do_fetch = DataStore._do_fetch.__func__
_fetch_events_txn = DataStore._fetch_events_txn.__func__
@@ -114,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
)
_get_members_rows_txn = DataStore._get_members_rows_txn.__func__
+ _get_state_for_groups = DataStore._get_state_for_groups.__func__
+ _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
+ _get_events_around_txn = DataStore._get_events_around_txn.__func__
+ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
@@ -128,7 +149,7 @@ class SlavedEventStore(BaseSlavedStore):
stream = result.get("events")
if stream:
- self._stream_id_gen.advance(stream["position"])
+ self._stream_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
self._process_replication_row(
row, backfilled=False, state_resets=state_resets
@@ -136,7 +157,7 @@ class SlavedEventStore(BaseSlavedStore):
stream = result.get("backfill")
if stream:
- self._backfill_id_gen.advance(-stream["position"])
+ self._backfill_id_gen.advance(-int(stream["position"]))
for row in stream["rows"]:
self._process_replication_row(
row, backfilled=True, state_resets=state_resets
@@ -144,12 +165,14 @@ class SlavedEventStore(BaseSlavedStore):
stream = result.get("forward_ex_outliers")
if stream:
+ self._stream_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
event_id = row[1]
self._invalidate_get_event_cache(event_id)
stream = result.get("backward_ex_outliers")
if stream:
+ self._backfill_id_gen.advance(-int(stream["position"]))
for row in stream["rows"]:
event_id = row[1]
self._invalidate_get_event_cache(event_id)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index 8faddb2595..d88206b3bb 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -43,10 +43,10 @@ class SlavedPusherStore(BaseSlavedStore):
def process_replication(self, result):
stream = result.get("pushers")
if stream:
- self._pushers_id_gen.advance(stream["position"])
+ self._pushers_id_gen.advance(int(stream["position"]))
stream = result.get("deleted_pushers")
if stream:
- self._pushers_id_gen.advance(stream["position"])
+ self._pushers_id_gen.advance(int(stream["position"]))
return super(SlavedPusherStore, self).process_replication(result)
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index b55d5dfd08..ec007516d0 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -50,7 +50,7 @@ class SlavedReceiptsStore(BaseSlavedStore):
def process_replication(self, result):
stream = result.get("receipts")
if stream:
- self._receipts_id_gen.advance(stream["position"])
+ self._receipts_id_gen.advance(int(stream["position"]))
for row in stream["rows"]:
room_id, receipt_type, user_id = row[1:4]
self.invalidate_caches_for_receipt(room_id, receipt_type, user_id)
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 6688fa8fa0..8b223e032b 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -44,6 +44,8 @@ from synapse.rest.client.v2_alpha import (
tokenrefresh,
tags,
account_data,
+ report_event,
+ openid,
)
from synapse.http.server import JsonResource
@@ -86,3 +88,5 @@ class ClientRestResource(JsonResource):
tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)
account_data.register_servlets(hs, client_resource)
+ report_event.register_servlets(hs, client_resource)
+ openid.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index c6a2ef2ccc..e3f4fbb0bb 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -355,5 +355,76 @@ class RegisterRestServlet(ClientV1RestServlet):
)
+class CreateUserRestServlet(ClientV1RestServlet):
+ """Handles user creation via a server-to-server interface
+ """
+
+ PATTERNS = client_path_patterns("/createUser$", releases=())
+
+ def __init__(self, hs):
+ super(CreateUserRestServlet, self).__init__(hs)
+ self.store = hs.get_datastore()
+ self.direct_user_creation_max_duration = hs.config.user_creation_max_duration
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ user_json = parse_json_object_from_request(request)
+
+ if "access_token" not in request.args:
+ raise SynapseError(400, "Expected application service token.")
+
+ app_service = yield self.store.get_app_service_by_token(
+ request.args["access_token"][0]
+ )
+ if not app_service:
+ raise SynapseError(403, "Invalid application service token.")
+
+ logger.debug("creating user: %s", user_json)
+
+ response = yield self._do_create(user_json)
+
+ defer.returnValue((200, response))
+
+ def on_OPTIONS(self, request):
+ return 403, {}
+
+ @defer.inlineCallbacks
+ def _do_create(self, user_json):
+ yield run_on_reactor()
+
+ if "localpart" not in user_json:
+ raise SynapseError(400, "Expected 'localpart' key.")
+
+ if "displayname" not in user_json:
+ raise SynapseError(400, "Expected 'displayname' key.")
+
+ if "duration_seconds" not in user_json:
+ raise SynapseError(400, "Expected 'duration_seconds' key.")
+
+ localpart = user_json["localpart"].encode("utf-8")
+ displayname = user_json["displayname"].encode("utf-8")
+ duration_seconds = 0
+ try:
+ duration_seconds = int(user_json["duration_seconds"])
+ except ValueError:
+ raise SynapseError(400, "Failed to parse 'duration_seconds'")
+ if duration_seconds > self.direct_user_creation_max_duration:
+ duration_seconds = self.direct_user_creation_max_duration
+
+ handler = self.handlers.registration_handler
+ user_id, token = yield handler.get_or_create_user(
+ localpart=localpart,
+ displayname=displayname,
+ duration_seconds=duration_seconds
+ )
+
+ defer.returnValue({
+ "user_id": user_id,
+ "access_token": token,
+ "home_server": self.hs.hostname,
+ })
+
+
def register_servlets(hs, http_server):
RegisterRestServlet(hs).register(http_server)
+ CreateUserRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py
new file mode 100644
index 0000000000..aa1cae8e1e
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/openid.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ._base import client_v2_patterns
+
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.api.errors import AuthError
+from synapse.util.stringutils import random_string
+
+from twisted.internet import defer
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class IdTokenServlet(RestServlet):
+ """
+ Get a bearer token that may be passed to a third party to confirm ownership
+ of a matrix user id.
+
+ The format of the response could be made compatible with the format given
+ in http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse
+
+ But instead of returning a signed "id_token" the response contains the
+ name of the issuing matrix homeserver. This means that for now the third
+ party will need to check the validity of the "id_token" against the
+ federation /openid/userinfo endpoint of the homeserver.
+
+ Request:
+
+ POST /user/{user_id}/openid/request_token?access_token=... HTTP/1.1
+
+ {}
+
+ Response:
+
+ HTTP/1.1 200 OK
+ {
+ "access_token": "ABDEFGH",
+ "token_type": "Bearer",
+ "matrix_server_name": "example.com",
+ "expires_in": 3600,
+ }
+ """
+ PATTERNS = client_v2_patterns(
+ "/user/(?P<user_id>[^/]*)/openid/request_token"
+ )
+
+ EXPIRES_MS = 3600 * 1000
+
+ def __init__(self, hs):
+ super(IdTokenServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+ self.server_name = hs.config.server_name
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, user_id):
+ requester = yield self.auth.get_user_by_req(request)
+ if user_id != requester.user.to_string():
+ raise AuthError(403, "Cannot request tokens for other users.")
+
+ # Parse the request body to make sure it's JSON, but ignore the contents
+ # for now.
+ parse_json_object_from_request(request)
+
+ token = random_string(24)
+ ts_valid_until_ms = self.clock.time_msec() + self.EXPIRES_MS
+
+ yield self.store.insert_open_id_token(token, ts_valid_until_ms, user_id)
+
+ defer.returnValue((200, {
+ "access_token": token,
+ "token_type": "Bearer",
+ "matrix_server_name": self.server_name,
+ "expires_in": self.EXPIRES_MS / 1000,
+ }))
+
+
+def register_servlets(hs, http_server):
+ IdTokenServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index ff8f69ddbf..1ecc02d94d 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -48,6 +48,7 @@ class RegisterRestServlet(RestServlet):
super(RegisterRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
self.auth_handler = hs.get_handlers().auth_handler
self.registration_handler = hs.get_handlers().registration_handler
self.identity_handler = hs.get_handlers().identity_handler
@@ -214,6 +215,34 @@ class RegisterRestServlet(RestServlet):
threepid['validated_at'],
)
+ # And we add an email pusher for them by default, but only
+ # if email notifications are enabled (so people don't start
+ # getting mail spam where they weren't before if email
+ # notifs are set up on a home server)
+ if (
+ self.hs.config.email_enable_notifs and
+ self.hs.config.email_notif_for_new_users
+ ):
+ # Pull the ID of the access token back out of the db
+ # It would really make more sense for this to be passed
+ # up when the access token is saved, but that's quite an
+ # invasive change I'd rather do separately.
+ user_tuple = yield self.store.get_user_by_access_token(
+ token
+ )
+
+ yield self.hs.get_pusherpool().add_pusher(
+ user_id=user_id,
+ access_token=user_tuple["token_id"],
+ kind="email",
+ app_id="m.email",
+ app_display_name="Email Notifications",
+ device_display_name=threepid["address"],
+ pushkey=threepid["address"],
+ lang=None, # We don't know a user's language here
+ data={},
+ )
+
if 'bind_email' in params and params['bind_email']:
logger.info("bind_email specified: binding")
diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py
new file mode 100644
index 0000000000..8903e12405
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/report_event.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from ._base import client_v2_patterns
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class ReportEventRestServlet(RestServlet):
+ PATTERNS = client_v2_patterns(
+ "/rooms/(?P<room_id>[^/]*)/report/(?P<event_id>[^/]*)$"
+ )
+
+ def __init__(self, hs):
+ super(ReportEventRestServlet, self).__init__()
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.clock = hs.get_clock()
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, room_id, event_id):
+ requester = yield self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ body = parse_json_object_from_request(request)
+
+ yield self.store.add_event_report(
+ room_id=room_id,
+ event_id=event_id,
+ user_id=user_id,
+ reason=body.get("reason"),
+ content=body,
+ received_ts=self.clock.time_msec(),
+ )
+
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ ReportEventRestServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 045ae6c03f..d970fde9e8 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -44,6 +44,7 @@ from .receipts import ReceiptsStore
from .search import SearchStore
from .tags import TagsStore
from .account_data import AccountDataStore
+from .openid import OpenIdStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
@@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore,
SearchStore,
TagsStore,
AccountDataStore,
- EventPushActionsStore
+ EventPushActionsStore,
+ OpenIdStore,
):
def __init__(self, db_conn, hs):
@@ -114,6 +116,7 @@ class DataStore(RoomMemberStore, RoomStore,
self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
+ self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
self._push_rules_stream_id_gen = ChainedIdGenerator(
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1e27c2c0ce..e0d7098692 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -453,7 +453,9 @@ class SQLBaseStore(object):
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
insertion_values (dict): key/values to use when inserting
- Returns: A deferred
+ Returns:
+ Deferred(bool): True if a new entry was created, False if an
+ existing one was updated.
"""
return self.runInteraction(
desc,
@@ -498,6 +500,10 @@ class SQLBaseStore(object):
)
txn.execute(sql, allvalues.values())
+ return True
+ else:
+ return False
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 7a7fbf1e52..ec7e8d40d2 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -16,6 +16,8 @@
from ._base import SQLBaseStore
from twisted.internet import defer
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+
import ujson as json
import logging
@@ -24,6 +26,7 @@ logger = logging.getLogger(__name__)
class AccountDataStore(SQLBaseStore):
+ @cached()
def get_account_data_for_user(self, user_id):
"""Get all the client account_data for a user.
@@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore):
"get_account_data_for_user", get_account_data_for_user_txn
)
+ @cachedInlineCallbacks(num_args=2)
+ def get_global_account_data_by_type_for_user(self, data_type, user_id):
+ """
+ Returns:
+ Deferred: A dict
+ """
+ result = yield self._simple_select_one_onecol(
+ table="account_data",
+ keyvalues={
+ "user_id": user_id,
+ "account_data_type": data_type,
+ },
+ retcol="content",
+ desc="get_global_account_data_by_type_for_user",
+ allow_none=True,
+ )
+
+ if result:
+ defer.returnValue(json.loads(result))
+ else:
+ defer.returnValue(None)
+
+ @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
+ num_args=2, list_name="user_ids", inlineCallbacks=True)
+ def get_global_account_data_by_type_for_users(self, data_type, user_ids):
+ rows = yield self._simple_select_many_batch(
+ table="account_data",
+ column="user_id",
+ iterable=user_ids,
+ keyvalues={
+ "account_data_type": data_type,
+ },
+ retcols=("user_id", "content",),
+ desc="get_global_account_data_by_type_for_users",
+ )
+
+ defer.returnValue({
+ row["user_id"]: json.loads(row["content"]) if row["content"] else None
+ for row in rows
+ })
+
def get_account_data_for_room(self, user_id, room_id):
"""Get all the client account_data for a user for a room.
@@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore):
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
+ txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
@@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore):
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
+ txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
+ txn.call_after(
+ self.get_global_account_data_by_type_for_user.invalidate,
+ (account_data_type, user_id,)
+ )
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 86a98b6f11..9705db5c47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -118,16 +118,19 @@ class EventPushActionsStore(SQLBaseStore):
max_stream_ordering=None):
def get_after_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
- "FROM event_push_actions AS ep, ("
- " SELECT room_id, user_id,"
- " max(topological_ordering) as topological_ordering,"
- " max(stream_ordering) as stream_ordering"
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+ "e.received_ts "
+ "FROM ("
+ " SELECT room_id, user_id, "
+ " max(topological_ordering) as topological_ordering, "
+ " max(stream_ordering) as stream_ordering "
" FROM events"
" NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
" GROUP BY room_id, user_id"
- ") AS rl "
- "WHERE"
+ ") AS rl,"
+ " event_push_actions AS ep"
+ " INNER JOIN events AS e USING (room_id, event_id)"
+ " WHERE"
" ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
@@ -153,11 +156,13 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
- "SELECT ep.event_id, ep.stream_ordering, ep.actions "
- "FROM event_push_actions AS ep "
- "WHERE ep.room_id not in ("
+ "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+ " e.received_ts"
+ " FROM event_push_actions AS ep"
+ " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+ " WHERE ep.room_id not in ("
" SELECT room_id FROM events NATURAL JOIN receipts_linearized"
- " WHERE receipt_type = 'm.read' AND user_id = ? "
+ " WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
") AND ep.user_id = ? AND ep.stream_ordering > ?"
)
@@ -175,12 +180,30 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue([
{
"event_id": row[0],
- "stream_ordering": row[1],
- "actions": json.loads(row[2]),
+ "room_id": row[1],
+ "stream_ordering": row[2],
+ "actions": json.loads(row[3]),
+ "received_ts": row[4],
} for row in after_read_receipt + no_read_receipt
])
@defer.inlineCallbacks
+ def get_time_of_last_push_action_before(self, stream_ordering):
+ def f(txn):
+ sql = (
+ "SELECT e.received_ts"
+ " FROM event_push_actions AS ep"
+ " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+ " WHERE ep.stream_ordering > ?"
+ " ORDER BY ep.stream_ordering ASC"
+ " LIMIT 1"
+ )
+ txn.execute(sql, (stream_ordering,))
+ return txn.fetchone()
+ result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+ defer.returnValue(result[0] if result else None)
+
+ @defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
def f(txn):
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
@@ -201,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
+ def _remove_push_actions_before_txn(self, txn, room_id, user_id,
+ topological_ordering):
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (room_id, user_id, )
+ )
+ txn.execute(
+ "DELETE FROM event_push_actions"
+ " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
+ (room_id, user_id, topological_ordering,)
+ )
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0307b2af3c..4655669ba0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,12 +19,14 @@ from twisted.internet import defer, reactor
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
from synapse.events.utils import prune_event
+from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
-from collections import namedtuple
+from collections import deque, namedtuple
+
import logging
import math
@@ -50,28 +52,169 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
+class _EventPeristenceQueue(object):
+ """Queues up events so that they can be persisted in bulk with only one
+ concurrent transaction per room.
+ """
+
+ _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
+ "events_and_contexts", "current_state", "backfilled", "deferred",
+ ))
+
+ def __init__(self):
+ self._event_persist_queues = {}
+ self._currently_persisting_rooms = set()
+
+ def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
+ """Add events to the queue, with the given persist_event options.
+ """
+ queue = self._event_persist_queues.setdefault(room_id, deque())
+ if queue:
+ end_item = queue[-1]
+ if end_item.current_state or current_state:
+ # We perist events with current_state set to True one at a time
+ pass
+ if end_item.backfilled == backfilled:
+ end_item.events_and_contexts.extend(events_and_contexts)
+ return end_item.deferred.observe()
+
+ deferred = ObservableDeferred(defer.Deferred())
+
+ queue.append(self._EventPersistQueueItem(
+ events_and_contexts=events_and_contexts,
+ backfilled=backfilled,
+ current_state=current_state,
+ deferred=deferred,
+ ))
+
+ return deferred.observe()
+
+ def handle_queue(self, room_id, per_item_callback):
+ """Attempts to handle the queue for a room if not already being handled.
+
+ The given callback will be invoked with for each item in the queue,1
+ of type _EventPersistQueueItem. The per_item_callback will continuously
+ be called with new items, unless the queue becomnes empty. The return
+ value of the function will be given to the deferreds waiting on the item,
+ exceptions will be passed to the deferres as well.
+
+ This function should therefore be called whenever anything is added
+ to the queue.
+
+ If another callback is currently handling the queue then it will not be
+ invoked.
+ """
+
+ if room_id in self._currently_persisting_rooms:
+ return
+
+ self._currently_persisting_rooms.add(room_id)
+
+ @defer.inlineCallbacks
+ def handle_queue_loop():
+ try:
+ queue = self._get_drainining_queue(room_id)
+ for item in queue:
+ try:
+ ret = yield per_item_callback(item)
+ item.deferred.callback(ret)
+ except Exception as e:
+ item.deferred.errback(e)
+ finally:
+ queue = self._event_persist_queues.pop(room_id, None)
+ if queue:
+ self._event_persist_queues[room_id] = queue
+ self._currently_persisting_rooms.discard(room_id)
+
+ preserve_fn(handle_queue_loop)()
+
+ def _get_drainining_queue(self, room_id):
+ queue = self._event_persist_queues.setdefault(room_id, deque())
+
+ try:
+ while True:
+ yield queue.popleft()
+ except IndexError:
+ # Queue has been drained.
+ pass
+
+
class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
def __init__(self, hs):
super(EventsStore, self).__init__(hs)
+ self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
- @defer.inlineCallbacks
+ self._event_persist_queue = _EventPeristenceQueue()
+
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
backfilled: ?
+ """
+ partitioned = {}
+ for event, ctx in events_and_contexts:
+ partitioned.setdefault(event.room_id, []).append((event, ctx))
+
+ deferreds = []
+ for room_id, evs_ctxs in partitioned.items():
+ d = self._event_persist_queue.add_to_queue(
+ room_id, evs_ctxs,
+ backfilled=backfilled,
+ current_state=None,
+ )
+ deferreds.append(d)
- Returns: Tuple of stream_orderings where the first is the minimum and
- last is the maximum stream ordering assigned to the events when
- persisting.
+ for room_id in partitioned.keys():
+ self._maybe_start_persisting(room_id)
- """
+ return defer.gatherResults(deferreds, consumeErrors=True)
+
+ @defer.inlineCallbacks
+ @log_function
+ def persist_event(self, event, context, current_state=None, backfilled=False):
+ deferred = self._event_persist_queue.add_to_queue(
+ event.room_id, [(event, context)],
+ backfilled=backfilled,
+ current_state=current_state,
+ )
+
+ self._maybe_start_persisting(event.room_id)
+
+ yield deferred
+
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
+ defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+
+ def _maybe_start_persisting(self, room_id):
+ @defer.inlineCallbacks
+ def persisting_queue(item):
+ if item.current_state:
+ for event, context in item.events_and_contexts:
+ # There should only ever be one item in
+ # events_and_contexts when current_state is
+ # not None
+ yield self._persist_event(
+ event, context,
+ current_state=item.current_state,
+ backfilled=item.backfilled,
+ )
+ else:
+ yield self._persist_events(
+ item.events_and_contexts,
+ backfilled=item.backfilled,
+ )
+
+ self._event_persist_queue.handle_queue(room_id, persisting_queue)
+
+ @defer.inlineCallbacks
+ def _persist_events(self, events_and_contexts, backfilled=False):
if not events_and_contexts:
return
@@ -118,8 +261,7 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, current_state=None, backfilled=False):
-
+ def _persist_event(self, event, context, current_state=None, backfilled=False):
try:
with self._stream_id_gen.get_next() as stream_ordering:
with self._state_groups_id_gen.get_next() as state_group_id:
@@ -136,9 +278,6 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException:
pass
- max_persisted_id = yield self._stream_id_gen.get_current_token()
- defer.returnValue((stream_ordering, max_persisted_id))
-
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
@@ -427,6 +566,7 @@ class EventsStore(SQLBaseStore):
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
+ "received_ts": self._clock.time_msec(),
}
for event, _ in events_and_contexts
],
diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py
new file mode 100644
index 0000000000..5dabb607bd
--- /dev/null
+++ b/synapse/storage/openid.py
@@ -0,0 +1,32 @@
+from ._base import SQLBaseStore
+
+
+class OpenIdStore(SQLBaseStore):
+ def insert_open_id_token(self, token, ts_valid_until_ms, user_id):
+ return self._simple_insert(
+ table="open_id_tokens",
+ values={
+ "token": token,
+ "ts_valid_until_ms": ts_valid_until_ms,
+ "user_id": user_id,
+ },
+ desc="insert_open_id_token"
+ )
+
+ def get_user_id_for_open_id_token(self, token, ts_now_ms):
+ def get_user_id_for_token_txn(txn):
+ sql = (
+ "SELECT user_id FROM open_id_tokens"
+ " WHERE token = ? AND ? <= ts_valid_until_ms"
+ )
+
+ txn.execute(sql, (token, ts_now_ms))
+
+ rows = txn.fetchall()
+ if not rows:
+ return None
+ else:
+ return rows[0][0]
+ return self.runInteraction(
+ "get_user_id_for_token", get_user_id_for_token_txn
+ )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 57f14fd12b..c8487c8838 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 31
+SCHEMA_VERSION = 32
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 11feb3eb11..9e8e2e2964 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -156,8 +156,7 @@ class PusherStore(SQLBaseStore):
profile_tag=""):
with self._pushers_id_gen.get_next() as stream_id:
def f(txn):
- txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
- return self._simple_upsert_txn(
+ newly_inserted = self._simple_upsert_txn(
txn,
"pushers",
{
@@ -178,11 +177,18 @@ class PusherStore(SQLBaseStore):
"id": stream_id,
},
)
- defer.returnValue((yield self.runInteraction("add_pusher", f)))
+ if newly_inserted:
+ # get_users_with_pushers_in_room only cares if the user has
+ # at least *one* pusher.
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+
+ yield self.runInteraction("add_pusher", f)
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
+ txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+
self._simple_delete_one_txn(
txn,
"pushers",
@@ -194,6 +200,7 @@ class PusherStore(SQLBaseStore):
{"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
{"stream_id": stream_id},
)
+
with self._pushers_id_gen.get_next() as stream_id:
yield self.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
@@ -233,3 +240,30 @@ class PusherStore(SQLBaseStore):
{'failing_since': failing_since},
desc="update_pusher_failing_since",
)
+
+ @defer.inlineCallbacks
+ def get_throttle_params_by_room(self, pusher_id):
+ res = yield self._simple_select_list(
+ "pusher_throttle",
+ {"pusher": pusher_id},
+ ["room_id", "last_sent_ts", "throttle_ms"],
+ desc="get_throttle_params_by_room"
+ )
+
+ params_by_room = {}
+ for row in res:
+ params_by_room[row["room_id"]] = {
+ "last_sent_ts": row["last_sent_ts"],
+ "throttle_ms": row["throttle_ms"]
+ }
+
+ defer.returnValue(params_by_room)
+
+ @defer.inlineCallbacks
+ def set_throttle_params(self, pusher_id, room_id, params):
+ yield self._simple_upsert(
+ "pusher_throttle",
+ {"pusher": pusher_id, "room_id": room_id},
+ params,
+ desc="set_throttle_params"
+ )
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 935fc503d9..fdcf28f3e1 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue([ev for res in results.values() for ev in res])
- @cachedInlineCallbacks(num_args=3, max_entries=5000)
+ @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
@@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+ txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(
self._receipts_stream_cache.entity_has_changed,
@@ -244,6 +244,17 @@ class ReceiptsStore(SQLBaseStore):
(user_id, room_id, receipt_type)
)
+ res = self._simple_select_one_txn(
+ txn,
+ table="events",
+ retcols=["topological_ordering", "stream_ordering"],
+ keyvalues={"event_id": event_id},
+ allow_none=True
+ )
+
+ topological_ordering = int(res["topological_ordering"]) if res else None
+ stream_ordering = int(res["stream_ordering"]) if res else None
+
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
@@ -255,16 +266,7 @@ class ReceiptsStore(SQLBaseStore):
txn.execute(sql, (room_id, receipt_type, user_id))
results = txn.fetchall()
- if results:
- res = self._simple_select_one_txn(
- txn,
- table="events",
- retcols=["topological_ordering", "stream_ordering"],
- keyvalues={"event_id": event_id},
- )
- topological_ordering = int(res["topological_ordering"])
- stream_ordering = int(res["stream_ordering"])
-
+ if results and topological_ordering:
for to, so, _ in results:
if int(to) > topological_ordering:
return False
@@ -294,6 +296,14 @@ class ReceiptsStore(SQLBaseStore):
}
)
+ if receipt_type == "m.read" and topological_ordering:
+ self._remove_push_actions_before_txn(
+ txn,
+ room_id=room_id,
+ user_id=user_id,
+ topological_ordering=topological_ordering,
+ )
+
return True
@defer.inlineCallbacks
@@ -367,7 +377,7 @@ class ReceiptsStore(SQLBaseStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+ txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
self._simple_delete_txn(
txn,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7af0cae6a5..bda84a744a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore):
make_guest,
appservice_id
)
+ self.get_user_by_id.invalidate((user_id,))
self.is_guest.invalidate((user_id,))
def _register(
@@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore):
(next_id, user_id, token,)
)
+ @cached()
def get_user_by_id(self, user_id):
return self._simple_select_one(
table="users",
@@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore):
}, {
'password_hash': password_hash
})
+ self.get_user_by_id.invalidate((user_id,))
@defer.inlineCallbacks
def user_delete_access_tokens(self, user_id, except_token_ids=[]):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 70aa64fb31..26933e593a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine
import collections
import logging
+import ujson as json
logger = logging.getLogger(__name__)
@@ -221,3 +222,20 @@ class RoomStore(SQLBaseStore):
aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases))
+
+ def add_event_report(self, room_id, event_id, user_id, reason, content,
+ received_ts):
+ next_id = self._event_reports_id_gen.get_next()
+ return self._simple_insert(
+ table="event_reports",
+ values={
+ "id": next_id,
+ "received_ts": received_ts,
+ "room_id": room_id,
+ "event_id": event_id,
+ "user_id": user_id,
+ "reason": reason,
+ "content": json.dumps(content),
+ },
+ desc="add_event_report"
+ )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 08a54cbdd1..9d6bfd5245 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -21,7 +21,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.api.constants import Membership
-from synapse.types import UserID
+from synapse.types import get_domian_from_id
import logging
@@ -273,10 +273,7 @@ class RoomMemberStore(SQLBaseStore):
room_id, membership=Membership.JOIN
)
- joined_domains = set(
- UserID.from_string(r["user_id"]).domain
- for r in rows
- )
+ joined_domains = set(get_domian_from_id(r["user_id"]) for r in rows)
return joined_domains
diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql
new file mode 100644
index 0000000000..1dd0f9e170
--- /dev/null
+++ b/synapse/storage/schema/delta/32/events.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN received_ts BIGINT;
diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql
new file mode 100644
index 0000000000..36f37b11c8
--- /dev/null
+++ b/synapse/storage/schema/delta/32/openid.sql
@@ -0,0 +1,9 @@
+
+CREATE TABLE open_id_tokens (
+ token TEXT NOT NULL PRIMARY KEY,
+ ts_valid_until_ms bigint NOT NULL,
+ user_id TEXT NOT NULL,
+ UNIQUE (token)
+);
+
+CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms);
diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql
new file mode 100644
index 0000000000..d86d30c13c
--- /dev/null
+++ b/synapse/storage/schema/delta/32/pusher_throttle.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE pusher_throttle(
+ pusher BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ last_sent_ts BIGINT,
+ throttle_ms BIGINT,
+ PRIMARY KEY (pusher, room_id)
+);
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;
diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql
new file mode 100644
index 0000000000..d13609776f
--- /dev/null
+++ b/synapse/storage/schema/delta/32/reports.sql
@@ -0,0 +1,25 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE event_reports(
+ id BIGINT NOT NULL PRIMARY KEY,
+ received_ts BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ reason TEXT,
+ content TEXT
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index d338dfcf0a..6c7481a728 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -16,16 +16,56 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
+from twisted.internet import defer, reactor
+
from canonicaljson import encode_canonical_json
+
+from collections import namedtuple
+
+import itertools
import logging
logger = logging.getLogger(__name__)
+_TransactionRow = namedtuple(
+ "_TransactionRow", (
+ "id", "transaction_id", "destination", "ts", "response_code",
+ "response_json",
+ )
+)
+
+_UpdateTransactionRow = namedtuple(
+ "_TransactionRow", (
+ "response_code", "response_json",
+ )
+)
+
+
class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
+ def __init__(self, hs):
+ super(TransactionStore, self).__init__(hs)
+
+ # New transactions that are currently in flights
+ self.inflight_transactions = {}
+
+ # Newly delievered transactions that *weren't* persisted while in flight
+ self.new_delivered_transactions = {}
+
+ # Newly delivered transactions that *were* persisted while in flight
+ self.update_delivered_transactions = {}
+
+ self.last_transaction = {}
+
+ reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
+ hs.get_clock().looping_call(
+ self._persist_in_mem_txns,
+ 1000,
+ )
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore):
list: A list of previous transaction ids.
"""
- return self.runInteraction(
- "prep_send_transaction",
- self._prep_send_transaction,
- transaction_id, destination, origin_server_ts
+ auto_id = self._transaction_id_gen.get_next()
+
+ txn_row = _TransactionRow(
+ id=auto_id,
+ transaction_id=transaction_id,
+ destination=destination,
+ ts=origin_server_ts,
+ response_code=0,
+ response_json=None,
)
- def _prep_send_transaction(self, txn, transaction_id, destination,
- origin_server_ts):
+ self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
- next_id = self._transaction_id_gen.get_next()
+ prev_txn = self.last_transaction.get(destination)
+ if prev_txn:
+ return defer.succeed(prev_txn)
+ else:
+ return self.runInteraction(
+ "_get_prevs_txn",
+ self._get_prevs_txn,
+ destination,
+ )
+ def _get_prevs_txn(self, txn, destination):
# First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
@@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore):
prev_txns = [r["transaction_id"] for r in results]
- # Actually add the new transaction to the sent_transactions table.
-
- self._simple_insert_txn(
- txn,
- table="sent_transactions",
- values={
- "id": next_id,
- "transaction_id": transaction_id,
- "destination": destination,
- "ts": origin_server_ts,
- "response_code": 0,
- "response_json": None,
- }
- )
-
- # TODO Update the tx id -> pdu id mapping
-
return prev_txns
def delivered_txn(self, transaction_id, destination, code, response_dict):
@@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore):
code (int)
response_json (str)
"""
- return self.runInteraction(
- "delivered_txn",
- self._delivered_txn,
- transaction_id, destination, code,
- buffer(encode_canonical_json(response_dict)),
- )
- def _delivered_txn(self, txn, transaction_id, destination,
- code, response_json):
- self._simple_update_one_txn(
- txn,
- table="sent_transactions",
- keyvalues={
- "transaction_id": transaction_id,
- "destination": destination,
- },
- updatevalues={
- "response_code": code,
- "response_json": None, # For now, don't persist response_json
- }
- )
+ txn_row = self.inflight_transactions.get(
+ destination, {}
+ ).pop(transaction_id, None)
+
+ self.last_transaction[destination] = transaction_id
+
+ if txn_row:
+ d = self.new_delivered_transactions.setdefault(destination, {})
+ d[transaction_id] = txn_row._replace(
+ response_code=code,
+ response_json=None, # For now, don't persist response
+ )
+ else:
+ d = self.update_delivered_transactions.setdefault(destination, {})
+ # For now, don't persist response
+ d[transaction_id] = _UpdateTransactionRow(code, None)
def get_transactions_after(self, transaction_id, destination):
"""Get all transactions after a given local transaction_id.
@@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn)
+
+ @defer.inlineCallbacks
+ def _persist_in_mem_txns(self):
+ try:
+ inflight = self.inflight_transactions
+ new_delivered = self.new_delivered_transactions
+ update_delivered = self.update_delivered_transactions
+
+ self.inflight_transactions = {}
+ self.new_delivered_transactions = {}
+ self.update_delivered_transactions = {}
+
+ full_rows = [
+ row._asdict()
+ for txn_map in itertools.chain(inflight.values(), new_delivered.values())
+ for row in txn_map.values()
+ ]
+
+ def f(txn):
+ if full_rows:
+ self._simple_insert_many_txn(
+ txn=txn,
+ table="sent_transactions",
+ values=full_rows
+ )
+
+ for dest, txn_map in update_delivered.items():
+ for txn_id, update_row in txn_map.items():
+ self._simple_update_one_txn(
+ txn,
+ table="sent_transactions",
+ keyvalues={
+ "transaction_id": txn_id,
+ "destination": dest,
+ },
+ updatevalues={
+ "response_code": update_row.response_code,
+ "response_json": None, # For now, don't persist response
+ }
+ )
+
+ if full_rows or update_delivered:
+ yield self.runInteraction("_persist_in_mem_txns", f)
+ except:
+ logger.exception("Failed to persist transactions!")
diff --git a/synapse/types.py b/synapse/types.py
index 5b166835bd..42fd9c7204 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -21,6 +21,10 @@ from collections import namedtuple
Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
+def get_domian_from_id(string):
+ return string.split(":", 1)[1]
+
+
class DomainSpecificString(
namedtuple("DomainSpecificString", ("localpart", "domain"))
):
diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py
new file mode 100644
index 0000000000..3efa8a8206
--- /dev/null
+++ b/synapse/util/presentable_names.py
@@ -0,0 +1,159 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+
+# intentionally looser than what aliases we allow to be registered since
+# other HSes may allow aliases that we would not
+ALIAS_RE = re.compile(r"^#.*:.+$")
+
+ALL_ALONE = "Empty Room"
+
+
+def calculate_room_name(room_state, user_id, fallback_to_members=True):
+ """
+ Works out a user-facing name for the given room as per Matrix
+ spec recommendations.
+ Does not yet support internationalisation.
+ Args:
+ room_state: Dictionary of the room's state
+ user_id: The ID of the user to whom the room name is being presented
+ fallback_to_members: If False, return None instead of generating a name
+ based on the room's members if the room has no
+ title or aliases.
+
+ Returns:
+ (string or None) A human readable name for the room.
+ """
+ # does it have a name?
+ if ("m.room.name", "") in room_state:
+ m_room_name = room_state[("m.room.name", "")]
+ if m_room_name.content and m_room_name.content["name"]:
+ return m_room_name.content["name"]
+
+ # does it have a canonical alias?
+ if ("m.room.canonical_alias", "") in room_state:
+ canon_alias = room_state[("m.room.canonical_alias", "")]
+ if (
+ canon_alias.content and canon_alias.content["alias"] and
+ _looks_like_an_alias(canon_alias.content["alias"])
+ ):
+ return canon_alias.content["alias"]
+
+ # at this point we're going to need to search the state by all state keys
+ # for an event type, so rearrange the data structure
+ room_state_bytype = _state_as_two_level_dict(room_state)
+
+ # right then, any aliases at all?
+ if "m.room.aliases" in room_state_bytype:
+ m_room_aliases = room_state_bytype["m.room.aliases"]
+ if len(m_room_aliases.values()) > 0:
+ first_alias_event = m_room_aliases.values()[0]
+ if first_alias_event.content and first_alias_event.content["aliases"]:
+ the_aliases = first_alias_event.content["aliases"]
+ if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]):
+ return the_aliases[0]
+
+ if not fallback_to_members:
+ return None
+
+ my_member_event = None
+ if ("m.room.member", user_id) in room_state:
+ my_member_event = room_state[("m.room.member", user_id)]
+
+ if (
+ my_member_event is not None and
+ my_member_event.content['membership'] == "invite"
+ ):
+ if ("m.room.member", my_member_event.sender) in room_state:
+ inviter_member_event = room_state[("m.room.member", my_member_event.sender)]
+ return "Invite from %s" % (name_from_member_event(inviter_member_event),)
+ else:
+ return "Room Invite"
+
+ # we're going to have to generate a name based on who's in the room,
+ # so find out who is in the room that isn't the user.
+ if "m.room.member" in room_state_bytype:
+ all_members = [
+ ev for ev in room_state_bytype["m.room.member"].values()
+ if ev.content['membership'] == "join" or ev.content['membership'] == "invite"
+ ]
+ # Sort the member events oldest-first so the we name people in the
+ # order the joined (it should at least be deterministic rather than
+ # dictionary iteration order)
+ all_members.sort(key=lambda e: e.origin_server_ts)
+ other_members = [m for m in all_members if m.state_key != user_id]
+ else:
+ other_members = []
+ all_members = []
+
+ if len(other_members) == 0:
+ if len(all_members) == 1:
+ # self-chat, peeked room with 1 participant,
+ # or inbound invite, or outbound 3PID invite.
+ if all_members[0].sender == user_id:
+ if "m.room.third_party_invite" in room_state_bytype:
+ third_party_invites = room_state_bytype["m.room.third_party_invite"]
+ if len(third_party_invites) > 0:
+ # technically third party invite events are not member
+ # events, but they are close enough
+ return "Inviting %s" (
+ descriptor_from_member_events(third_party_invites)
+ )
+ else:
+ return ALL_ALONE
+ else:
+ return name_from_member_event(all_members[0])
+ else:
+ return ALL_ALONE
+ else:
+ return descriptor_from_member_events(other_members)
+
+
+def descriptor_from_member_events(member_events):
+ if len(member_events) == 0:
+ return "nobody"
+ elif len(member_events) == 1:
+ return name_from_member_event(member_events[0])
+ elif len(member_events) == 2:
+ return "%s and %s" % (
+ name_from_member_event(member_events[0]),
+ name_from_member_event(member_events[1]),
+ )
+ else:
+ return "%s and %d others" % (
+ name_from_member_event(member_events[0]),
+ len(member_events) - 1,
+ )
+
+
+def name_from_member_event(member_event):
+ if (
+ member_event.content and "displayname" in member_event.content and
+ member_event.content["displayname"]
+ ):
+ return member_event.content["displayname"]
+ return member_event.state_key
+
+
+def _state_as_two_level_dict(state):
+ ret = {}
+ for k, v in state.items():
+ ret.setdefault(k[0], {})[k[1]] = v
+ return ret
+
+
+def _looks_like_an_alias(string):
+ return ALIAS_RE.match(string) is not None
diff --git a/synapse/visibility.py b/synapse/visibility.py
new file mode 100644
index 0000000000..948ad51772
--- /dev/null
+++ b/synapse/visibility.py
@@ -0,0 +1,210 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import Membership, EventTypes
+
+from synapse.util.logcontext import preserve_fn
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+VISIBILITY_PRIORITY = (
+ "world_readable",
+ "shared",
+ "invited",
+ "joined",
+)
+
+
+MEMBERSHIP_PRIORITY = (
+ Membership.JOIN,
+ Membership.INVITE,
+ Membership.KNOCK,
+ Membership.LEAVE,
+ Membership.BAN,
+)
+
+
+@defer.inlineCallbacks
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+ """ Returns dict of user_id -> list of events that user is allowed to
+ see.
+
+ Args:
+ user_tuples (str, bool): (user id, is_peeking) for each user to be
+ checked. is_peeking should be true if:
+ * the user is not currently a member of the room, and:
+ * the user has not been a member of the room since the
+ given events
+ events ([synapse.events.EventBase]): list of events to filter
+ """
+ forgotten = yield defer.gatherResults([
+ preserve_fn(store.who_forgot_in_room)(
+ room_id,
+ )
+ for room_id in frozenset(e.room_id for e in events)
+ ], consumeErrors=True)
+
+ # Set of membership event_ids that have been forgotten
+ event_id_forgotten = frozenset(
+ row["event_id"] for rows in forgotten for row in rows
+ )
+
+ ignore_dict_content = yield store.get_global_account_data_by_type_for_users(
+ "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples]
+ )
+
+ # FIXME: This will explode if people upload something incorrect.
+ ignore_dict = {
+ user_id: frozenset(
+ content.get("ignored_users", {}).keys() if content else []
+ )
+ for user_id, content in ignore_dict_content.items()
+ }
+
+ def allowed(event, user_id, is_peeking, ignore_list):
+ """
+ Args:
+ event (synapse.events.EventBase): event to check
+ user_id (str)
+ is_peeking (bool)
+ ignore_list (list): list of users to ignore
+ """
+ if not event.is_state() and event.sender in ignore_list:
+ return False
+
+ state = event_id_to_state[event.event_id]
+
+ # get the room_visibility at the time of the event.
+ visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+ if visibility_event:
+ visibility = visibility_event.content.get("history_visibility", "shared")
+ else:
+ visibility = "shared"
+
+ if visibility not in VISIBILITY_PRIORITY:
+ visibility = "shared"
+
+ # if it was world_readable, it's easy: everyone can read it
+ if visibility == "world_readable":
+ return True
+
+ # Always allow history visibility events on boundaries. This is done
+ # by setting the effective visibility to the least restrictive
+ # of the old vs new.
+ if event.type == EventTypes.RoomHistoryVisibility:
+ prev_content = event.unsigned.get("prev_content", {})
+ prev_visibility = prev_content.get("history_visibility", None)
+
+ if prev_visibility not in VISIBILITY_PRIORITY:
+ prev_visibility = "shared"
+
+ new_priority = VISIBILITY_PRIORITY.index(visibility)
+ old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
+ if old_priority < new_priority:
+ visibility = prev_visibility
+
+ # likewise, if the event is the user's own membership event, use
+ # the 'most joined' membership
+ membership = None
+ if event.type == EventTypes.Member and event.state_key == user_id:
+ membership = event.content.get("membership", None)
+ if membership not in MEMBERSHIP_PRIORITY:
+ membership = "leave"
+
+ prev_content = event.unsigned.get("prev_content", {})
+ prev_membership = prev_content.get("membership", None)
+ if prev_membership not in MEMBERSHIP_PRIORITY:
+ prev_membership = "leave"
+
+ new_priority = MEMBERSHIP_PRIORITY.index(membership)
+ old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
+ if old_priority < new_priority:
+ membership = prev_membership
+
+ # otherwise, get the user's membership at the time of the event.
+ if membership is None:
+ membership_event = state.get((EventTypes.Member, user_id), None)
+ if membership_event:
+ if membership_event.event_id not in event_id_forgotten:
+ membership = membership_event.membership
+
+ # if the user was a member of the room at the time of the event,
+ # they can see it.
+ if membership == Membership.JOIN:
+ return True
+
+ if visibility == "joined":
+ # we weren't a member at the time of the event, so we can't
+ # see this event.
+ return False
+
+ elif visibility == "invited":
+ # user can also see the event if they were *invited* at the time
+ # of the event.
+ return membership == Membership.INVITE
+
+ else:
+ # visibility is shared: user can also see the event if they have
+ # become a member since the event
+ #
+ # XXX: if the user has subsequently joined and then left again,
+ # ideally we would share history up to the point they left. But
+ # we don't know when they left.
+ return not is_peeking
+
+ defer.returnValue({
+ user_id: [
+ event
+ for event in events
+ if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, []))
+ ]
+ for user_id, is_peeking in user_tuples
+ })
+
+
+@defer.inlineCallbacks
+def filter_events_for_client(store, user_id, events, is_peeking=False):
+ """
+ Check which events a user is allowed to see
+
+ Args:
+ user_id(str): user id to be checked
+ events([synapse.events.EventBase]): list of events to be checked
+ is_peeking(bool): should be True if:
+ * the user is not currently a member of the room, and:
+ * the user has not been a member of the room since the given
+ events
+
+ Returns:
+ [synapse.events.EventBase]
+ """
+ types = (
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, user_id),
+ )
+ event_id_to_state = yield store.get_state_for_events(
+ frozenset(e.event_id for e in events),
+ types=types
+ )
+ res = yield filter_events_for_clients(
+ store, [(user_id, is_peeking)], events, event_id_to_state
+ )
+ defer.returnValue(res.get(user_id, []))
|