diff --git a/CHANGES.rst b/CHANGES.rst
index 6291fedb9a..f1529e79bd 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,17 @@
+Changes in synapse v0.23.1 (2017-10-02)
+=======================================
+
+Changes:
+
+* Make 'affinity' package optional, as it is not supported on some platforms
+
+
+Changes in synapse v0.23.0 (2017-10-02)
+=======================================
+
+No changes since v0.23.0-rc2
+
+
Changes in synapse v0.23.0-rc2 (2017-09-26)
===========================================
diff --git a/contrib/systemd/synapse.service b/contrib/systemd/synapse.service
index 92d94b9d58..3f037055b9 100644
--- a/contrib/systemd/synapse.service
+++ b/contrib/systemd/synapse.service
@@ -9,9 +9,10 @@ Description=Synapse Matrix homeserver
Type=simple
User=synapse
Group=synapse
-EnvironmentFile=-/etc/sysconfig/synapse
WorkingDirectory=/var/lib/synapse
-ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml --log-config=/etc/synapse/log_config.yaml
+ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
+ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
[Install]
WantedBy=multi-user.target
+
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index bc167b59af..dc7fe940e8 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -376,10 +376,13 @@ class Porter(object):
" VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
)
- rows_dict = [
- dict(zip(headers, row))
- for row in rows
- ]
+ rows_dict = []
+ for row in rows:
+ d = dict(zip(headers, row))
+ if "\0" in d['value']:
+ logger.warn('dropping search row %s', d)
+ else:
+ rows_dict.append(d)
txn.executemany(sql, [
(
diff --git a/synapse/__init__.py b/synapse/__init__.py
index ec83e6adb7..bee4aba625 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.23.0-rc2"
+__version__ = "0.23.1"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index cd0e815919..cf4730730d 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -12,10 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import gc
import logging
+import sys
+
+try:
+ import affinity
+except:
+ affinity = None
-import affinity
from daemonize import Daemonize
from synapse.util import PreserveLoggingContext
from synapse.util.rlimit import change_resource_limit
@@ -78,6 +84,13 @@ def start_reactor(
with PreserveLoggingContext():
logger.info("Running")
if cpu_affinity is not None:
+ if not affinity:
+ quit_with_error(
+ "Missing package 'affinity' required for cpu_affinity\n"
+ "option\n\n"
+ "Install by running:\n\n"
+ " pip install affinity\n\n"
+ )
logger.info("Setting CPU affinity to %s" % cpu_affinity)
affinity.set_process_affinity_mask(0, cpu_affinity)
change_resource_limit(soft_file_limit)
@@ -97,3 +110,13 @@ def start_reactor(
daemon.start()
else:
run()
+
+
+def quit_with_error(error_string):
+ message_lines = error_string.split("\n")
+ line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
+ sys.stderr.write("*" * line_length + '\n')
+ for line in message_lines:
+ sys.stderr.write(" %s\n" % (line.rstrip(),))
+ sys.stderr.write("*" * line_length + '\n')
+ sys.exit(1)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 84ad8f04a0..3adf72e141 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -25,6 +25,7 @@ from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
STATIC_PREFIX, WEB_CLIENT_PREFIX
from synapse.app import _base
+from synapse.app._base import quit_with_error
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
@@ -249,16 +250,6 @@ class SynapseHomeServer(HomeServer):
return db_conn
-def quit_with_error(error_string):
- message_lines = error_string.split("\n")
- line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
- sys.stderr.write("*" * line_length + '\n')
- for line in message_lines:
- sys.stderr.write(" %s\n" % (line.rstrip(),))
- sys.stderr.write("*" * line_length + '\n')
- sys.exit(1)
-
-
def setup(config_options):
"""
Args:
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index e739f105b2..dccc579eac 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -45,3 +45,69 @@ class SpamChecker(object):
return False
return self.spam_checker.check_event_for_spam(event)
+
+ def user_may_invite(self, inviter_userid, invitee_userid, room_id):
+ """Checks if a given user may send an invite
+
+ If this method returns false, the invite will be rejected.
+
+ Args:
+ userid (string): The sender's user ID
+
+ Returns:
+ bool: True if the user may send an invite, otherwise False
+ """
+ if self.spam_checker is None:
+ return True
+
+ return self.spam_checker.user_may_invite(inviter_userid, invitee_userid, room_id)
+
+ def user_may_create_room(self, userid):
+ """Checks if a given user may create a room
+
+ If this method returns false, the creation request will be rejected.
+
+ Args:
+ userid (string): The sender's user ID
+
+ Returns:
+ bool: True if the user may create a room, otherwise False
+ """
+ if self.spam_checker is None:
+ return True
+
+ return self.spam_checker.user_may_create_room(userid)
+
+ def user_may_create_room_alias(self, userid, room_alias):
+ """Checks if a given user may create a room alias
+
+ If this method returns false, the association request will be rejected.
+
+ Args:
+ userid (string): The sender's user ID
+ room_alias (string): The alias to be created
+
+ Returns:
+ bool: True if the user may create a room alias, otherwise False
+ """
+ if self.spam_checker is None:
+ return True
+
+ return self.spam_checker.user_may_create_room_alias(userid, room_alias)
+
+ def user_may_publish_room(self, userid, room_id):
+ """Checks if a given user may publish a room to the directory
+
+ If this method returns false, the publish request will be rejected.
+
+ Args:
+ userid (string): The sender's user ID
+ room_id (string): The ID of the room that would be published
+
+ Returns:
+ bool: True if the user may publish the room, otherwise False
+ """
+ if self.spam_checker is None:
+ return True
+
+ return self.spam_checker.user_may_publish_room(userid, room_id)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 51e3fdea06..f00d59e701 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -12,14 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
from twisted.internet import defer
from .federation_base import FederationBase
from .units import Transaction, Edu
-from synapse.util.async import Linearizer
+from synapse.util import async
from synapse.util.logutils import log_function
from synapse.util.caches.response_cache import ResponseCache
from synapse.events import FrozenEvent
@@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature
import simplejson as json
import logging
+# when processing incoming transactions, we try to handle multiple rooms in
+# parallel, up to this limit.
+TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
@@ -52,7 +53,8 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
- self._server_linearizer = Linearizer("fed_server")
+ self._server_linearizer = async.Linearizer("fed_server")
+ self._transaction_linearizer = async.Linearizer("fed_txn_handler")
# We cache responses to state queries, as they take a while and often
# come in waves.
@@ -109,25 +111,41 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_incoming_transaction(self, transaction_data):
+ # keep this as early as possible to make the calculated origin ts as
+ # accurate as possible.
+ request_time = self._clock.time_msec()
+
transaction = Transaction(**transaction_data)
- received_pdus_counter.inc_by(len(transaction.pdus))
+ if not transaction.transaction_id:
+ raise Exception("Transaction missing transaction_id")
+ if not transaction.origin:
+ raise Exception("Transaction missing origin")
- for p in transaction.pdus:
- if "unsigned" in p:
- unsigned = p["unsigned"]
- if "age" in unsigned:
- p["age"] = unsigned["age"]
- if "age" in p:
- p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
- del p["age"]
+ logger.debug("[%s] Got transaction", transaction.transaction_id)
- pdu_list = [
- self.event_from_pdu_json(p) for p in transaction.pdus
- ]
+ # use a linearizer to ensure that we don't process the same transaction
+ # multiple times in parallel.
+ with (yield self._transaction_linearizer.queue(
+ (transaction.origin, transaction.transaction_id),
+ )):
+ result = yield self._handle_incoming_transaction(
+ transaction, request_time,
+ )
- logger.debug("[%s] Got transaction", transaction.transaction_id)
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _handle_incoming_transaction(self, transaction, request_time):
+ """ Process an incoming transaction and return the HTTP response
+
+ Args:
+ transaction (Transaction): incoming transaction
+ request_time (int): timestamp that the HTTP request arrived at
+ Returns:
+ Deferred[(int, object)]: http response code and body
+ """
response = yield self.transaction_actions.have_responded(transaction)
if response:
@@ -140,42 +158,50 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id)
- results = []
-
- for pdu in pdu_list:
- # check that it's actually being sent from a valid destination to
- # workaround bug #1753 in 0.18.5 and 0.18.6
- if transaction.origin != get_domain_from_id(pdu.event_id):
- # We continue to accept join events from any server; this is
- # necessary for the federation join dance to work correctly.
- # (When we join over federation, the "helper" server is
- # responsible for sending out the join event, rather than the
- # origin. See bug #1893).
- if not (
- pdu.type == 'm.room.member' and
- pdu.content and
- pdu.content.get("membership", None) == 'join'
- ):
- logger.info(
- "Discarding PDU %s from invalid origin %s",
- pdu.event_id, transaction.origin
- )
- continue
- else:
- logger.info(
- "Accepting join PDU %s from %s",
- pdu.event_id, transaction.origin
- )
+ received_pdus_counter.inc_by(len(transaction.pdus))
- try:
- yield self._handle_received_pdu(transaction.origin, pdu)
- results.append({})
- except FederationError as e:
- self.send_failure(e, transaction.origin)
- results.append({"error": str(e)})
- except Exception as e:
- results.append({"error": str(e)})
- logger.exception("Failed to handle PDU")
+ pdus_by_room = {}
+
+ for p in transaction.pdus:
+ if "unsigned" in p:
+ unsigned = p["unsigned"]
+ if "age" in unsigned:
+ p["age"] = unsigned["age"]
+ if "age" in p:
+ p["age_ts"] = request_time - int(p["age"])
+ del p["age"]
+
+ event = self.event_from_pdu_json(p)
+ room_id = event.room_id
+ pdus_by_room.setdefault(room_id, []).append(event)
+
+ pdu_results = {}
+
+ # we can process different rooms in parallel (which is useful if they
+ # require callouts to other servers to fetch missing events), but
+ # impose a limit to avoid going too crazy with ram/cpu.
+ @defer.inlineCallbacks
+ def process_pdus_for_room(room_id):
+ logger.debug("Processing PDUs for %s", room_id)
+ for pdu in pdus_by_room[room_id]:
+ event_id = pdu.event_id
+ try:
+ yield self._handle_received_pdu(
+ transaction.origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ self.send_failure(e, transaction.origin)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ pdu_results[event_id] = {"error": str(e)}
+ logger.exception("Failed to handle PDU %s", event_id)
+
+ yield async.concurrently_execute(
+ process_pdus_for_room, pdus_by_room.keys(),
+ TRANSACTION_CONCURRENCY_LIMIT,
+ )
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
@@ -188,14 +214,12 @@ class FederationServer(FederationBase):
for failure in getattr(transaction, "pdu_failures", []):
logger.info("Got failure %r", failure)
- logger.debug("Returning: %s", str(results))
-
response = {
- "pdus": dict(zip(
- (p.event_id for p in pdu_list), results
- )),
+ "pdus": pdu_results,
}
+ logger.debug("Returning: %s", str(response))
+
yield self.transaction_actions.set_response(
transaction,
200, response
@@ -520,6 +544,30 @@ class FederationServer(FederationBase):
Returns (Deferred): completes with None
Raises: FederationError if the signatures / hash do not match
"""
+ # check that it's actually being sent from a valid destination to
+ # workaround bug #1753 in 0.18.5 and 0.18.6
+ if origin != get_domain_from_id(pdu.event_id):
+ # We continue to accept join events from any server; this is
+ # necessary for the federation join dance to work correctly.
+ # (When we join over federation, the "helper" server is
+ # responsible for sending out the join event, rather than the
+ # origin. See bug #1893).
+ if not (
+ pdu.type == 'm.room.member' and
+ pdu.content and
+ pdu.content.get("membership", None) == 'join'
+ ):
+ logger.info(
+ "Discarding PDU %s from invalid origin %s",
+ pdu.event_id, origin
+ )
+ return
+ else:
+ logger.info(
+ "Accepting join PDU %s from %s",
+ pdu.event_id, origin
+ )
+
# Check signature.
try:
pdu = yield self._check_sigs_and_hash(pdu)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 003eaba893..7a3c9cbb70 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,8 +20,8 @@ from .persistence import TransactionActions
from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
+from synapse.util import logcontext
from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@@ -231,11 +231,9 @@ class TransactionQueue(object):
(pdu, order)
)
- preserve_context_over_fn(
- self._attempt_new_transaction, destination
- )
+ self._attempt_new_transaction(destination)
- @preserve_fn # the caller should not yield on this
+ @logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
@@ -299,7 +297,7 @@ class TransactionQueue(object):
state.user_id: state for state in states
})
- preserve_fn(self._attempt_new_transaction)(destination)
+ self._attempt_new_transaction(destination)
def send_edu(self, destination, edu_type, content, key=None):
edu = Edu(
@@ -321,9 +319,7 @@ class TransactionQueue(object):
else:
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
- preserve_context_over_fn(
- self._attempt_new_transaction, destination
- )
+ self._attempt_new_transaction(destination)
def send_failure(self, failure, destination):
if destination == self.server_name or destination == "localhost":
@@ -336,9 +332,7 @@ class TransactionQueue(object):
destination, []
).append(failure)
- preserve_context_over_fn(
- self._attempt_new_transaction, destination
- )
+ self._attempt_new_transaction(destination)
def send_device_messages(self, destination):
if destination == self.server_name or destination == "localhost":
@@ -347,15 +341,24 @@ class TransactionQueue(object):
if not self.can_send_to(destination):
return
- preserve_context_over_fn(
- self._attempt_new_transaction, destination
- )
+ self._attempt_new_transaction(destination)
def get_current_token(self):
return 0
- @defer.inlineCallbacks
def _attempt_new_transaction(self, destination):
+ """Try to start a new transaction to this destination
+
+ If there is already a transaction in progress to this destination,
+ returns immediately. Otherwise kicks off the process of sending a
+ transaction in the background.
+
+ Args:
+ destination (str):
+
+ Returns:
+ None
+ """
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
@@ -368,6 +371,19 @@ class TransactionQueue(object):
)
return
+ logger.debug("TX [%s] Starting transaction loop", destination)
+
+ # Drop the logcontext before starting the transaction. It doesn't
+ # really make sense to log all the outbound transactions against
+ # whatever path led us to this point: that's pretty arbitrary really.
+ #
+ # (this also means we can fire off _perform_transaction without
+ # yielding)
+ with logcontext.PreserveLoggingContext():
+ self._transaction_transmission_loop(destination)
+
+ @defer.inlineCallbacks
+ def _transaction_transmission_loop(self, destination):
pending_pdus = []
try:
self.pending_transactions[destination] = 1
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 943554ce98..a0464ae5c0 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -40,6 +40,8 @@ class DirectoryHandler(BaseHandler):
"directory", self.on_directory_query
)
+ self.spam_checker = hs.get_spam_checker()
+
@defer.inlineCallbacks
def _create_association(self, room_alias, room_id, servers=None, creator=None):
# general association creation for both human users and app services
@@ -73,6 +75,11 @@ class DirectoryHandler(BaseHandler):
# association creation for human users
# TODO(erikj): Do user auth.
+ if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
+ raise SynapseError(
+ 403, "This user is not permitted to create this alias",
+ )
+
can_create = yield self.can_modify_alias(
room_alias,
user_id=user_id
@@ -327,6 +334,14 @@ class DirectoryHandler(BaseHandler):
room_id (str)
visibility (str): "public" or "private"
"""
+ if not self.spam_checker.user_may_publish_room(
+ requester.user.to_string(), room_id
+ ):
+ raise AuthError(
+ 403,
+ "This user is not permitted to publish rooms to the room list"
+ )
+
if requester.is_guest:
raise AuthError(403, "Guests cannot edit the published room list")
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 18f87cad67..7711cded01 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,7 +14,6 @@
# limitations under the License.
"""Contains handlers for federation events."""
-import synapse.util.logcontext
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
@@ -26,10 +25,7 @@ from synapse.api.errors import (
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import (
- preserve_fn, preserve_context_over_deferred
-)
+from synapse.util import unwrapFirstError, logcontext
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor, Linearizer
@@ -77,6 +73,7 @@ class FederationHandler(BaseHandler):
self.action_generator = hs.get_action_generator()
self.is_mine_id = hs.is_mine_id
self.pusher_pool = hs.get_pusherpool()
+ self.spam_checker = hs.get_spam_checker()
self.replication_layer.set_handler(self)
@@ -125,6 +122,28 @@ class FederationHandler(BaseHandler):
self.room_queues[pdu.room_id].append((pdu, origin))
return
+ # If we're no longer in the room just ditch the event entirely. This
+ # is probably an old server that has come back and thinks we're still
+ # in the room (or we've been rejoined to the room by a state reset).
+ #
+ # If we were never in the room then maybe our database got vaped and
+ # we should check if we *are* in fact in the room. If we are then we
+ # can magically rejoin the room.
+ is_in_room = yield self.auth.check_host_in_room(
+ pdu.room_id,
+ self.server_name
+ )
+ if not is_in_room:
+ was_in_room = yield self.store.was_host_joined(
+ pdu.room_id, self.server_name,
+ )
+ if was_in_room:
+ logger.info(
+ "Ignoring PDU %s for room %s from %s as we've left the room!",
+ pdu.event_id, pdu.room_id, origin,
+ )
+ return
+
state = None
auth_chain = []
@@ -591,9 +610,9 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch
)
- results = yield preserve_context_over_deferred(defer.gatherResults(
+ results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- preserve_fn(self.replication_layer.get_pdu)(
+ logcontext.preserve_fn(self.replication_layer.get_pdu)(
[dest],
event_id,
outlier=True,
@@ -785,10 +804,14 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
- states = yield preserve_context_over_deferred(defer.gatherResults([
- preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
- for e in event_ids
- ]))
+ states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
+ room_id, [e]
+ )
+ for e in event_ids
+ ], consumeErrors=True,
+ ))
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
@@ -941,9 +964,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
- synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
- room_queue
- )
+ logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
defer.returnValue(True)
@@ -1070,6 +1091,9 @@ class FederationHandler(BaseHandler):
"""
event = pdu
+ if event.state_key is None:
+ raise SynapseError(400, "The invite event did not have a state key")
+
is_blocked = yield self.store.is_room_blocked(event.room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
@@ -1077,6 +1101,13 @@ class FederationHandler(BaseHandler):
if self.hs.config.block_non_admin_invites:
raise SynapseError(403, "This server does not accept room invites")
+ if not self.spam_checker.user_may_invite(
+ event.sender, event.state_key, event.room_id,
+ ):
+ raise SynapseError(
+ 403, "This user is not permitted to send invites to this server/user"
+ )
+
membership = event.content.get("membership")
if event.type != EventTypes.Member or membership != Membership.INVITE:
raise SynapseError(400, "The event was not an m.room.member invite event")
@@ -1085,9 +1116,6 @@ class FederationHandler(BaseHandler):
if sender_domain != origin:
raise SynapseError(400, "The invite event was not from the server sending it")
- if event.state_key is None:
- raise SynapseError(400, "The invite event did not have a state key")
-
if not self.is_mine_id(event.state_key):
raise SynapseError(400, "The invite event must be for this server")
@@ -1430,7 +1458,7 @@ class FederationHandler(BaseHandler):
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- preserve_fn(self.pusher_pool.on_new_notifications)(
+ logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
)
@@ -1443,16 +1471,16 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
- contexts = yield preserve_context_over_deferred(defer.gatherResults(
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- preserve_fn(self._prep_event)(
+ logcontext.preserve_fn(self._prep_event)(
origin,
ev_info["event"],
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
- ]
+ ], consumeErrors=True,
))
yield self.store.persist_events(
@@ -1760,18 +1788,17 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
- different_events = yield preserve_context_over_deferred(defer.gatherResults(
- [
- preserve_fn(self.store.get_event)(
+ different_events = yield logcontext.make_deferred_yieldable(
+ defer.gatherResults([
+ logcontext.preserve_fn(self.store.get_event)(
d,
allow_none=True,
allow_rejected=False,
)
for d in different_auth
if d in have_events and not have_events[d]
- ],
- consumeErrors=True
- )).addErrback(unwrapFirstError)
+ ], consumeErrors=True)
+ ).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e22d4803b9..fbf88b46ef 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -25,6 +25,7 @@ from synapse.types import (
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import measure_func
+from synapse.util.frozenutils import unfreeze
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -556,7 +557,7 @@ class MessageHandler(BaseHandler):
# Ensure that we can round trip before trying to persist in db
try:
- dump = ujson.dumps(event.content)
+ dump = ujson.dumps(unfreeze(event.content))
ujson.loads(dump)
except:
logger.exception("Failed to encode content: %r", event.content)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5698d28088..535ba9517c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -60,6 +60,11 @@ class RoomCreationHandler(BaseHandler):
},
}
+ def __init__(self, hs):
+ super(RoomCreationHandler, self).__init__(hs)
+
+ self.spam_checker = hs.get_spam_checker()
+
@defer.inlineCallbacks
def create_room(self, requester, config, ratelimit=True):
""" Creates a new room.
@@ -75,6 +80,9 @@ class RoomCreationHandler(BaseHandler):
"""
user_id = requester.user.to_string()
+ if not self.spam_checker.user_may_create_room(user_id):
+ raise SynapseError(403, "You are not permitted to create rooms")
+
if ratelimit:
yield self.ratelimit(requester)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index d6ad57171c..970fec0666 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -50,6 +50,7 @@ class RoomMemberHandler(BaseHandler):
self.member_linearizer = Linearizer(name="member")
self.clock = hs.get_clock()
+ self.spam_checker = hs.get_spam_checker()
self.distributor = hs.get_distributor()
self.distributor.declare("user_joined_room")
@@ -212,12 +213,26 @@ class RoomMemberHandler(BaseHandler):
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
- if (effective_membership_state == "invite" and
- self.hs.config.block_non_admin_invites):
+ if effective_membership_state == "invite":
+ block_invite = False
is_requester_admin = yield self.auth.is_server_admin(
requester.user,
)
if not is_requester_admin:
+ if self.hs.config.block_non_admin_invites:
+ logger.info(
+ "Blocking invite: user is not admin and non-admin "
+ "invites disabled"
+ )
+ block_invite = True
+
+ if not self.spam_checker.user_may_invite(
+ requester.user.to_string(), target.to_string(), room_id,
+ ):
+ logger.info("Blocking invite due to spam checker")
+ block_invite = True
+
+ if block_invite:
raise SynapseError(
403, "Invites have been disabled on this server",
)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 85effdfa46..9dce99ebec 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,4 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -238,6 +239,28 @@ BASE_APPEND_OVERRIDE_RULES = [
}
]
},
+ {
+ 'rule_id': 'global/override/.m.rule.roomnotif',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'content.body',
+ 'pattern': '*@room*',
+ '_id': '_roomnotif_content',
+ },
+ {
+ 'kind': 'sender_notification_permission',
+ 'key': 'room',
+ '_id': '_roomnotif_pl',
+ },
+ ],
+ 'actions': [
+ 'notify', {
+ 'set_tweak': 'highlight',
+ 'value': True,
+ }
+ ]
+ }
]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index b0d64aa6c4..425a017bdf 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,11 +20,13 @@ from twisted.internet import defer
from .push_rule_evaluator import PushRuleEvaluatorForEvent
+from synapse.event_auth import get_user_power_level
from synapse.api.constants import EventTypes, Membership
from synapse.metrics import get_metrics_for
from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
+from synapse.state import POWER_KEY
from collections import namedtuple
@@ -59,6 +62,7 @@ class BulkPushRuleEvaluator(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
self.room_push_rule_cache_metrics = cache_metrics.register_cache(
"cache",
@@ -109,6 +113,29 @@ class BulkPushRuleEvaluator(object):
)
@defer.inlineCallbacks
+ def _get_power_levels_and_sender_level(self, event, context):
+ pl_event_id = context.prev_state_ids.get(POWER_KEY)
+ if pl_event_id:
+ # fastpath: if there's a power level event, that's all we need, and
+ # not having a power level event is an extreme edge case
+ pl_event = yield self.store.get_event(pl_event_id)
+ auth_events = {POWER_KEY: pl_event}
+ else:
+ auth_events_ids = yield self.auth.compute_auth_events(
+ event, context.prev_state_ids, for_verification=False,
+ )
+ auth_events = yield self.store.get_events(auth_events_ids)
+ auth_events = {
+ (e.type, e.state_key): e for e in auth_events.itervalues()
+ }
+
+ sender_level = get_user_power_level(event.sender, auth_events)
+
+ pl_event = auth_events.get(POWER_KEY)
+
+ defer.returnValue((pl_event.content if pl_event else {}, sender_level))
+
+ @defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
"""Given an event and context, evaluate the push rules and return
the results
@@ -123,7 +150,13 @@ class BulkPushRuleEvaluator(object):
event, context
)
- evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
+ (power_levels, sender_power_level) = (
+ yield self._get_power_levels_and_sender_level(event, context)
+ )
+
+ evaluator = PushRuleEvaluatorForEvent(
+ event, len(room_members), sender_power_level, power_levels,
+ )
condition_cache = {}
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 172c27c137..3601f2d365 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,6 +30,21 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
def _room_member_count(ev, condition, room_member_count):
+ return _test_ineq_condition(condition, room_member_count)
+
+
+def _sender_notification_permission(ev, condition, sender_power_level, power_levels):
+ notif_level_key = condition.get('key')
+ if notif_level_key is None:
+ return False
+
+ notif_levels = power_levels.get('notifications', {})
+ room_notif_level = notif_levels.get(notif_level_key, 50)
+
+ return sender_power_level >= room_notif_level
+
+
+def _test_ineq_condition(condition, number):
if 'is' not in condition:
return False
m = INEQUALITY_EXPR.match(condition['is'])
@@ -41,15 +57,15 @@ def _room_member_count(ev, condition, room_member_count):
rhs = int(rhs)
if ineq == '' or ineq == '==':
- return room_member_count == rhs
+ return number == rhs
elif ineq == '<':
- return room_member_count < rhs
+ return number < rhs
elif ineq == '>':
- return room_member_count > rhs
+ return number > rhs
elif ineq == '>=':
- return room_member_count >= rhs
+ return number >= rhs
elif ineq == '<=':
- return room_member_count <= rhs
+ return number <= rhs
else:
return False
@@ -65,9 +81,11 @@ def tweaks_for_actions(actions):
class PushRuleEvaluatorForEvent(object):
- def __init__(self, event, room_member_count):
+ def __init__(self, event, room_member_count, sender_power_level, power_levels):
self._event = event
self._room_member_count = room_member_count
+ self._sender_power_level = sender_power_level
+ self._power_levels = power_levels
# Maps strings of e.g. 'content.body' -> event["content"]["body"]
self._value_cache = _flatten_dict(event)
@@ -81,6 +99,10 @@ class PushRuleEvaluatorForEvent(object):
return _room_member_count(
self._event, condition, self._room_member_count
)
+ elif condition['kind'] == 'sender_notification_permission':
+ return _sender_notification_permission(
+ self._event, condition, self._sender_power_level, self._power_levels,
+ )
else:
return True
@@ -183,7 +205,7 @@ def _glob_to_re(glob, word_boundary):
r,
)
if word_boundary:
- r = r"\b%s\b" % (r,)
+ r = _re_word_boundary(r)
return re.compile(r, flags=re.IGNORECASE)
else:
@@ -192,7 +214,7 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
elif word_boundary:
r = re.escape(glob)
- r = r"\b%s\b" % (r,)
+ r = _re_word_boundary(r)
return re.compile(r, flags=re.IGNORECASE)
else:
@@ -200,6 +222,18 @@ def _glob_to_re(glob, word_boundary):
return re.compile(r, flags=re.IGNORECASE)
+def _re_word_boundary(r):
+ """
+ Adds word boundary characters to the start and end of an
+ expression to require that the match occur as a whole word,
+ but do so respecting the fact that strings starting or ending
+ with non-word characters will change word boundaries.
+ """
+ # we can't use \b as it chokes on unicode. however \W seems to be okay
+ # as shorthand for [^0-9A-Za-z_].
+ return r"(^|\W)%s(\W|$)" % (r,)
+
+
def _flatten_dict(d, prefix=[], result=None):
if result is None:
result = {}
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 630e92c90e..7052333c19 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -40,7 +40,6 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
- "affinity": ["affinity"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
@@ -59,6 +58,9 @@ CONDITIONAL_REQUIREMENTS = {
"psutil": {
"psutil>=2.0.0": ["psutil>=2.0.0"],
},
+ "affinity": {
+ "affinity": ["affinity"],
+ },
}
diff --git a/synapse/state.py b/synapse/state.py
index 390799fbd5..dcdcdef65e 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -288,6 +288,9 @@ class StateHandler(object):
"""
logger.debug("resolve_state_groups event_ids %s", event_ids)
+ # map from state group id to the state in that state group (where
+ # 'state' is a map from state key to event id)
+ # dict[int, dict[(str, str), str]]
state_groups_ids = yield self.store.get_state_groups_ids(
room_id, event_ids
)
@@ -320,11 +323,15 @@ class StateHandler(object):
"Resolving state for %s with %d groups", room_id, len(state_groups_ids)
)
+ # build a map from state key to the event_ids which set that state.
+ # dict[(str, str), set[str])
state = {}
for st in state_groups_ids.values():
for key, e_id in st.items():
state.setdefault(key, set()).add(e_id)
+ # build a map from state key to the event_ids which set that state,
+ # including only those where there are state keys in conflict.
conflicted_state = {
k: list(v)
for k, v in state.items()
@@ -494,8 +501,14 @@ def _resolve_with_state_fac(unconflicted_state, conflicted_state,
logger.info("Asking for %d conflicted events", len(needed_events))
+ # dict[str, FrozenEvent]: a map from state event id to event. Only includes
+ # the state events which are in conflict.
state_map = yield state_map_factory(needed_events)
+ # get the ids of the auth events which allow us to authenticate the
+ # conflicted state, picking only from the unconflicting state.
+ #
+ # dict[(str, str), str]: a map from state key to event id
auth_events = _create_auth_events_from_maps(
unconflicted_state, conflicted_state, state_map
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7002b3752e..4f0b43c36d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -784,6 +784,9 @@ class EventsStore(SQLBaseStore):
self._invalidate_cache_and_stream(
txn, self.is_host_joined, (room_id, host)
)
+ self._invalidate_cache_and_stream(
+ txn, self.was_host_joined, (room_id, host)
+ )
self._invalidate_cache_and_stream(
txn, self.get_users_in_room, (room_id,)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 457ca288d0..a0fc9a6867 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -533,6 +533,46 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(True)
+ @cachedInlineCallbacks()
+ def was_host_joined(self, room_id, host):
+ """Check whether the server is or ever was in the room.
+
+ Args:
+ room_id (str)
+ host (str)
+
+ Returns:
+ Deferred: Resolves to True if the host is/was in the room, otherwise
+ False.
+ """
+ if '%' in host or '_' in host:
+ raise Exception("Invalid host name")
+
+ sql = """
+ SELECT user_id FROM room_memberships
+ WHERE room_id = ?
+ AND user_id LIKE ?
+ AND membership = 'join'
+ LIMIT 1
+ """
+
+ # We do need to be careful to ensure that host doesn't have any wild cards
+ # in it, but we checked above for known ones and we'll check below that
+ # the returned user actually has the correct domain.
+ like_clause = "%:" + host
+
+ rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause)
+
+ if not rows:
+ defer.returnValue(False)
+
+ user_id = rows[0][0]
+ if get_domain_from_id(user_id) != host:
+ # This can only happen if the host name has something funky in it
+ raise Exception("Invalid host name")
+
+ defer.returnValue(True)
+
def get_joined_hosts(self, room_id, state_entry):
state_group = state_entry.state_group
if not state_group:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1453faf0ef..bb252f75d7 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -19,7 +19,7 @@ from twisted.internet import defer, reactor
from .logcontext import (
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
)
-from synapse.util import unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError
from contextlib import contextmanager
@@ -155,7 +155,7 @@ def concurrently_execute(func, args, limit):
except StopIteration:
pass
- return preserve_context_over_deferred(defer.gatherResults([
+ return logcontext.make_deferred_yieldable(defer.gatherResults([
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True)).addErrback(unwrapFirstError)
|