summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/homeserver.py2
-rw-r--r--synapse/config/third_party_event_rules.py42
-rw-r--r--synapse/events/third_party_rules.py62
-rw-r--r--synapse/groups/attestations.py11
-rw-r--r--synapse/handlers/federation.py74
-rw-r--r--synapse/handlers/groups_local.py4
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/profile.py29
-rw-r--r--synapse/metrics/__init__.py1
-rw-r--r--synapse/push/emailpusher.py19
-rw-r--r--synapse/push/presentable_names.py11
-rw-r--r--synapse/push/pusherpool.py30
-rw-r--r--synapse/replication/http/_base.py10
-rw-r--r--synapse/server.py7
-rw-r--r--synapse/storage/engines/postgres.py8
-rw-r--r--synapse/storage/events.py3
-rw-r--r--synapse/storage/registration.py6
-rw-r--r--synapse/storage/search.py22
18 files changed, 288 insertions, 67 deletions
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 5c4fc8ff21..acadef4fd3 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -38,6 +38,7 @@ from .server import ServerConfig
 from .server_notices_config import ServerNoticesConfig
 from .spam_checker import SpamCheckerConfig
 from .stats import StatsConfig
+from .third_party_event_rules import ThirdPartyRulesConfig
 from .tls import TlsConfig
 from .user_directory import UserDirectoryConfig
 from .voip import VoipConfig
@@ -73,5 +74,6 @@ class HomeServerConfig(
     StatsConfig,
     ServerNoticesConfig,
     RoomDirectoryConfig,
+    ThirdPartyRulesConfig,
 ):
     pass
diff --git a/synapse/config/third_party_event_rules.py b/synapse/config/third_party_event_rules.py
new file mode 100644
index 0000000000..a89dd5f98a
--- /dev/null
+++ b/synapse/config/third_party_event_rules.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.module_loader import load_module
+
+from ._base import Config
+
+
+class ThirdPartyRulesConfig(Config):
+    def read_config(self, config):
+        self.third_party_event_rules = None
+
+        provider = config.get("third_party_event_rules", None)
+        if provider is not None:
+            self.third_party_event_rules = load_module(provider)
+
+    def default_config(self, **kwargs):
+        return """\
+        # Server admins can define a Python module that implements extra rules for
+        # allowing or denying incoming events. In order to work, this module needs to
+        # override the methods defined in synapse/events/third_party_rules.py.
+        #
+        # This feature is designed to be used in closed federations only, where each
+        # participating server enforces the same rules.
+        #
+        #third_party_event_rules:
+        #  module: "my_custom_project.SuperRulesSet"
+        #  config:
+        #    example_option: 'things'
+        """
diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py
new file mode 100644
index 0000000000..9f98d51523
--- /dev/null
+++ b/synapse/events/third_party_rules.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+class ThirdPartyEventRules(object):
+    """Allows server admins to provide a Python module implementing an extra set of rules
+    to apply when processing events.
+
+    This is designed to help admins of closed federations with enforcing custom
+    behaviours.
+    """
+
+    def __init__(self, hs):
+        self.third_party_rules = None
+
+        self.store = hs.get_datastore()
+
+        module = None
+        config = None
+        if hs.config.third_party_event_rules:
+            module, config = hs.config.third_party_event_rules
+
+        if module is not None:
+            self.third_party_rules = module(config=config)
+
+    @defer.inlineCallbacks
+    def check_event_allowed(self, event, context):
+        """Check if a provided event should be allowed in the given context.
+
+        Args:
+            event (synapse.events.EventBase): The event to be checked.
+            context (synapse.events.snapshot.EventContext): The context of the event.
+
+        Returns:
+            defer.Deferred(bool), True if the event should be allowed, False if not.
+        """
+        if self.third_party_rules is None:
+            defer.returnValue(True)
+
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        # Retrieve the state events from the database.
+        state_events = {}
+        for key, event_id in prev_state_ids.items():
+            state_events[key] = yield self.store.get_event(event_id, allow_none=True)
+
+        ret = yield self.third_party_rules.check_event_allowed(event, state_events)
+        defer.returnValue(ret)
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index e5dda1975f..3ba74001d8 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -42,7 +42,7 @@ from signedjson.sign import sign_json
 
 from twisted.internet import defer
 
-from synapse.api.errors import RequestSendFailed, SynapseError
+from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import get_domain_from_id
 from synapse.util.logcontext import run_in_background
@@ -132,9 +132,10 @@ class GroupAttestionRenewer(object):
         self.is_mine_id = hs.is_mine_id
         self.attestations = hs.get_groups_attestation_signing()
 
-        self._renew_attestations_loop = self.clock.looping_call(
-            self._start_renew_attestations, 30 * 60 * 1000,
-        )
+        if not hs.config.worker_app:
+            self._renew_attestations_loop = self.clock.looping_call(
+                self._start_renew_attestations, 30 * 60 * 1000,
+            )
 
     @defer.inlineCallbacks
     def on_renew_attestation(self, group_id, user_id, content):
@@ -194,7 +195,7 @@ class GroupAttestionRenewer(object):
                 yield self.store.update_attestation_renewal(
                     group_id, user_id, attestation
                 )
-            except RequestSendFailed as e:
+            except (RequestSendFailed, HttpResponseException) as e:
                 logger.warning(
                     "Failed to renew attestation of %r in %r: %s",
                     user_id, group_id, e,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac5ca79143..93e064cda3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -33,6 +34,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
+    Codes,
     FederationDeniedError,
     FederationError,
     RequestSendFailed,
@@ -127,6 +129,8 @@ class FederationHandler(BaseHandler):
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
+        self.third_party_event_rules = hs.get_third_party_event_rules()
+
     @defer.inlineCallbacks
     def on_receive_pdu(
             self, origin, pdu, sent_to_us_directly=False,
@@ -1258,6 +1262,15 @@ class FederationHandler(BaseHandler):
             logger.warn("Failed to create join %r because %s", event, e)
             raise e
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.info("Creation of join %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
         yield self.auth.check_from_context(
@@ -1300,6 +1313,15 @@ class FederationHandler(BaseHandler):
             origin, event
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.info("Sending of join %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         logger.debug(
             "on_send_join_request: After _handle_new_event: %s, sigs: %s",
             event.event_id,
@@ -1458,6 +1480,15 @@ class FederationHandler(BaseHandler):
             builder=builder,
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.warning("Creation of leave %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         try:
             # The remote hasn't signed it yet, obviously. We'll do the full checks
             # when we get the event back in `on_send_leave_request`
@@ -1484,10 +1515,19 @@ class FederationHandler(BaseHandler):
 
         event.internal_metadata.outlier = False
 
-        yield self._handle_new_event(
+        context = yield self._handle_new_event(
             origin, event
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.info("Sending of leave %s forbidden by third-party rules", event)
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         logger.debug(
             "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
             event.event_id,
@@ -2550,6 +2590,18 @@ class FederationHandler(BaseHandler):
                 builder=builder
             )
 
+            event_allowed = yield self.third_party_event_rules.check_event_allowed(
+                event, context,
+            )
+            if not event_allowed:
+                logger.info(
+                    "Creation of threepid invite %s forbidden by third-party rules",
+                    event,
+                )
+                raise SynapseError(
+                    403, "This event is not allowed in this context", Codes.FORBIDDEN,
+                )
+
             event, context = yield self.add_display_name_to_third_party_invite(
                 room_version, event_dict, event, context
             )
@@ -2598,6 +2650,18 @@ class FederationHandler(BaseHandler):
             builder=builder,
         )
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            logger.warning(
+                "Exchange of threepid invite %s forbidden by third-party rules",
+                event,
+            )
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         event, context = yield self.add_display_name_to_third_party_invite(
             room_version, event_dict, event, context
         )
@@ -2613,12 +2677,6 @@ class FederationHandler(BaseHandler):
         # though the sender isn't a local user.
         event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
 
-        # XXX we send the invite here, but send_membership_event also sends it,
-        # so we end up making two requests. I think this is redundant.
-        returned_invite = yield self.send_invite(origin, event)
-        # TODO: Make sure the signatures actually are correct.
-        event.signatures.update(returned_invite.signatures)
-
         member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 02c508acec..f60ace02e8 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -49,9 +49,7 @@ def _create_rerouter(func_name):
             def http_response_errback(failure):
                 failure.trap(HttpResponseException)
                 e = failure.value
-                if e.code == 403:
-                    raise e.to_synapse_error()
-                return failure
+                raise e.to_synapse_error()
 
             def request_failed_errback(failure):
                 failure.trap(RequestSendFailed)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b02469ceb..11650dc80c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2017 - 2018 New Vector Ltd
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -248,6 +249,7 @@ class EventCreationHandler(object):
         self.action_generator = hs.get_action_generator()
 
         self.spam_checker = hs.get_spam_checker()
+        self.third_party_event_rules = hs.get_third_party_event_rules()
 
         self._block_events_without_consent_error = (
             self.config.block_events_without_consent_error
@@ -658,6 +660,14 @@ class EventCreationHandler(object):
         else:
             room_version = yield self.store.get_room_version(event.room_id)
 
+        event_allowed = yield self.third_party_event_rules.check_event_allowed(
+            event, context,
+        )
+        if not event_allowed:
+            raise SynapseError(
+                403, "This event is not allowed in this context", Codes.FORBIDDEN,
+            )
+
         try:
             yield self.auth.check_from_context(room_version, event, context)
         except AuthError as err:
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a5fc6c5dbf..3e04233394 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,12 +15,15 @@
 
 import logging
 
+from six import raise_from
+
 from twisted.internet import defer
 
 from synapse.api.errors import (
     AuthError,
-    CodeMessageException,
     Codes,
+    HttpResponseException,
+    RequestSendFailed,
     StoreError,
     SynapseError,
 )
@@ -85,10 +88,10 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
                 defer.returnValue(result)
-            except CodeMessageException as e:
-                if e.code != 404:
-                    logger.exception("Failed to get displayname")
-                raise
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
 
     @defer.inlineCallbacks
     def get_profile_from_cache(self, user_id):
@@ -142,10 +145,10 @@ class BaseProfileHandler(BaseHandler):
                     },
                     ignore_backoff=True,
                 )
-            except CodeMessageException as e:
-                if e.code != 404:
-                    logger.exception("Failed to get displayname")
-                raise
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
 
             defer.returnValue(result["displayname"])
 
@@ -208,10 +211,10 @@ class BaseProfileHandler(BaseHandler):
                     },
                     ignore_backoff=True,
                 )
-            except CodeMessageException as e:
-                if e.code != 404:
-                    logger.exception("Failed to get avatar_url")
-                raise
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+            except HttpResponseException as e:
+                raise e.to_synapse_error()
 
             defer.returnValue(result["avatar_url"])
 
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 0d3ae1a43d..8aee14a8a8 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -224,7 +224,6 @@ class BucketCollector(object):
             for i, bound in enumerate(self.buckets):
                 if x <= bound:
                     buckets[bound] = buckets.get(bound, 0) + data[x]
-                    break
 
         for i in self.buckets:
             res.append([str(i), buckets.get(i, 0)])
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index e8ee67401f..c89a8438a9 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -114,6 +114,21 @@ class EmailPusher(object):
 
         run_as_background_process("emailpush.process", self._process)
 
+    def _pause_processing(self):
+        """Used by tests to temporarily pause processing of events.
+
+        Asserts that its not currently processing.
+        """
+        assert not self._is_processing
+        self._is_processing = True
+
+    def _resume_processing(self):
+        """Used by tests to resume processing of events after pausing.
+        """
+        assert self._is_processing
+        self._is_processing = False
+        self._start_processing()
+
     @defer.inlineCallbacks
     def _process(self):
         # we should never get here if we are already processing
@@ -215,6 +230,10 @@ class EmailPusher(object):
 
     @defer.inlineCallbacks
     def save_last_stream_ordering_and_success(self, last_stream_ordering):
+        if last_stream_ordering is None:
+            # This happens if we haven't yet processed anything
+            return
+
         self.last_stream_ordering = last_stream_ordering
         yield self.store.update_pusher_last_stream_ordering_and_success(
             self.app_id, self.email, self.user_id,
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index eef6e18c2e..0c66702325 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -162,6 +162,17 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
 
 
 def descriptor_from_member_events(member_events):
+    """Get a description of the room based on the member events.
+
+    Args:
+        member_events (Iterable[FrozenEvent])
+
+    Returns:
+        str
+    """
+
+    member_events = list(member_events)
+
     if len(member_events) == 0:
         return "nobody"
     elif len(member_events) == 1:
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 40a7709c09..63c583565f 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -60,6 +60,11 @@ class PusherPool:
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name, pushkey, lang, data,
                    profile_tag=""):
+        """Creates a new pusher and adds it to the pool
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher]
+        """
         time_now_msec = self.clock.time_msec()
 
         # we try to create the pusher just to validate the config: it
@@ -103,7 +108,9 @@ class PusherPool:
             last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
         )
-        yield self.start_pusher_by_id(app_id, pushkey, user_id)
+        pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
     def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@@ -184,7 +191,11 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def start_pusher_by_id(self, app_id, pushkey, user_id):
-        """Look up the details for the given pusher, and start it"""
+        """Look up the details for the given pusher, and start it
+
+        Returns:
+            Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any
+        """
         if not self._should_start_pushers:
             return
 
@@ -192,13 +203,16 @@ class PusherPool:
             app_id, pushkey
         )
 
-        p = None
+        pusher_dict = None
         for r in resultlist:
             if r['user_name'] == user_id:
-                p = r
+                pusher_dict = r
 
-        if p:
-            yield self._start_pusher(p)
+        pusher = None
+        if pusher_dict:
+            pusher = yield self._start_pusher(pusher_dict)
+
+        defer.returnValue(pusher)
 
     @defer.inlineCallbacks
     def _start_pushers(self):
@@ -224,7 +238,7 @@ class PusherPool:
             pusherdict (dict):
 
         Returns:
-            None
+            Deferred[EmailPusher|HttpPusher]
         """
         try:
             p = self.pusher_factory.create_pusher(pusherdict)
@@ -270,6 +284,8 @@ class PusherPool:
 
         p.on_started(have_notifs)
 
+        defer.returnValue(p)
+
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):
         appid_pushkey = "%s:%s" % (app_id, pushkey)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index e81456ab2b..0a432a16fa 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -17,11 +17,17 @@ import abc
 import logging
 import re
 
+from six import raise_from
 from six.moves import urllib
 
 from twisted.internet import defer
 
-from synapse.api.errors import CodeMessageException, HttpResponseException
+from synapse.api.errors import (
+    CodeMessageException,
+    HttpResponseException,
+    RequestSendFailed,
+    SynapseError,
+)
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.stringutils import random_string
 
@@ -175,6 +181,8 @@ class ReplicationEndpoint(object):
                 # on the master process that we should send to the client. (And
                 # importantly, not stack traces everywhere)
                 raise e.to_synapse_error()
+            except RequestSendFailed as e:
+                raise_from(SynapseError(502, "Failed to talk to master"), e)
 
             defer.returnValue(result)
 
diff --git a/synapse/server.py b/synapse/server.py
index 9229a68a8d..a54e023cc9 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -1,5 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -35,6 +37,7 @@ from synapse.crypto import context_factory
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
 from synapse.events.spamcheck import SpamChecker
+from synapse.events.third_party_rules import ThirdPartyEventRules
 from synapse.events.utils import EventClientSerializer
 from synapse.federation.federation_client import FederationClient
 from synapse.federation.federation_server import (
@@ -178,6 +181,7 @@ class HomeServer(object):
         'groups_attestation_renewer',
         'secrets',
         'spam_checker',
+        'third_party_event_rules',
         'room_member_handler',
         'federation_registry',
         'server_notices_manager',
@@ -483,6 +487,9 @@ class HomeServer(object):
     def build_spam_checker(self):
         return SpamChecker(self)
 
+    def build_third_party_event_rules(self):
+        return ThirdPartyEventRules(self)
+
     def build_room_member_handler(self):
         if self.config.worker_app:
             return RoomMemberWorkerHandler(self)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 1b97ee74e3..289b6bc281 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -45,6 +45,10 @@ class PostgresEngine(object):
         # together. For example, version 8.1.5 will be returned as 80105
         self._version = db_conn.server_version
 
+        # Are we on a supported PostgreSQL version?
+        if self._version < 90500:
+            raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.")
+
         db_conn.set_isolation_level(
             self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
         )
@@ -64,9 +68,9 @@ class PostgresEngine(object):
     @property
     def can_native_upsert(self):
         """
-        Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+        Can we use native UPSERTs?
         """
-        return self._version >= 90500
+        return True
 
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1578403f79..f631fb1733 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -228,7 +228,8 @@ class EventsStore(
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
         # Collect metrics on the number of forward extremities that exist.
-        self._current_forward_extremities_amount = {}
+        # Counter of number of extremities to count
+        self._current_forward_extremities_amount = c_counter()
 
         BucketCollector(
             "synapse_forward_extremities",
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9f910eac9c..d36917e4d6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -642,7 +642,9 @@ class RegistrationStore(
                 FROM users
                     LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
                     LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
-                WHERE password_hash IS NULL OR password_hash = ''
+                WHERE (users.password_hash IS NULL OR users.password_hash = '')
+                AND (users.appservice_id IS NULL OR users.appservice_id = '')
+                AND users.is_guest = 0
                 AND users.name > ?
                 GROUP BY users.name
                 ORDER BY users.name ASC
@@ -666,7 +668,7 @@ class RegistrationStore(
             logger.info("Marked %d rows as deactivated", rows_processed_nb)
 
             self._background_update_progress_txn(
-                txn, "users_set_deactivated_flag", {"user_id": rows[-1]["user_id"]}
+                txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
             )
 
             if batch_size > len(rows):
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index ff49eaae02..10a27c207a 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -341,29 +341,7 @@ class SearchStore(BackgroundUpdateStore):
                 for entry in entries
             )
 
-            # inserts to a GIN index are normally batched up into a pending
-            # list, and then all committed together once the list gets to a
-            # certain size. The trouble with that is that postgres (pre-9.5)
-            # uses work_mem to determine the length of the list, and work_mem
-            # is typically very large.
-            #
-            # We therefore reduce work_mem while we do the insert.
-            #
-            # (postgres 9.5 uses the separate gin_pending_list_limit setting,
-            # so doesn't suffer the same problem, but changing work_mem will
-            # be harmless)
-            #
-            # Note that we don't need to worry about restoring it on
-            # exception, because exceptions will cause the transaction to be
-            # rolled back, including the effects of the SET command.
-            #
-            # Also: we use SET rather than SET LOCAL because there's lots of
-            # other stuff going on in this transaction, which want to have the
-            # normal work_mem setting.
-
-            txn.execute("SET work_mem='256kB'")
             txn.executemany(sql, args)
-            txn.execute("RESET work_mem")
 
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = (