diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71316f7d09..6b0ca80087 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,23 +279,35 @@ class DataStore(
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
+ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+ return self.runInteraction("count_daily_users", self._count_users, yesterday)
- def _count_users(txn):
- yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
- sql = """
- SELECT COALESCE(count(*), 0) FROM (
- SELECT user_id FROM user_ips
- WHERE last_seen > ?
- GROUP BY user_id
- ) u
- """
-
- txn.execute(sql, (yesterday,))
- count, = txn.fetchone()
- return count
+ def count_monthly_users(self):
+ """
+ Counts the number of users who used this homeserver in the last 30 days.
+ Note this method is intended for phonehome metrics only and is different
+ from the mau figure in synapse.storage.monthly_active_users which,
+ amongst other things, includes a 3 day grace period before a user counts.
+ """
+ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+ return self.runInteraction(
+ "count_monthly_users", self._count_users, thirty_days_ago
+ )
- return self.runInteraction("count_users", _count_users)
+ def _count_users(self, txn, time_from):
+ """
+ Returns number of users seen in the past time_from period
+ """
+ sql = """
+ SELECT COALESCE(count(*), 0) FROM (
+ SELECT user_id FROM user_ips
+ WHERE last_seen > ?
+ GROUP BY user_id
+ ) u
+ """
+ txn.execute(sql, (time_from,))
+ count, = txn.fetchone()
+ return count
def count_r30_users(self):
"""
@@ -347,7 +359,7 @@ class DataStore(
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
for row in txn:
- if row[0] == 'unknown':
+ if row[0] == "unknown":
pass
results[row[0]] = row[1]
@@ -374,7 +386,7 @@ class DataStore(
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
count, = txn.fetchone()
- results['all'] = count
+ results["all"] = count
return results
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ae891aa332..29589853c6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -38,6 +38,14 @@ from synapse.util.caches.descriptors import Cache
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode
+# import a function which will return a monotonic time, in seconds
+try:
+ # on python 3, use time.monotonic, since time.clock can go backwards
+ from time import monotonic as monotonic_time
+except ImportError:
+ # ... but python 2 doesn't have it
+ from time import clock as monotonic_time
+
logger = logging.getLogger(__name__)
try:
@@ -167,22 +175,22 @@ class PerformanceCounters(object):
self.current_counters = {}
self.previous_counters = {}
- def update(self, key, start_time, end_time=None):
- if end_time is None:
- end_time = time.time()
- duration = end_time - start_time
+ def update(self, key, duration_secs):
count, cum_time = self.current_counters.get(key, (0, 0))
count += 1
- cum_time += duration
+ cum_time += duration_secs
self.current_counters[key] = (count, cum_time)
- return end_time
- def interval(self, interval_duration, limit=3):
+ def interval(self, interval_duration_secs, limit=3):
counters = []
for name, (count, cum_time) in iteritems(self.current_counters):
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append(
- ((cum_time - prev_time) / interval_duration, count - prev_count, name)
+ (
+ (cum_time - prev_time) / interval_duration_secs,
+ count - prev_count,
+ name,
+ )
)
self.previous_counters = dict(self.current_counters)
@@ -213,7 +221,6 @@ class SQLBaseStore(object):
# is running in mainline, and we have some nice monitoring frontends
# to watch it
self._txn_perf_counters = PerformanceCounters()
- self._get_event_counters = PerformanceCounters()
self._get_event_cache = Cache(
"*getEvent*", keylen=3, max_entries=hs.config.event_cache_size
@@ -299,12 +306,12 @@ class SQLBaseStore(object):
def select_users_with_no_expiration_date_txn(txn):
"""Retrieves the list of registered users with no expiration date from the
- database.
+ database, filtering out deactivated users.
"""
sql = (
"SELECT users.name FROM users"
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
- " WHERE account_validity.user_id is NULL;"
+ " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
)
txn.execute(sql, [])
@@ -312,9 +319,7 @@ class SQLBaseStore(object):
if res:
for user in res:
self.set_expiration_date_for_user_txn(
- txn,
- user["name"],
- use_delta=True,
+ txn, user["name"], use_delta=True
)
yield self.runInteraction(
@@ -352,32 +357,24 @@ class SQLBaseStore(object):
)
def start_profiling(self):
- self._previous_loop_ts = self._clock.time_msec()
+ self._previous_loop_ts = monotonic_time()
def loop():
curr = self._current_txn_total_time
prev = self._previous_txn_total_time
self._previous_txn_total_time = curr
- time_now = self._clock.time_msec()
+ time_now = monotonic_time()
time_then = self._previous_loop_ts
self._previous_loop_ts = time_now
- ratio = (curr - prev) / (time_now - time_then)
+ duration = time_now - time_then
+ ratio = (curr - prev) / duration
- top_three_counters = self._txn_perf_counters.interval(
- time_now - time_then, limit=3
- )
-
- top_3_event_counters = self._get_event_counters.interval(
- time_now - time_then, limit=3
- )
+ top_three_counters = self._txn_perf_counters.interval(duration, limit=3)
perf_logger.info(
- "Total database time: %.3f%% {%s} {%s}",
- ratio * 100,
- top_three_counters,
- top_3_event_counters,
+ "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters
)
self._clock.looping_call(loop, 10000)
@@ -385,7 +382,7 @@ class SQLBaseStore(object):
def _new_transaction(
self, conn, desc, after_callbacks, exception_callbacks, func, *args, **kwargs
):
- start = time.time()
+ start = monotonic_time()
txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
@@ -451,7 +448,7 @@ class SQLBaseStore(object):
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
- end = time.time()
+ end = monotonic_time()
duration = end - start
LoggingContext.current_context().add_database_transaction(duration)
@@ -459,7 +456,7 @@ class SQLBaseStore(object):
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
self._current_txn_total_time += duration
- self._txn_perf_counters.update(desc, start, end)
+ self._txn_perf_counters.update(desc, duration)
sql_txn_timer.labels(desc).observe(duration)
@defer.inlineCallbacks
@@ -525,11 +522,11 @@ class SQLBaseStore(object):
)
parent_context = None
- start_time = time.time()
+ start_time = monotonic_time()
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection", parent_context) as context:
- sched_duration_sec = time.time() - start_time
+ sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)
@@ -1667,7 +1664,7 @@ def db_to_json(db_content):
# Decode it to a Unicode string before feeding it to json.loads, so we
# consistenty get a Unicode-containing object out.
if isinstance(db_content, (bytes, bytearray)):
- db_content = db_content.decode('utf8')
+ db_content = db_content.decode("utf8")
try:
return json.loads(db_content)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index b8b8273f73..50f913a414 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -169,7 +169,7 @@ class BackgroundUpdateStore(SQLBaseStore):
in_flight = set(update["update_name"] for update in updates)
for update in updates:
if update["depends_on"] not in in_flight:
- self._background_update_queue.append(update['update_name'])
+ self._background_update_queue.append(update["update_name"])
if not self._background_update_queue:
# no work left to do
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d102e07372..3413a46675 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -149,9 +149,7 @@ class DeviceWorkerStore(SQLBaseStore):
defer.returnValue((stream_id_cutoff, []))
results = yield self._get_device_update_edus_by_remote(
- destination,
- from_stream_id,
- query_map,
+ destination, from_stream_id, query_map
)
defer.returnValue((now_stream_id, results))
@@ -182,9 +180,7 @@ class DeviceWorkerStore(SQLBaseStore):
return list(txn)
@defer.inlineCallbacks
- def _get_device_update_edus_by_remote(
- self, destination, from_stream_id, query_map,
- ):
+ def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_map):
"""Returns a list of device update EDUs as well as E2EE keys
Args:
@@ -210,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore):
# The prev_id for the first row is always the last row before
# `from_stream_id`
prev_id = yield self._get_last_device_update_for_remote_user(
- destination, user_id, from_stream_id,
+ destination, user_id, from_stream_id
)
for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)]
@@ -238,7 +234,7 @@ class DeviceWorkerStore(SQLBaseStore):
defer.returnValue(results)
def _get_last_device_update_for_remote_user(
- self, destination, user_id, from_stream_id,
+ self, destination, user_id, from_stream_id
):
def f(txn):
prev_sent_id_sql = """
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index 521936e3b0..f40ef2ab64 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -87,10 +87,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
},
values={
"version": version,
- "first_message_index": room_key['first_message_index'],
- "forwarded_count": room_key['forwarded_count'],
- "is_verified": room_key['is_verified'],
- "session_data": json.dumps(room_key['session_data']),
+ "first_message_index": room_key["first_message_index"],
+ "forwarded_count": room_key["forwarded_count"],
+ "is_verified": room_key["is_verified"],
+ "session_data": json.dumps(room_key["session_data"]),
},
lock=False,
)
@@ -118,13 +118,13 @@ class EndToEndRoomKeyStore(SQLBaseStore):
try:
version = int(version)
except ValueError:
- defer.returnValue({'rooms': {}})
+ defer.returnValue({"rooms": {}})
keyvalues = {"user_id": user_id, "version": version}
if room_id:
- keyvalues['room_id'] = room_id
+ keyvalues["room_id"] = room_id
if session_id:
- keyvalues['session_id'] = session_id
+ keyvalues["session_id"] = session_id
rows = yield self._simple_select_list(
table="e2e_room_keys",
@@ -141,10 +141,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="get_e2e_room_keys",
)
- sessions = {'rooms': {}}
+ sessions = {"rooms": {}}
for row in rows:
- room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}})
- room_entry['sessions'][row['session_id']] = {
+ room_entry = sessions["rooms"].setdefault(row["room_id"], {"sessions": {}})
+ room_entry["sessions"][row["session_id"]] = {
"first_message_index": row["first_message_index"],
"forwarded_count": row["forwarded_count"],
"is_verified": row["is_verified"],
@@ -174,9 +174,9 @@ class EndToEndRoomKeyStore(SQLBaseStore):
keyvalues = {"user_id": user_id, "version": int(version)}
if room_id:
- keyvalues['room_id'] = room_id
+ keyvalues["room_id"] = room_id
if session_id:
- keyvalues['session_id'] = session_id
+ keyvalues["session_id"] = session_id
yield self._simple_delete(
table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys"
@@ -191,7 +191,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
row = txn.fetchone()
if not row:
- raise StoreError(404, 'No current backup version')
+ raise StoreError(404, "No current backup version")
return row[0]
def get_e2e_room_keys_version_info(self, user_id, version=None):
@@ -255,7 +255,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
current_version = txn.fetchone()[0]
if current_version is None:
- current_version = '0'
+ current_version = "0"
new_version = str(int(current_version) + 1)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 1b97ee74e3..289b6bc281 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -45,6 +45,10 @@ class PostgresEngine(object):
# together. For example, version 8.1.5 will be returned as 80105
self._version = db_conn.server_version
+ # Are we on a supported PostgreSQL version?
+ if self._version < 90500:
+ raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.")
+
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
@@ -64,9 +68,9 @@ class PostgresEngine(object):
@property
def can_native_upsert(self):
"""
- Can we use native UPSERTs? This requires PostgreSQL 9.5+.
+ Can we use native UPSERTs?
"""
- return self._version >= 90500
+ return True
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 933bcf42c2..e9b9caa49a 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -85,7 +85,7 @@ class Sqlite3Engine(object):
def _parse_match_info(buf):
bufsize = len(buf)
- return [struct.unpack('@I', buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
+ return [struct.unpack("@I", buf[i : i + 4])[0] for i in range(0, bufsize, 4)]
def _rank(raw_match_info):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 09e39c2c28..cb4478342f 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -190,6 +190,34 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id,
)
+ def get_rooms_with_many_extremities(self, min_count, limit):
+ """Get the top rooms with at least N extremities.
+
+ Args:
+ min_count (int): The minimum number of extremities
+ limit (int): The maximum number of rooms to return.
+
+ Returns:
+ Deferred[list]: At most `limit` room IDs that have at least
+ `min_count` extremities, sorted by extremity count.
+ """
+
+ def _get_rooms_with_many_extremities_txn(txn):
+ sql = """
+ SELECT room_id FROM event_forward_extremities
+ GROUP BY room_id
+ HAVING count(*) > ?
+ ORDER BY count(*) DESC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (min_count, limit))
+ return [room_id for room_id, in txn]
+
+ return self.runInteraction(
+ "get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn
+ )
+
@cached(max_entries=5000, iterable=True)
def get_latest_event_ids_in_room(self, room_id):
return self._simple_select_onecol(
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index a729f3e067..eca77069fd 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -277,7 +277,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# contain results from the first query, correctly ordered, followed
# by results from the second query, but we want them all ordered
# by stream_ordering, oldest first.
- notifs.sort(key=lambda r: r['stream_ordering'])
+ notifs.sort(key=lambda r: r["stream_ordering"])
# Take only up to the limit. We have to stop at the limit because
# one of the subqueries may have hit the limit.
@@ -379,7 +379,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# contain results from the first query, correctly ordered, followed
# by results from the second query, but we want them all ordered
# by received_ts (most recent first)
- notifs.sort(key=lambda r: -(r['received_ts'] or 0))
+ notifs.sort(key=lambda r: -(r["received_ts"] or 0))
# Now return the first `limit`
defer.returnValue(notifs[:limit])
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..fefba39ea1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,14 +17,14 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
from six.moves import range
from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +74,21 @@ state_delta_reuse_delta_counter = Counter(
"synapse_storage_events_state_delta_reuse_delta", ""
)
+# The number of forward extremities for each new event.
+forward_extremities_counter = Histogram(
+ "synapse_storage_events_forward_extremities_persisted",
+ "Number of forward extremities for each new event",
+ buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
+# The number of stale forward extremities for each new event. Stale extremities
+# are those that were in the previous set of extremities as well as the new.
+stale_forward_extremities_counter = Histogram(
+ "synapse_storage_events_stale_forward_extremities_persisted",
+ "Number of unchanged forward extremities for each new event",
+ buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
def encode_json(json_object):
"""
@@ -220,13 +236,39 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
+ # Collect metrics on the number of forward extremities that exist.
+ # Counter of number of extremities to count
+ self._current_forward_extremities_amount = c_counter()
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
+ )
+
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
@@ -514,6 +556,8 @@ class EventsStore(
and not event.internal_metadata.is_soft_failed()
]
+ latest_event_ids = set(latest_event_ids)
+
# start with the existing forward extremities
result = set(latest_event_ids)
@@ -537,6 +581,13 @@ class EventsStore(
)
result.difference_update(existing_prevs)
+ # We only update metrics for events that change forward extremities
+ # (e.g. we ignore backfill/outliers/etc)
+ if result != latest_event_ids:
+ forward_extremities_counter.observe(len(result))
+ stale = latest_event_ids & result
+ stale_forward_extremities_counter.observe(len(stale))
+
defer.returnValue(result)
@defer.inlineCallbacks
@@ -568,17 +619,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(
- r[0]
- for r in txn
- if not json.loads(r[1]).get("soft_failed")
- )
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events_which_are_prevs_txn,
- chunk,
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
defer.returnValue(results)
@@ -640,9 +685,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_prevs_before_rejected",
- _get_prevs_before_rejected_txn,
- chunk,
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
defer.returnValue(existing_prevs)
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 75c1935bf3..1ce21d190c 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -64,8 +64,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
)
self.register_background_update_handler(
- self.DELETE_SOFT_FAILED_EXTREMITIES,
- self._cleanup_extremities_bg_update,
+ self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
)
@defer.inlineCallbacks
@@ -269,7 +268,8 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
LEFT JOIN events USING (event_id)
LEFT JOIN event_json USING (event_id)
LEFT JOIN rejections USING (event_id)
- """, (batch_size,)
+ """,
+ (batch_size,),
)
for prev_event_id, event_id, metadata, rejected, outlier in txn:
@@ -364,13 +364,12 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
column="event_id",
iterable=to_delete,
keyvalues={},
- retcols=("room_id",)
+ retcols=("room_id",),
)
room_ids = set(row["room_id"] for row in rows)
for room_id in room_ids:
txn.call_after(
- self.get_latest_event_ids_in_room.invalidate,
- (room_id,)
+ self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
self._simple_delete_many_txn(
@@ -384,7 +383,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
return len(original_set)
num_handled = yield self.runInteraction(
- "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+ "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
)
if not num_handled:
@@ -394,8 +393,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
txn.execute("DROP TABLE _extremities_to_check")
yield self.runInteraction(
- "_cleanup_extremities_bg_update_drop_table",
- _drop_table_txn,
+ "_cleanup_extremities_bg_update_drop_table", _drop_table_txn
)
defer.returnValue(num_handled)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index cc7df5cf14..6d680d405a 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -27,7 +27,6 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import NotFoundError
from synapse.api.room_versions import EventFormatVersions
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
-# these are only included to make the type annotations work
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -111,8 +110,7 @@ class EventsWorkerStore(SQLBaseStore):
return ts
return self.runInteraction(
- "get_approximate_received_ts",
- _get_approximate_received_ts_txn,
+ "get_approximate_received_ts", _get_approximate_received_ts_txn
)
@defer.inlineCallbacks
@@ -677,7 +675,8 @@ class EventsWorkerStore(SQLBaseStore):
"""
return self.runInteraction(
"get_total_state_event_counts",
- self._get_total_state_event_counts_txn, room_id
+ self._get_total_state_event_counts_txn,
+ room_id,
)
def _get_current_state_event_counts_txn(self, txn, room_id):
@@ -701,7 +700,8 @@ class EventsWorkerStore(SQLBaseStore):
"""
return self.runInteraction(
"get_current_state_event_counts",
- self._get_current_state_event_counts_txn, room_id
+ self._get_current_state_event_counts_txn,
+ room_id,
)
@defer.inlineCallbacks
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index dce6a43ac1..73e6fc6de2 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1179,11 +1179,7 @@ class GroupServerStore(SQLBaseStore):
for table in tables:
self._simple_delete_txn(
- txn,
- table=table,
- keyvalues={"group_id": group_id},
+ txn, table=table, keyvalues={"group_id": group_id}
)
- return self.runInteraction(
- "delete_group", _delete_group_txn
- )
+ return self.runInteraction("delete_group", _delete_group_txn)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index e3655ad8d7..e72f89e446 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -131,7 +131,7 @@ class KeyStore(SQLBaseStore):
def _invalidate(res):
f = self._get_server_verify_key.invalidate
for i in invalidations:
- f((i, ))
+ f((i,))
return res
return self.runInteraction(
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 3ecf47e7a7..6b1238ce4a 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -22,11 +22,11 @@ class MediaRepositoryStore(BackgroundUpdateStore):
super(MediaRepositoryStore, self).__init__(db_conn, hs)
self.register_background_index_update(
- update_name='local_media_repository_url_idx',
- index_name='local_media_repository_url_idx',
- table='local_media_repository',
- columns=['created_ts'],
- where_clause='url_cache IS NOT NULL',
+ update_name="local_media_repository_url_idx",
+ index_name="local_media_repository_url_idx",
+ table="local_media_repository",
+ columns=["created_ts"],
+ where_clause="url_cache IS NOT NULL",
)
def get_local_media(self, media_id):
@@ -108,12 +108,12 @@ class MediaRepositoryStore(BackgroundUpdateStore):
return dict(
zip(
(
- 'response_code',
- 'etag',
- 'expires_ts',
- 'og',
- 'media_id',
- 'download_ts',
+ "response_code",
+ "etag",
+ "expires_ts",
+ "og",
+ "media_id",
+ "download_ts",
),
row,
)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 8aa8abc470..081564360f 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -86,11 +86,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
if len(self.reserved_users) > 0:
# questionmarks is a hack to overcome sqlite not supporting
# tuples in 'WHERE IN %s'
- questionmarks = '?' * len(self.reserved_users)
+ questionmarks = "?" * len(self.reserved_users)
query_args.extend(self.reserved_users)
sql = base_sql + """ AND user_id NOT IN ({})""".format(
- ','.join(questionmarks)
+ ",".join(questionmarks)
)
else:
sql = base_sql
@@ -124,7 +124,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
if len(self.reserved_users) > 0:
query_args.extend(self.reserved_users)
sql = base_sql + """ AND user_id NOT IN ({})""".format(
- ','.join(questionmarks)
+ ",".join(questionmarks)
)
else:
sql = base_sql
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index f2c1bed487..7c4e1dc7ec 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -133,7 +133,7 @@ def _setup_new_database(cur, database_engine):
if ver <= SCHEMA_VERSION:
valid_dirs.append((ver, abs_path))
else:
- logger.warn("Unexpected entry in 'full_schemas': %s", filename)
+ logger.debug("Ignoring entry '%s' in 'full_schemas'", filename)
if not valid_dirs:
raise PrepareDatabaseException(
@@ -146,9 +146,10 @@ def _setup_new_database(cur, database_engine):
directory_entries = os.listdir(sql_dir)
- for filename in sorted(fnmatch.filter(directory_entries, "*.sql") + fnmatch.filter(
- directory_entries, "*.sql." + specific
- )):
+ for filename in sorted(
+ fnmatch.filter(directory_entries, "*.sql")
+ + fnmatch.filter(directory_entries, "*.sql." + specific)
+ ):
sql_loc = os.path.join(sql_dir, filename)
logger.debug("Applying schema %s", sql_loc)
executescript(cur, sql_loc)
@@ -313,7 +314,7 @@ def _apply_module_schemas(txn, database_engine, config):
application config
"""
for (mod, _config) in config.password_providers:
- if not hasattr(mod, 'get_db_schema_files'):
+ if not hasattr(mod, "get_db_schema_files"):
continue
modname = ".".join((mod.__module__, mod.__name__))
_apply_module_schema_files(
@@ -343,7 +344,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
continue
root_name, ext = os.path.splitext(name)
- if ext != '.sql':
+ if ext != ".sql":
raise PrepareDatabaseException(
"only .sql files are currently supported for module schemas"
)
@@ -407,7 +408,7 @@ def get_statements(f):
def executescript(txn, schema_path):
- with open(schema_path, 'r') as f:
+ with open(schema_path, "r") as f:
for statement in get_statements(f):
txn.execute(statement)
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index aeec2f57c4..0ff392bdb4 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -41,7 +41,7 @@ class ProfileWorkerStore(SQLBaseStore):
defer.returnValue(
ProfileInfo(
- avatar_url=profile['avatar_url'], display_name=profile['displayname']
+ avatar_url=profile["avatar_url"], display_name=profile["displayname"]
)
)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9e406baafa..98cec8c82b 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -46,12 +46,12 @@ def _load_rules(rawrules, enabled_map):
rules = list(list_with_base_rules(ruleslist))
for i, rule in enumerate(rules):
- rule_id = rule['rule_id']
+ rule_id = rule["rule_id"]
if rule_id in enabled_map:
- if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+ if rule.get("enabled", True) != bool(enabled_map[rule_id]):
# Rules are cached across users.
rule = dict(rule)
- rule['enabled'] = bool(enabled_map[rule_id])
+ rule["enabled"] = bool(enabled_map[rule_id])
rules[i] = rule
return rules
@@ -126,12 +126,12 @@ class PushRulesWorkerStore(
def get_push_rules_enabled_for_user(self, user_id):
results = yield self._simple_select_list(
table="push_rules_enable",
- keyvalues={'user_name': user_id},
+ keyvalues={"user_name": user_id},
retcols=("user_name", "rule_id", "enabled"),
desc="get_push_rules_enabled_for_user",
)
defer.returnValue(
- {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
+ {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
)
def have_push_rules_changed_for_user(self, user_id, last_id):
@@ -175,7 +175,7 @@ class PushRulesWorkerStore(
rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
for row in rows:
- results.setdefault(row['user_name'], []).append(row)
+ results.setdefault(row["user_name"], []).append(row)
enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
@@ -194,7 +194,7 @@ class PushRulesWorkerStore(
rule (Dict): A push rule.
"""
# Create new rule id
- rule_id_scope = '/'.join(rule["rule_id"].split('/')[:-1])
+ rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
new_rule_id = rule_id_scope + "/" + new_room_id
# Change room id in each condition
@@ -334,8 +334,8 @@ class PushRulesWorkerStore(
desc="bulk_get_push_rules_enabled",
)
for row in rows:
- enabled = bool(row['enabled'])
- results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
+ enabled = bool(row["enabled"])
+ results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
defer.returnValue(results)
@@ -568,7 +568,7 @@ class PushRuleStore(PushRulesWorkerStore):
def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
self._simple_delete_one_txn(
- txn, "push_rules", {'user_name': user_id, 'rule_id': rule_id}
+ txn, "push_rules", {"user_name": user_id, "rule_id": rule_id}
)
self._insert_push_rules_update_txn(
@@ -605,9 +605,9 @@ class PushRuleStore(PushRulesWorkerStore):
self._simple_upsert_txn(
txn,
"push_rules_enable",
- {'user_name': user_id, 'rule_id': rule_id},
- {'enabled': 1 if enabled else 0},
- {'id': new_id},
+ {"user_name": user_id, "rule_id": rule_id},
+ {"enabled": 1 if enabled else 0},
+ {"id": new_id},
)
self._insert_push_rules_update_txn(
@@ -645,8 +645,8 @@ class PushRuleStore(PushRulesWorkerStore):
self._simple_update_one_txn(
txn,
"push_rules",
- {'user_name': user_id, 'rule_id': rule_id},
- {'actions': actions_json},
+ {"user_name": user_id, "rule_id": rule_id},
+ {"actions": actions_json},
)
self._insert_push_rules_update_txn(
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 1567e1df48..cfe0a94330 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -37,24 +37,24 @@ else:
class PusherWorkerStore(SQLBaseStore):
def _decode_pushers_rows(self, rows):
for r in rows:
- dataJson = r['data']
- r['data'] = None
+ dataJson = r["data"]
+ r["data"] = None
try:
if isinstance(dataJson, db_binary_type):
dataJson = str(dataJson).decode("UTF8")
- r['data'] = json.loads(dataJson)
+ r["data"] = json.loads(dataJson)
except Exception as e:
logger.warn(
"Invalid JSON in data for pusher %d: %s, %s",
- r['id'],
+ r["id"],
dataJson,
e.args[0],
)
pass
- if isinstance(r['pushkey'], db_binary_type):
- r['pushkey'] = str(r['pushkey']).decode("UTF8")
+ if isinstance(r["pushkey"], db_binary_type):
+ r["pushkey"] = str(r["pushkey"]).decode("UTF8")
return rows
@@ -195,15 +195,15 @@ class PusherWorkerStore(SQLBaseStore):
)
def get_if_users_have_pushers(self, user_ids):
rows = yield self._simple_select_many_batch(
- table='pushers',
- column='user_name',
+ table="pushers",
+ column="user_name",
iterable=user_ids,
- retcols=['user_name'],
- desc='get_if_users_have_pushers',
+ retcols=["user_name"],
+ desc="get_if_users_have_pushers",
)
result = {user_id: False for user_id in user_ids}
- result.update({r['user_name']: True for r in rows})
+ result.update({r["user_name"]: True for r in rows})
defer.returnValue(result)
@@ -299,8 +299,8 @@ class PusherStore(PusherWorkerStore):
):
yield self._simple_update_one(
"pushers",
- {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'last_stream_ordering': last_stream_ordering},
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ {"last_stream_ordering": last_stream_ordering},
desc="update_pusher_last_stream_ordering",
)
@@ -310,10 +310,10 @@ class PusherStore(PusherWorkerStore):
):
yield self._simple_update_one(
"pushers",
- {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{
- 'last_stream_ordering': last_stream_ordering,
- 'last_success': last_success,
+ "last_stream_ordering": last_stream_ordering,
+ "last_success": last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)
@@ -322,8 +322,8 @@ class PusherStore(PusherWorkerStore):
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
yield self._simple_update_one(
"pushers",
- {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
- {'failing_since': failing_since},
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ {"failing_since": failing_since},
desc="update_pusher_failing_since",
)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index a1647e50a1..b477da12b1 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_users_with_read_receipts_in_room(self, room_id):
receipts = yield self.get_receipts_for_room(room_id, "m.read")
- defer.returnValue(set(r['user_id'] for r in receipts))
+ defer.returnValue(set(r["user_id"] for r in receipts))
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1dd1182e82..983ce13291 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import re
from six import iterkeys
@@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
+logger = logging.getLogger(__name__)
+
class RegistrationWorkerStore(SQLBaseStore):
def __init__(self, db_conn, hs):
@@ -113,8 +116,9 @@ class RegistrationWorkerStore(SQLBaseStore):
defer.returnValue(res)
@defer.inlineCallbacks
- def set_account_validity_for_user(self, user_id, expiration_ts, email_sent,
- renewal_token=None):
+ def set_account_validity_for_user(
+ self, user_id, expiration_ts, email_sent, renewal_token=None
+ ):
"""Updates the account validity properties of the given account, with the
given values.
@@ -128,6 +132,7 @@ class RegistrationWorkerStore(SQLBaseStore):
renewal_token (str): Renewal token the user can use to extend the validity
of their account. Defaults to no token.
"""
+
def set_account_validity_for_user_txn(txn):
self._simple_update_txn(
txn=txn,
@@ -140,12 +145,11 @@ class RegistrationWorkerStore(SQLBaseStore):
},
)
self._invalidate_cache_and_stream(
- txn, self.get_expiration_ts_for_user, (user_id,),
+ txn, self.get_expiration_ts_for_user, (user_id,)
)
yield self.runInteraction(
- "set_account_validity_for_user",
- set_account_validity_for_user_txn,
+ "set_account_validity_for_user", set_account_validity_for_user_txn
)
@defer.inlineCallbacks
@@ -214,6 +218,7 @@ class RegistrationWorkerStore(SQLBaseStore):
Returns:
Deferred: Resolves to a list[dict[user_id (str), expiration_ts_ms (int)]]
"""
+
def select_users_txn(txn, now_ms, renew_at):
sql = (
"SELECT user_id, expiration_ts_ms FROM account_validity"
@@ -226,7 +231,8 @@ class RegistrationWorkerStore(SQLBaseStore):
res = yield self.runInteraction(
"get_users_expiring_soon",
select_users_txn,
- self.clock.time_msec(), self.config.account_validity.renew_at,
+ self.clock.time_msec(),
+ self.config.account_validity.renew_at,
)
defer.returnValue(res)
@@ -249,6 +255,20 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def delete_account_validity_for_user(self, user_id):
+ """Deletes the entry for the given user in the account validity table, removing
+ their expiration date and renewal token.
+
+ Args:
+ user_id (str): ID of the user to remove from the account validity table.
+ """
+ yield self._simple_delete_one(
+ table="account_validity",
+ keyvalues={"user_id": user_id},
+ desc="delete_account_validity_for_user",
+ )
+
+ @defer.inlineCallbacks
def is_server_admin(self, user):
res = yield self._simple_select_one_onecol(
table="users",
@@ -352,7 +372,7 @@ class RegistrationWorkerStore(SQLBaseStore):
WHERE creation_ts > ?
) AS t GROUP BY user_type
"""
- results = {'native': 0, 'guest': 0, 'bridged': 0}
+ results = {"native": 0, "guest": 0, "bridged": 0}
txn.execute(sql, (yesterday,))
for row in txn:
results[row[0]] = row[1]
@@ -418,7 +438,7 @@ class RegistrationWorkerStore(SQLBaseStore):
{"medium": medium, "address": address},
["guest_access_token"],
True,
- 'get_3pid_guest_access_token',
+ "get_3pid_guest_access_token",
)
if ret:
defer.returnValue(ret["guest_access_token"])
@@ -455,11 +475,11 @@ class RegistrationWorkerStore(SQLBaseStore):
txn,
"user_threepids",
{"medium": medium, "address": address},
- ['user_id'],
+ ["user_id"],
True,
)
if ret:
- return ret['user_id']
+ return ret["user_id"]
return None
@defer.inlineCallbacks
@@ -475,8 +495,8 @@ class RegistrationWorkerStore(SQLBaseStore):
ret = yield self._simple_select_list(
"user_threepids",
{"user_id": user_id},
- ['medium', 'address', 'validated_at', 'added_at'],
- 'user_get_threepids',
+ ["medium", "address", "validated_at", "added_at"],
+ "user_get_threepids",
)
defer.returnValue(ret)
@@ -555,11 +575,7 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
return self._simple_select_onecol(
table="user_threepid_id_server",
- keyvalues={
- "user_id": user_id,
- "medium": medium,
- "address": address,
- },
+ keyvalues={"user_id": user_id, "medium": medium, "address": address},
retcol="id_server",
desc="get_id_servers_user_bound",
)
@@ -595,15 +611,80 @@ class RegistrationStore(
self.register_noop_background_update("refresh_tokens_device_index")
self.register_background_update_handler(
- "user_threepids_grandfather", self._bg_user_threepids_grandfather,
+ "user_threepids_grandfather", self._bg_user_threepids_grandfather
+ )
+
+ self.register_background_update_handler(
+ "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag
)
# Create a background job for culling expired 3PID validity tokens
hs.get_clock().looping_call(
- self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
+ self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
)
@defer.inlineCallbacks
+ def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+ """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+ for each of them.
+ """
+
+ last_user = progress.get("user_id", "")
+
+ def _backgroud_update_set_deactivated_flag_txn(txn):
+ txn.execute(
+ """
+ SELECT
+ users.name,
+ COUNT(access_tokens.token) AS count_tokens,
+ COUNT(user_threepids.address) AS count_threepids
+ FROM users
+ LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+ LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+ WHERE (users.password_hash IS NULL OR users.password_hash = '')
+ AND (users.appservice_id IS NULL OR users.appservice_id = '')
+ AND users.is_guest = 0
+ AND users.name > ?
+ GROUP BY users.name
+ ORDER BY users.name ASC
+ LIMIT ?;
+ """,
+ (last_user, batch_size),
+ )
+
+ rows = self.cursor_to_dict(txn)
+
+ if not rows:
+ return True
+
+ rows_processed_nb = 0
+
+ for user in rows:
+ if not user["count_tokens"] and not user["count_threepids"]:
+ self.set_user_deactivated_status_txn(txn, user["name"], True)
+ rows_processed_nb += 1
+
+ logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+ self._background_update_progress_txn(
+ txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+ )
+
+ if batch_size > len(rows):
+ return True
+ else:
+ return False
+
+ end = yield self.runInteraction(
+ "users_set_deactivated_flag", _backgroud_update_set_deactivated_flag_txn
+ )
+
+ if end:
+ yield self._end_background_update("users_set_deactivated_flag")
+
+ defer.returnValue(batch_size)
+
+ @defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):
"""Adds an access token for the given user.
@@ -768,7 +849,7 @@ class RegistrationStore(
def user_set_password_hash_txn(txn):
self._simple_update_one_txn(
- txn, 'users', {'name': user_id}, {'password_hash': password_hash}
+ txn, "users", {"name": user_id}, {"password_hash": password_hash}
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -789,9 +870,9 @@ class RegistrationStore(
def f(txn):
self._simple_update_one_txn(
txn,
- table='users',
- keyvalues={'name': user_id},
- updatevalues={'consent_version': consent_version},
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"consent_version": consent_version},
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -813,9 +894,9 @@ class RegistrationStore(
def f(txn):
self._simple_update_one_txn(
txn,
- table='users',
- keyvalues={'name': user_id},
- updatevalues={'consent_server_notice_sent': consent_version},
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"consent_server_notice_sent": consent_version},
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -985,7 +1066,7 @@ class RegistrationStore(
if id_servers:
yield self.runInteraction(
- "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn,
+ "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
)
yield self._end_background_update("user_threepids_grandfather")
@@ -993,12 +1074,7 @@ class RegistrationStore(
defer.returnValue(1)
def get_threepid_validation_session(
- self,
- medium,
- client_secret,
- address=None,
- sid=None,
- validated=True,
+ self, medium, client_secret, address=None, sid=None, validated=True
):
"""Gets a session_id and last_send_attempt (if available) for a
client_secret/medium/(address|session_id) combo
@@ -1018,23 +1094,22 @@ class RegistrationStore(
latest session_id and send_attempt count for this 3PID.
Otherwise None if there hasn't been a previous attempt
"""
- keyvalues = {
- "medium": medium,
- "client_secret": client_secret,
- }
+ keyvalues = {"medium": medium, "client_secret": client_secret}
if address:
keyvalues["address"] = address
if sid:
keyvalues["session_id"] = sid
- assert(address or sid)
+ assert address or sid
def get_threepid_validation_session_txn(txn):
sql = """
SELECT address, session_id, medium, client_secret,
last_send_attempt, validated_at
FROM threepid_validation_session WHERE %s
- """ % (" AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),)
+ """ % (
+ " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+ )
if validated is not None:
sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
@@ -1049,17 +1124,10 @@ class RegistrationStore(
return rows[0]
return self.runInteraction(
- "get_threepid_validation_session",
- get_threepid_validation_session_txn,
+ "get_threepid_validation_session", get_threepid_validation_session_txn
)
- def validate_threepid_session(
- self,
- session_id,
- client_secret,
- token,
- current_ts,
- ):
+ def validate_threepid_session(self, session_id, client_secret, token, current_ts):
"""Attempt to validate a threepid session using a token
Args:
@@ -1091,7 +1159,7 @@ class RegistrationStore(
if retrieved_client_secret != client_secret:
raise ThreepidValidationError(
- 400, "This client_secret does not match the provided session_id",
+ 400, "This client_secret does not match the provided session_id"
)
row = self._simple_select_one_txn(
@@ -1104,7 +1172,7 @@ class RegistrationStore(
if not row:
raise ThreepidValidationError(
- 400, "Validation token not found or has expired",
+ 400, "Validation token not found or has expired"
)
expires = row["expires"]
next_link = row["next_link"]
@@ -1115,7 +1183,7 @@ class RegistrationStore(
if expires <= current_ts:
raise ThreepidValidationError(
- 400, "This token has expired. Please request a new one",
+ 400, "This token has expired. Please request a new one"
)
# Looks good. Validate the session
@@ -1130,8 +1198,7 @@ class RegistrationStore(
# Return next_link if it exists
return self.runInteraction(
- "validate_threepid_session_txn",
- validate_threepid_session_txn,
+ "validate_threepid_session_txn", validate_threepid_session_txn
)
def upsert_threepid_validation_session(
@@ -1198,6 +1265,7 @@ class RegistrationStore(
token_expires (int): The timestamp for which after the token
will no longer be valid
"""
+
def start_or_continue_validation_session_txn(txn):
# Create or update a validation session
self._simple_upsert_txn(
@@ -1231,6 +1299,7 @@ class RegistrationStore(
def cull_expired_threepid_validation_tokens(self):
"""Remove threepid validation tokens with expiry dates that have passed"""
+
def cull_expired_threepid_validation_tokens_txn(txn, ts):
sql = """
DELETE FROM threepid_validation_token WHERE
@@ -1252,6 +1321,7 @@ class RegistrationStore(
Args:
session_id (str): The ID of the session to delete
"""
+
def delete_threepid_session_txn(txn):
self._simple_delete_txn(
txn,
@@ -1265,6 +1335,53 @@ class RegistrationStore(
)
return self.runInteraction(
- "delete_threepid_session",
- delete_threepid_session_txn,
+ "delete_threepid_session", delete_threepid_session_txn
+ )
+
+ def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+ self._simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,)
)
+
+ @defer.inlineCallbacks
+ def set_user_deactivated_status(self, user_id, deactivated):
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id (str): The ID of the user to set the status for.
+ deactivated (bool): The value to set for `deactivated`.
+ """
+
+ yield self.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id,
+ deactivated,
+ )
+
+ @cachedInlineCallbacks()
+ def get_user_deactivated_status(self, user_id):
+ """Retrieve the value for the `deactivated` property for the provided user.
+
+ Args:
+ user_id (str): The ID of the user to retrieve the status for.
+
+ Returns:
+ defer.Deferred(bool): The requested value.
+ """
+
+ res = yield self._simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="deactivated",
+ desc="get_user_deactivated_status",
+ )
+
+ # Convert the integer into a boolean.
+ defer.returnValue(res == 1)
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 4c83800cca..1b01934c19 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -468,9 +468,5 @@ class RelationsStore(RelationsWorkerStore):
"""
self._simple_delete_txn(
- txn,
- table="event_relations",
- keyvalues={
- "event_id": redacted_event_id,
- }
+ txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7617913326..8004aeb909 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -420,7 +420,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
table="room_memberships",
column="event_id",
iterable=missing_member_event_ids,
- retcols=('user_id', 'display_name', 'avatar_url'),
+ retcols=("user_id", "display_name", "avatar_url"),
keyvalues={"membership": Membership.JOIN},
batch_size=500,
desc="_get_joined_users_from_context",
@@ -448,7 +448,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cachedInlineCallbacks(max_entries=10000)
def is_host_joined(self, room_id, host):
- if '%' in host or '_' in host:
+ if "%" in host or "_" in host:
raise Exception("Invalid host name")
sql = """
@@ -490,7 +490,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
Deferred: Resolves to True if the host is/was in the room, otherwise
False.
"""
- if '%' in host or '_' in host:
+ if "%" in host or "_" in host:
raise Exception("Invalid host name")
sql = """
@@ -723,7 +723,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
room_id = row["room_id"]
try:
event_json = json.loads(row["json"])
- content = event_json['content']
+ content = event_json["content"]
except Exception:
continue
diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py
index 147496a38b..3edfcfd783 100644
--- a/synapse/storage/schema/delta/20/pushers.py
+++ b/synapse/storage/schema/delta/20/pushers.py
@@ -29,7 +29,8 @@ logger = logging.getLogger(__name__)
def run_create(cur, database_engine, *args, **kwargs):
logger.info("Porting pushers table...")
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE IF NOT EXISTS pushers2 (
id BIGINT PRIMARY KEY,
user_name TEXT NOT NULL,
@@ -48,27 +49,34 @@ def run_create(cur, database_engine, *args, **kwargs):
failing_since BIGINT,
UNIQUE (app_id, pushkey, user_name)
)
- """)
- cur.execute("""SELECT
+ """
+ )
+ cur.execute(
+ """SELECT
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
FROM pushers
- """)
+ """
+ )
count = 0
for row in cur.fetchall():
row = list(row)
row[8] = bytes(row[8]).decode("utf-8")
row[11] = bytes(row[11]).decode("utf-8")
- cur.execute(database_engine.convert_param_style("""
+ cur.execute(
+ database_engine.convert_param_style(
+ """
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
- ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
- row
+ ) values (%s)"""
+ % (",".join(["?" for _ in range(len(row))]))
+ ),
+ row,
)
count += 1
cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index ef7ec34346..9b95411fb6 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -40,9 +40,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
logger.warning("Could not get app_service_config_files from config")
pass
- appservices = load_appservices(
- config.server_name, config_files
- )
+ appservices = load_appservices(config.server_name, config_files)
owned = {}
@@ -53,20 +51,19 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
if user_id in owned.keys():
logger.error(
"user_id %s was owned by more than one application"
- " service (IDs %s and %s); assigning arbitrarily to %s" %
- (user_id, owned[user_id], appservice.id, owned[user_id])
+ " service (IDs %s and %s); assigning arbitrarily to %s"
+ % (user_id, owned[user_id], appservice.id, owned[user_id])
)
owned.setdefault(appservice.id, []).append(user_id)
for as_id, user_ids in owned.items():
n = 100
- user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
+ user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n))
for chunk in user_chunks:
cur.execute(
database_engine.convert_param_style(
- "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % (
- ",".join("?" for _ in chunk),
- )
+ "UPDATE users SET appservice_id = ? WHERE name IN (%s)"
+ % (",".join("?" for _ in chunk),)
),
- [as_id] + chunk
+ [as_id] + chunk,
)
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
index 93367fa09e..9bb504aad5 100644
--- a/synapse/storage/schema/delta/31/pushers.py
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -24,12 +24,13 @@ logger = logging.getLogger(__name__)
def token_to_stream_ordering(token):
- return int(token[1:].split('_')[0])
+ return int(token[1:].split("_")[0])
def run_create(cur, database_engine, *args, **kwargs):
logger.info("Porting pushers table, delta 31...")
- cur.execute("""
+ cur.execute(
+ """
CREATE TABLE IF NOT EXISTS pushers2 (
id BIGINT PRIMARY KEY,
user_name TEXT NOT NULL,
@@ -48,26 +49,33 @@ def run_create(cur, database_engine, *args, **kwargs):
failing_since BIGINT,
UNIQUE (app_id, pushkey, user_name)
)
- """)
- cur.execute("""SELECT
+ """
+ )
+ cur.execute(
+ """SELECT
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
FROM pushers
- """)
+ """
+ )
count = 0
for row in cur.fetchall():
row = list(row)
row[12] = token_to_stream_ordering(row[12])
- cur.execute(database_engine.convert_param_style("""
+ cur.execute(
+ database_engine.convert_param_style(
+ """
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_stream_ordering, last_success,
failing_since
- ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
- row
+ ) values (%s)"""
+ % (",".join(["?" for _ in range(len(row))]))
+ ),
+ row,
)
count += 1
cur.execute("DROP TABLE pushers")
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
index 9754d3ccfb..a26057dfb6 100644
--- a/synapse/storage/schema/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -26,5 +26,5 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
database_engine.convert_param_style(
"UPDATE remote_media_cache SET last_access_ts = ?"
),
- (int(time.time() * 1000),)
+ (int(time.time() * 1000),),
)
diff --git a/synapse/storage/schema/delta/47/state_group_seq.py b/synapse/storage/schema/delta/47/state_group_seq.py
index f6766501d2..9fd1ccf6f7 100644
--- a/synapse/storage/schema/delta/47/state_group_seq.py
+++ b/synapse/storage/schema/delta/47/state_group_seq.py
@@ -27,10 +27,7 @@ def run_create(cur, database_engine, *args, **kwargs):
else:
start_val = row[0] + 1
- cur.execute(
- "CREATE SEQUENCE state_group_id_seq START WITH %s",
- (start_val, ),
- )
+ cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,))
def run_upgrade(*args, **kwargs):
diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py
index 2233af87d7..49f5f2c003 100644
--- a/synapse/storage/schema/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/delta/48/group_unique_indexes.py
@@ -38,16 +38,22 @@ def run_create(cur, database_engine, *args, **kwargs):
rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
# remove duplicates from group_users & group_invites tables
- cur.execute("""
+ cur.execute(
+ """
DELETE FROM group_users WHERE %s NOT IN (
SELECT min(%s) FROM group_users GROUP BY group_id, user_id
);
- """ % (rowid, rowid))
- cur.execute("""
+ """
+ % (rowid, rowid)
+ )
+ cur.execute(
+ """
DELETE FROM group_invites WHERE %s NOT IN (
SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
);
- """ % (rowid, rowid))
+ """
+ % (rowid, rowid)
+ )
for statement in get_statements(FIX_INDEXES.splitlines()):
cur.execute(statement)
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
index 6dd467b6c5..b1684a8441 100644
--- a/synapse/storage/schema/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -65,14 +65,18 @@ def run_create(cur, database_engine, *args, **kwargs):
def run_upgrade(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
- cur.execute("""
+ cur.execute(
+ """
ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
- """)
+ """
+ )
return
# sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
- cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+ cur.execute(
+ "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'"
+ )
(oldsql,) = cur.fetchone()
sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
@@ -86,7 +90,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
cur.execute("PRAGMA writable_schema=ON")
cur.execute(
"UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
- (sql, ),
+ (sql,),
)
cur.execute("PRAGMA schema_version=%i" % (oldver + 1,))
cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('users_set_deactivated_flag', '{}');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index ff49eaae02..f3b1cec933 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -31,8 +31,8 @@ from .background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
SearchEntry = namedtuple(
- 'SearchEntry',
- ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'],
+ "SearchEntry",
+ ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"],
)
@@ -216,7 +216,7 @@ class SearchStore(BackgroundUpdateStore):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
- have_added_index = progress['have_added_indexes']
+ have_added_index = progress["have_added_indexes"]
if not have_added_index:
@@ -341,29 +341,7 @@ class SearchStore(BackgroundUpdateStore):
for entry in entries
)
- # inserts to a GIN index are normally batched up into a pending
- # list, and then all committed together once the list gets to a
- # certain size. The trouble with that is that postgres (pre-9.5)
- # uses work_mem to determine the length of the list, and work_mem
- # is typically very large.
- #
- # We therefore reduce work_mem while we do the insert.
- #
- # (postgres 9.5 uses the separate gin_pending_list_limit setting,
- # so doesn't suffer the same problem, but changing work_mem will
- # be harmless)
- #
- # Note that we don't need to worry about restoring it on
- # exception, because exceptions will cause the transaction to be
- # rolled back, including the effects of the SET command.
- #
- # Also: we use SET rather than SET LOCAL because there's lots of
- # other stuff going on in this transaction, which want to have the
- # normal work_mem setting.
-
- txn.execute("SET work_mem='256kB'")
txn.executemany(sql, args)
- txn.execute("RESET work_mem")
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index ff266b09b0..1cec84ee2e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -71,7 +71,8 @@ class StatsStore(StateDeltasStore):
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables
- stmts = get_statements("""
+ stmts = get_statements(
+ """
-- We just recreate the table, we'll be reinserting the
-- correct entries again later anyway.
DROP TABLE IF EXISTS {temp}_rooms;
@@ -85,7 +86,10 @@ class StatsStore(StateDeltasStore):
ON {temp}_rooms(events);
CREATE INDEX {temp}_rooms_id
ON {temp}_rooms(room_id);
- """.format(temp=TEMP_TABLE).splitlines())
+ """.format(
+ temp=TEMP_TABLE
+ ).splitlines()
+ )
for statement in stmts:
txn.execute(statement)
@@ -105,7 +109,9 @@ class StatsStore(StateDeltasStore):
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
WHERE t.room_id IS NULL
GROUP BY c.room_id
- """ % (TEMP_TABLE,)
+ """ % (
+ TEMP_TABLE,
+ )
txn.execute(sql)
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
@@ -184,7 +190,8 @@ class StatsStore(StateDeltasStore):
logger.info(
"Processing the next %d rooms of %d remaining",
- len(rooms_to_work_on), progress["remaining"],
+ len(rooms_to_work_on),
+ progress["remaining"],
)
# Number of state events we've processed by going through each room
@@ -204,10 +211,17 @@ class StatsStore(StateDeltasStore):
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
- state_events = yield self.get_events([
- join_rules_id, history_visibility_id, encryption_id, name_id,
- topic_id, avatar_id, canonical_alias_id,
- ])
+ state_events = yield self.get_events(
+ [
+ join_rules_id,
+ history_visibility_id,
+ encryption_id,
+ name_id,
+ topic_id,
+ avatar_id,
+ canonical_alias_id,
+ ]
+ )
def _get_or_none(event_id, arg):
event = state_events.get(event_id)
@@ -271,7 +285,7 @@ class StatsStore(StateDeltasStore):
# We've finished a room. Delete it from the table.
self._simple_delete_one_txn(
- txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
+ txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
)
yield self.runInteraction("update_room_stats", _fetch_data)
@@ -338,7 +352,7 @@ class StatsStore(StateDeltasStore):
"name",
"topic",
"avatar",
- "canonical_alias"
+ "canonical_alias",
):
field = fields.get(col)
if field and "\0" in field:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 6f7f65d96b..d9482a3843 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -65,7 +65,7 @@ _EventDictReturn = namedtuple(
def generate_pagination_where_clause(
- direction, column_names, from_token, to_token, engine,
+ direction, column_names, from_token, to_token, engine
):
"""Creates an SQL expression to bound the columns by the pagination
tokens.
@@ -153,7 +153,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
str
"""
- assert(bound in (">", "<", ">=", "<="))
+ assert bound in (">", "<", ">=", "<=")
name1, name2 = column_names
val1, val2 = values
@@ -169,11 +169,7 @@ def _make_generic_sql_bound(bound, column_names, values, engine):
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres.
- return "((%d,%d) %s (%s,%s))" % (
- val1, val2,
- bound,
- name1, name2,
- )
+ return "((%d,%d) %s (%s,%s))" % (val1, val2, bound, name1, name2)
# We want to generate queries of e.g. the form:
#
@@ -276,7 +272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(
- self, room_ids, from_key, to_key, limit=0, order='DESC'
+ self, room_ids, from_key, to_key, limit=0, order="DESC"
):
"""Get new room events in stream ordering since `from_key`.
@@ -346,7 +342,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_room(
- self, room_id, from_key, to_key, limit=0, order='DESC'
+ self, room_id, from_key, to_key, limit=0, order="DESC"
):
"""Get new room events in stream ordering since `from_key`.
@@ -395,8 +391,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
- ret = yield self.get_events_as_list([
- r.event_id for r in rows], get_prev_content=True,
+ ret = yield self.get_events_as_list(
+ [r.event_id for r in rows], get_prev_content=True
)
self._set_before_and_after(ret, rows, topo_order=from_id is None)
@@ -446,7 +442,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows = yield self.runInteraction("get_membership_changes_for_user", f)
ret = yield self.get_events_as_list(
- [r.event_id for r in rows], get_prev_content=True,
+ [r.event_id for r in rows], get_prev_content=True
)
self._set_before_and_after(ret, rows, topo_order=False)
@@ -725,7 +721,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn,
room_id,
before_token,
- direction='b',
+ direction="b",
limit=before_limit,
event_filter=event_filter,
)
@@ -735,7 +731,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn,
room_id,
after_token,
- direction='f',
+ direction="f",
limit=after_limit,
event_filter=event_filter,
)
@@ -816,7 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_id,
from_token,
to_token=None,
- direction='b',
+ direction="b",
limit=-1,
event_filter=None,
):
@@ -846,7 +842,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
args = [False, room_id]
- if direction == 'b':
+ if direction == "b":
order = "DESC"
else:
order = "ASC"
@@ -882,7 +878,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if rows:
topo = rows[-1].topological_ordering
toke = rows[-1].stream_ordering
- if direction == 'b':
+ if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
@@ -898,7 +894,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def paginate_room_events(
- self, room_id, from_key, to_key=None, direction='b', limit=-1, event_filter=None
+ self, room_id, from_key, to_key=None, direction="b", limit=-1, event_filter=None
):
"""Returns list of events before or after a given token.
|