diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 865b5e915a..f62f70b9f1 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -26,6 +26,7 @@ from prometheus_client import Histogram
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@@ -192,6 +193,51 @@ class SQLBaseStore(object):
self.database_engine = hs.database_engine
+ # A set of tables that are not safe to use native upserts in.
+ self._unsafe_to_upsert_tables = {"user_ips"}
+
+ if self.database_engine.can_native_upsert:
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates of tables that aren't safe to update.
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
+
+ @defer.inlineCallbacks
+ def _check_safe_to_upsert(self):
+ """
+ Is it safe to use native UPSERT?
+
+ If there are background updates, we will need to wait, as they may be
+ the addition of indexes that set the UNIQUE constraint that we require.
+
+ If the background updates have not completed, wait 15 sec and check again.
+ """
+ updates = yield self._simple_select_list(
+ "background_updates",
+ keyvalues=None,
+ retcols=["update_name"],
+ desc="check_background_updates",
+ )
+ updates = [x["update_name"] for x in updates]
+
+ # The User IPs table in schema #53 was missing a unique index, which we
+ # run as a background update.
+ if "user_ips_device_unique_index" not in updates:
+ self._unsafe_to_upsert_tables.discard("user_ips")
+
+ # If there's any tables left to check, reschedule to run.
+ if self._unsafe_to_upsert_tables:
+ self._clock.call_later(
+ 15.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert
+ )
+
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
@@ -494,8 +540,15 @@ class SQLBaseStore(object):
txn.executemany(sql, vals)
@defer.inlineCallbacks
- def _simple_upsert(self, table, keyvalues, values,
- insertion_values={}, desc="_simple_upsert", lock=True):
+ def _simple_upsert(
+ self,
+ table,
+ keyvalues,
+ values,
+ insertion_values={},
+ desc="_simple_upsert",
+ lock=True
+ ):
"""
`lock` should generally be set to True (the default), but can be set
@@ -516,16 +569,21 @@ class SQLBaseStore(object):
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
- Deferred(bool): True if a new entry was created, False if an
- existing one was updated.
+ Deferred(None or bool): Native upserts always return None. Emulated
+ upserts return True if a new entry was created, False if an existing
+ one was updated.
"""
attempts = 0
while True:
try:
result = yield self.runInteraction(
desc,
- self._simple_upsert_txn, table, keyvalues, values, insertion_values,
- lock=lock
+ self._simple_upsert_txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values,
+ lock=lock,
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
@@ -537,12 +595,71 @@ class SQLBaseStore(object):
# presumably we raced with another transaction: let's retry.
logger.warn(
- "IntegrityError when upserting into %s; retrying: %s",
- table, e
+ "%s when upserting into %s; retrying: %s", e.__name__, table, e
)
- def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
- lock=True):
+ def _simple_upsert_txn(
+ self,
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values={},
+ lock=True,
+ ):
+ """
+ Pick the UPSERT method which works best on the platform. Either the
+ native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+
+ Args:
+ txn: The transaction to use.
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
+ Returns:
+ None or bool: Native upserts always return None. Emulated
+ upserts return True if a new entry was created, False if an existing
+ one was updated.
+ """
+ if (
+ self.database_engine.can_native_upsert
+ and table not in self._unsafe_to_upsert_tables
+ ):
+ return self._simple_upsert_txn_native_upsert(
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values=insertion_values,
+ )
+ else:
+ return self._simple_upsert_txn_emulated(
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values=insertion_values,
+ lock=lock,
+ )
+
+ def _simple_upsert_txn_emulated(
+ self, txn, table, keyvalues, values, insertion_values={}, lock=True
+ ):
+ """
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
+ Returns:
+ bool: Return True if a new entry was created, False if an existing
+ one was updated.
+ """
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)
@@ -577,12 +694,44 @@ class SQLBaseStore(object):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
- ", ".join("?" for _ in allvalues)
+ ", ".join("?" for _ in allvalues),
)
txn.execute(sql, list(allvalues.values()))
# successfully inserted
return True
+ def _simple_upsert_txn_native_upsert(
+ self, txn, table, keyvalues, values, insertion_values={}
+ ):
+ """
+ Use the native UPSERT functionality in recent PostgreSQL versions.
+
+ Args:
+ table (str): The table to upsert into
+ keyvalues (dict): The unique key tables and their new values
+ values (dict): The nonunique columns and their new values
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ Returns:
+ None
+ """
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+ allvalues.update(insertion_values)
+
+ sql = (
+ "INSERT INTO %s (%s) VALUES (%s) "
+ "ON CONFLICT (%s) DO UPDATE SET %s"
+ ) % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues),
+ ", ".join(k for k in keyvalues),
+ ", ".join(k + "=EXCLUDED." + k for k in values),
+ )
+ txn.execute(sql, list(allvalues.values()))
+
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 5d548f250a..091d7116c5 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -110,8 +110,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
@defer.inlineCallbacks
def _remove_user_ip_dupes(self, progress, batch_size):
+ # This works function works by scanning the user_ips table in batches
+ # based on `last_seen`. For each row in a batch it searches the rest of
+ # the table to see if there are any duplicates, if there are then they
+ # are removed and replaced with a suitable row.
- last_seen_progress = progress.get("last_seen", 0)
+ # Fetch the start of the batch
+ begin_last_seen = progress.get("last_seen", 0)
def get_last_seen(txn):
txn.execute(
@@ -122,29 +127,28 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
LIMIT 1
OFFSET ?
""",
- (last_seen_progress, batch_size)
+ (begin_last_seen, batch_size)
)
- results = txn.fetchone()
- return results
-
- # Get a last seen that's sufficiently far away enough from the last one
- last_seen = yield self.runInteraction(
+ row = txn.fetchone()
+ if row:
+ return row[0]
+ else:
+ return None
+
+ # Get a last seen that has roughly `batch_size` since `begin_last_seen`
+ end_last_seen = yield self.runInteraction(
"user_ips_dups_get_last_seen", get_last_seen
)
- if not last_seen:
- # If we get a None then we're reaching the end and just need to
- # delete the last batch.
- last = True
+ # If it returns None, then we're processing the last batch
+ last = end_last_seen is None
- # We fake not having an upper bound by using a future date, by
- # just multiplying the current time by two....
- last_seen = int(self.clock.time_msec()) * 2
- else:
- last = False
- last_seen = last_seen[0]
+ logger.info(
+ "Scanning for duplicate 'user_ips' rows in range: %s <= last_seen < %s",
+ begin_last_seen, end_last_seen,
+ )
- def remove(txn, last_seen_progress, last_seen):
+ def remove(txn):
# This works by looking at all entries in the given time span, and
# then for each (user_id, access_token, ip) tuple in that range
# checking for any duplicates in the rest of the table (via a join).
@@ -153,6 +157,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
# all other duplicates.
# It is efficient due to the existence of (user_id, access_token,
# ip) and (last_seen) indices.
+
+ # Define the search space, which requires handling the last batch in
+ # a different way
+ if last:
+ clause = "? <= last_seen"
+ args = (begin_last_seen,)
+ else:
+ clause = "? <= last_seen AND last_seen < ?"
+ args = (begin_last_seen, end_last_seen)
+
txn.execute(
"""
SELECT user_id, access_token, ip,
@@ -160,13 +174,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
FROM (
SELECT user_id, access_token, ip
FROM user_ips
- WHERE ? <= last_seen AND last_seen < ?
- ORDER BY last_seen
+ WHERE {}
) c
INNER JOIN user_ips USING (user_id, access_token, ip)
GROUP BY user_id, access_token, ip
- HAVING count(*) > 1""",
- (last_seen_progress, last_seen)
+ HAVING count(*) > 1
+ """.format(clause),
+ args
)
res = txn.fetchall()
@@ -194,12 +208,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
)
self._background_update_progress_txn(
- txn, "user_ips_remove_dupes", {"last_seen": last_seen}
+ txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
)
- yield self.runInteraction(
- "user_ips_dups_remove", remove, last_seen_progress, last_seen
- )
+ yield self.runInteraction("user_ips_dups_remove", remove)
+
if last:
yield self._end_background_update("user_ips_remove_dupes")
@@ -244,7 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
)
def _update_client_ips_batch_txn(self, txn, to_update):
- self.database_engine.lock_table(txn, "user_ips")
+ if "user_ips" in self._unsafe_to_upsert_tables or (
+ not self.database_engine.can_native_upsert
+ ):
+ self.database_engine.lock_table(txn, "user_ips")
for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index e2f9de8451..ff5ef97ca8 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,7 +18,7 @@ import platform
from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
-from .sqlite3 import Sqlite3Engine
+from .sqlite import Sqlite3Engine
SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 42225f8a2a..4004427c7b 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -38,6 +38,13 @@ class PostgresEngine(object):
return sql.replace("?", "%s")
def on_new_connection(self, db_conn):
+
+ # Get the version of PostgreSQL that we're using. As per the psycopg2
+ # docs: The number is formed by converting the major, minor, and
+ # revision numbers into two-decimal-digit numbers and appending them
+ # together. For example, version 8.1.5 will be returned as 80105
+ self._version = db_conn.server_version
+
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
@@ -54,6 +61,13 @@ class PostgresEngine(object):
cursor.close()
+ @property
+ def can_native_upsert(self):
+ """
+ Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+ """
+ return self._version >= 90500
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite.py
index 19949fc474..c64d73ff21 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite.py
@@ -15,6 +15,7 @@
import struct
import threading
+from sqlite3 import sqlite_version_info
from synapse.storage.prepare_database import prepare_database
@@ -30,6 +31,14 @@ class Sqlite3Engine(object):
self._current_state_group_id = None
self._current_state_group_id_lock = threading.Lock()
+ @property
+ def can_native_upsert(self):
+ """
+ Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
+ more work we haven't done yet to tell what was inserted vs updated.
+ """
+ return sqlite_version_info >= (3, 24, 0)
+
def check_database(self, txn):
pass
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 79e0276de6..3e1915fb87 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1268,6 +1268,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
event.internal_metadata.get_dict()
),
"json": encode_json(event_dict(event)),
+ "format_version": event.format_version,
}
for event, _ in events_and_contexts
],
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a8326f5296..599f892858 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -21,10 +21,10 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.api.constants import EventFormatVersions
from synapse.api.errors import NotFoundError
-# these are only included to make the type annotations work
-from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
+# these are only included to make the type annotations work
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -353,6 +353,7 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
+ format_version=row["format_version"],
)
for row in rows
],
@@ -377,6 +378,7 @@ class EventsWorkerStore(SQLBaseStore):
" e.event_id as event_id, "
" e.internal_metadata,"
" e.json,"
+ " e.format_version, "
" r.redacts as redacts,"
" rej.event_id as rejects "
" FROM event_json as e"
@@ -392,7 +394,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
- rejected_reason=None):
+ format_version, rejected_reason=None):
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
@@ -405,8 +407,17 @@ class EventsWorkerStore(SQLBaseStore):
desc="_get_event_from_row_rejected_reason",
)
+ if format_version is None:
+ # This means that we stored the event before we had the concept
+ # of a event format version, so it must be a V1 event.
+ format_version = EventFormatVersions.V1
+
+ # TODO: When we implement new event formats we'll need to use a
+ # different event python type
+ assert format_version == EventFormatVersions.V1
+
original_ev = FrozenEvent(
- d,
+ event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 2743b52bad..134297e284 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -215,7 +215,7 @@ class PusherStore(PusherWorkerStore):
with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so _simple_upsert will retry
- newly_inserted = yield self._simple_upsert(
+ yield self._simple_upsert(
table="pushers",
keyvalues={
"app_id": app_id,
@@ -238,7 +238,12 @@ class PusherStore(PusherWorkerStore):
lock=False,
)
- if newly_inserted:
+ user_has_pusher = self.get_if_user_has_pusher.cache.get(
+ (user_id,), None, update_metrics=False
+ )
+
+ if user_has_pusher is not True:
+ # invalidate, since we the user might not have had a pusher before
yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0707f9a86a..592c1bcd33 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -588,12 +588,12 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
# We update the local_invites table only if the event is "current",
- # i.e., its something that has just happened.
- # The only current event that can also be an outlier is if its an
- # invite that has come in across federation.
+ # i.e., its something that has just happened. If the event is an
+ # outlier it is only current if its an "out of band membership",
+ # like a remote invite or a rejection of a remote invite.
is_new_state = not backfilled and (
not event.internal_metadata.is_outlier()
- or event.internal_metadata.is_invite_from_remote()
+ or event.internal_metadata.is_out_of_band_membership()
)
is_mine = self.hs.is_mine_id(event.state_key)
if is_new_state and is_mine:
diff --git a/synapse/storage/schema/delta/53/event_format_version.sql b/synapse/storage/schema/delta/53/event_format_version.sql
new file mode 100644
index 0000000000..1d977c2834
--- /dev/null
+++ b/synapse/storage/schema/delta/53/event_format_version.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE event_json ADD COLUMN format_version INTEGER;
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a134e9b3e8..c3ab7db7ae 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -437,6 +437,30 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
create_event = yield self.get_event(create_id)
defer.returnValue(create_event.content.get("room_version", "1"))
+ @defer.inlineCallbacks
+ def get_room_predecessor(self, room_id):
+ """Get the predecessor room of an upgraded room if one exists.
+ Otherwise return None.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[unicode|None]: predecessor room id
+ """
+ state_ids = yield self.get_current_state_ids(room_id)
+ create_id = state_ids.get((EventTypes.Create, ""))
+
+ # If we can't find the create event, assume we've hit a dead end
+ if not create_id:
+ defer.returnValue(None)
+
+ # Retrieve the room's create event
+ create_event = yield self.get_event(create_id)
+
+ # Return predecessor if present
+ defer.returnValue(create_event.content.get("predecessor", None))
+
@cached(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
"""Get the current state event ids for a room based on the
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index a8781b0e5d..ce48212265 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -168,14 +168,14 @@ class UserDirectoryStore(SQLBaseStore):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
- if new_entry:
+ if self.database_engine.can_native_upsert:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- )
+ ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
@@ -185,20 +185,45 @@ class UserDirectoryStore(SQLBaseStore):
)
)
else:
- sql = """
- UPDATE user_directory_search
- SET vector = setweight(to_tsvector('english', ?), 'A')
- || setweight(to_tsvector('english', ?), 'D')
- || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
- WHERE user_id = ?
- """
- txn.execute(
- sql,
- (
- get_localpart_from_id(user_id), get_domain_from_id(user_id),
- display_name, user_id,
+ # TODO: Remove this code after we've bumped the minimum version
+ # of postgres to always support upserts, so we can get rid of
+ # `new_entry` usage
+ if new_entry is True:
+ sql = """
+ INSERT INTO user_directory_search(user_id, vector)
+ VALUES (?,
+ setweight(to_tsvector('english', ?), 'A')
+ || setweight(to_tsvector('english', ?), 'D')
+ || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+ )
+ """
+ txn.execute(
+ sql,
+ (
+ user_id, get_localpart_from_id(user_id),
+ get_domain_from_id(user_id), display_name,
+ )
+ )
+ elif new_entry is False:
+ sql = """
+ UPDATE user_directory_search
+ SET vector = setweight(to_tsvector('english', ?), 'A')
+ || setweight(to_tsvector('english', ?), 'D')
+ || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+ WHERE user_id = ?
+ """
+ txn.execute(
+ sql,
+ (
+ get_localpart_from_id(user_id),
+ get_domain_from_id(user_id),
+ display_name, user_id,
+ )
+ )
+ else:
+ raise RuntimeError(
+ "upsert returned None when 'can_native_upsert' is False"
)
- )
elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name,) if display_name else user_id
self._simple_upsert_txn(
|