summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py4
-rw-r--r--synapse/api/constants.py13
-rw-r--r--synapse/config/server.py15
-rw-r--r--synapse/crypto/context_factory.py15
-rw-r--r--synapse/events/__init__.py12
-rw-r--r--synapse/federation/federation_client.py24
-rw-r--r--synapse/handlers/federation.py32
-rw-r--r--synapse/handlers/room_list.py14
-rw-r--r--synapse/http/client.py5
-rw-r--r--synapse/http/federation/matrix_federation_agent.py3
-rw-r--r--synapse/rest/client/v2_alpha/register.py7
-rw-r--r--synapse/storage/_base.py171
-rw-r--r--synapse/storage/client_ips.py5
-rw-r--r--synapse/storage/engines/__init__.py2
-rw-r--r--synapse/storage/engines/postgres.py14
-rw-r--r--synapse/storage/engines/sqlite.py (renamed from synapse/storage/engines/sqlite3.py)9
-rw-r--r--synapse/storage/events.py1
-rw-r--r--synapse/storage/events_worker.py19
-rw-r--r--synapse/storage/pusher.py9
-rw-r--r--synapse/storage/roommember.py8
-rw-r--r--synapse/storage/schema/delta/53/event_format_version.sql16
-rw-r--r--synapse/storage/user_directory.py55
22 files changed, 363 insertions, 90 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index ba1019b9b2..e37b807c94 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -819,7 +819,9 @@ class Auth(object):
             elif threepid:
                 # If the user does not exist yet, but is signing up with a
                 # reserved threepid then pass auth check
-                if is_threepid_reserved(self.hs.config, threepid):
+                if is_threepid_reserved(
+                    self.hs.config.mau_limits_reserved_threepids, threepid
+                ):
                     return
             # Else if there is no room in the MAU bucket, bail
             current_mau = yield self.store.get_monthly_active_count()
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 46c4b4b9dc..51ee078bc3 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -120,6 +120,19 @@ KNOWN_ROOM_VERSIONS = {
     RoomVersions.STATE_V2_TEST,
 }
 
+
+class EventFormatVersions(object):
+    """This is an internal enum for tracking the version of the event format,
+    independently from the room version.
+    """
+    V1 = 1
+
+
+KNOWN_EVENT_FORMAT_VERSIONS = {
+    EventFormatVersions.V1,
+}
+
+
 ServerNoticeMsgType = "m.server_notice"
 ServerNoticeLimitReached = "m.server_notice.usage_limit_reached"
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index fb57791098..22dcc87d8a 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -256,7 +256,11 @@ class ServerConfig(Config):
         #
         # web_client_location: "/path/to/web/root"
 
-        # The public-facing base URL for the client API (not including _matrix/...)
+        # The public-facing base URL that clients use to access this HS
+        # (not including _matrix/...). This is the same URL a user would
+        # enter into the 'custom HS URL' field on their client. If you
+        # use synapse with a reverse proxy, this should be the URL to reach
+        # synapse via the proxy.
         # public_baseurl: https://example.com:8448/
 
         # Set the soft limit on the number of file descriptors synapse can use
@@ -420,19 +424,18 @@ class ServerConfig(Config):
                                   " service on the given port.")
 
 
-def is_threepid_reserved(config, threepid):
+def is_threepid_reserved(reserved_threepids, threepid):
     """Check the threepid against the reserved threepid config
     Args:
-        config(ServerConfig) - to access server config attributes
+        reserved_threepids([dict]) - list of reserved threepids
         threepid(dict) - The threepid to test for
 
     Returns:
         boolean Is the threepid undertest reserved_user
     """
 
-    for tp in config.mau_limits_reserved_threepids:
-        if (threepid['medium'] == tp['medium']
-                and threepid['address'] == tp['address']):
+    for tp in reserved_threepids:
+        if (threepid['medium'] == tp['medium'] and threepid['address'] == tp['address']):
             return True
     return False
 
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 6ba3eca7b2..286ad80100 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -17,6 +17,7 @@ from zope.interface import implementer
 
 from OpenSSL import SSL, crypto
 from twisted.internet._sslverify import _defaultCurveName
+from twisted.internet.abstract import isIPAddress, isIPv6Address
 from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
 from twisted.internet.ssl import CertificateOptions, ContextFactory
 from twisted.python.failure import Failure
@@ -98,8 +99,14 @@ class ClientTLSOptions(object):
 
     def __init__(self, hostname, ctx):
         self._ctx = ctx
-        self._hostname = hostname
-        self._hostnameBytes = _idnaBytes(hostname)
+
+        if isIPAddress(hostname) or isIPv6Address(hostname):
+            self._hostnameBytes = hostname.encode('ascii')
+            self._sendSNI = False
+        else:
+            self._hostnameBytes = _idnaBytes(hostname)
+            self._sendSNI = True
+
         ctx.set_info_callback(
             _tolerateErrors(self._identityVerifyingInfoCallback)
         )
@@ -111,7 +118,9 @@ class ClientTLSOptions(object):
         return connection
 
     def _identityVerifyingInfoCallback(self, connection, where, ret):
-        if where & SSL.SSL_CB_HANDSHAKE_START:
+        # Literal IPv4 and IPv6 addresses are not permitted
+        # as host names according to the RFCs
+        if where & SSL.SSL_CB_HANDSHAKE_START and self._sendSNI:
             connection.set_tlsext_host_name(self._hostnameBytes)
 
 
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 84c75495d5..888296933b 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -18,6 +18,7 @@ from distutils.util import strtobool
 
 import six
 
+from synapse.api.constants import EventFormatVersions
 from synapse.util.caches import intern_dict
 from synapse.util.frozenutils import freeze
 
@@ -41,8 +42,13 @@ class _EventInternalMetadata(object):
     def is_outlier(self):
         return getattr(self, "outlier", False)
 
-    def is_invite_from_remote(self):
-        return getattr(self, "invite_from_remote", False)
+    def is_out_of_band_membership(self):
+        """Whether this is an out of band membership, like an invite or an invite
+        rejection. This is needed as those events are marked as outliers, but
+        they still need to be processed as if they're new events (e.g. updating
+        invite state in the database, relaying to clients, etc).
+        """
+        return getattr(self, "out_of_band_membership", False)
 
     def get_send_on_behalf_of(self):
         """Whether this server should send the event on behalf of another server.
@@ -179,6 +185,8 @@ class EventBase(object):
 
 
 class FrozenEvent(EventBase):
+    format_version = EventFormatVersions.V1  # All events of this type are V1
+
     def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):
         event_dict = dict(event_dict)
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7fb5736142..777deabdf7 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -37,7 +37,7 @@ from synapse.api.errors import (
     HttpResponseException,
     SynapseError,
 )
-from synapse.events import builder
+from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -71,6 +71,8 @@ class FederationClient(FederationBase):
         self.state = hs.get_state_handler()
         self.transport_layer = hs.get_federation_transport_client()
 
+        self.event_builder_factory = hs.get_event_builder_factory()
+
         self._get_pdu_cache = ExpiringCache(
             cache_name="get_pdu_cache",
             clock=self._clock,
@@ -539,6 +541,8 @@ class FederationClient(FederationBase):
         Does so by asking one of the already participating servers to create an
         event with proper context.
 
+        Returns a fully signed and hashed event.
+
         Note that this does not append any events to any graphs.
 
         Args:
@@ -553,8 +557,9 @@ class FederationClient(FederationBase):
             params (dict[str, str|Iterable[str]]): Query parameters to include in the
                 request.
         Return:
-            Deferred: resolves to a tuple of (origin (str), event (object))
-            where origin is the remote homeserver which generated the event.
+            Deferred[tuple[str, FrozenEvent]]: resolves to a tuple of `origin`
+            and event where origin is the remote homeserver which generated
+            the event.
 
             Fails with a ``SynapseError`` if the chosen remote server
             returns a 300/400 code.
@@ -588,7 +593,18 @@ class FederationClient(FederationBase):
             if "prev_state" not in pdu_dict:
                 pdu_dict["prev_state"] = []
 
-            ev = builder.EventBuilder(pdu_dict)
+            # Strip off the fields that we want to clobber.
+            pdu_dict.pop("origin", None)
+            pdu_dict.pop("origin_server_ts", None)
+            pdu_dict.pop("unsigned", None)
+
+            builder = self.event_builder_factory.new(pdu_dict)
+            add_hashes_and_signatures(
+                builder,
+                self.hs.hostname,
+                self.hs.config.signing_key[0]
+            )
+            ev = builder.build()
 
             defer.returnValue(
                 (destination, ev)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5280d88a50..453d393ce1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -44,10 +44,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
-from synapse.crypto.event_signing import (
-    add_hashes_and_signatures,
-    compute_event_signature,
-)
+from synapse.crypto.event_signing import compute_event_signature
 from synapse.events.validator import EventValidator
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
@@ -59,7 +56,6 @@ from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room
-from synapse.util.frozenutils import unfreeze
 from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.visibility import filter_events_for_server
@@ -1088,7 +1084,6 @@ class FederationHandler(BaseHandler):
         handled_events = set()
 
         try:
-            event = self._sign_event(event)
             # Try the host we successfully got a response to /make_join/
             # request first.
             try:
@@ -1292,7 +1287,7 @@ class FederationHandler(BaseHandler):
             )
 
         event.internal_metadata.outlier = True
-        event.internal_metadata.invite_from_remote = True
+        event.internal_metadata.out_of_band_membership = True
 
         event.signatures.update(
             compute_event_signature(
@@ -1318,7 +1313,7 @@ class FederationHandler(BaseHandler):
         # Mark as outlier as we don't have any state for this event; we're not
         # even in the room.
         event.internal_metadata.outlier = True
-        event = self._sign_event(event)
+        event.internal_metadata.out_of_band_membership = True
 
         # Try the host that we succesfully called /make_leave/ on first for
         # the /send_leave/ request.
@@ -1362,27 +1357,6 @@ class FederationHandler(BaseHandler):
         assert(event.room_id == room_id)
         defer.returnValue((origin, event))
 
-    def _sign_event(self, event):
-        event.internal_metadata.outlier = False
-
-        builder = self.event_builder_factory.new(
-            unfreeze(event.get_pdu_json())
-        )
-
-        builder.event_id = self.event_builder_factory.create_event_id()
-        builder.origin = self.hs.hostname
-
-        if not hasattr(event, "signatures"):
-            builder.signatures = {}
-
-        add_hashes_and_signatures(
-            builder,
-            self.hs.hostname,
-            self.hs.config.signing_key[0],
-        )
-
-        return builder.build()
-
     @defer.inlineCallbacks
     @log_function
     def on_make_leave_request(self, room_id, user_id):
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index dc88620885..13e212d669 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -73,8 +73,14 @@ class RoomListHandler(BaseHandler):
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
             logger.info("Bypassing cache as search request.")
+
+            # XXX: Quick hack to stop room directory queries taking too long.
+            # Timeout request after 60s. Probably want a more fundamental
+            # solution at some point
+            timeout = self.clock.time() + 60
             return self._get_public_room_list(
-                limit, since_token, search_filter, network_tuple=network_tuple,
+                limit, since_token, search_filter,
+                network_tuple=network_tuple, timeout=timeout,
             )
 
         key = (limit, since_token, network_tuple)
@@ -87,7 +93,8 @@ class RoomListHandler(BaseHandler):
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
                               search_filter=None,
-                              network_tuple=EMPTY_THIRD_PARTY_ID,):
+                              network_tuple=EMPTY_THIRD_PARTY_ID,
+                              timeout=None,):
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
@@ -202,6 +209,9 @@ class RoomListHandler(BaseHandler):
 
         chunk = []
         for i in range(0, len(rooms_to_scan), step):
+            if timeout and self.clock.time() > timeout:
+                raise Exception("Timed out searching room directory")
+
             batch = rooms_to_scan[i:i + step]
             logger.info("Processing %i rooms for result", len(batch))
             yield concurrently_execute(
diff --git a/synapse/http/client.py b/synapse/http/client.py
index afcf698b29..47a1f82ff0 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -333,9 +333,10 @@ class SimpleHttpClient(object):
             "POST", uri, headers=Headers(actual_headers), data=query_bytes
         )
 
+        body = yield make_deferred_yieldable(readBody(response))
+
         if 200 <= response.code < 300:
-            body = yield make_deferred_yieldable(treq.json_content(response))
-            defer.returnValue(body)
+            defer.returnValue(json.loads(body))
         else:
             raise HttpResponseException(response.code, response.phrase, body)
 
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 64c780a341..0ec28c6696 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -101,7 +101,8 @@ class MatrixFederationAgent(object):
         if port is not None:
             target = (host, port)
         else:
-            server_list = yield self._srv_resolver.resolve_service(server_name_bytes)
+            service_name = b"_matrix._tcp.%s" % (server_name_bytes, )
+            server_list = yield self._srv_resolver.resolve_service(service_name)
             if not server_list:
                 target = (host, 8448)
                 logger.debug("No SRV record for %s, using %s", host, target)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 14025cd219..7f812b8209 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -416,8 +416,11 @@ class RegisterRestServlet(RestServlet):
             )
             # Necessary due to auth checks prior to the threepid being
             # written to the db
-            if is_threepid_reserved(self.hs.config, threepid):
-                yield self.store.upsert_monthly_active_user(registered_user_id)
+            if threepid:
+                if is_threepid_reserved(
+                    self.hs.config.mau_limits_reserved_threepids, threepid
+                ):
+                    yield self.store.upsert_monthly_active_user(registered_user_id)
 
             # remember that we've now registered that user account, and with
             #  what user ID (since the user may not have specified)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 865b5e915a..f62f70b9f1 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -26,6 +26,7 @@ from prometheus_client import Histogram
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.engines import PostgresEngine
 from synapse.util.caches.descriptors import Cache
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@@ -192,6 +193,51 @@ class SQLBaseStore(object):
 
         self.database_engine = hs.database_engine
 
+        # A set of tables that are not safe to use native upserts in.
+        self._unsafe_to_upsert_tables = {"user_ips"}
+
+        if self.database_engine.can_native_upsert:
+            # Check ASAP (and then later, every 1s) to see if we have finished
+            # background updates of tables that aren't safe to update.
+            self._clock.call_later(
+                0.0,
+                run_as_background_process,
+                "upsert_safety_check",
+                self._check_safe_to_upsert
+            )
+
+    @defer.inlineCallbacks
+    def _check_safe_to_upsert(self):
+        """
+        Is it safe to use native UPSERT?
+
+        If there are background updates, we will need to wait, as they may be
+        the addition of indexes that set the UNIQUE constraint that we require.
+
+        If the background updates have not completed, wait 15 sec and check again.
+        """
+        updates = yield self._simple_select_list(
+            "background_updates",
+            keyvalues=None,
+            retcols=["update_name"],
+            desc="check_background_updates",
+        )
+        updates = [x["update_name"] for x in updates]
+
+        # The User IPs table in schema #53 was missing a unique index, which we
+        # run as a background update.
+        if "user_ips_device_unique_index" not in updates:
+            self._unsafe_to_upsert_tables.discard("user_ips")
+
+        # If there's any tables left to check, reschedule to run.
+        if self._unsafe_to_upsert_tables:
+            self._clock.call_later(
+                15.0,
+                run_as_background_process,
+                "upsert_safety_check",
+                self._check_safe_to_upsert
+            )
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -494,8 +540,15 @@ class SQLBaseStore(object):
         txn.executemany(sql, vals)
 
     @defer.inlineCallbacks
-    def _simple_upsert(self, table, keyvalues, values,
-                       insertion_values={}, desc="_simple_upsert", lock=True):
+    def _simple_upsert(
+        self,
+        table,
+        keyvalues,
+        values,
+        insertion_values={},
+        desc="_simple_upsert",
+        lock=True
+    ):
         """
 
         `lock` should generally be set to True (the default), but can be set
@@ -516,16 +569,21 @@ class SQLBaseStore(object):
                 inserting
             lock (bool): True to lock the table when doing the upsert.
         Returns:
-            Deferred(bool): True if a new entry was created, False if an
-                existing one was updated.
+            Deferred(None or bool): Native upserts always return None. Emulated
+            upserts return True if a new entry was created, False if an existing
+            one was updated.
         """
         attempts = 0
         while True:
             try:
                 result = yield self.runInteraction(
                     desc,
-                    self._simple_upsert_txn, table, keyvalues, values, insertion_values,
-                    lock=lock
+                    self._simple_upsert_txn,
+                    table,
+                    keyvalues,
+                    values,
+                    insertion_values,
+                    lock=lock,
                 )
                 defer.returnValue(result)
             except self.database_engine.module.IntegrityError as e:
@@ -537,12 +595,71 @@ class SQLBaseStore(object):
 
                 # presumably we raced with another transaction: let's retry.
                 logger.warn(
-                    "IntegrityError when upserting into %s; retrying: %s",
-                    table, e
+                    "%s when upserting into %s; retrying: %s", e.__name__, table, e
                 )
 
-    def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
-                           lock=True):
+    def _simple_upsert_txn(
+        self,
+        txn,
+        table,
+        keyvalues,
+        values,
+        insertion_values={},
+        lock=True,
+    ):
+        """
+        Pick the UPSERT method which works best on the platform. Either the
+        native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+
+        Args:
+            txn: The transaction to use.
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+            insertion_values (dict): additional key/values to use only when
+                inserting
+            lock (bool): True to lock the table when doing the upsert.
+        Returns:
+            None or bool: Native upserts always return None. Emulated
+            upserts return True if a new entry was created, False if an existing
+            one was updated.
+        """
+        if (
+            self.database_engine.can_native_upsert
+            and table not in self._unsafe_to_upsert_tables
+        ):
+            return self._simple_upsert_txn_native_upsert(
+                txn,
+                table,
+                keyvalues,
+                values,
+                insertion_values=insertion_values,
+            )
+        else:
+            return self._simple_upsert_txn_emulated(
+                txn,
+                table,
+                keyvalues,
+                values,
+                insertion_values=insertion_values,
+                lock=lock,
+            )
+
+    def _simple_upsert_txn_emulated(
+        self, txn, table, keyvalues, values, insertion_values={}, lock=True
+    ):
+        """
+        Args:
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+            insertion_values (dict): additional key/values to use only when
+                inserting
+            lock (bool): True to lock the table when doing the upsert.
+        Returns:
+            bool: Return True if a new entry was created, False if an existing
+            one was updated.
+        """
         # We need to lock the table :(, unless we're *really* careful
         if lock:
             self.database_engine.lock_table(txn, table)
@@ -577,12 +694,44 @@ class SQLBaseStore(object):
         sql = "INSERT INTO %s (%s) VALUES (%s)" % (
             table,
             ", ".join(k for k in allvalues),
-            ", ".join("?" for _ in allvalues)
+            ", ".join("?" for _ in allvalues),
         )
         txn.execute(sql, list(allvalues.values()))
         # successfully inserted
         return True
 
+    def _simple_upsert_txn_native_upsert(
+        self, txn, table, keyvalues, values, insertion_values={}
+    ):
+        """
+        Use the native UPSERT functionality in recent PostgreSQL versions.
+
+        Args:
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+            insertion_values (dict): additional key/values to use only when
+                inserting
+        Returns:
+            None
+        """
+        allvalues = {}
+        allvalues.update(keyvalues)
+        allvalues.update(values)
+        allvalues.update(insertion_values)
+
+        sql = (
+            "INSERT INTO %s (%s) VALUES (%s) "
+            "ON CONFLICT (%s) DO UPDATE SET %s"
+        ) % (
+            table,
+            ", ".join(k for k in allvalues),
+            ", ".join("?" for _ in allvalues),
+            ", ".join(k for k in keyvalues),
+            ", ".join(k + "=EXCLUDED." + k for k in values),
+        )
+        txn.execute(sql, list(allvalues.values()))
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False, desc="_simple_select_one"):
         """Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b228a20ac2..091d7116c5 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -257,7 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         )
 
     def _update_client_ips_batch_txn(self, txn, to_update):
-        self.database_engine.lock_table(txn, "user_ips")
+        if "user_ips" in self._unsafe_to_upsert_tables or (
+            not self.database_engine.can_native_upsert
+        ):
+            self.database_engine.lock_table(txn, "user_ips")
 
         for entry in iteritems(to_update):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index e2f9de8451..ff5ef97ca8 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,7 +18,7 @@ import platform
 
 from ._base import IncorrectDatabaseSetup
 from .postgres import PostgresEngine
-from .sqlite3 import Sqlite3Engine
+from .sqlite import Sqlite3Engine
 
 SUPPORTED_MODULE = {
     "sqlite3": Sqlite3Engine,
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 42225f8a2a..4004427c7b 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -38,6 +38,13 @@ class PostgresEngine(object):
         return sql.replace("?", "%s")
 
     def on_new_connection(self, db_conn):
+
+        # Get the version of PostgreSQL that we're using. As per the psycopg2
+        # docs: The number is formed by converting the major, minor, and
+        # revision numbers into two-decimal-digit numbers and appending them
+        # together. For example, version 8.1.5 will be returned as 80105
+        self._version = db_conn.server_version
+
         db_conn.set_isolation_level(
             self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
         )
@@ -54,6 +61,13 @@ class PostgresEngine(object):
 
         cursor.close()
 
+    @property
+    def can_native_upsert(self):
+        """
+        Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+        """
+        return self._version >= 90500
+
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
             # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite.py
index 19949fc474..c64d73ff21 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite.py
@@ -15,6 +15,7 @@
 
 import struct
 import threading
+from sqlite3 import sqlite_version_info
 
 from synapse.storage.prepare_database import prepare_database
 
@@ -30,6 +31,14 @@ class Sqlite3Engine(object):
         self._current_state_group_id = None
         self._current_state_group_id_lock = threading.Lock()
 
+    @property
+    def can_native_upsert(self):
+        """
+        Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
+        more work we haven't done yet to tell what was inserted vs updated.
+        """
+        return sqlite_version_info >= (3, 24, 0)
+
     def check_database(self, txn):
         pass
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 79e0276de6..3e1915fb87 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1268,6 +1268,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
                         event.internal_metadata.get_dict()
                     ),
                     "json": encode_json(event_dict(event)),
+                    "format_version": event.format_version,
                 }
                 for event, _ in events_and_contexts
             ],
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a8326f5296..599f892858 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -21,10 +21,10 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventFormatVersions
 from synapse.api.errors import NotFoundError
-# these are only included to make the type annotations work
-from synapse.events import EventBase  # noqa: F401
 from synapse.events import FrozenEvent
+# these are only included to make the type annotations work
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -353,6 +353,7 @@ class EventsWorkerStore(SQLBaseStore):
                     self._get_event_from_row,
                     row["internal_metadata"], row["json"], row["redacts"],
                     rejected_reason=row["rejects"],
+                    format_version=row["format_version"],
                 )
                 for row in rows
             ],
@@ -377,6 +378,7 @@ class EventsWorkerStore(SQLBaseStore):
                 " e.event_id as event_id, "
                 " e.internal_metadata,"
                 " e.json,"
+                " e.format_version, "
                 " r.redacts as redacts,"
                 " rej.event_id as rejects "
                 " FROM event_json as e"
@@ -392,7 +394,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def _get_event_from_row(self, internal_metadata, js, redacted,
-                            rejected_reason=None):
+                            format_version, rejected_reason=None):
         with Measure(self._clock, "_get_event_from_row"):
             d = json.loads(js)
             internal_metadata = json.loads(internal_metadata)
@@ -405,8 +407,17 @@ class EventsWorkerStore(SQLBaseStore):
                     desc="_get_event_from_row_rejected_reason",
                 )
 
+            if format_version is None:
+                # This means that we stored the event before we had the concept
+                # of a event format version, so it must be a V1 event.
+                format_version = EventFormatVersions.V1
+
+            # TODO: When we implement new event formats we'll need to use a
+            # different event python type
+            assert format_version == EventFormatVersions.V1
+
             original_ev = FrozenEvent(
-                d,
+                event_dict=d,
                 internal_metadata_dict=internal_metadata,
                 rejected_reason=rejected_reason,
             )
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 2743b52bad..134297e284 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -215,7 +215,7 @@ class PusherStore(PusherWorkerStore):
         with self._pushers_id_gen.get_next() as stream_id:
             # no need to lock because `pushers` has a unique key on
             # (app_id, pushkey, user_name) so _simple_upsert will retry
-            newly_inserted = yield self._simple_upsert(
+            yield self._simple_upsert(
                 table="pushers",
                 keyvalues={
                     "app_id": app_id,
@@ -238,7 +238,12 @@ class PusherStore(PusherWorkerStore):
                 lock=False,
             )
 
-            if newly_inserted:
+            user_has_pusher = self.get_if_user_has_pusher.cache.get(
+                (user_id,), None, update_metrics=False
+            )
+
+            if user_has_pusher is not True:
+                # invalidate, since we the user might not have had a pusher before
                 yield self.runInteraction(
                     "add_pusher",
                     self._invalidate_cache_and_stream,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0707f9a86a..592c1bcd33 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -588,12 +588,12 @@ class RoomMemberStore(RoomMemberWorkerStore):
             )
 
             # We update the local_invites table only if the event is "current",
-            # i.e., its something that has just happened.
-            # The only current event that can also be an outlier is if its an
-            # invite that has come in across federation.
+            # i.e., its something that has just happened. If the event is an
+            # outlier it is only current if its an "out of band membership",
+            # like a remote invite or a rejection of a remote invite.
             is_new_state = not backfilled and (
                 not event.internal_metadata.is_outlier()
-                or event.internal_metadata.is_invite_from_remote()
+                or event.internal_metadata.is_out_of_band_membership()
             )
             is_mine = self.hs.is_mine_id(event.state_key)
             if is_new_state and is_mine:
diff --git a/synapse/storage/schema/delta/53/event_format_version.sql b/synapse/storage/schema/delta/53/event_format_version.sql
new file mode 100644
index 0000000000..1d977c2834
--- /dev/null
+++ b/synapse/storage/schema/delta/53/event_format_version.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE event_json ADD COLUMN format_version INTEGER;
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index a8781b0e5d..ce48212265 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -168,14 +168,14 @@ class UserDirectoryStore(SQLBaseStore):
             if isinstance(self.database_engine, PostgresEngine):
                 # We weight the localpart most highly, then display name and finally
                 # server name
-                if new_entry:
+                if self.database_engine.can_native_upsert:
                     sql = """
                         INSERT INTO user_directory_search(user_id, vector)
                         VALUES (?,
                             setweight(to_tsvector('english', ?), 'A')
                             || setweight(to_tsvector('english', ?), 'D')
                             || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
-                        )
+                        ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
                     """
                     txn.execute(
                         sql,
@@ -185,20 +185,45 @@ class UserDirectoryStore(SQLBaseStore):
                         )
                     )
                 else:
-                    sql = """
-                        UPDATE user_directory_search
-                        SET vector = setweight(to_tsvector('english', ?), 'A')
-                            || setweight(to_tsvector('english', ?), 'D')
-                            || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
-                        WHERE user_id = ?
-                    """
-                    txn.execute(
-                        sql,
-                        (
-                            get_localpart_from_id(user_id), get_domain_from_id(user_id),
-                            display_name, user_id,
+                    # TODO: Remove this code after we've bumped the minimum version
+                    # of postgres to always support upserts, so we can get rid of
+                    # `new_entry` usage
+                    if new_entry is True:
+                        sql = """
+                            INSERT INTO user_directory_search(user_id, vector)
+                            VALUES (?,
+                                setweight(to_tsvector('english', ?), 'A')
+                                || setweight(to_tsvector('english', ?), 'D')
+                                || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                            )
+                        """
+                        txn.execute(
+                            sql,
+                            (
+                                user_id, get_localpart_from_id(user_id),
+                                get_domain_from_id(user_id), display_name,
+                            )
+                        )
+                    elif new_entry is False:
+                        sql = """
+                            UPDATE user_directory_search
+                            SET vector = setweight(to_tsvector('english', ?), 'A')
+                                || setweight(to_tsvector('english', ?), 'D')
+                                || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                            WHERE user_id = ?
+                        """
+                        txn.execute(
+                            sql,
+                            (
+                                get_localpart_from_id(user_id),
+                                get_domain_from_id(user_id),
+                                display_name, user_id,
+                            )
+                        )
+                    else:
+                        raise RuntimeError(
+                            "upsert returned None when 'can_native_upsert' is False"
                         )
-                    )
             elif isinstance(self.database_engine, Sqlite3Engine):
                 value = "%s %s" % (user_id, display_name,) if display_name else user_id
                 self._simple_upsert_txn(