diff --git a/synapse/__init__.py b/synapse/__init__.py
index 56df3f5ac6..e908b90177 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
except ImportError:
pass
-__version__ = "1.4.0"
+__version__ = "1.4.1rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 774326dff9..eb54f56853 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -605,13 +605,13 @@ def run(hs):
@defer.inlineCallbacks
def generate_monthly_active_users():
current_mau_count = 0
- reserved_count = 0
+ reserved_users = ()
store = hs.get_datastore()
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
current_mau_count = yield store.get_monthly_active_count()
- reserved_count = yield store.get_registered_reserved_users_count()
+ reserved_users = yield store.get_registered_reserved_users()
current_mau_gauge.set(float(current_mau_count))
- registered_reserved_users_mau_gauge.set(float(reserved_count))
+ registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
max_mau_gauge.set(float(hs.config.max_mau_value))
def start_generate_monthly_active_users():
diff --git a/synapse/config/cas.py b/synapse/config/cas.py
index b916c3aa66..4526c1a67b 100644
--- a/synapse/config/cas.py
+++ b/synapse/config/cas.py
@@ -30,11 +30,13 @@ class CasConfig(Config):
self.cas_enabled = cas_config.get("enabled", True)
self.cas_server_url = cas_config["server_url"]
self.cas_service_url = cas_config["service_url"]
+ self.cas_displayname_attribute = cas_config.get("displayname_attribute")
self.cas_required_attributes = cas_config.get("required_attributes", {})
else:
self.cas_enabled = False
self.cas_server_url = None
self.cas_service_url = None
+ self.cas_displayname_attribute = None
self.cas_required_attributes = {}
def generate_config_section(self, config_dir_path, server_name, **kwargs):
@@ -45,6 +47,7 @@ class CasConfig(Config):
# enabled: true
# server_url: "https://cas-server.com"
# service_url: "https://homeserver.domain.com:8448"
+ # #displayname_attribute: name
# #required_attributes:
# # name: value
"""
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 056fb97acb..0a84d0e2b0 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -248,16 +248,10 @@ class E2eKeysHandler(object):
results = yield self.store.get_e2e_device_keys(local_query)
- # Build the result structure, un-jsonify the results, and add the
- # "unsigned" section
+ # Build the result structure
for user_id, device_keys in results.items():
for device_id, device_info in device_keys.items():
- r = dict(device_info["keys"])
- r["unsigned"] = {}
- display_name = device_info["device_display_name"]
- if display_name is not None:
- r["unsigned"]["device_display_name"] = display_name
- result_dict[user_id][device_id] = r
+ result_dict[user_id][device_id] = device_info
log_kv(results)
return result_dict
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index a9d80f708c..0cea445f0d 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -352,8 +352,8 @@ class E2eRoomKeysHandler(object):
A deferred of an empty dict.
"""
if "version" not in version_info:
- raise SynapseError(400, "Missing version in body", Codes.MISSING_PARAM)
- if version_info["version"] != version:
+ version_info["version"] = version
+ elif version_info["version"] != version:
raise SynapseError(
400, "Version in body does not match", Codes.INVALID_PARAM
)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index cb9158fe1b..2ccb210fd6 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -388,7 +388,7 @@ class DirectServeResource(resource.Resource):
if not callback:
return super().render(request)
- resp = callback(request)
+ resp = trace_servlet(self.__class__.__name__)(callback)(request)
# If it's a coroutine, turn it into a Deferred
if isinstance(resp, types.CoroutineType):
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index cd1ff6a518..0638cec429 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -169,6 +169,7 @@ import contextlib
import inspect
import logging
import re
+import types
from functools import wraps
from typing import Dict
@@ -778,8 +779,7 @@ def trace_servlet(servlet_name, extract_context=False):
return func
@wraps(func)
- @defer.inlineCallbacks
- def _trace_servlet_inner(request, *args, **kwargs):
+ async def _trace_servlet_inner(request, *args, **kwargs):
request_tags = {
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
@@ -796,8 +796,14 @@ def trace_servlet(servlet_name, extract_context=False):
scope = start_active_span(servlet_name, tags=request_tags)
with scope:
- result = yield defer.maybeDeferred(func, request, *args, **kwargs)
- return result
+ result = func(request, *args, **kwargs)
+
+ if not isinstance(result, (types.CoroutineType, defer.Deferred)):
+ # Some servlets aren't async and just return results
+ # directly, so we handle that here.
+ return result
+
+ return await result
return _trace_servlet_inner
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 9cddbc752a..8414af08cb 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -377,6 +377,7 @@ class CasTicketServlet(RestServlet):
super(CasTicketServlet, self).__init__()
self.cas_server_url = hs.config.cas_server_url
self.cas_service_url = hs.config.cas_service_url
+ self.cas_displayname_attribute = hs.config.cas_displayname_attribute
self.cas_required_attributes = hs.config.cas_required_attributes
self._sso_auth_handler = SSOAuthHandler(hs)
self._http_client = hs.get_simple_http_client()
@@ -400,6 +401,7 @@ class CasTicketServlet(RestServlet):
def handle_cas_response(self, request, cas_response_body, client_redirect_url):
user, attributes = self.parse_cas_response(cas_response_body)
+ displayname = attributes.pop(self.cas_displayname_attribute, None)
for required_attribute, required_value in self.cas_required_attributes.items():
# If required attribute was not in CAS Response - Forbidden
@@ -414,7 +416,7 @@ class CasTicketServlet(RestServlet):
raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
return self._sso_auth_handler.on_successful_auth(
- user, request, client_redirect_url
+ user, request, client_redirect_url, displayname
)
def parse_cas_response(self, cas_response_body):
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
index df4f44cd36..d596786430 100644
--- a/synapse/rest/client/v2_alpha/room_keys.py
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -375,7 +375,7 @@ class RoomKeysVersionServlet(RestServlet):
"ed25519:something": "hijklmnop"
}
},
- "version": "42"
+ "version": "12345"
}
HTTP/1.1 200 OK
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 7a56cd4b6c..0c68c3aad5 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -270,7 +270,7 @@ class PreviewUrlResource(DirectServeResource):
logger.debug("Calculated OG for %s as %s" % (url, og))
- jsonog = json.dumps(og).encode("utf8")
+ jsonog = json.dumps(og)
# store OG in history-aware DB cache
yield self.store.store_url_cache(
@@ -283,7 +283,7 @@ class PreviewUrlResource(DirectServeResource):
media_info["created_ts"],
)
- return jsonog
+ return jsonog.encode("utf8")
@defer.inlineCallbacks
def _download_url(self, url, user):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 06cc14fcd1..f5906fcd54 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -20,6 +20,7 @@ import random
import sys
import threading
import time
+from typing import Iterable, Tuple
from six import PY2, iteritems, iterkeys, itervalues
from six.moves import builtins, intern, range
@@ -1163,19 +1164,18 @@ class SQLBaseStore(object):
if not iterable:
return []
- sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
-
- clauses = []
- values = []
- clauses.append("%s IN (%s)" % (column, ",".join("?" for _ in iterable)))
- values.extend(iterable)
+ clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
+ clauses = [clause]
for key, value in iteritems(keyvalues):
clauses.append("%s = ?" % (key,))
values.append(value)
- if clauses:
- sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
+ sql = "SELECT %s FROM %s WHERE %s" % (
+ ", ".join(retcols),
+ table,
+ " AND ".join(clauses),
+ )
txn.execute(sql, values)
return cls.cursor_to_dict(txn)
@@ -1324,10 +1324,8 @@ class SQLBaseStore(object):
sql = "DELETE FROM %s" % table
- clauses = []
- values = []
- clauses.append("%s IN (%s)" % (column, ",".join("?" for _ in iterable)))
- values.extend(iterable)
+ clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
+ clauses = [clause]
for key, value in iteritems(keyvalues):
clauses.append("%s = ?" % (key,))
@@ -1694,3 +1692,30 @@ def db_to_json(db_content):
except Exception:
logging.warning("Tried to decode '%r' as JSON and failed", db_content)
raise
+
+
+def make_in_list_sql_clause(
+ database_engine, column: str, iterable: Iterable
+) -> Tuple[str, Iterable]:
+ """Returns an SQL clause that checks the given column is in the iterable.
+
+ On SQLite this expands to `column IN (?, ?, ...)`, whereas on Postgres
+ it expands to `column = ANY(?)`. While both DBs support the `IN` form,
+ using the `ANY` form on postgres means that it views queries with
+ different length iterables as the same, helping the query stats.
+
+ Args:
+ database_engine
+ column: Name of the column
+ iterable: The values to check the column against.
+
+ Returns:
+ A tuple of SQL query and the args
+ """
+
+ if database_engine.supports_using_any_list:
+ # This should hopefully be faster, but also makes postgres query
+ # stats easier to understand.
+ return "%s = ANY(?)" % (column,), [list(iterable)]
+ else:
+ return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 70bc2bb2cc..f04aad0743 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -20,7 +20,7 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
@@ -378,15 +378,15 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
else:
if not devices:
continue
- sql = (
- "SELECT device_id FROM devices"
- " WHERE user_id = ? AND device_id IN ("
- + ",".join("?" * len(devices))
- + ")"
+
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "device_id", devices
)
+ sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause
+
# TODO: Maybe this needs to be done in batches if there are
# too many local devices for a given user.
- txn.execute(sql, [user_id] + devices)
+ txn.execute(sql, [user_id] + list(args))
for row in txn:
# Only insert into the local inbox if the device exists on
# this server
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 111bfb3d64..ac5239e50a 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -28,7 +28,12 @@ from synapse.logging.opentracing import (
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import Cache, SQLBaseStore, db_to_json
+from synapse.storage._base import (
+ Cache,
+ SQLBaseStore,
+ db_to_json,
+ make_in_list_sql_clause,
+)
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util import batch_iter
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
@@ -448,11 +453,14 @@ class DeviceWorkerStore(SQLBaseStore):
sql = """
SELECT DISTINCT user_id FROM device_lists_stream
WHERE stream_id > ?
- AND user_id IN (%s)
+ AND
"""
for chunk in batch_iter(to_check, 100):
- txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "user_id", chunk
+ )
+ txn.execute(sql + clause, (from_key,) + tuple(args))
changes.update(user_id for user_id, in txn)
return changes
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 33e3a84933..872bc75490 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -40,7 +40,8 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
This option only takes effect if include_all_devices is true.
Returns:
Dict mapping from user-id to dict mapping from device_id to
- dict containing "key_json", "device_display_name".
+ key data. The key data will be a dict in the same format as the
+ DeviceKeys type returned by POST /_matrix/client/r0/keys/query.
"""
set_tag("query_list", query_list)
if not query_list:
@@ -54,11 +55,20 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
include_deleted_devices,
)
+ # Build the result structure, un-jsonify the results, and add the
+ # "unsigned" section
+ rv = {}
for user_id, device_keys in iteritems(results):
+ rv[user_id] = {}
for device_id, device_info in iteritems(device_keys):
- device_info["keys"] = db_to_json(device_info.pop("key_json"))
-
- return results
+ r = db_to_json(device_info.pop("key_json"))
+ r["unsigned"] = {}
+ display_name = device_info["device_display_name"]
+ if display_name is not None:
+ r["unsigned"]["device_display_name"] = display_name
+ rv[user_id][device_id] = r
+
+ return rv
@trace
def _get_e2e_device_keys_txn(
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 601617b21e..b7c4eda338 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -22,6 +22,13 @@ class PostgresEngine(object):
def __init__(self, database_module, database_config):
self.module = database_module
self.module.extensions.register_type(self.module.extensions.UNICODE)
+
+ # Disables passing `bytes` to txn.execute, c.f. #6186. If you do
+ # actually want to use bytes than wrap it in `bytearray`.
+ def _disable_bytes_adapter(_):
+ raise Exception("Passing bytes to DB is disabled.")
+
+ self.module.extensions.register_adapter(bytes, _disable_bytes_adapter)
self.synchronous_commit = database_config.get("synchronous_commit", True)
self._version = None # unknown as yet
@@ -79,6 +86,12 @@ class PostgresEngine(object):
"""
return True
+ @property
+ def supports_using_any_list(self):
+ """Do we support using `a = ANY(?)` and passing a list
+ """
+ return True
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index ac92109366..ddad17dc5a 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -46,6 +46,12 @@ class Sqlite3Engine(object):
"""
return self.module.sqlite_version_info >= (3, 15, 0)
+ @property
+ def supports_using_any_list(self):
+ """Do we support using `a = ANY(?)` and passing a list
+ """
+ return False
+
def check_database(self, txn):
pass
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index f5e8c39262..47cc10d32a 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.util.caches.descriptors import cached
@@ -68,7 +68,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
else:
results = set()
- base_sql = "SELECT auth_id FROM event_auth WHERE event_id IN (%s)"
+ base_sql = "SELECT auth_id FROM event_auth WHERE "
front = set(event_ids)
while front:
@@ -76,7 +76,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
front_list = list(front)
chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)]
for chunk in chunks:
- txn.execute(base_sql % (",".join(["?"] * len(chunk)),), chunk)
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "event_id", chunk
+ )
+ txn.execute(base_sql + clause, list(args))
new_front.update([r[0] for r in txn])
new_front -= results
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bb6ff0595a..ee49ef235d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -39,6 +39,7 @@ from synapse.logging.utils import log_function
from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
+from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
@@ -641,14 +642,16 @@ class EventsStore(
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
- prev_event_id IN (%s)
- AND NOT events.outlier
+ NOT events.outlier
AND rejections.event_id IS NULL
- """ % (
- ",".join("?" for _ in batch),
+ AND
+ """
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "prev_event_id", batch
)
- txn.execute(sql, batch)
+ txn.execute(sql + clause, args)
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
@@ -695,13 +698,15 @@ class EventsStore(
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
- event_id IN (%s)
- AND NOT events.outlier
- """ % (
- ",".join("?" for _ in to_recursively_check),
+ NOT events.outlier
+ AND
+ """
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "event_id", to_recursively_check
)
- txn.execute(sql, to_recursively_check)
+ txn.execute(sql + clause, args)
to_recursively_check = []
for event_id, prev_event_id, metadata, rejected in txn:
@@ -1543,10 +1548,14 @@ class EventsStore(
" FROM events as e"
" LEFT JOIN rejections as rej USING (event_id)"
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
- " WHERE e.event_id IN (%s)"
- ) % (",".join(["?"] * len(ev_map)),)
+ " WHERE "
+ )
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "e.event_id", list(ev_map)
+ )
- txn.execute(sql, list(ev_map))
+ txn.execute(sql + clause, args)
rows = self.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
@@ -2249,11 +2258,12 @@ class EventsStore(
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
- WHERE state_group IN (%s) AND ep.event_id IS NULL
- """ % (
- ",".join("?" for _ in current_search),
+ WHERE ep.event_id IS NULL AND
+ """
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "state_group", current_search
)
- txn.execute(sql, list(current_search))
+ txn.execute(sql + clause, list(args))
referenced = set(sg for sg, in txn)
referenced_groups |= referenced
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 5717baf48c..31ea6f917f 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -21,6 +21,7 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
@@ -71,6 +72,19 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
"redactions_received_ts", self._redactions_received_ts
)
+ # This index gets deleted in `event_fix_redactions_bytes` update
+ self.register_background_index_update(
+ "event_fix_redactions_bytes_create_index",
+ index_name="redactions_censored_redacts",
+ table="redactions",
+ columns=["redacts"],
+ where_clause="have_censored",
+ )
+
+ self.register_background_update_handler(
+ "event_fix_redactions_bytes", self._event_fix_redactions_bytes
+ )
+
@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
@@ -312,12 +326,13 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
INNER JOIN event_json USING (event_id)
LEFT JOIN rejections USING (event_id)
WHERE
- prev_event_id IN (%s)
- AND NOT events.outlier
- """ % (
- ",".join("?" for _ in to_check),
+ NOT events.outlier
+ AND
+ """
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "prev_event_id", to_check
)
- txn.execute(sql, to_check)
+ txn.execute(sql + clause, list(args))
for prev_event_id, event_id, metadata, rejected in txn:
if event_id in graph:
@@ -458,3 +473,33 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
yield self._end_background_update("redactions_received_ts")
return count
+
+ @defer.inlineCallbacks
+ def _event_fix_redactions_bytes(self, progress, batch_size):
+ """Undoes hex encoded censored redacted event JSON.
+ """
+
+ def _event_fix_redactions_bytes_txn(txn):
+ # This update is quite fast due to new index.
+ txn.execute(
+ """
+ UPDATE event_json
+ SET
+ json = convert_from(json::bytea, 'utf8')
+ FROM redactions
+ WHERE
+ redactions.have_censored
+ AND event_json.event_id = redactions.redacts
+ AND json NOT LIKE '{%';
+ """
+ )
+
+ txn.execute("DROP INDEX redactions_censored_redacts")
+
+ yield self.runInteraction(
+ "_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
+ )
+
+ yield self._end_background_update("event_fix_redactions_bytes")
+
+ return 1
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 57ce0304e9..4c4b76bd93 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -31,12 +31,11 @@ from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.types import get_domain_from_id
from synapse.util import batch_iter
from synapse.util.metrics import Measure
-from ._base import SQLBaseStore
-
logger = logging.getLogger(__name__)
@@ -623,10 +622,14 @@ class EventsWorkerStore(SQLBaseStore):
" rej.reason "
" FROM event_json as e"
" LEFT JOIN rejections as rej USING (event_id)"
- " WHERE e.event_id IN (%s)"
- ) % (",".join(["?"] * len(evs)),)
+ " WHERE "
+ )
- txn.execute(sql, evs)
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "e.event_id", evs
+ )
+
+ txn.execute(sql + clause, args)
for row in txn:
event_id = row[0]
@@ -640,11 +643,11 @@ class EventsWorkerStore(SQLBaseStore):
}
# check for redactions
- redactions_sql = (
- "SELECT event_id, redacts FROM redactions WHERE redacts IN (%s)"
- ) % (",".join(["?"] * len(evs)),)
+ redactions_sql = "SELECT event_id, redacts FROM redactions WHERE "
+
+ clause, args = make_in_list_sql_clause(txn.database_engine, "redacts", evs)
- txn.execute(redactions_sql, evs)
+ txn.execute(redactions_sql + clause, args)
for (redacter, redacted) in txn:
d = event_dict.get(redacted)
@@ -753,10 +756,11 @@ class EventsWorkerStore(SQLBaseStore):
results = set()
def have_seen_events_txn(txn, chunk):
- sql = "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" % (
- ",".join("?" * len(chunk)),
+ sql = "SELECT event_id FROM events as e WHERE "
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "e.event_id", chunk
)
- txn.execute(sql, chunk)
+ txn.execute(sql + clause, args)
for (event_id,) in txn:
results.add(event_id)
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 23b48f6cea..7c2a7da836 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -51,7 +51,7 @@ class FilteringStore(SQLBaseStore):
"SELECT filter_id FROM user_filters "
"WHERE user_id = ? AND filter_json = ?"
)
- txn.execute(sql, (user_localpart, def_json))
+ txn.execute(sql, (user_localpart, bytearray(def_json)))
filter_id_response = txn.fetchone()
if filter_id_response is not None:
return filter_id_response[0]
@@ -68,7 +68,7 @@ class FilteringStore(SQLBaseStore):
"INSERT INTO user_filters (user_id, filter_id, filter_json)"
"VALUES(?, ?, ?)"
)
- txn.execute(sql, (user_localpart, filter_id, def_json))
+ txn.execute(sql, (user_localpart, filter_id, bytearray(def_json)))
return filter_id
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 752e9788a2..3803604be7 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -32,7 +32,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
super(MonthlyActiveUsersStore, self).__init__(None, hs)
self._clock = hs.get_clock()
self.hs = hs
- self.reserved_users = ()
# Do not add more reserved users than the total allowable number
self._new_transaction(
dbconn,
@@ -51,7 +50,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
txn (cursor):
threepids (list[dict]): List of threepid dicts to reserve
"""
- reserved_user_list = []
for tp in threepids:
user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
@@ -60,10 +58,8 @@ class MonthlyActiveUsersStore(SQLBaseStore):
is_support = self.is_support_user_txn(txn, user_id)
if not is_support:
self.upsert_monthly_active_user_txn(txn, user_id)
- reserved_user_list.append(user_id)
else:
logger.warning("mau limit reserved threepid %s not found in db" % tp)
- self.reserved_users = tuple(reserved_user_list)
@defer.inlineCallbacks
def reap_monthly_active_users(self):
@@ -74,8 +70,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[]
"""
- def _reap_users(txn):
- # Purge stale users
+ def _reap_users(txn, reserved_users):
+ """
+ Args:
+ reserved_users (tuple): reserved users to preserve
+ """
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
query_args = [thirty_days_ago]
@@ -83,20 +82,19 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
# when len(reserved_users) == 0. Works fine on sqlite.
- if len(self.reserved_users) > 0:
+ if len(reserved_users) > 0:
# questionmarks is a hack to overcome sqlite not supporting
# tuples in 'WHERE IN %s'
- questionmarks = "?" * len(self.reserved_users)
+ question_marks = ",".join("?" * len(reserved_users))
- query_args.extend(self.reserved_users)
- sql = base_sql + """ AND user_id NOT IN ({})""".format(
- ",".join(questionmarks)
- )
+ query_args.extend(reserved_users)
+ sql = base_sql + " AND user_id NOT IN ({})".format(question_marks)
else:
sql = base_sql
txn.execute(sql, query_args)
+ max_mau_value = self.hs.config.max_mau_value
if self.hs.config.limit_usage_by_mau:
# If MAU user count still exceeds the MAU threshold, then delete on
# a least recently active basis.
@@ -106,31 +104,52 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# While Postgres does not require 'LIMIT', but also does not support
# negative LIMIT values. So there is no way to write it that both can
# support
- safe_guard = self.hs.config.max_mau_value - len(self.reserved_users)
- # Must be greater than zero for postgres
- safe_guard = safe_guard if safe_guard > 0 else 0
- query_args = [safe_guard]
-
- base_sql = """
- DELETE FROM monthly_active_users
- WHERE user_id NOT IN (
- SELECT user_id FROM monthly_active_users
- ORDER BY timestamp DESC
- LIMIT ?
+ if len(reserved_users) == 0:
+ sql = """
+ DELETE FROM monthly_active_users
+ WHERE user_id NOT IN (
+ SELECT user_id FROM monthly_active_users
+ ORDER BY timestamp DESC
+ LIMIT ?
)
- """
+ """
+ txn.execute(sql, (max_mau_value,))
# Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
# when len(reserved_users) == 0. Works fine on sqlite.
- if len(self.reserved_users) > 0:
- query_args.extend(self.reserved_users)
- sql = base_sql + """ AND user_id NOT IN ({})""".format(
- ",".join(questionmarks)
- )
else:
- sql = base_sql
- txn.execute(sql, query_args)
+ # Must be >= 0 for postgres
+ num_of_non_reserved_users_to_remove = max(
+ max_mau_value - len(reserved_users), 0
+ )
+
+ # It is important to filter reserved users twice to guard
+ # against the case where the reserved user is present in the
+ # SELECT, meaning that a legitmate mau is deleted.
+ sql = """
+ DELETE FROM monthly_active_users
+ WHERE user_id NOT IN (
+ SELECT user_id FROM monthly_active_users
+ WHERE user_id NOT IN ({})
+ ORDER BY timestamp DESC
+ LIMIT ?
+ )
+ AND user_id NOT IN ({})
+ """.format(
+ question_marks, question_marks
+ )
+
+ query_args = [
+ *reserved_users,
+ num_of_non_reserved_users_to_remove,
+ *reserved_users,
+ ]
- yield self.runInteraction("reap_monthly_active_users", _reap_users)
+ txn.execute(sql, query_args)
+
+ reserved_users = yield self.get_registered_reserved_users()
+ yield self.runInteraction(
+ "reap_monthly_active_users", _reap_users, reserved_users
+ )
# It seems poor to invalidate the whole cache, Postgres supports
# 'Returning' which would allow me to invalidate only the
# specific users, but sqlite has no way to do this and instead
@@ -159,21 +178,25 @@ class MonthlyActiveUsersStore(SQLBaseStore):
return self.runInteraction("count_users", _count_users)
@defer.inlineCallbacks
- def get_registered_reserved_users_count(self):
- """Of the reserved threepids defined in config, how many are associated
+ def get_registered_reserved_users(self):
+ """Of the reserved threepids defined in config, which are associated
with registered users?
Returns:
- Defered[int]: Number of real reserved users
+ Defered[list]: Real reserved users
"""
- count = 0
- for tp in self.hs.config.mau_limits_reserved_threepids:
+ users = []
+
+ for tp in self.hs.config.mau_limits_reserved_threepids[
+ : self.hs.config.max_mau_value
+ ]:
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
tp["medium"], tp["address"]
)
if user_id:
- count = count + 1
- return count
+ users.append(user_id)
+
+ return users
@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 5db6f2d84a..3a641f538b 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -18,11 +18,10 @@ from collections import namedtuple
from twisted.internet import defer
from synapse.api.constants import PresenceState
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.util import batch_iter
from synapse.util.caches.descriptors import cached, cachedList
-from ._base import SQLBaseStore
-
class UserPresenceState(
namedtuple(
@@ -119,14 +118,13 @@ class PresenceStore(SQLBaseStore):
)
# Delete old rows to stop database from getting really big
- sql = (
- "DELETE FROM presence_stream WHERE" " stream_id < ?" " AND user_id IN (%s)"
- )
+ sql = "DELETE FROM presence_stream WHERE stream_id < ? AND "
for states in batch_iter(presence_states, 50):
- args = [stream_id]
- args.extend(s.user_id for s in states)
- txn.execute(sql % (",".join("?" for _ in states),), args)
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "user_id", [s.user_id for s in states]
+ )
+ txn.execute(sql + clause, [stream_id] + list(args))
def get_all_presence_updates(self, last_id, current_id):
if last_id == current_id:
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 3e0e834a62..b12e80440a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -241,7 +241,7 @@ class PusherStore(PusherWorkerStore):
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
- "data": encode_canonical_json(data),
+ "data": bytearray(encode_canonical_json(data)),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 290ddb30e8..0c24430f28 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -21,12 +21,11 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import SQLBaseStore
-from .util.id_generators import StreamIdGenerator
-
logger = logging.getLogger(__name__)
@@ -217,24 +216,26 @@ class ReceiptsWorkerStore(SQLBaseStore):
def f(txn):
if from_key:
- sql = (
- "SELECT * FROM receipts_linearized WHERE"
- " room_id IN (%s) AND stream_id > ? AND stream_id <= ?"
- ) % (",".join(["?"] * len(room_ids)))
- args = list(room_ids)
- args.extend([from_key, to_key])
+ sql = """
+ SELECT * FROM receipts_linearized WHERE
+ stream_id > ? AND stream_id <= ? AND
+ """
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", room_ids
+ )
- txn.execute(sql, args)
+ txn.execute(sql + clause, [from_key, to_key] + list(args))
else:
- sql = (
- "SELECT * FROM receipts_linearized WHERE"
- " room_id IN (%s) AND stream_id <= ?"
- ) % (",".join(["?"] * len(room_ids)))
+ sql = """
+ SELECT * FROM receipts_linearized WHERE
+ stream_id <= ? AND
+ """
- args = list(room_ids)
- args.append(to_key)
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", room_ids
+ )
- txn.execute(sql, args)
+ txn.execute(sql + clause, [to_key] + list(args))
return self.cursor_to_dict(txn)
@@ -433,13 +434,19 @@ class ReceiptsStore(ReceiptsWorkerStore):
# we need to points in graph -> linearized form.
# TODO: Make this better.
def graph_to_linear(txn):
- query = (
- "SELECT event_id WHERE room_id = ? AND stream_ordering IN ("
- " SELECT max(stream_ordering) WHERE event_id IN (%s)"
- ")"
- ) % (",".join(["?"] * len(event_ids)))
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "event_id", event_ids
+ )
+
+ sql = """
+ SELECT event_id WHERE room_id = ? AND stream_ordering IN (
+ SELECT max(stream_ordering) WHERE %s
+ )
+ """ % (
+ clause,
+ )
- txn.execute(query, [room_id] + event_ids)
+ txn.execute(sql, [room_id] + list(args))
rows = txn.fetchall()
if rows:
return rows[0][0]
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4e606a8380..ff63487823 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import LoggingTransaction
+from synapse.storage._base import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.events_worker import EventsWorkerStore
@@ -372,6 +372,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
results = []
if membership_list:
if self._current_state_events_membership_up_to_date:
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "c.membership", membership_list
+ )
sql = """
SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering
FROM current_state_events AS c
@@ -379,11 +382,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
WHERE
c.type = 'm.room.member'
AND state_key = ?
- AND c.membership IN (%s)
+ AND %s
""" % (
- ",".join("?" * len(membership_list))
+ clause,
)
else:
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "m.membership", membership_list
+ )
sql = """
SELECT room_id, e.sender, m.membership, event_id, e.stream_ordering
FROM current_state_events AS c
@@ -392,12 +398,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
WHERE
c.type = 'm.room.member'
AND state_key = ?
- AND m.membership IN (%s)
+ AND %s
""" % (
- ",".join("?" * len(membership_list))
+ clause,
)
- txn.execute(sql, (user_id, *membership_list))
+ txn.execute(sql, (user_id, *args))
results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)]
if do_invite:
diff --git a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
index f7bcc5e2f2..67471f3ef5 100644
--- a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
+++ b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
@@ -15,12 +15,11 @@
-- There was a bug where we may have updated censored redactions as bytes,
--- which can (somehow) cause json to be inserted hex encoded. This goes and
--- undoes any such hex encoded JSON.
-UPDATE event_json SET json = convert_from(json::bytea, 'utf8')
-WHERE event_id IN (
- SELECT event_json.event_id
- FROM event_json
- INNER JOIN redactions ON (event_json.event_id = redacts)
- WHERE have_censored AND json NOT LIKE '{%'
-);
+-- which can (somehow) cause json to be inserted hex encoded. These updates go
+-- and undoes any such hex encoded JSON.
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('event_fix_redactions_bytes_create_index', '{}');
+
+INSERT into background_updates (update_name, progress_json, depends_on)
+ VALUES ('event_fix_redactions_bytes', '{}', 'event_fix_redactions_bytes_create_index');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 6ba4190f1a..7695bf09fc 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -24,6 +24,7 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from .background_updates import BackgroundUpdateStore
@@ -385,8 +386,10 @@ class SearchStore(SearchBackgroundUpdateStore):
# Make sure we don't explode because the person is in too many rooms.
# We filter the results below regardless.
if len(room_ids) < 500:
- clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
- args.extend(room_ids)
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", room_ids
+ )
+ clauses = [clause]
local_clauses = []
for key in keys:
@@ -492,8 +495,10 @@ class SearchStore(SearchBackgroundUpdateStore):
# Make sure we don't explode because the person is in too many rooms.
# We filter the results below regardless.
if len(room_ids) < 500:
- clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
- args.extend(room_ids)
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", room_ids
+ )
+ clauses = [clause]
local_clauses = []
for key in keys:
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
index 05cabc2282..aa4f0da5f0 100644
--- a/synapse/storage/user_erasure_store.py
+++ b/synapse/storage/user_erasure_store.py
@@ -56,15 +56,15 @@ class UserErasureWorkerStore(SQLBaseStore):
# iterate it multiple times, and (b) avoiding duplicates.
user_ids = tuple(set(user_ids))
- def _get_erased_users(txn):
- txn.execute(
- "SELECT user_id FROM erased_users WHERE user_id IN (%s)"
- % (",".join("?" * len(user_ids))),
- user_ids,
- )
- return set(r[0] for r in txn)
-
- erased_users = yield self.runInteraction("are_users_erased", _get_erased_users)
+ rows = yield self._simple_select_many_batch(
+ table="erased_users",
+ column="user_id",
+ iterable=user_ids,
+ retcols=("user_id",),
+ desc="are_users_erased",
+ )
+ erased_users = set(row["user_id"] for row in rows)
+
res = dict((u, u in erased_users) for u in user_ids)
return res
|