diff --git a/synapse/__init__.py b/synapse/__init__.py
index faa183a99e..3cde33c0d7 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.31.2"
+__version__ = "0.32.2"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 66639b0089..088b4e8b6d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -19,6 +19,7 @@ from six import itervalues
import pymacaroons
from twisted.internet import defer
+from netaddr import IPAddress
import synapse.types
from synapse import event_auth
@@ -244,6 +245,11 @@ class Auth(object):
if app_service is None:
defer.returnValue((None, None))
+ if app_service.ip_range_whitelist:
+ ip_address = IPAddress(self.hs.get_ip_from_request(request))
+ if ip_address not in app_service.ip_range_whitelist:
+ defer.returnValue((None, None))
+
if "user_id" not in request.args:
defer.returnValue((app_service.sender, app_service))
@@ -488,7 +494,7 @@ class Auth(object):
def _look_up_user_by_access_token(self, token):
ret = yield self.store.get_user_by_access_token(token)
if not ret:
- logger.warn("Unrecognised access token - not in store: %s" % (token,))
+ logger.warn("Unrecognised access token - not in store.")
raise AuthError(
self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.",
errcode=Codes.UNKNOWN_TOKEN
@@ -511,7 +517,7 @@ class Auth(object):
)
service = self.store.get_app_service_by_token(token)
if not service:
- logger.warn("Unrecognised appservice access token: %s" % (token,))
+ logger.warn("Unrecognised appservice access token.")
raise AuthError(
self.TOKEN_NOT_FOUND_HTTP_STATUS,
"Unrecognised access token.",
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 5baba43966..4df930c8d1 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -76,6 +76,8 @@ class EventTypes(object):
Topic = "m.room.topic"
Name = "m.room.name"
+ ServerACL = "m.room.server_acl"
+
class RejectedReason(object):
AUTH_ERROR = "auth_error"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index e6ad3768f0..227a0713b2 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -17,7 +17,8 @@
import logging
-import simplejson as json
+from canonicaljson import json
+
from six import iteritems
from six.moves import http_client
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index dbc0e7e445..aae25e7a47 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -17,7 +17,8 @@ from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, RoomID
from twisted.internet import defer
-import simplejson as json
+from canonicaljson import json
+
import jsonschema
from jsonschema import FormatChecker
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index d1c598622a..328cbfa284 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -85,7 +85,8 @@ class ApplicationService(object):
NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
- sender=None, id=None, protocols=None, rate_limited=True):
+ sender=None, id=None, protocols=None, rate_limited=True,
+ ip_range_whitelist=None):
self.token = token
self.url = url
self.hs_token = hs_token
@@ -93,6 +94,7 @@ class ApplicationService(object):
self.server_name = hostname
self.namespaces = self._check_namespaces(namespaces)
self.id = id
+ self.ip_range_whitelist = ip_range_whitelist
if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 277305e184..0c27bb2fa7 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -17,6 +17,8 @@ from ._base import Config, ConfigError
from synapse.appservice import ApplicationService
from synapse.types import UserID
+from netaddr import IPSet
+
import yaml
import logging
@@ -154,6 +156,13 @@ def _load_appservice(hostname, as_info, config_filename):
" will not receive events or queries.",
config_filename,
)
+
+ ip_range_whitelist = None
+ if as_info.get('ip_range_whitelist'):
+ ip_range_whitelist = IPSet(
+ as_info.get('ip_range_whitelist')
+ )
+
return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
@@ -163,5 +172,6 @@ def _load_appservice(hostname, as_info, config_filename):
sender=user_id,
id=as_info["id"],
protocols=protocols,
- rate_limited=rate_limited
+ rate_limited=rate_limited,
+ ip_range_whitelist=ip_range_whitelist,
)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 968ecd9ea0..71fd51e4bc 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -16,6 +16,7 @@
import logging
+from synapse.http.endpoint import parse_and_validate_server_name
from ._base import Config, ConfigError
logger = logging.Logger(__name__)
@@ -25,6 +26,12 @@ class ServerConfig(Config):
def read_config(self, config):
self.server_name = config["server_name"]
+
+ try:
+ parse_and_validate_server_name(self.server_name)
+ except ValueError as e:
+ raise ConfigError(str(e))
+
self.pid_file = self.abspath(config.get("pid_file"))
self.web_client = config["web_client"]
self.web_client_location = config.get("web_client_location", None)
@@ -162,8 +169,8 @@ class ServerConfig(Config):
})
def default_config(self, server_name, **kwargs):
- if ":" in server_name:
- bind_port = int(server_name.split(":")[1])
+ _, bind_port = parse_and_validate_server_name(server_name)
+ if bind_port is not None:
unsecure_port = bind_port - 400
else:
bind_port = 8448
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index 8e48f8a9cf..e13cab9dbf 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -18,7 +18,7 @@ from twisted.web.http import HTTPClient
from twisted.internet.protocol import Factory
from twisted.internet import defer, reactor
from synapse.http.endpoint import matrix_federation_endpoint
-import simplejson as json
+from canonicaljson import json
import logging
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index f512d88145..cdf99fd140 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -76,6 +76,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
return
if event.type == EventTypes.Create:
+ sender_domain = get_domain_from_id(event.sender)
room_id_domain = get_domain_from_id(event.room_id)
if room_id_domain != sender_domain:
raise AuthError(
@@ -524,7 +525,11 @@ def _check_power_levels(event, auth_events):
"to your own"
)
- if old_level > user_level or new_level > user_level:
+ # Check if the old and new levels are greater than the user level
+ # (if defined)
+ old_level_too_big = old_level is not None and old_level > user_level
+ new_level_too_big = new_level is not None and new_level > user_level
+ if old_level_too_big or new_level_too_big:
raise AuthError(
403,
"You don't have permission to add ops level greater "
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d4dd967c60..591d0026bf 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -14,10 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import re
-import simplejson as json
+from canonicaljson import json
+import six
from twisted.internet import defer
+from twisted.internet.abstract import isIPAddress
+from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import (
@@ -27,6 +31,7 @@ from synapse.federation.federation_base import (
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
+from synapse.http.endpoint import parse_server_name
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
@@ -74,6 +79,9 @@ class FederationServer(FederationBase):
@log_function
def on_backfill_request(self, origin, room_id, versions, limit):
with (yield self._server_linearizer.queue((origin, room_id))):
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
+
pdus = yield self.handler.on_backfill_request(
origin, room_id, versions, limit
)
@@ -134,6 +142,8 @@ class FederationServer(FederationBase):
received_pdus_counter.inc(len(transaction.pdus))
+ origin_host, _ = parse_server_name(transaction.origin)
+
pdus_by_room = {}
for p in transaction.pdus:
@@ -154,9 +164,21 @@ class FederationServer(FederationBase):
# we can process different rooms in parallel (which is useful if they
# require callouts to other servers to fetch missing events), but
# impose a limit to avoid going too crazy with ram/cpu.
+
@defer.inlineCallbacks
def process_pdus_for_room(room_id):
logger.debug("Processing PDUs for %s", room_id)
+ try:
+ yield self.check_server_matches_acl(origin_host, room_id)
+ except AuthError as e:
+ logger.warn(
+ "Ignoring PDUs for room %s from banned server", room_id,
+ )
+ for pdu in pdus_by_room[room_id]:
+ event_id = pdu.event_id
+ pdu_results[event_id] = e.error_dict()
+ return
+
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
try:
@@ -211,6 +233,9 @@ class FederationServer(FederationBase):
if not event_id:
raise NotImplementedError("Specify an event")
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
+
in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
@@ -234,6 +259,9 @@ class FederationServer(FederationBase):
if not event_id:
raise NotImplementedError("Specify an event")
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
+
in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
@@ -298,7 +326,9 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp))
@defer.inlineCallbacks
- def on_make_join_request(self, room_id, user_id):
+ def on_make_join_request(self, origin, room_id, user_id):
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
@@ -306,6 +336,8 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
pdu = event_from_pdu_json(content)
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, pdu.room_id)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
@@ -314,6 +346,10 @@ class FederationServer(FederationBase):
def on_send_join_request(self, origin, content):
logger.debug("on_send_join_request: content: %s", content)
pdu = event_from_pdu_json(content)
+
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, pdu.room_id)
+
logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec()
@@ -325,7 +361,9 @@ class FederationServer(FederationBase):
}))
@defer.inlineCallbacks
- def on_make_leave_request(self, room_id, user_id):
+ def on_make_leave_request(self, origin, room_id, user_id):
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
pdu = yield self.handler.on_make_leave_request(room_id, user_id)
time_now = self._clock.time_msec()
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
@@ -334,6 +372,10 @@ class FederationServer(FederationBase):
def on_send_leave_request(self, origin, content):
logger.debug("on_send_leave_request: content: %s", content)
pdu = event_from_pdu_json(content)
+
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, pdu.room_id)
+
logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
yield self.handler.on_send_leave_request(origin, pdu)
defer.returnValue((200, {}))
@@ -341,6 +383,9 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id):
with (yield self._server_linearizer.queue((origin, room_id))):
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
+
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
res = {
@@ -369,6 +414,9 @@ class FederationServer(FederationBase):
Deferred: Results in `dict` with the same format as `content`
"""
with (yield self._server_linearizer.queue((origin, room_id))):
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
+
auth_chain = [
event_from_pdu_json(e)
for e in content["auth_chain"]
@@ -442,6 +490,9 @@ class FederationServer(FederationBase):
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
with (yield self._server_linearizer.queue((origin, room_id))):
+ origin_host, _ = parse_server_name(origin)
+ yield self.check_server_matches_acl(origin_host, room_id)
+
logger.info(
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
" limit: %d, min_depth: %d",
@@ -549,7 +600,9 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
- yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
+ yield self.handler.on_receive_pdu(
+ origin, pdu, get_missing=True, sent_to_us_directly=True,
+ )
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
@@ -577,6 +630,101 @@ class FederationServer(FederationBase):
)
defer.returnValue(ret)
+ @defer.inlineCallbacks
+ def check_server_matches_acl(self, server_name, room_id):
+ """Check if the given server is allowed by the server ACLs in the room
+
+ Args:
+ server_name (str): name of server, *without any port part*
+ room_id (str): ID of the room to check
+
+ Raises:
+ AuthError if the server does not match the ACL
+ """
+ state_ids = yield self.store.get_current_state_ids(room_id)
+ acl_event_id = state_ids.get((EventTypes.ServerACL, ""))
+
+ if not acl_event_id:
+ return
+
+ acl_event = yield self.store.get_event(acl_event_id)
+ if server_matches_acl_event(server_name, acl_event):
+ return
+
+ raise AuthError(code=403, msg="Server is banned from room")
+
+
+def server_matches_acl_event(server_name, acl_event):
+ """Check if the given server is allowed by the ACL event
+
+ Args:
+ server_name (str): name of server, without any port part
+ acl_event (EventBase): m.room.server_acl event
+
+ Returns:
+ bool: True if this server is allowed by the ACLs
+ """
+ logger.debug("Checking %s against acl %s", server_name, acl_event.content)
+
+ # first of all, check if literal IPs are blocked, and if so, whether the
+ # server name is a literal IP
+ allow_ip_literals = acl_event.content.get("allow_ip_literals", True)
+ if not isinstance(allow_ip_literals, bool):
+ logger.warn("Ignorning non-bool allow_ip_literals flag")
+ allow_ip_literals = True
+ if not allow_ip_literals:
+ # check for ipv6 literals. These start with '['.
+ if server_name[0] == '[':
+ return False
+
+ # check for ipv4 literals. We can just lift the routine from twisted.
+ if isIPAddress(server_name):
+ return False
+
+ # next, check the deny list
+ deny = acl_event.content.get("deny", [])
+ if not isinstance(deny, (list, tuple)):
+ logger.warn("Ignorning non-list deny ACL %s", deny)
+ deny = []
+ for e in deny:
+ if _acl_entry_matches(server_name, e):
+ # logger.info("%s matched deny rule %s", server_name, e)
+ return False
+
+ # then the allow list.
+ allow = acl_event.content.get("allow", [])
+ if not isinstance(allow, (list, tuple)):
+ logger.warn("Ignorning non-list allow ACL %s", allow)
+ allow = []
+ for e in allow:
+ if _acl_entry_matches(server_name, e):
+ # logger.info("%s matched allow rule %s", server_name, e)
+ return True
+
+ # everything else should be rejected.
+ # logger.info("%s fell through", server_name)
+ return False
+
+
+def _acl_entry_matches(server_name, acl_entry):
+ if not isinstance(acl_entry, six.string_types):
+ logger.warn("Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry))
+ return False
+ regex = _glob_to_regex(acl_entry)
+ return regex.match(server_name)
+
+
+def _glob_to_regex(glob):
+ res = ''
+ for c in glob:
+ if c == '*':
+ res = res + '.*'
+ elif c == '?':
+ res = res + '.'
+ else:
+ res = res + re.escape(c)
+ return re.compile(res + "\\Z", re.IGNORECASE)
+
class FederationHandlerRegistry(object):
"""Allows classes to register themselves as handlers for a given EDU or
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 19d09f5422..c6d98d35cb 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.api.errors import Codes, SynapseError, FederationDeniedError
+from synapse.http.endpoint import parse_and_validate_server_name
from synapse.http.server import JsonResource
from synapse.http.servlet import (
parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
@@ -99,26 +100,6 @@ class Authenticator(object):
origin = None
- def parse_auth_header(header_str):
- try:
- params = auth.split(" ")[1].split(",")
- param_dict = dict(kv.split("=") for kv in params)
-
- def strip_quotes(value):
- if value.startswith("\""):
- return value[1:-1]
- else:
- return value
-
- origin = strip_quotes(param_dict["origin"])
- key = strip_quotes(param_dict["key"])
- sig = strip_quotes(param_dict["sig"])
- return (origin, key, sig)
- except Exception:
- raise AuthenticationError(
- 400, "Malformed Authorization header", Codes.UNAUTHORIZED
- )
-
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
if not auth_headers:
@@ -127,8 +108,8 @@ class Authenticator(object):
)
for auth in auth_headers:
- if auth.startswith("X-Matrix"):
- (origin, key, sig) = parse_auth_header(auth)
+ if auth.startswith(b"X-Matrix"):
+ (origin, key, sig) = _parse_auth_header(auth)
json_request["origin"] = origin
json_request["signatures"].setdefault(origin, {})[key] = sig
@@ -165,6 +146,48 @@ class Authenticator(object):
logger.exception("Error resetting retry timings on %s", origin)
+def _parse_auth_header(header_bytes):
+ """Parse an X-Matrix auth header
+
+ Args:
+ header_bytes (bytes): header value
+
+ Returns:
+ Tuple[str, str, str]: origin, key id, signature.
+
+ Raises:
+ AuthenticationError if the header could not be parsed
+ """
+ try:
+ header_str = header_bytes.decode('utf-8')
+ params = header_str.split(" ")[1].split(",")
+ param_dict = dict(kv.split("=") for kv in params)
+
+ def strip_quotes(value):
+ if value.startswith(b"\""):
+ return value[1:-1]
+ else:
+ return value
+
+ origin = strip_quotes(param_dict["origin"])
+
+ # ensure that the origin is a valid server name
+ parse_and_validate_server_name(origin)
+
+ key = strip_quotes(param_dict["key"])
+ sig = strip_quotes(param_dict["sig"])
+ return origin, key, sig
+ except Exception as e:
+ logger.warn(
+ "Error parsing auth header '%s': %s",
+ header_bytes.decode('ascii', 'replace'),
+ e,
+ )
+ raise AuthenticationError(
+ 400, "Malformed Authorization header", Codes.UNAUTHORIZED,
+ )
+
+
class BaseFederationServlet(object):
REQUIRE_AUTH = True
@@ -362,7 +385,9 @@ class FederationMakeJoinServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
- content = yield self.handler.on_make_join_request(context, user_id)
+ content = yield self.handler.on_make_join_request(
+ origin, context, user_id,
+ )
defer.returnValue((200, content))
@@ -371,7 +396,9 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
- content = yield self.handler.on_make_leave_request(context, user_id)
+ content = yield self.handler.on_make_leave_request(
+ origin, context, user_id,
+ )
defer.returnValue((200, content))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index a131b7f73f..cbef1f2770 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -16,6 +16,8 @@
from twisted.internet import defer, threads
+from canonicaljson import json
+
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.api.errors import (
@@ -32,7 +34,6 @@ from twisted.web.client import PartialDownloadError
import logging
import bcrypt
import pymacaroons
-import simplejson
import attr
import synapse.util.stringutils as stringutils
@@ -403,7 +404,7 @@ class AuthHandler(BaseHandler):
except PartialDownloadError as pde:
# Twisted is silly
data = pde.response
- resp_body = simplejson.loads(data)
+ resp_body = json.loads(data)
if 'success' in resp_body:
# Note that we do NOT check the hostname here: we explicitly
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 8ec5ba2012..a84b7b8b80 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from ._base import BaseHandler
from synapse.types import UserID, create_requester
@@ -39,14 +39,15 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
- reactor.callWhenRunning(self._start_user_parting)
+ hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
- def deactivate_account(self, user_id):
+ def deactivate_account(self, user_id, erase_data):
"""Deactivate a user's account
Args:
user_id (str): ID of user to be deactivated
+ erase_data (bool): whether to GDPR-erase the user's data
Returns:
Deferred
@@ -92,6 +93,11 @@ class DeactivateAccountHandler(BaseHandler):
# delete from user directory
yield self.user_directory_handler.handle_user_deactivated(user_id)
+ # Mark the user as erased, if they asked for that
+ if erase_data:
+ logger.info("Marking %s as erased", user_id)
+ yield self.store.mark_user_erased(user_id)
+
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 8a2d177539..62b4892a4e 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -14,10 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import simplejson as json
import logging
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from six import iteritems
@@ -80,7 +79,7 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
- # Firt get local devices.
+ # First get local devices.
failures = {}
results = {}
if local_query:
@@ -357,7 +356,7 @@ def _exception_to_failure(e):
# include ConnectionRefused and other errors
#
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
- # give a string for e.message, which simplejson then fails to serialize.
+ # give a string for e.message, which json then fails to serialize.
return {
"status": 503, "message": str(e.message),
}
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2571758284..13117d70fe 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -44,6 +44,7 @@ from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
)
+from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.events.utils import prune_event
@@ -89,7 +90,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_receive_pdu(self, origin, pdu, get_missing=True):
+ def on_receive_pdu(
+ self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ ):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -163,14 +166,11 @@ class FederationHandler(BaseHandler):
"Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin,
)
- return
+ defer.returnValue(None)
state = None
-
auth_chain = []
- fetch_state = False
-
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
@@ -225,26 +225,60 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5],
)
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ if sent_to_us_directly and prevs - seen:
+ # If they have sent it to us directly, and the server
+ # isn't telling us about the auth events that it's
+ # made a message referencing, we explode
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
)
- fetch_state = True
+ elif prevs - seen:
+ # Calculate the state of the previous events, and
+ # de-conflict them to find the current state.
+ state_groups = []
+ auth_chains = set()
+ try:
+ # Get the state of the events we know about
+ ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ state_groups.append(ours)
+
+ # Ask the remote server for the states we don't
+ # know about
+ for p in prevs - seen:
+ state, got_auth_chain = (
+ yield self.replication_layer.get_state_for_room(
+ origin, pdu.room_id, p
+ )
+ )
+ auth_chains.update(got_auth_chain)
+ state_group = {(x.type, x.state_key): x.event_id for x in state}
+ state_groups.append(state_group)
+
+ # Resolve any conflicting state
+ def fetch(ev_ids):
+ return self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False
+ )
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.replication_layer.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except Exception:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
+ state_map = yield resolve_events_with_factory(
+ state_groups, {pdu.event_id: pdu}, fetch
+ )
+
+ state = (yield self.store.get_events(state_map.values())).values()
+ auth_chain = list(auth_chains)
+ except Exception:
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=pdu.event_id,
+ )
yield self._process_received_pdu(
origin,
@@ -322,11 +356,17 @@ class FederationHandler(BaseHandler):
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn("Event %s failed history check.")
+ else:
+ raise
@log_function
@defer.inlineCallbacks
@@ -460,6 +500,47 @@ class FederationHandler(BaseHandler):
@measure_func("_filter_events_for_server")
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
+ """Filter the given events for the given server, redacting those the
+ server can't see.
+
+ Assumes the server is currently in the room.
+
+ Returns
+ list[FrozenEvent]
+ """
+ # First lets check to see if all the events have a history visibility
+ # of "shared" or "world_readable". If thats the case then we don't
+ # need to check membership (as we know the server is in the room).
+ event_to_state_ids = yield self.store.get_state_ids_for_events(
+ frozenset(e.event_id for e in events),
+ types=(
+ (EventTypes.RoomHistoryVisibility, ""),
+ )
+ )
+
+ visibility_ids = set()
+ for sids in event_to_state_ids.itervalues():
+ hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
+ if hist:
+ visibility_ids.add(hist)
+
+ # If we failed to find any history visibility events then the default
+ # is "shared" visiblity.
+ if not visibility_ids:
+ defer.returnValue(events)
+
+ event_map = yield self.store.get_events(visibility_ids)
+ all_open = all(
+ e.content.get("history_visibility") in (None, "shared", "world_readable")
+ for e in event_map.itervalues()
+ )
+
+ if all_open:
+ defer.returnValue(events)
+
+ # Ok, so we're dealing with events that have non-trivial visibility
+ # rules, so we need to also get the memberships of the room.
+
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
@@ -495,7 +576,20 @@ class FederationHandler(BaseHandler):
for e_id, key_to_eid in event_to_state_ids.iteritems()
}
+ erased_senders = yield self.store.are_users_erased(
+ e.sender for e in events,
+ )
+
def redact_disallowed(event, state):
+ # if the sender has been gdpr17ed, always return a redacted
+ # copy of the event.
+ if erased_senders[event.sender]:
+ logger.info(
+ "Sender of %s has been erased, redacting",
+ event.event_id,
+ )
+ return prune_event(event)
+
if not state:
return event
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index f00dfe1d3e..277c2b7760 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -19,7 +19,7 @@
import logging
-import simplejson as json
+from canonicaljson import json
from twisted.internet import defer
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b9946ab91..cbadf3c88e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,13 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import simplejson
import sys
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
import six
from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.python.failure import Failure
@@ -157,7 +156,7 @@ class MessageHandler(BaseHandler):
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
- reactor.callLater(24 * 3600, clear_purge)
+ self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
@@ -491,7 +490,7 @@ class EventCreationHandler(object):
target, e
)
- is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+ is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
if not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
@@ -509,12 +508,13 @@ class EventCreationHandler(object):
defer.returnValue((event, context))
- def _is_exempt_from_privacy_policy(self, builder):
+ def _is_exempt_from_privacy_policy(self, builder, requester):
""""Determine if an event to be sent is exempt from having to consent
to the privacy policy
Args:
builder (synapse.events.builder.EventBuilder): event being created
+ requester (Requster): user requesting this event
Returns:
Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +525,9 @@ class EventCreationHandler(object):
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
return self._is_server_notices_room(builder.room_id)
+ elif membership == Membership.LEAVE:
+ # the user is always allowed to leave (but not kick people)
+ return builder.state_key == requester.user.to_string()
return succeed(False)
@defer.inlineCallbacks
@@ -793,7 +796,7 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
dump = frozendict_json_encoder.encode(event.content)
- simplejson.loads(dump)
+ json.loads(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..7db59fba00 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,7 +22,7 @@ The methods that define policy are:
- should_notify
"""
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from contextlib import contextmanager
from six import itervalues, iteritems
@@ -179,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
- reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+ hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {}
self._next_serial = 1
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 51ec727df0..7f486e48e5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -145,7 +145,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
- "device_lists", # List of user_ids whose devices have chanegd
+ "device_lists", # List of user_ids whose devices have changed
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 46ffb41de1..5bdc484c15 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -42,7 +42,7 @@ from twisted.web._newclient import ResponseDone
from six import StringIO
from prometheus_client import Counter
-import simplejson as json
+from canonicaljson import json
import logging
import urllib
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 39432da452..6056df6226 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,8 +12,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import re
+
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
@@ -37,6 +39,71 @@ _Server = collections.namedtuple(
)
+def parse_server_name(server_name):
+ """Split a server name into host/port parts.
+
+ Args:
+ server_name (str): server name to parse
+
+ Returns:
+ Tuple[str, int|None]: host/port parts.
+
+ Raises:
+ ValueError if the server name could not be parsed.
+ """
+ try:
+ if server_name[-1] == ']':
+ # ipv6 literal, hopefully
+ return server_name, None
+
+ domain_port = server_name.rsplit(":", 1)
+ domain = domain_port[0]
+ port = int(domain_port[1]) if domain_port[1:] else None
+ return domain, port
+ except Exception:
+ raise ValueError("Invalid server name '%s'" % server_name)
+
+
+VALID_HOST_REGEX = re.compile(
+ "\\A[0-9a-zA-Z.-]+\\Z",
+)
+
+
+def parse_and_validate_server_name(server_name):
+ """Split a server name into host/port parts and do some basic validation.
+
+ Args:
+ server_name (str): server name to parse
+
+ Returns:
+ Tuple[str, int|None]: host/port parts.
+
+ Raises:
+ ValueError if the server name could not be parsed.
+ """
+ host, port = parse_server_name(server_name)
+
+ # these tests don't need to be bulletproof as we'll find out soon enough
+ # if somebody is giving us invalid data. What we *do* need is to be sure
+ # that nobody is sneaking IP literals in that look like hostnames, etc.
+
+ # look for ipv6 literals
+ if host[0] == '[':
+ if host[-1] != ']':
+ raise ValueError("Mismatched [...] in server name '%s'" % (
+ server_name,
+ ))
+ return host, port
+
+ # otherwise it should only be alphanumerics.
+ if not VALID_HOST_REGEX.match(host):
+ raise ValueError("Server name '%s' contains invalid characters" % (
+ server_name,
+ ))
+
+ return host, port
+
+
def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None,
timeout=None):
"""Construct an endpoint for the given matrix destination.
@@ -50,9 +117,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
timeout (int): connection timeout in seconds
"""
- domain_port = destination.split(":")
- domain = domain_port[0]
- port = int(domain_port[1]) if domain_port[1:] else None
+ domain, port = parse_server_name(destination)
endpoint_kw_args = {}
@@ -74,21 +139,22 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
reactor, "matrix", domain, protocol="tcp",
default_port=default_port, endpoint=transport_endpoint,
endpoint_kw_args=endpoint_kw_args
- ))
+ ), reactor)
else:
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
- ))
+ ), reactor)
class _WrappingEndpointFac(object):
- def __init__(self, endpoint_fac):
+ def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac
+ self.reactor = reactor
@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
- conn = _WrappedConnection(conn)
+ conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn)
@@ -98,9 +164,10 @@ class _WrappedConnection(object):
"""
__slots__ = ["conn", "last_request"]
- def __init__(self, conn):
+ def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())
+ self._reactor = reactor
def __getattr__(self, name):
return getattr(self.conn, name)
@@ -131,14 +198,14 @@ class _WrappedConnection(object):
# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
- reactor.callLater(3 * 60, self._time_things_out_maybe)
+ self._reactor.callLater(3 * 60, self._time_things_out_maybe)
d = self.conn.request(request)
def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
- reactor.callLater(3 * 60, self._time_things_out_maybe)
+ self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res
d.addCallback(update_request_time)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b48d05fcd2..6a398c9645 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -27,7 +27,7 @@ from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
from synapse.api.errors import (
SynapseError, Codes, HttpResponseException, FederationDeniedError,
@@ -36,7 +36,6 @@ from synapse.api.errors import (
from signedjson.sign import sign_json
import cgi
-import simplejson as json
import logging
import random
import sys
diff --git a/synapse/http/server.py b/synapse/http/server.py
index bc09b8b2be..517aaf7b5a 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -29,7 +29,7 @@ import synapse.metrics
import synapse.events
from canonicaljson import (
- encode_canonical_json, encode_pretty_printed_json
+ encode_canonical_json, encode_pretty_printed_json, json
)
from twisted.internet import defer
@@ -41,7 +41,6 @@ from twisted.web.util import redirectTo
import collections
import logging
import urllib
-import simplejson
logger = logging.getLogger(__name__)
@@ -410,7 +409,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
if canonical_json or synapse.events.USE_FROZEN_DICTS:
json_bytes = encode_canonical_json(json_object)
else:
- json_bytes = simplejson.dumps(json_object)
+ json_bytes = json.dumps(json_object)
return respond_with_json_bytes(
request, code, json_bytes,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index ef8e62901b..ef3a01ddc7 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -18,7 +18,9 @@
from synapse.api.errors import SynapseError, Codes
import logging
-import simplejson
+
+from canonicaljson import json
+
logger = logging.getLogger(__name__)
@@ -171,7 +173,7 @@ def parse_json_value_from_request(request, allow_empty_body=False):
return None
try:
- content = simplejson.loads(content_bytes)
+ content = json.loads(content_bytes)
except Exception as e:
logger.warn("Unable to parse JSON: %s", e)
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 74a752d6cf..fe93643b1e 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -107,13 +107,28 @@ class SynapseRequest(Request):
end_time = time.time()
+ # need to decode as it could be raw utf-8 bytes
+ # from a IDN servname in an auth header
+ authenticated_entity = self.authenticated_entity
+ if authenticated_entity is not None:
+ authenticated_entity = authenticated_entity.decode("utf-8", "replace")
+
+ # ...or could be raw utf-8 bytes in the User-Agent header.
+ # N.B. if you don't do this, the logger explodes cryptically
+ # with maximum recursion trying to log errors about
+ # the charset problem.
+ # c.f. https://github.com/matrix-org/synapse/issues/3471
+ user_agent = self.get_user_agent()
+ if user_agent is not None:
+ user_agent = user_agent.decode("utf-8", "replace")
+
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
- self.authenticated_entity,
+ authenticated_entity,
end_time - self.start_time,
ru_utime,
ru_stime,
@@ -125,7 +140,7 @@ class SynapseRequest(Request):
self.method,
self.get_redacted_uri(),
self.clientproto,
- self.get_user_agent(),
+ user_agent,
evt_db_fetch_count,
)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 7d6e0232ed..2d2397caae 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -147,7 +147,8 @@ class GCCounts(object):
yield cm
-REGISTRY.register(GCCounts())
+if not running_on_pypy:
+ REGISTRY.register(GCCounts())
#
# Twisted reactor metrics
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index ba7286cb72..52d4f087ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
import logging
@@ -199,7 +199,7 @@ class EmailPusher(object):
self.timed_call = None
if soonest_due_at is not None:
- self.timed_call = reactor.callLater(
+ self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer
)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bf7ff74a1a..7a481b5a1e 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -15,7 +15,7 @@
# limitations under the License.
import logging
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
@@ -220,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
- self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.timed_call = self.hs.get_reactor().callLater(
+ self.backoff_delay, self.on_timer
+ )
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index faf6dfdb8d..987eec3ef2 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -57,16 +57,14 @@ REQUIREMENTS = {
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
- "attr": ["attr"],
+ "attrs": ["attr"],
+ "netaddr>=0.7.18": ["netaddr"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
"matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
},
- "preview_url": {
- "netaddr>=0.7.18": ["netaddr"],
- },
"email.enable_notifs": {
"Jinja2>=2.8": ["Jinja2>=2.8"],
"bleach>=1.4.2": ["bleach>=1.4.2"],
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b1f64ef0d8..97d3196633 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
+from synapse.storage.user_erasure_store import UserErasureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@@ -45,6 +46,7 @@ class SlavedEventStore(EventFederationWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
+ UserErasureWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6d2513c4e2..bb852b00af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -15,7 +15,7 @@
"""A replication client for use by synapse workers.
"""
-from twisted.internet import reactor, defer
+from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
@@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
self.server_name = hs.config.server_name
self._clock = hs.get_clock() # As self.clock is defined in super class
- reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
+ hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
def startedConnecting(self, connector):
logger.info("Connecting to replication: %r", connector.getDestination())
@@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
- reactor.connectTCP(host, port, factory)
+ hs.get_reactor().connectTCP(host, port, factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 12aac3cc6b..f3908df642 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -19,13 +19,17 @@ allowed to be sent by which side.
"""
import logging
-import simplejson
+import platform
+if platform.python_implementation() == "PyPy":
+ import json
+ _json_encoder = json.JSONEncoder()
+else:
+ import simplejson as json
+ _json_encoder = json.JSONEncoder(namedtuple_as_object=False)
logger = logging.getLogger(__name__)
-_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
-
class Command(object):
"""The base command class.
@@ -102,7 +106,7 @@ class RdataCommand(Command):
return cls(
stream_name,
None if token == "batch" else int(token),
- simplejson.loads(row_json)
+ json.loads(row_json)
)
def to_line(self):
@@ -300,7 +304,7 @@ class InvalidateCacheCommand(Command):
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)
- return cls(cache_func, simplejson.loads(keys_json))
+ return cls(cache_func, json.loads(keys_json))
def to_line(self):
return " ".join((
@@ -329,7 +333,7 @@ class UserIpCommand(Command):
def from_line(cls, line):
user_id, jsn = line.split(" ", 1)
- access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
+ access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
return cls(
user_id, access_token, ip, user_agent, device_id, last_seen
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 63bd6d2652..95ad8c1b4c 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -15,7 +15,7 @@
"""The server side of the replication stream.
"""
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream
@@ -109,7 +109,7 @@ class ReplicationStreamer(object):
self.is_looping = False
self.pending_updates = False
- reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
+ hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
def on_shutdown(self):
# close all connections on shutdown
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index b8665a45eb..8fb08dc526 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -16,6 +16,8 @@
from twisted.internet import defer
+from six.moves import http_client
+
from synapse.api.constants import Membership
from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError
from synapse.types import UserID, create_requester
@@ -247,6 +249,15 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
+ body = parse_json_object_from_request(request, allow_empty_body=True)
+ erase = body.get("erase", False)
+ if not isinstance(erase, bool):
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Param 'erase' must be a boolean, if given",
+ Codes.BAD_JSON,
+ )
+
UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
@@ -254,7 +265,9 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
- yield self._deactivate_account_handler.deactivate_account(target_user_id)
+ yield self._deactivate_account_handler.deactivate_account(
+ target_user_id, erase,
+ )
defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 34df5be4e9..88ca5184cd 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -23,7 +23,8 @@ from synapse.util.msisdn import phone_number_to_msisdn
from .base import ClientV1RestServlet, client_path_patterns
-import simplejson as json
+from canonicaljson import json
+
import urllib
from six.moves.urllib import parse as urlparse
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 0b984987ed..e6ae5db79b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -31,7 +31,7 @@ from synapse.http.servlet import (
from six.moves.urllib import parse as urlparse
import logging
-import simplejson as json
+from canonicaljson import json
logger = logging.getLogger(__name__)
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index e1281cfbb6..80dbc3c92e 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,6 +16,7 @@
# limitations under the License.
import logging
+from six.moves import http_client
from twisted.internet import defer
from synapse.api.auth import has_access_token
@@ -186,13 +188,20 @@ class DeactivateAccountRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
body = parse_json_object_from_request(request)
+ erase = body.get("erase", False)
+ if not isinstance(erase, bool):
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Param 'erase' must be a boolean, if given",
+ Codes.BAD_JSON,
+ )
requester = yield self.auth.get_user_by_req(request)
# allow ASes to dectivate their own users
if requester.app_service:
yield self._deactivate_account_handler.deactivate_account(
- requester.user.to_string()
+ requester.user.to_string(), erase,
)
defer.returnValue((200, {}))
@@ -200,7 +209,7 @@ class DeactivateAccountRestServlet(RestServlet):
requester, body, self.hs.get_ip_from_request(request),
)
yield self._deactivate_account_handler.deactivate_account(
- requester.user.to_string(),
+ requester.user.to_string(), erase,
)
defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index a291cffbf1..d2aa47b326 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit
import itertools
import logging
-import simplejson as json
+from canonicaljson import json
logger = logging.getLogger(__name__)
diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py
index 956bd5da75..e44d4276d2 100644
--- a/synapse/rest/media/v0/content_repository.py
+++ b/synapse/rest/media/v0/content_repository.py
@@ -22,8 +22,9 @@ from synapse.api.errors import (
from twisted.protocols.basic import FileSender
from twisted.web import server, resource
+from canonicaljson import json
+
import base64
-import simplejson as json
import logging
import os
import re
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 565cef2b8d..adca490640 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -23,7 +23,8 @@ import re
import shutil
import sys
import traceback
-import simplejson as json
+
+from canonicaljson import json
from six.moves import urllib_parse as urlparse
from six import string_types
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 979fa22438..e843b702b9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,6 +20,7 @@ import time
import logging
from synapse.storage.devices import DeviceStore
+from synapse.storage.user_erasure_store import UserErasureStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
@@ -88,6 +89,7 @@ class DataStore(RoomMemberStore, RoomStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
+ UserErasureStore,
):
def __init__(self, db_conn, hs):
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 284ec3c970..7034a61399 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -22,8 +22,9 @@ from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from canonicaljson import json
+
import abc
-import simplejson as json
import logging
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 12ea8a158c..4d32d0bdf6 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -15,8 +15,8 @@
# limitations under the License.
import logging
import re
-import simplejson as json
from twisted.internet import defer
+from canonicaljson import json
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b7e9c716c8..af18964510 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -18,7 +18,8 @@ from . import engines
from twisted.internet import defer
-import simplejson as json
+from canonicaljson import json
+
import logging
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index a879e5bfc1..38addbf9c0 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -14,7 +14,8 @@
# limitations under the License.
import logging
-import simplejson
+
+from canonicaljson import json
from twisted.internet import defer
@@ -85,7 +86,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
)
rows = []
for destination, edu in remote_messages_by_destination.items():
- edu_json = simplejson.dumps(edu)
+ edu_json = json.dumps(edu)
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)
@@ -177,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
" WHERE user_id = ?"
)
txn.execute(sql, (user_id,))
- message_json = simplejson.dumps(messages_by_device["*"])
+ message_json = json.dumps(messages_by_device["*"])
for row in txn:
# Add the message for all devices for this user on this
# server.
@@ -199,7 +200,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
# Only insert into the local inbox if the device exists on
# this server
device = row[0]
- message_json = simplejson.dumps(messages_by_device[device])
+ message_json = json.dumps(messages_by_device[device])
messages_json_for_user[device] = message_json
if messages_json_for_user:
@@ -253,7 +254,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
- messages.append(simplejson.loads(row[1]))
+ messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)
@@ -389,7 +390,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
messages = []
for row in txn:
stream_pos = row[0]
- messages.append(simplejson.loads(row[1]))
+ messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d149d8392e..2ed9ada783 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import simplejson as json
from twisted.internet import defer
@@ -21,6 +20,8 @@ from synapse.api.errors import StoreError
from ._base import SQLBaseStore, Cache
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+from canonicaljson import json
+
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index b146487943..181047c8b7 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -16,8 +16,7 @@ from twisted.internet import defer
from synapse.util.caches.descriptors import cached
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from canonicaljson import encode_canonical_json, json
from ._base import SQLBaseStore
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 8cb24b7d59..05cb3f61ce 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -19,7 +19,8 @@ from twisted.internet import defer
from synapse.util.caches.descriptors import cachedInlineCallbacks
import logging
-import simplejson as json
+
+from canonicaljson import json
from six import iteritems
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7d0e59538a..a54abb9edd 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,8 @@ from functools import wraps
import itertools
import logging
-import simplejson as json
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.storage.events_worker import EventsWorkerStore
@@ -800,7 +801,8 @@ class EventsStore(EventsWorkerStore):
]
)
- self._curr_state_delta_stream_cache.entity_has_changed(
+ txn.call_after(
+ self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index f6a6e46b43..896225aab9 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -29,7 +29,8 @@ from synapse.api.errors import SynapseError
from collections import namedtuple
import logging
-import simplejson as json
+
+from canonicaljson import json
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 2e2763126d..eae6027cee 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -19,8 +19,7 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError, Codes
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from canonicaljson import encode_canonical_json
-import simplejson as json
+from canonicaljson import encode_canonical_json, json
class FilteringStore(SQLBaseStore):
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index da05ccb027..b77402d295 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -20,7 +20,7 @@ from synapse.api.errors import SynapseError
from ._base import SQLBaseStore
-import simplejson as json
+from canonicaljson import json
# The category ID for the "default" category. We don't store as null in the
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 04a0b59a39..9e52e992b3 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -25,9 +25,10 @@ from synapse.push.baserules import list_with_base_rules
from synapse.api.constants import EventTypes
from twisted.internet import defer
+from canonicaljson import json
+
import abc
import logging
-import simplejson as json
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 307660b99a..c6def861cf 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -17,12 +17,11 @@
from ._base import SQLBaseStore
from twisted.internet import defer
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
import logging
-import simplejson as json
import types
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c93c228f6e..f230a3bab7 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -21,9 +21,10 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
from twisted.internet import defer
+from canonicaljson import json
+
import abc
import logging
-import simplejson as json
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9c9cf46e7f..0d18f6d869 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -623,7 +623,9 @@ class RegistrationStore(RegistrationWorkerStore,
Removes the given user to the table of users who need to be parted from all the
rooms they're in, effectively marking that user as fully deactivated.
"""
- return self._simple_delete_one(
+ # XXX: This should be simple_delete_one but we failed to put a unique index on
+ # the table, so somehow duplicate entries have ended up in it.
+ return self._simple_delete(
"users_pending_deactivation",
keyvalues={
"user_id": user_id,
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index ea6a189185..ca0eb187e5 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -20,9 +20,10 @@ from synapse.storage._base import SQLBaseStore
from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from canonicaljson import json
+
import collections
import logging
-import simplejson as json
import re
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 829cc4a207..8fc9549a75 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -28,7 +28,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.types import get_domain_from_id
import logging
-import simplejson as json
+from canonicaljson import json
from six import itervalues, iteritems
diff --git a/synapse/storage/schema/delta/50/erasure_store.sql b/synapse/storage/schema/delta/50/erasure_store.sql
new file mode 100644
index 0000000000..5d8641a9ab
--- /dev/null
+++ b/synapse/storage/schema/delta/50/erasure_store.sql
@@ -0,0 +1,21 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a table of users who have requested that their details be erased
+CREATE TABLE erased_users (
+ user_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX erased_users_user ON erased_users(user_id);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f0fa5d7631..9b77c45318 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -16,7 +16,7 @@
from collections import namedtuple
import logging
import re
-import simplejson as json
+from canonicaljson import json
from six import string_types
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 6671d3cfca..04d123ed95 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -19,7 +19,8 @@ from synapse.storage.account_data import AccountDataWorkerStore
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
-import simplejson as json
+from canonicaljson import json
+
import logging
from six.moves import range
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index e485d19b84..acbc03446e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -19,12 +19,11 @@ from synapse.util.caches.descriptors import cached
from twisted.internet import defer
import six
-from canonicaljson import encode_canonical_json
+from canonicaljson import encode_canonical_json, json
from collections import namedtuple
import logging
-import simplejson as json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
new file mode 100644
index 0000000000..47bfc01e84
--- /dev/null
+++ b/synapse/storage/user_erasure_store.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import operator
+
+from twisted.internet import defer
+
+from synapse.storage._base import SQLBaseStore
+from synapse.util.caches.descriptors import cachedList, cached
+
+
+class UserErasureWorkerStore(SQLBaseStore):
+ @cached()
+ def is_user_erased(self, user_id):
+ """
+ Check if the given user id has requested erasure
+
+ Args:
+ user_id (str): full user id to check
+
+ Returns:
+ Deferred[bool]: True if the user has requested erasure
+ """
+ return self._simple_select_onecol(
+ table="erased_users",
+ keyvalues={"user_id": user_id},
+ retcol="1",
+ desc="is_user_erased",
+ ).addCallback(operator.truth)
+
+ @cachedList(
+ cached_method_name="is_user_erased",
+ list_name="user_ids",
+ inlineCallbacks=True,
+ )
+ def are_users_erased(self, user_ids):
+ """
+ Checks which users in a list have requested erasure
+
+ Args:
+ user_ids (iterable[str]): full user id to check
+
+ Returns:
+ Deferred[dict[str, bool]]:
+ for each user, whether the user has requested erasure.
+ """
+ # this serves the dual purpose of (a) making sure we can do len and
+ # iterate it multiple times, and (b) avoiding duplicates.
+ user_ids = tuple(set(user_ids))
+
+ def _get_erased_users(txn):
+ txn.execute(
+ "SELECT user_id FROM erased_users WHERE user_id IN (%s)" % (
+ ",".join("?" * len(user_ids))
+ ),
+ user_ids,
+ )
+ return set(r[0] for r in txn)
+
+ erased_users = yield self.runInteraction(
+ "are_users_erased", _get_erased_users,
+ )
+ res = dict((u, u in erased_users) for u in user_ids)
+ defer.returnValue(res)
+
+
+class UserErasureStore(UserErasureWorkerStore):
+ def mark_user_erased(self, user_id):
+ """Indicate that user_id wishes their message history to be erased.
+
+ Args:
+ user_id (str): full user_id to be erased
+ """
+ def f(txn):
+ # first check if they are already in the list
+ txn.execute(
+ "SELECT 1 FROM erased_users WHERE user_id = ?",
+ (user_id, )
+ )
+ if txn.fetchone():
+ return
+
+ # they are not already there: do the insert.
+ txn.execute(
+ "INSERT INTO erased_users (user_id) VALUES (?)",
+ (user_id, )
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.is_user_erased, (user_id,)
+ )
+ return self.runInteraction("mark_user_erased", f)
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 2a3df7c71d..e9886ef299 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -34,6 +34,9 @@ def unwrapFirstError(failure):
class Clock(object):
"""
A Clock wraps a Twisted reactor and provides utilities on top of it.
+
+ Args:
+ reactor: The Twisted reactor to use.
"""
_reactor = attr.ib()
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 817118e30f..0fb8620001 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -78,7 +78,8 @@ class StreamChangeCache(object):
not_known_entities = set(entities) - set(self._entity_to_key)
result = (
- set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+ {self._cache[k] for k in self._cache.islice(
+ start=self._cache.bisect_right(stream_pos))}
.intersection(entities)
.union(not_known_entities)
)
@@ -113,7 +114,8 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- return self._cache.values()[self._cache.bisect_right(stream_pos) :]
+ return [self._cache[k] for k in self._cache.islice(
+ start=self._cache.bisect_right(stream_pos))]
else:
return None
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 15f0a7ba9e..535e7d0e7a 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -14,7 +14,7 @@
# limitations under the License.
from frozendict import frozendict
-import simplejson as json
+from canonicaljson import json
from six import string_types
diff --git a/synapse/visibility.py b/synapse/visibility.py
index aef4953c1d..65d79cf0d0 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -12,15 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import itertools
import logging
+import operator
from twisted.internet import defer
-from synapse.api.constants import Membership, EventTypes
-
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
-
+from synapse.api.constants import EventTypes, Membership
+from synapse.events.utils import prune_event
+from synapse.util.logcontext import (
+ make_deferred_yieldable, preserve_fn,
+)
logger = logging.getLogger(__name__)
@@ -95,16 +97,27 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if ignore_dict_content else []
)
+ erased_senders = yield store.are_users_erased((e.sender for e in events))
+
def allowed(event):
"""
Args:
event (synapse.events.EventBase): event to check
+
+ Returns:
+ None|EventBase:
+ None if the user cannot see this event at all
+
+ a redacted copy of the event if they can only see a redacted
+ version
+
+ the original event if they can see it as normal.
"""
if not event.is_state() and event.sender in ignore_list:
- return False
+ return None
if event.event_id in always_include_ids:
- return True
+ return event
state = event_id_to_state[event.event_id]
@@ -118,10 +131,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if visibility not in VISIBILITY_PRIORITY:
visibility = "shared"
- # if it was world_readable, it's easy: everyone can read it
- if visibility == "world_readable":
- return True
-
# Always allow history visibility events on boundaries. This is done
# by setting the effective visibility to the least restrictive
# of the old vs new.
@@ -155,7 +164,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if membership == "leave" and (
prev_membership == "join" or prev_membership == "invite"
):
- return True
+ return event
new_priority = MEMBERSHIP_PRIORITY.index(membership)
old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
@@ -166,31 +175,55 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if membership is None:
membership_event = state.get((EventTypes.Member, user_id), None)
if membership_event:
+ # XXX why do we do this?
+ # https://github.com/matrix-org/synapse/issues/3350
if membership_event.event_id not in event_id_forgotten:
membership = membership_event.membership
# if the user was a member of the room at the time of the event,
# they can see it.
if membership == Membership.JOIN:
- return True
+ return event
+
+ # otherwise, it depends on the room visibility.
if visibility == "joined":
# we weren't a member at the time of the event, so we can't
# see this event.
- return False
+ return None
elif visibility == "invited":
# user can also see the event if they were *invited* at the time
# of the event.
- return membership == Membership.INVITE
-
- else:
- # visibility is shared: user can also see the event if they have
- # become a member since the event
+ return (
+ event if membership == Membership.INVITE else None
+ )
+
+ elif visibility == "shared" and is_peeking:
+ # if the visibility is shared, users cannot see the event unless
+ # they have *subequently* joined the room (or were members at the
+ # time, of course)
#
# XXX: if the user has subsequently joined and then left again,
# ideally we would share history up to the point they left. But
- # we don't know when they left.
- return not is_peeking
+ # we don't know when they left. We just treat it as though they
+ # never joined, and restrict access.
+ return None
+
+ # the visibility is either shared or world_readable, and the user was
+ # not a member at the time. We allow it, provided the original sender
+ # has not requested their data to be erased, in which case, we return
+ # a redacted version.
+ if erased_senders[event.sender]:
+ return prune_event(event)
+
+ return event
+
+ # check each event: gives an iterable[None|EventBase]
+ filtered_events = itertools.imap(allowed, events)
+
+ # remove the None entries
+ filtered_events = filter(operator.truth, filtered_events)
- defer.returnValue(list(filter(allowed, events)))
+ # we turn it into a list before returning it.
+ defer.returnValue(list(filtered_events))
|