diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 86a333a919..e7f6ea7286 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -498,7 +498,7 @@ class DataStore(
)
count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
retval = {"users": users, "total": count}
- defer.returnValue(retval)
+ return retval
def search_users(self, term):
"""Function to search users list for one or more users with
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a7c93efa46..489ce82fae 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -513,7 +513,7 @@ class SQLBaseStore(object):
after_callback(*after_args, **after_kwargs)
raise
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def runWithConnection(self, func, *args, **kwargs):
@@ -553,7 +553,7 @@ class SQLBaseStore(object):
with PreserveLoggingContext():
result = yield self._db_pool.runWithConnection(inner_func, *args, **kwargs)
- defer.returnValue(result)
+ return result
@staticmethod
def cursor_to_dict(cursor):
@@ -615,8 +615,8 @@ class SQLBaseStore(object):
# a cursor after we receive an error from the db.
if not or_ignore:
raise
- defer.returnValue(False)
- defer.returnValue(True)
+ return False
+ return True
@staticmethod
def _simple_insert_txn(txn, table, values):
@@ -708,7 +708,7 @@ class SQLBaseStore(object):
insertion_values,
lock=lock,
)
- defer.returnValue(result)
+ return result
except self.database_engine.module.IntegrityError as e:
attempts += 1
if attempts >= 5:
@@ -1121,7 +1121,7 @@ class SQLBaseStore(object):
results = []
if not iterable:
- defer.returnValue(results)
+ return results
# iterables can not be sliced, so convert it to a list first
it_list = list(iterable)
@@ -1142,7 +1142,7 @@ class SQLBaseStore(object):
results.extend(rows)
- defer.returnValue(results)
+ return results
@classmethod
def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols):
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 8394389073..9fa5b4f3d6 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -111,9 +111,9 @@ class AccountDataWorkerStore(SQLBaseStore):
)
if result:
- defer.returnValue(json.loads(result))
+ return json.loads(result)
else:
- defer.returnValue(None)
+ return None
@cached(num_args=2)
def get_account_data_for_room(self, user_id, room_id):
@@ -264,11 +264,9 @@ class AccountDataWorkerStore(SQLBaseStore):
on_invalidate=cache_context.invalidate,
)
if not ignored_account_data:
- defer.returnValue(False)
+ return False
- defer.returnValue(
- ignored_user_id in ignored_account_data.get("ignored_users", {})
- )
+ return ignored_user_id in ignored_account_data.get("ignored_users", {})
class AccountDataStore(AccountDataWorkerStore):
@@ -332,7 +330,7 @@ class AccountDataStore(AccountDataWorkerStore):
)
result = self._account_data_id_gen.get_current_token()
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def add_account_data_for_user(self, user_id, account_data_type, content):
@@ -373,7 +371,7 @@ class AccountDataStore(AccountDataWorkerStore):
)
result = self._account_data_id_gen.get_current_token()
- defer.returnValue(result)
+ return result
def _update_max_stream_id(self, next_id):
"""Update the max stream_id
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index eb329ebd8b..05d9c05c3f 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -145,7 +145,7 @@ class ApplicationServiceTransactionWorkerStore(
for service in as_list:
if service.id == res["as_id"]:
services.append(service)
- defer.returnValue(services)
+ return services
@defer.inlineCallbacks
def get_appservice_state(self, service):
@@ -164,9 +164,9 @@ class ApplicationServiceTransactionWorkerStore(
desc="get_appservice_state",
)
if result:
- defer.returnValue(result.get("state"))
+ return result.get("state")
return
- defer.returnValue(None)
+ return None
def set_appservice_state(self, service, state):
"""Set the application service state.
@@ -298,15 +298,13 @@ class ApplicationServiceTransactionWorkerStore(
)
if not entry:
- defer.returnValue(None)
+ return None
event_ids = json.loads(entry["event_ids"])
events = yield self.get_events_as_list(event_ids)
- defer.returnValue(
- AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
- )
+ return AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
def _get_last_txn(self, txn, service_id):
txn.execute(
@@ -360,7 +358,7 @@ class ApplicationServiceTransactionWorkerStore(
events = yield self.get_events_as_list(event_ids)
- defer.returnValue((upper_bound, events))
+ return (upper_bound, events)
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 50f913a414..e5f0668f09 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -115,7 +115,7 @@ class BackgroundUpdateStore(SQLBaseStore):
" Unscheduling background update task."
)
self._all_done = True
- defer.returnValue(None)
+ return None
@defer.inlineCallbacks
def has_completed_background_updates(self):
@@ -127,11 +127,11 @@ class BackgroundUpdateStore(SQLBaseStore):
# if we've previously determined that there is nothing left to do, that
# is easy
if self._all_done:
- defer.returnValue(True)
+ return True
# obviously, if we have things in our queue, we're not done.
if self._background_update_queue:
- defer.returnValue(False)
+ return False
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
@@ -144,9 +144,9 @@ class BackgroundUpdateStore(SQLBaseStore):
)
if not updates:
self._all_done = True
- defer.returnValue(True)
+ return True
- defer.returnValue(False)
+ return False
@defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
@@ -173,14 +173,14 @@ class BackgroundUpdateStore(SQLBaseStore):
if not self._background_update_queue:
# no work left to do
- defer.returnValue(None)
+ return None
# pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
res = yield self._do_background_update(update_name, desired_duration_ms)
- defer.returnValue(res)
+ return res
@defer.inlineCallbacks
def _do_background_update(self, update_name, desired_duration_ms):
@@ -231,7 +231,7 @@ class BackgroundUpdateStore(SQLBaseStore):
performance.update(items_updated, duration_ms)
- defer.returnValue(len(self._background_update_performance))
+ return len(self._background_update_performance)
def register_background_update_handler(self, update_name, update_handler):
"""Register a handler for doing a background update.
@@ -266,7 +266,7 @@ class BackgroundUpdateStore(SQLBaseStore):
@defer.inlineCallbacks
def noop_update(progress, batch_size):
yield self._end_background_update(update_name)
- defer.returnValue(1)
+ return 1
self.register_background_update_handler(update_name, noop_update)
@@ -370,7 +370,7 @@ class BackgroundUpdateStore(SQLBaseStore):
logger.info("Adding index %s to %s", index_name, table)
yield self.runWithConnection(runner)
yield self._end_background_update(update_name)
- defer.returnValue(1)
+ return 1
self.register_background_update_handler(update_name, updater)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index bda68de5be..6db8c54077 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -104,7 +104,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
yield self.runWithConnection(f)
yield self._end_background_update("user_ips_drop_nonunique_index")
- defer.returnValue(1)
+ return 1
@defer.inlineCallbacks
def _analyze_user_ip(self, progress, batch_size):
@@ -121,7 +121,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
yield self._end_background_update("user_ips_analyze")
- defer.returnValue(1)
+ return 1
@defer.inlineCallbacks
def _remove_user_ip_dupes(self, progress, batch_size):
@@ -291,7 +291,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
if last:
yield self._end_background_update("user_ips_remove_dupes")
- defer.returnValue(batch_size)
+ return batch_size
@defer.inlineCallbacks
def insert_client_ip(
@@ -401,7 +401,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"device_id": did,
"last_seen": last_seen,
}
- defer.returnValue(ret)
+ return ret
@classmethod
def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
@@ -461,14 +461,12 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
for row in rows
)
- defer.returnValue(
- list(
- {
- "access_token": access_token,
- "ip": ip,
- "user_agent": user_agent,
- "last_seen": last_seen,
- }
- for (access_token, ip), (user_agent, last_seen) in iteritems(results)
- )
+ return list(
+ {
+ "access_token": access_token,
+ "ip": ip,
+ "user_agent": user_agent,
+ "last_seen": last_seen,
+ }
+ for (access_token, ip), (user_agent, last_seen) in iteritems(results)
)
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 4ea0deea4f..79bb0ea46d 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -92,7 +92,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id, last_deleted_stream_id
)
if not has_changed:
- defer.returnValue(0)
+ return 0
def delete_messages_for_device_txn(txn):
sql = (
@@ -115,7 +115,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
last_deleted_stream_id, up_to_stream_id
)
- defer.returnValue(count)
+ return count
def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit
@@ -263,7 +263,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
destination, stream_id
)
- defer.returnValue(self._device_inbox_id_gen.get_current_token())
+ return self._device_inbox_id_gen.get_current_token()
@defer.inlineCallbacks
def add_messages_from_remote_to_device_inbox(
@@ -312,7 +312,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
for user_id in local_messages_by_user_then_device.keys():
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
- defer.returnValue(stream_id)
+ return stream_id
def _add_messages_to_local_device_inbox_txn(
self, txn, stream_id, messages_by_user_then_device
@@ -426,4 +426,4 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
- defer.returnValue(1)
+ return 1
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d2b113a4e7..8f72d92895 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -71,7 +71,7 @@ class DeviceWorkerStore(SQLBaseStore):
desc="get_devices_by_user",
)
- defer.returnValue({d["device_id"]: d for d in devices})
+ return {d["device_id"]: d for d in devices}
@defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit):
@@ -88,7 +88,7 @@ class DeviceWorkerStore(SQLBaseStore):
destination, int(from_stream_id)
)
if not has_changed:
- defer.returnValue((now_stream_id, []))
+ return (now_stream_id, [])
# We retrieve n+1 devices from the list of outbound pokes where n is
# our outbound device update limit. We then check if the very last
@@ -111,7 +111,7 @@ class DeviceWorkerStore(SQLBaseStore):
# Return an empty list if there are no updates
if not updates:
- defer.returnValue((now_stream_id, []))
+ return (now_stream_id, [])
# if we have exceeded the limit, we need to exclude any results with the
# same stream_id as the last row.
@@ -147,13 +147,13 @@ class DeviceWorkerStore(SQLBaseStore):
# skip that stream_id and return an empty list, and continue with the next
# stream_id next time.
if not query_map:
- defer.returnValue((stream_id_cutoff, []))
+ return (stream_id_cutoff, [])
results = yield self._get_device_update_edus_by_remote(
destination, from_stream_id, query_map
)
- defer.returnValue((now_stream_id, results))
+ return (now_stream_id, results)
def _get_devices_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id, limit
@@ -232,7 +232,7 @@ class DeviceWorkerStore(SQLBaseStore):
results.append(result)
- defer.returnValue(results)
+ return results
def _get_last_device_update_for_remote_user(
self, destination, user_id, from_stream_id
@@ -330,7 +330,7 @@ class DeviceWorkerStore(SQLBaseStore):
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
- defer.returnValue((user_ids_not_in_cache, results))
+ return (user_ids_not_in_cache, results)
@cachedInlineCallbacks(num_args=2, tree=True)
def _get_cached_user_device(self, user_id, device_id):
@@ -340,7 +340,7 @@ class DeviceWorkerStore(SQLBaseStore):
retcol="content",
desc="_get_cached_user_device",
)
- defer.returnValue(db_to_json(content))
+ return db_to_json(content)
@cachedInlineCallbacks()
def _get_cached_devices_for_user(self, user_id):
@@ -350,9 +350,9 @@ class DeviceWorkerStore(SQLBaseStore):
retcols=("device_id", "content"),
desc="_get_cached_devices_for_user",
)
- defer.returnValue(
- {device["device_id"]: db_to_json(device["content"]) for device in devices}
- )
+ return {
+ device["device_id"]: db_to_json(device["content"]) for device in devices
+ }
def get_devices_with_keys_by_user(self, user_id):
"""Get all devices (with any device keys) for a user
@@ -482,7 +482,7 @@ class DeviceWorkerStore(SQLBaseStore):
results = {user_id: None for user_id in user_ids}
results.update({row["user_id"]: row["stream_id"] for row in rows})
- defer.returnValue(results)
+ return results
class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
@@ -543,7 +543,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"""
key = (user_id, device_id)
if self.device_id_exists_cache.get(key, None):
- defer.returnValue(False)
+ return False
try:
inserted = yield self._simple_insert(
@@ -557,7 +557,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
or_ignore=True,
)
self.device_id_exists_cache.prefill(key, True)
- defer.returnValue(inserted)
+ return inserted
except Exception as e:
logger.error(
"store_device with device_id=%s(%r) user_id=%s(%r)"
@@ -780,7 +780,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
hosts,
stream_id,
)
- defer.returnValue(stream_id)
+ return stream_id
def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
now = self._clock.time_msec()
@@ -889,4 +889,4 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
yield self.runWithConnection(f)
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
- defer.returnValue(1)
+ return 1
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 201bbd430c..e966a73f3d 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -46,7 +46,7 @@ class DirectoryWorkerStore(SQLBaseStore):
)
if not room_id:
- defer.returnValue(None)
+ return None
return
servers = yield self._simple_select_onecol(
@@ -57,10 +57,10 @@ class DirectoryWorkerStore(SQLBaseStore):
)
if not servers:
- defer.returnValue(None)
+ return None
return
- defer.returnValue(RoomAliasMapping(room_id, room_alias.to_string(), servers))
+ return RoomAliasMapping(room_id, room_alias.to_string(), servers)
def get_room_alias_creator(self, room_alias):
return self._simple_select_one_onecol(
@@ -125,7 +125,7 @@ class DirectoryStore(DirectoryWorkerStore):
raise SynapseError(
409, "Room alias %s already exists" % room_alias.to_string()
)
- defer.returnValue(ret)
+ return ret
@defer.inlineCallbacks
def delete_room_alias(self, room_alias):
@@ -133,7 +133,7 @@ class DirectoryStore(DirectoryWorkerStore):
"delete_room_alias", self._delete_room_alias_txn, room_alias
)
- defer.returnValue(room_id)
+ return room_id
def _delete_room_alias_txn(self, txn, room_alias):
txn.execute(
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index f40ef2ab64..99128f2df7 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -61,7 +61,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
row["session_data"] = json.loads(row["session_data"])
- defer.returnValue(row)
+ return row
@defer.inlineCallbacks
def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
@@ -118,7 +118,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
try:
version = int(version)
except ValueError:
- defer.returnValue({"rooms": {}})
+ return {"rooms": {}}
keyvalues = {"user_id": user_id, "version": version}
if room_id:
@@ -151,7 +151,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"session_data": json.loads(row["session_data"]),
}
- defer.returnValue(sessions)
+ return sessions
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2fabb9e2cb..1e07474e70 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -41,7 +41,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
dict containing "key_json", "device_display_name".
"""
if not query_list:
- defer.returnValue({})
+ return {}
results = yield self.runInteraction(
"get_e2e_device_keys",
@@ -55,7 +55,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for device_id, device_info in iteritems(device_keys):
device_info["keys"] = db_to_json(device_info.pop("key_json"))
- defer.returnValue(results)
+ return results
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
@@ -130,9 +130,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
desc="add_e2e_one_time_keys_check",
)
- defer.returnValue(
- {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
- )
+ return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
@defer.inlineCallbacks
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index cb4478342f..4f500d893e 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -131,9 +131,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
)
if not rows:
- defer.returnValue(0)
+ return 0
else:
- defer.returnValue(max(row["depth"] for row in rows))
+ return max(row["depth"] for row in rows)
def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
@@ -169,7 +169,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# make sure that we don't completely ignore the older events.
res = res[0:5] + random.sample(res[5:], 5)
- defer.returnValue(res)
+ return res
def get_latest_event_ids_and_hashes_in_room(self, room_id):
"""
@@ -411,7 +411,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
limit,
)
events = yield self.get_events_as_list(ids)
- defer.returnValue(events)
+ return events
def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit):
@@ -463,7 +463,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
desc="get_successor_events",
)
- defer.returnValue([row["event_id"] for row in rows])
+ return [row["event_id"] for row in rows]
class EventFederationStore(EventFederationWorkerStore):
@@ -654,4 +654,4 @@ class EventFederationStore(EventFederationWorkerStore):
if not result:
yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY)
- defer.returnValue(batch_size)
+ return batch_size
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index dcfb67e029..22025effbc 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -100,7 +100,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
user_id,
last_read_event_id,
)
- defer.returnValue(ret)
+ return ret
def _get_unread_counts_by_receipt_txn(
self, txn, room_id, user_id, last_read_event_id
@@ -178,7 +178,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
return [r[0] for r in txn]
ret = yield self.runInteraction("get_push_action_users_in_range", f)
- defer.returnValue(ret)
+ return ret
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range_for_http(
@@ -279,7 +279,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Take only up to the limit. We have to stop at the limit because
# one of the subqueries may have hit the limit.
- defer.returnValue(notifs[:limit])
+ return notifs[:limit]
@defer.inlineCallbacks
def get_unread_push_actions_for_user_in_range_for_email(
@@ -380,7 +380,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
notifs.sort(key=lambda r: -(r["received_ts"] or 0))
# Now return the first `limit`
- defer.returnValue(notifs[:limit])
+ return notifs[:limit]
def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
"""A fast check to see if there might be something to push for the
@@ -477,7 +477,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
keyvalues={"event_id": event_id},
desc="remove_push_actions_from_staging",
)
- defer.returnValue(res)
+ return res
except Exception:
# this method is called from an exception handler, so propagating
# another exception here really isn't helpful - there's nothing
@@ -732,7 +732,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
push_actions = yield self.runInteraction("get_push_actions_for_user", f)
for pa in push_actions:
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
- defer.returnValue(push_actions)
+ return push_actions
@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
@@ -749,7 +749,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
return txn.fetchone()
result = yield self.runInteraction("get_time_of_last_push_action_before", f)
- defer.returnValue(result[0] if result else None)
+ return result[0] if result else None
@defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
@@ -758,7 +758,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
return txn.fetchone()
result = yield self.runInteraction("get_latest_push_action_stream_ordering", f)
- defer.returnValue(result[0] or 0)
+ return result[0] or 0
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b70457bfc6..88c0180116 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -223,7 +223,7 @@ def _retry_on_integrity_error(func):
except self.database_engine.module.IntegrityError:
logger.exception("IntegrityError, retrying.")
res = yield func(self, *args, delete_existing=True, **kwargs)
- defer.returnValue(res)
+ return res
return f
@@ -309,7 +309,7 @@ class EventsStore(
max_persisted_id = yield self._stream_id_gen.get_current_token()
- defer.returnValue(max_persisted_id)
+ return max_persisted_id
@defer.inlineCallbacks
@log_function
@@ -334,7 +334,7 @@ class EventsStore(
yield make_deferred_yieldable(deferred)
max_persisted_id = yield self._stream_id_gen.get_current_token()
- defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+ return (event.internal_metadata.stream_ordering, max_persisted_id)
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
@@ -595,7 +595,7 @@ class EventsStore(
stale = latest_event_ids & result
stale_forward_extremities_counter.observe(len(stale))
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def _get_events_which_are_prevs(self, event_ids):
@@ -633,7 +633,7 @@ class EventsStore(
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
- defer.returnValue(results)
+ return results
@defer.inlineCallbacks
def _get_prevs_before_rejected(self, event_ids):
@@ -695,7 +695,7 @@ class EventsStore(
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
- defer.returnValue(existing_prevs)
+ return existing_prevs
@defer.inlineCallbacks
def _get_new_state_after_events(
@@ -796,7 +796,7 @@ class EventsStore(
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
- defer.returnValue((None, None))
+ return (None, None)
if len(new_state_groups) == 1 and len(old_state_groups) == 1:
# If we're going from one state group to another, lets check if
@@ -813,7 +813,7 @@ class EventsStore(
# the current state in memory then lets also return that,
# but it doesn't matter if we don't.
new_state = state_groups_map.get(new_state_group)
- defer.returnValue((new_state, delta_ids))
+ return (new_state, delta_ids)
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@@ -825,7 +825,7 @@ class EventsStore(
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- defer.returnValue((state_groups_map[new_state_groups.pop()], None))
+ return (state_groups_map[new_state_groups.pop()], None)
# Ok, we need to defer to the state handler to resolve our state sets.
@@ -854,7 +854,7 @@ class EventsStore(
state_res_store=StateResolutionStore(self),
)
- defer.returnValue((res.state, None))
+ return (res.state, None)
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -877,7 +877,7 @@ class EventsStore(
if ev_id != existing_state.get(key)
}
- defer.returnValue((to_delete, to_insert))
+ return (to_delete, to_insert)
@log_function
def _persist_events_txn(
@@ -1564,7 +1564,7 @@ class EventsStore(
return count
ret = yield self.runInteraction("count_messages", _count_messages)
- defer.returnValue(ret)
+ return ret
@defer.inlineCallbacks
def count_daily_sent_messages(self):
@@ -1585,7 +1585,7 @@ class EventsStore(
return count
ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
- defer.returnValue(ret)
+ return ret
@defer.inlineCallbacks
def count_daily_active_rooms(self):
@@ -1600,7 +1600,7 @@ class EventsStore(
return count
ret = yield self.runInteraction("count_daily_active_rooms", _count)
- defer.returnValue(ret)
+ return ret
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
@@ -2183,7 +2183,7 @@ class EventsStore(
"""
to_1, so_1 = yield self._get_event_ordering(event_id1)
to_2, so_2 = yield self._get_event_ordering(event_id2)
- defer.returnValue((to_1, so_1) > (to_2, so_2))
+ return (to_1, so_1) > (to_2, so_2)
@cachedInlineCallbacks(max_entries=5000)
def _get_event_ordering(self, event_id):
@@ -2197,9 +2197,7 @@ class EventsStore(
if not res:
raise SynapseError(404, "Could not find event %s" % (event_id,))
- defer.returnValue(
- (int(res["topological_ordering"]), int(res["stream_ordering"]))
- )
+ return (int(res["topological_ordering"]), int(res["stream_ordering"]))
def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
def get_all_updated_current_state_deltas_txn(txn):
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 1ce21d190c..6587f31e2b 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -135,7 +135,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
if not result:
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def _background_reindex_origin_server_ts(self, progress, batch_size):
@@ -212,7 +212,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
if not result:
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def _cleanup_extremities_bg_update(self, progress, batch_size):
@@ -396,4 +396,4 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
"_cleanup_extremities_bg_update_drop_table", _drop_table_txn
)
- defer.returnValue(num_handled)
+ return num_handled
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 858fc755a1..79680ee856 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -139,8 +139,11 @@ class EventsWorkerStore(SQLBaseStore):
If there is a mismatch, behave as per allow_none.
Returns:
- Deferred : A FrozenEvent.
+ Deferred[EventBase|None]
"""
+ if not isinstance(event_id, str):
+ raise TypeError("Invalid event event_id %r" % (event_id,))
+
events = yield self.get_events_as_list(
[event_id],
check_redacted=check_redacted,
@@ -157,7 +160,7 @@ class EventsWorkerStore(SQLBaseStore):
if event is None and not allow_none:
raise NotFoundError("Could not find event %s" % (event_id,))
- defer.returnValue(event)
+ return event
@defer.inlineCallbacks
def get_events(
@@ -187,7 +190,7 @@ class EventsWorkerStore(SQLBaseStore):
allow_rejected=allow_rejected,
)
- defer.returnValue({e.event_id: e for e in events})
+ return {e.event_id: e for e in events}
@defer.inlineCallbacks
def get_events_as_list(
@@ -217,7 +220,7 @@ class EventsWorkerStore(SQLBaseStore):
"""
if not event_ids:
- defer.returnValue([])
+ return []
# there may be duplicates so we cast the list to a set
event_entry_map = yield self._get_events_from_cache_or_db(
@@ -268,6 +271,14 @@ class EventsWorkerStore(SQLBaseStore):
)
continue
+ if original_event.room_id != entry.event.room_id:
+ logger.info(
+ "Withholding redaction %s of event %s from a different room",
+ event_id,
+ redacted_event_id,
+ )
+ continue
+
if entry.event.internal_metadata.need_to_check_redaction():
original_domain = get_domain_from_id(original_event.sender)
redaction_domain = get_domain_from_id(entry.event.sender)
@@ -305,7 +316,7 @@ class EventsWorkerStore(SQLBaseStore):
event.unsigned["prev_content"] = prev.content
event.unsigned["prev_sender"] = prev.sender
- defer.returnValue(events)
+ return events
@defer.inlineCallbacks
def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
@@ -452,7 +463,7 @@ class EventsWorkerStore(SQLBaseStore):
without having to create a new transaction for each request for events.
"""
if not events:
- defer.returnValue({})
+ return {}
events_d = defer.Deferred()
with self._event_fetch_lock:
@@ -496,7 +507,7 @@ class EventsWorkerStore(SQLBaseStore):
)
)
- defer.returnValue({e.event.event_id: e for e in res if e})
+ return {e.event.event_id: e for e in res if e}
def _fetch_event_rows(self, txn, event_ids):
"""Fetch event rows from the database
@@ -609,7 +620,7 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
- defer.returnValue(cache_entry)
+ return cache_entry
@defer.inlineCallbacks
def _maybe_redact_event_row(self, original_ev, redactions):
@@ -629,6 +640,10 @@ class EventsWorkerStore(SQLBaseStore):
# we choose to ignore redactions of m.room.create events.
return None
+ if original_ev.type == "m.room.redaction":
+ # ... and redaction events
+ return None
+
redaction_map = yield self._get_events_from_cache_or_db(redactions)
for redaction_id in redactions:
@@ -636,9 +651,21 @@ class EventsWorkerStore(SQLBaseStore):
if not redaction_entry:
# we don't have the redaction event, or the redaction event was not
# authorized.
+ logger.debug(
+ "%s was redacted by %s but redaction not found/authed",
+ original_ev.event_id,
+ redaction_id,
+ )
continue
redaction_event = redaction_entry.event
+ if redaction_event.room_id != original_ev.room_id:
+ logger.debug(
+ "%s was redacted by %s but redaction was in a different room!",
+ original_ev.event_id,
+ redaction_id,
+ )
+ continue
# Starting in room version v3, some redactions need to be
# rechecked if we didn't have the redacted event at the
@@ -650,8 +677,15 @@ class EventsWorkerStore(SQLBaseStore):
redaction_event.internal_metadata.recheck_redaction = False
else:
# Senders don't match, so the event isn't actually redacted
+ logger.debug(
+ "%s was redacted by %s but the senders don't match",
+ original_ev.event_id,
+ redaction_id,
+ )
continue
+ logger.debug("Redacting %s due to %s", original_ev.event_id, redaction_id)
+
# we found a good redaction event. Redact!
redacted_event = prune_event(original_ev)
redacted_event.unsigned["redacted_by"] = redaction_id
@@ -679,7 +713,7 @@ class EventsWorkerStore(SQLBaseStore):
desc="have_events_in_timeline",
)
- defer.returnValue(set(r["event_id"] for r in rows))
+ return set(r["event_id"] for r in rows)
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
@@ -705,7 +739,7 @@ class EventsWorkerStore(SQLBaseStore):
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), []):
yield self.runInteraction("have_seen_events", have_seen_events_txn, chunk)
- defer.returnValue(results)
+ return results
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
@@ -816,4 +850,4 @@ class EventsWorkerStore(SQLBaseStore):
# it.
complexity_v1 = round(state_events / 500, 2)
- defer.returnValue({"v1": complexity_v1})
+ return {"v1": complexity_v1}
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index b195dc66a0..23b48f6cea 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -15,8 +15,6 @@
from canonicaljson import encode_canonical_json
-from twisted.internet import defer
-
from synapse.api.errors import Codes, SynapseError
from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -41,7 +39,7 @@ class FilteringStore(SQLBaseStore):
desc="get_user_filter",
)
- defer.returnValue(db_to_json(def_json))
+ return db_to_json(def_json)
def add_user_filter(self, user_localpart, user_filter):
def_json = encode_canonical_json(user_filter)
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index 73e6fc6de2..15b01c6958 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -307,15 +307,13 @@ class GroupServerStore(SQLBaseStore):
desc="get_group_categories",
)
- defer.returnValue(
- {
- row["category_id"]: {
- "is_public": row["is_public"],
- "profile": json.loads(row["profile"]),
- }
- for row in rows
+ return {
+ row["category_id"]: {
+ "is_public": row["is_public"],
+ "profile": json.loads(row["profile"]),
}
- )
+ for row in rows
+ }
@defer.inlineCallbacks
def get_group_category(self, group_id, category_id):
@@ -328,7 +326,7 @@ class GroupServerStore(SQLBaseStore):
category["profile"] = json.loads(category["profile"])
- defer.returnValue(category)
+ return category
def upsert_group_category(self, group_id, category_id, profile, is_public):
"""Add/update room category for group
@@ -370,15 +368,13 @@ class GroupServerStore(SQLBaseStore):
desc="get_group_roles",
)
- defer.returnValue(
- {
- row["role_id"]: {
- "is_public": row["is_public"],
- "profile": json.loads(row["profile"]),
- }
- for row in rows
+ return {
+ row["role_id"]: {
+ "is_public": row["is_public"],
+ "profile": json.loads(row["profile"]),
}
- )
+ for row in rows
+ }
@defer.inlineCallbacks
def get_group_role(self, group_id, role_id):
@@ -391,7 +387,7 @@ class GroupServerStore(SQLBaseStore):
role["profile"] = json.loads(role["profile"])
- defer.returnValue(role)
+ return role
def upsert_group_role(self, group_id, role_id, profile, is_public):
"""Add/remove user role
@@ -960,7 +956,7 @@ class GroupServerStore(SQLBaseStore):
_register_user_group_membership_txn,
next_id,
)
- defer.returnValue(res)
+ return res
@defer.inlineCallbacks
def create_group(
@@ -1057,9 +1053,9 @@ class GroupServerStore(SQLBaseStore):
now = int(self._clock.time_msec())
if row and now < row["valid_until_ms"]:
- defer.returnValue(json.loads(row["attestation_json"]))
+ return json.loads(row["attestation_json"])
- defer.returnValue(None)
+ return None
def get_joined_groups(self, user_id):
return self._simple_select_onecol(
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 081564360f..752e9788a2 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -173,7 +173,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
)
if user_id:
count = count + 1
- defer.returnValue(count)
+ return count
@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 42ec8c6bb8..1a0f2d5768 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -90,9 +90,7 @@ class PresenceStore(SQLBaseStore):
presence_states,
)
- defer.returnValue(
- (stream_orderings[-1], self._presence_id_gen.get_current_token())
- )
+ return (stream_orderings[-1], self._presence_id_gen.get_current_token())
def _update_presence_txn(self, txn, stream_orderings, presence_states):
for stream_id, state in zip(stream_orderings, presence_states):
@@ -180,7 +178,7 @@ class PresenceStore(SQLBaseStore):
for row in rows:
row["currently_active"] = bool(row["currently_active"])
- defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
+ return {row["user_id"]: UserPresenceState(**row) for row in rows}
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 0ff392bdb4..8a5d8e9b18 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -34,15 +34,13 @@ class ProfileWorkerStore(SQLBaseStore):
except StoreError as e:
if e.code == 404:
# no match
- defer.returnValue(ProfileInfo(None, None))
+ return ProfileInfo(None, None)
return
else:
raise
- defer.returnValue(
- ProfileInfo(
- avatar_url=profile["avatar_url"], display_name=profile["displayname"]
- )
+ return ProfileInfo(
+ avatar_url=profile["avatar_url"], display_name=profile["displayname"]
)
def get_profile_displayname(self, user_localpart):
@@ -168,7 +166,7 @@ class ProfileStore(ProfileWorkerStore):
)
if res:
- defer.returnValue(True)
+ return True
res = yield self._simple_select_one_onecol(
table="group_invites",
@@ -179,4 +177,4 @@ class ProfileStore(ProfileWorkerStore):
)
if res:
- defer.returnValue(True)
+ return True
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 98cec8c82b..a6517c4cf3 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -120,7 +120,7 @@ class PushRulesWorkerStore(
rules = _load_rules(rows, enabled_map)
- defer.returnValue(rules)
+ return rules
@cachedInlineCallbacks(max_entries=5000)
def get_push_rules_enabled_for_user(self, user_id):
@@ -130,9 +130,7 @@ class PushRulesWorkerStore(
retcols=("user_name", "rule_id", "enabled"),
desc="get_push_rules_enabled_for_user",
)
- defer.returnValue(
- {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
- )
+ return {r["rule_id"]: False if r["enabled"] == 0 else True for r in results}
def have_push_rules_changed_for_user(self, user_id, last_id):
if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
@@ -160,7 +158,7 @@ class PushRulesWorkerStore(
)
def bulk_get_push_rules(self, user_ids):
if not user_ids:
- defer.returnValue({})
+ return {}
results = {user_id: [] for user_id in user_ids}
@@ -182,7 +180,7 @@ class PushRulesWorkerStore(
for user_id, rules in results.items():
results[user_id] = _load_rules(rules, enabled_map_by_user.get(user_id, {}))
- defer.returnValue(results)
+ return results
@defer.inlineCallbacks
def move_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
@@ -253,7 +251,7 @@ class PushRulesWorkerStore(
result = yield self._bulk_get_push_rules_for_room(
event.room_id, state_group, current_state_ids, event=event
)
- defer.returnValue(result)
+ return result
@cachedInlineCallbacks(num_args=2, cache_context=True)
def _bulk_get_push_rules_for_room(
@@ -312,7 +310,7 @@ class PushRulesWorkerStore(
rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
- defer.returnValue(rules_by_user)
+ return rules_by_user
@cachedList(
cached_method_name="get_push_rules_enabled_for_user",
@@ -322,7 +320,7 @@ class PushRulesWorkerStore(
)
def bulk_get_push_rules_enabled(self, user_ids):
if not user_ids:
- defer.returnValue({})
+ return {}
results = {user_id: {} for user_id in user_ids}
@@ -336,7 +334,7 @@ class PushRulesWorkerStore(
for row in rows:
enabled = bool(row["enabled"])
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
- defer.returnValue(results)
+ return results
class PushRuleStore(PushRulesWorkerStore):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cfe0a94330..be3d4d9ded 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -63,7 +63,7 @@ class PusherWorkerStore(SQLBaseStore):
ret = yield self._simple_select_one_onecol(
"pushers", {"user_name": user_id}, "id", allow_none=True
)
- defer.returnValue(ret is not None)
+ return ret is not None
def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
@@ -95,7 +95,7 @@ class PusherWorkerStore(SQLBaseStore):
],
desc="get_pushers_by",
)
- defer.returnValue(self._decode_pushers_rows(ret))
+ return self._decode_pushers_rows(ret)
@defer.inlineCallbacks
def get_all_pushers(self):
@@ -106,7 +106,7 @@ class PusherWorkerStore(SQLBaseStore):
return self._decode_pushers_rows(rows)
rows = yield self.runInteraction("get_all_pushers", get_pushers)
- defer.returnValue(rows)
+ return rows
def get_all_updated_pushers(self, last_id, current_id, limit):
if last_id == current_id:
@@ -205,7 +205,7 @@ class PusherWorkerStore(SQLBaseStore):
result = {user_id: False for user_id in user_ids}
result.update({r["user_name"]: True for r in rows})
- defer.returnValue(result)
+ return result
class PusherStore(PusherWorkerStore):
@@ -343,7 +343,7 @@ class PusherStore(PusherWorkerStore):
"throttle_ms": row["throttle_ms"],
}
- defer.returnValue(params_by_room)
+ return params_by_room
@defer.inlineCallbacks
def set_throttle_params(self, pusher_id, room_id, params):
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index b477da12b1..6aa6d98ebb 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_users_with_read_receipts_in_room(self, room_id):
receipts = yield self.get_receipts_for_room(room_id, "m.read")
- defer.returnValue(set(r["user_id"] for r in receipts))
+ return set(r["user_id"] for r in receipts)
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
@@ -92,7 +92,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
desc="get_receipts_for_user",
)
- defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
+ return {row["room_id"]: row["event_id"] for row in rows}
@defer.inlineCallbacks
def get_receipts_for_user_with_orderings(self, user_id, receipt_type):
@@ -110,16 +110,14 @@ class ReceiptsWorkerStore(SQLBaseStore):
return txn.fetchall()
rows = yield self.runInteraction("get_receipts_for_user_with_orderings", f)
- defer.returnValue(
- {
- row[0]: {
- "event_id": row[1],
- "topological_ordering": row[2],
- "stream_ordering": row[3],
- }
- for row in rows
+ return {
+ row[0]: {
+ "event_id": row[1],
+ "topological_ordering": row[2],
+ "stream_ordering": row[3],
}
- )
+ for row in rows
+ }
@defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
@@ -147,7 +145,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
room_ids, to_key, from_key=from_key
)
- defer.returnValue([ev for res in results.values() for ev in res])
+ return [ev for res in results.values() for ev in res]
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
@@ -197,7 +195,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
rows = yield self.runInteraction("get_linearized_receipts_for_room", f)
if not rows:
- defer.returnValue([])
+ return []
content = {}
for row in rows:
@@ -205,9 +203,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
row["user_id"]
] = json.loads(row["data"])
- defer.returnValue(
- [{"type": "m.receipt", "room_id": room_id, "content": content}]
- )
+ return [{"type": "m.receipt", "room_id": room_id, "content": content}]
@cachedList(
cached_method_name="_get_linearized_receipts_for_room",
@@ -217,7 +213,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
- defer.returnValue({})
+ return {}
def f(txn):
if from_key:
@@ -264,7 +260,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
room_id: [results[room_id]] if room_id in results else []
for room_id in room_ids
}
- defer.returnValue(results)
+ return results
def get_all_updated_receipts(self, last_id, current_id, limit=None):
if last_id == current_id:
@@ -468,7 +464,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
)
if event_ts is None:
- defer.returnValue(None)
+ return None
now = self._clock.time_msec()
logger.debug(
@@ -482,7 +478,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
max_persisted_id = self._receipts_id_gen.get_current_token()
- defer.returnValue((stream_id, max_persisted_id))
+ return (stream_id, max_persisted_id)
def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, data):
return self.runInteraction(
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 8b2c2a97ab..999c10a308 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -75,12 +75,12 @@ class RegistrationWorkerStore(SQLBaseStore):
info = yield self.get_user_by_id(user_id)
if not info:
- defer.returnValue(False)
+ return False
now = self.clock.time_msec()
trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000
is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
- defer.returnValue(is_trial)
+ return is_trial
@cached()
def get_user_by_access_token(self, token):
@@ -115,7 +115,7 @@ class RegistrationWorkerStore(SQLBaseStore):
allow_none=True,
desc="get_expiration_ts_for_user",
)
- defer.returnValue(res)
+ return res
@defer.inlineCallbacks
def set_account_validity_for_user(
@@ -190,7 +190,7 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="get_user_from_renewal_token",
)
- defer.returnValue(res)
+ return res
@defer.inlineCallbacks
def get_renewal_token_for_user(self, user_id):
@@ -209,7 +209,7 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="get_renewal_token_for_user",
)
- defer.returnValue(res)
+ return res
@defer.inlineCallbacks
def get_users_expiring_soon(self):
@@ -237,7 +237,7 @@ class RegistrationWorkerStore(SQLBaseStore):
self.config.account_validity.renew_at,
)
- defer.returnValue(res)
+ return res
@defer.inlineCallbacks
def set_renewal_mail_status(self, user_id, email_sent):
@@ -280,7 +280,7 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="is_server_admin",
)
- defer.returnValue(res if res else False)
+ return res if res else False
def _query_for_auth(self, txn, token):
sql = (
@@ -311,7 +311,7 @@ class RegistrationWorkerStore(SQLBaseStore):
res = yield self.runInteraction(
"is_support_user", self.is_support_user_txn, user_id
)
- defer.returnValue(res)
+ return res
def is_support_user_txn(self, txn, user_id):
res = self._simple_select_one_onecol_txn(
@@ -349,7 +349,7 @@ class RegistrationWorkerStore(SQLBaseStore):
return 0
ret = yield self.runInteraction("count_users", _count_users)
- defer.returnValue(ret)
+ return ret
def count_daily_user_type(self):
"""
@@ -395,7 +395,7 @@ class RegistrationWorkerStore(SQLBaseStore):
return count
ret = yield self.runInteraction("count_users", _count_users)
- defer.returnValue(ret)
+ return ret
@defer.inlineCallbacks
def find_next_generated_user_id_localpart(self):
@@ -425,7 +425,7 @@ class RegistrationWorkerStore(SQLBaseStore):
if i not in found:
return i
- defer.returnValue(
+ return (
(
yield self.runInteraction(
"find_next_generated_user_id", _find_next_generated_user_id
@@ -447,7 +447,7 @@ class RegistrationWorkerStore(SQLBaseStore):
user_id = yield self.runInteraction(
"get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address
)
- defer.returnValue(user_id)
+ return user_id
def get_user_id_by_threepid_txn(self, txn, medium, address):
"""Returns user id from threepid
@@ -487,7 +487,7 @@ class RegistrationWorkerStore(SQLBaseStore):
["medium", "address", "validated_at", "added_at"],
"user_get_threepids",
)
- defer.returnValue(ret)
+ return ret
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
@@ -677,7 +677,7 @@ class RegistrationStore(
if end:
yield self._end_background_update("users_set_deactivated_flag")
- defer.returnValue(batch_size)
+ return batch_size
@defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms):
@@ -957,7 +957,7 @@ class RegistrationStore(
desc="is_guest",
)
- defer.returnValue(res if res else False)
+ return res if res else False
def add_user_pending_deactivation(self, user_id):
"""
@@ -1024,7 +1024,7 @@ class RegistrationStore(
yield self._end_background_update("user_threepids_grandfather")
- defer.returnValue(1)
+ return 1
def get_threepid_validation_session(
self, medium, client_secret, address=None, sid=None, validated=True
@@ -1337,4 +1337,4 @@ class RegistrationStore(
)
# Convert the integer into a boolean.
- defer.returnValue(res == 1)
+ return res == 1
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 9954bc094f..fcb5f2f23a 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -17,8 +17,6 @@ import logging
import attr
-from twisted.internet import defer
-
from synapse.api.constants import RelationTypes
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
@@ -363,7 +361,7 @@ class RelationsWorkerStore(SQLBaseStore):
return
edit_event = yield self.get_event(edit_id, allow_none=True)
- defer.returnValue(edit_event)
+ return edit_event
def has_user_annotated_event(self, parent_id, event_type, aggregation_key, sender):
"""Check if a user has already annotated an event with the same key
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index fe9d79d792..bc606292b8 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -193,14 +193,12 @@ class RoomWorkerStore(SQLBaseStore):
)
if row:
- defer.returnValue(
- RatelimitOverride(
- messages_per_second=row["messages_per_second"],
- burst_count=row["burst_count"],
- )
+ return RatelimitOverride(
+ messages_per_second=row["messages_per_second"],
+ burst_count=row["burst_count"],
)
else:
- defer.returnValue(None)
+ return None
class RoomStore(RoomWorkerStore, SearchStore):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d0fe3a7f78..e60409ed73 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -108,7 +108,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
room_id, on_invalidate=cache_context.invalidate
)
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
- defer.returnValue(hosts)
+ return hosts
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
@@ -267,8 +267,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
invites = yield self.get_invited_rooms_for_user(user_id)
for invite in invites:
if invite.room_id == room_id:
- defer.returnValue(invite)
- defer.returnValue(None)
+ return invite
+ return None
@defer.inlineCallbacks
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
@@ -375,11 +375,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rooms = yield self.get_rooms_for_user_where_membership_is(
user_id, membership_list=[Membership.JOIN]
)
- defer.returnValue(
- frozenset(
- GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
- for r in rooms
- )
+ return frozenset(
+ GetRoomsForUserWithStreamOrdering(r.room_id, r.stream_ordering)
+ for r in rooms
)
@defer.inlineCallbacks
@@ -389,7 +387,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rooms = yield self.get_rooms_for_user_with_stream_ordering(
user_id, on_invalidate=on_invalidate
)
- defer.returnValue(frozenset(r.room_id for r in rooms))
+ return frozenset(r.room_id for r in rooms)
@cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
def get_users_who_share_room_with_user(self, user_id, cache_context):
@@ -406,7 +404,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
user_who_share_room.update(user_ids)
- defer.returnValue(user_who_share_room)
+ return user_who_share_room
@defer.inlineCallbacks
def get_joined_users_from_context(self, event, context):
@@ -422,7 +420,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
result = yield self._get_joined_users_from_context(
event.room_id, state_group, current_state_ids, event=event, context=context
)
- defer.returnValue(result)
+ return result
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
@@ -536,7 +534,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
avatar_url=to_ascii(event.content.get("avatar_url", None)),
)
- defer.returnValue(users_in_room)
+ return users_in_room
@cachedInlineCallbacks(max_entries=10000)
def is_host_joined(self, room_id, host):
@@ -561,14 +559,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause)
if not rows:
- defer.returnValue(False)
+ return False
user_id = rows[0][0]
if get_domain_from_id(user_id) != host:
# This can only happen if the host name has something funky in it
raise Exception("Invalid host name")
- defer.returnValue(True)
+ return True
@cachedInlineCallbacks()
def was_host_joined(self, room_id, host):
@@ -601,14 +599,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause)
if not rows:
- defer.returnValue(False)
+ return False
user_id = rows[0][0]
if get_domain_from_id(user_id) != host:
# This can only happen if the host name has something funky in it
raise Exception("Invalid host name")
- defer.returnValue(True)
+ return True
def get_joined_hosts(self, room_id, state_entry):
state_group = state_entry.state_group
@@ -635,7 +633,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
cache = self._get_joined_hosts_cache(room_id)
joined_hosts = yield cache.get_destinations(state_entry)
- defer.returnValue(joined_hosts)
+ return joined_hosts
@cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id):
@@ -665,7 +663,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return rows[0][0]
count = yield self.runInteraction("did_forget_membership", f)
- defer.returnValue(count == 0)
+ return count == 0
@cached()
def get_forgotten_rooms_for_user(self, user_id):
@@ -684,6 +682,11 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# to see if any have subsequently been updated. This is done so that
# we can use a partial index on `forgotten = 1` on the assumption
# that few users will actually forget many rooms.
+ #
+ # Note that a room is considered "forgotten" if *all* membership
+ # events for that user and room have the forgotten field set (as
+ # when a user forgets a room we update all rows for that user and
+ # room, not just the current one).
sql = """
SELECT room_id, (
SELECT count(*) FROM room_memberships
@@ -918,7 +921,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
if not result:
yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def _background_current_state_membership(self, progress, batch_size):
@@ -943,10 +946,10 @@ class RoomMemberStore(RoomMemberWorkerStore):
next_room, = row
sql = """
- UPDATE current_state_events AS c
+ UPDATE current_state_events
SET membership = (
SELECT membership FROM room_memberships
- WHERE event_id = c.event_id
+ WHERE event_id = current_state_events.event_id
)
WHERE room_id = ?
"""
@@ -976,7 +979,7 @@ class RoomMemberStore(RoomMemberWorkerStore):
if finished:
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
- defer.returnValue(row_count)
+ return row_count
class _JoinedHostsCache(object):
@@ -1004,7 +1007,7 @@ class _JoinedHostsCache(object):
state_entry(synapse.state._StateCacheEntry)
"""
if state_entry.state_group == self.state_group:
- defer.returnValue(frozenset(self.hosts_to_joined_users))
+ return frozenset(self.hosts_to_joined_users)
with (yield self.linearizer.queue(())):
if state_entry.state_group == self.state_group:
@@ -1041,7 +1044,7 @@ class _JoinedHostsCache(object):
else:
self.state_group = object()
self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
- defer.returnValue(frozenset(self.hosts_to_joined_users))
+ return frozenset(self.hosts_to_joined_users)
def __len__(self):
return self._len
diff --git a/synapse/storage/schema/delta/56/room_membership_idx.sql b/synapse/storage/schema/delta/56/room_membership_idx.sql
index fc0b498843..92ab1f5e65 100644
--- a/synapse/storage/schema/delta/56/room_membership_idx.sql
+++ b/synapse/storage/schema/delta/56/room_membership_idx.sql
@@ -13,13 +13,6 @@
* limitations under the License.
*/
--- We add membership to current state so that we don't need to join against
--- room_memberships, which can be surprisingly costly (we do such queries
--- very frequently).
--- This will be null for non-membership events and the content.membership key
--- for membership events. (Will also be null for membership events until the
--- background update job has finished).
-
-- Adds an index on room_memberships for fetching all forgotten rooms for a user
INSERT INTO background_updates (update_name, progress_json) VALUES
('room_membership_forgotten_idx', '{}');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index f3b1cec933..df87ab6a6d 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -166,7 +166,7 @@ class SearchStore(BackgroundUpdateStore):
if not result:
yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def _background_reindex_gin_search(self, progress, batch_size):
@@ -209,7 +209,7 @@ class SearchStore(BackgroundUpdateStore):
yield self.runWithConnection(create_index)
yield self._end_background_update(self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME)
- defer.returnValue(1)
+ return 1
@defer.inlineCallbacks
def _background_reindex_search_order(self, progress, batch_size):
@@ -287,7 +287,7 @@ class SearchStore(BackgroundUpdateStore):
if not finished:
yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME)
- defer.returnValue(num_rows)
+ return num_rows
def store_event_search_txn(self, txn, event, key, value):
"""Add event to the search table
@@ -454,17 +454,15 @@ class SearchStore(BackgroundUpdateStore):
count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
- defer.returnValue(
- {
- "results": [
- {"event": event_map[r["event_id"]], "rank": r["rank"]}
- for r in results
- if r["event_id"] in event_map
- ],
- "highlights": highlights,
- "count": count,
- }
- )
+ return {
+ "results": [
+ {"event": event_map[r["event_id"]], "rank": r["rank"]}
+ for r in results
+ if r["event_id"] in event_map
+ ],
+ "highlights": highlights,
+ "count": count,
+ }
@defer.inlineCallbacks
def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
@@ -599,22 +597,20 @@ class SearchStore(BackgroundUpdateStore):
count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
- defer.returnValue(
- {
- "results": [
- {
- "event": event_map[r["event_id"]],
- "rank": r["rank"],
- "pagination_token": "%s,%s"
- % (r["origin_server_ts"], r["stream_ordering"]),
- }
- for r in results
- if r["event_id"] in event_map
- ],
- "highlights": highlights,
- "count": count,
- }
- )
+ return {
+ "results": [
+ {
+ "event": event_map[r["event_id"]],
+ "rank": r["rank"],
+ "pagination_token": "%s,%s"
+ % (r["origin_server_ts"], r["stream_ordering"]),
+ }
+ for r in results
+ if r["event_id"] in event_map
+ ],
+ "highlights": highlights,
+ "count": count,
+ }
def _find_highlights_in_postgres(self, search_query, events):
"""Given a list of events and a search term, return a list of words
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 6bd81e84ad..fb83218f90 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -59,7 +59,7 @@ class SignatureWorkerStore(SQLBaseStore):
for e_id, h in hashes.items()
}
- defer.returnValue(list(hashes.items()))
+ return list(hashes.items())
def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU.
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a35289876d..1980a87108 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -422,7 +422,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# Retrieve the room's create event
create_event = yield self.get_create_event_for_room(room_id)
- defer.returnValue(create_event.content.get("room_version", "1"))
+ return create_event.content.get("room_version", "1")
@defer.inlineCallbacks
def get_room_predecessor(self, room_id):
@@ -442,7 +442,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
create_event = yield self.get_create_event_for_room(room_id)
# Return predecessor if present
- defer.returnValue(create_event.content.get("predecessor", None))
+ return create_event.content.get("predecessor", None)
@defer.inlineCallbacks
def get_create_event_for_room(self, room_id):
@@ -466,7 +466,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# Retrieve the room's create event and return
create_event = yield self.get_event(create_id)
- defer.returnValue(create_event)
+ return create_event
@cached(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
@@ -563,7 +563,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
if not event:
return
- defer.returnValue(event.content.get("canonical_alias"))
+ return event.content.get("canonical_alias")
@cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
@@ -613,14 +613,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if not event_ids:
- defer.returnValue({})
+ return {}
event_to_groups = yield self._get_state_group_for_events(event_ids)
groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups)
- defer.returnValue(group_to_state)
+ return group_to_state
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
@@ -634,7 +634,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
group_to_state = yield self._get_state_for_groups((state_group,))
- defer.returnValue(group_to_state[state_group])
+ return group_to_state[state_group]
@defer.inlineCallbacks
def get_state_groups(self, room_id, event_ids):
@@ -645,7 +645,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
dict of state_group_id -> list of state events.
"""
if not event_ids:
- defer.returnValue({})
+ return {}
group_to_ids = yield self.get_state_groups_ids(room_id, event_ids)
@@ -658,16 +658,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
get_prev_content=False,
)
- defer.returnValue(
- {
- group: [
- state_event_map[v]
- for v in itervalues(event_id_map)
- if v in state_event_map
- ]
- for group, event_id_map in iteritems(group_to_ids)
- }
- )
+ return {
+ group: [
+ state_event_map[v]
+ for v in itervalues(event_id_map)
+ if v in state_event_map
+ ]
+ for group, event_id_map in iteritems(group_to_ids)
+ }
@defer.inlineCallbacks
def _get_state_groups_from_groups(self, groups, state_filter):
@@ -694,7 +692,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
results.update(res)
- defer.returnValue(results)
+ return results
def _get_state_groups_from_groups_txn(
self, txn, groups, state_filter=StateFilter.all()
@@ -829,7 +827,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
for event_id, group in iteritems(event_to_groups)
}
- defer.returnValue({event: event_to_state[event] for event in event_ids})
+ return {event: event_to_state[event] for event in event_ids}
@defer.inlineCallbacks
def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()):
@@ -855,7 +853,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
for event_id, group in iteritems(event_to_groups)
}
- defer.returnValue({event: event_to_state[event] for event in event_ids})
+ return {event: event_to_state[event] for event in event_ids}
@defer.inlineCallbacks
def get_state_for_event(self, event_id, state_filter=StateFilter.all()):
@@ -871,7 +869,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
A deferred dict from (type, state_key) -> state_event
"""
state_map = yield self.get_state_for_events([event_id], state_filter)
- defer.returnValue(state_map[event_id])
+ return state_map[event_id]
@defer.inlineCallbacks
def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()):
@@ -887,7 +885,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
A deferred dict from (type, state_key) -> state_event
"""
state_map = yield self.get_state_ids_for_events([event_id], state_filter)
- defer.returnValue(state_map[event_id])
+ return state_map[event_id]
@cached(max_entries=50000)
def _get_state_group_for_event(self, event_id):
@@ -917,7 +915,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
desc="_get_state_group_for_events",
)
- defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
+ return {row["event_id"]: row["state_group"] for row in rows}
def _get_state_for_group_using_cache(self, cache, group, state_filter):
"""Checks if group is in cache. See `_get_state_for_groups`
@@ -997,7 +995,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
incomplete_groups = incomplete_groups_m | incomplete_groups_nm
if not incomplete_groups:
- defer.returnValue(state)
+ return state
cache_sequence_nm = self._state_group_cache.sequence
cache_sequence_m = self._state_group_members_cache.sequence
@@ -1024,7 +1022,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# everything we need from the database anyway.
state[group] = state_filter.filter_state(group_state_dict)
- defer.returnValue(state)
+ return state
def _get_state_for_groups_using_cache(self, groups, cache, state_filter):
"""Gets the state at each of a list of state groups, optionally
@@ -1498,7 +1496,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME
)
- defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR)
+ return result * BATCH_SIZE_SCALE_FACTOR
@defer.inlineCallbacks
def _background_index_state(self, progress, batch_size):
@@ -1528,4 +1526,4 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
- defer.returnValue(1)
+ return 1
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 1cec84ee2e..e13efed417 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -66,7 +66,7 @@ class StatsStore(StateDeltasStore):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_createtables")
- defer.returnValue(1)
+ return 1
# Get all the rooms that we want to process.
def _make_staging_area(txn):
@@ -120,7 +120,7 @@ class StatsStore(StateDeltasStore):
self.get_earliest_token_for_room_stats.invalidate_all()
yield self._end_background_update("populate_stats_createtables")
- defer.returnValue(1)
+ return 1
@defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size):
@@ -129,7 +129,7 @@ class StatsStore(StateDeltasStore):
"""
if not self.stats_enabled:
yield self._end_background_update("populate_stats_cleanup")
- defer.returnValue(1)
+ return 1
position = yield self._simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
@@ -143,14 +143,14 @@ class StatsStore(StateDeltasStore):
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
yield self._end_background_update("populate_stats_cleanup")
- defer.returnValue(1)
+ return 1
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
- defer.returnValue(1)
+ return 1
# If we don't have progress filed, delete everything.
if not progress:
@@ -186,7 +186,7 @@ class StatsStore(StateDeltasStore):
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
yield self._end_background_update("populate_stats_process_rooms")
- defer.returnValue(1)
+ return 1
logger.info(
"Processing the next %d rooms of %d remaining",
@@ -211,16 +211,18 @@ class StatsStore(StateDeltasStore):
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
+ event_ids = [
+ join_rules_id,
+ history_visibility_id,
+ encryption_id,
+ name_id,
+ topic_id,
+ avatar_id,
+ canonical_alias_id,
+ ]
+
state_events = yield self.get_events(
- [
- join_rules_id,
- history_visibility_id,
- encryption_id,
- name_id,
- topic_id,
- avatar_id,
- canonical_alias_id,
- ]
+ [ev for ev in event_ids if ev is not None]
)
def _get_or_none(event_id, arg):
@@ -303,9 +305,9 @@ class StatsStore(StateDeltasStore):
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
- defer.returnValue(processed_event_count)
+ return processed_event_count
- defer.returnValue(processed_event_count)
+ return processed_event_count
def delete_all_stats(self):
"""
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a0465484df..856c2ee8d8 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -300,7 +300,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
if not room_ids:
- defer.returnValue({})
+ return {}
results = {}
room_ids = list(room_ids)
@@ -323,7 +323,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
results.update(dict(zip(rm_ids, res)))
- defer.returnValue(results)
+ return results
def get_rooms_that_changed(self, room_ids, from_key):
"""Given a list of rooms and a token, return rooms where there may have
@@ -364,7 +364,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
the chunk of events returned.
"""
if from_key == to_key:
- defer.returnValue(([], from_key))
+ return ([], from_key)
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
@@ -374,7 +374,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
if not has_changed:
- defer.returnValue(([], from_key))
+ return ([], from_key)
def f(txn):
sql = (
@@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# get.
key = from_key
- defer.returnValue((ret, key))
+ return (ret, key)
@defer.inlineCallbacks
def get_membership_changes_for_user(self, user_id, from_key, to_key):
@@ -415,14 +415,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
to_id = RoomStreamToken.parse_stream_token(to_key).stream
if from_key == to_key:
- defer.returnValue([])
+ return []
if from_id:
has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_id)
)
if not has_changed:
- defer.returnValue([])
+ return []
def f(txn):
sql = (
@@ -447,7 +447,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._set_before_and_after(ret, rows, topo_order=False)
- defer.returnValue(ret)
+ return ret
@defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token):
@@ -477,7 +477,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._set_before_and_after(events, rows)
- defer.returnValue((events, token))
+ return (events, token)
@defer.inlineCallbacks
def get_recent_event_ids_for_room(self, room_id, limit, end_token):
@@ -496,7 +496,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
# Allow a zero limit here, and no-op.
if limit == 0:
- defer.returnValue(([], end_token))
+ return ([], end_token)
end_token = RoomStreamToken.parse(end_token)
@@ -511,7 +511,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# We want to return the results in ascending order.
rows.reverse()
- defer.returnValue((rows, token))
+ return (rows, token)
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
@@ -549,12 +549,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
token = yield self.get_room_max_stream_ordering()
if room_id is None:
- defer.returnValue("s%d" % (token,))
+ return "s%d" % (token,)
else:
topo = yield self.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
- defer.returnValue("t%d-%d" % (topo, token))
+ return "t%d-%d" % (topo, token)
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
@@ -674,14 +674,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[e for e in results["after"]["event_ids"]], get_prev_content=True
)
- defer.returnValue(
- {
- "events_before": events_before,
- "events_after": events_after,
- "start": results["before"]["token"],
- "end": results["after"]["token"],
- }
- )
+ return {
+ "events_before": events_before,
+ "events_after": events_after,
+ "start": results["before"]["token"],
+ "end": results["after"]["token"],
+ }
def _get_events_around_txn(
self, txn, room_id, event_id, before_limit, after_limit, event_filter
@@ -785,7 +783,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
events = yield self.get_events_as_list(event_ids)
- defer.returnValue((upper_bound, events))
+ return (upper_bound, events)
def get_federation_out_pos(self, typ):
return self._simple_select_one_onecol(
@@ -939,7 +937,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._set_before_and_after(events, rows)
- defer.returnValue((events, token))
+ return (events, token)
class StreamStore(StreamWorkerStore):
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index e88f8ea35f..20dd6bd53d 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -66,7 +66,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
room_id string, tag string and content string.
"""
if last_id == current_id:
- defer.returnValue([])
+ return []
def get_all_updated_tags_txn(txn):
sql = (
@@ -107,7 +107,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
)
results.extend(tags)
- defer.returnValue(results)
+ return results
@defer.inlineCallbacks
def get_updated_tags(self, user_id, stream_id):
@@ -135,7 +135,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
user_id, int(stream_id)
)
if not changed:
- defer.returnValue({})
+ return {}
room_ids = yield self.runInteraction("get_updated_tags", get_updated_tags_txn)
@@ -145,7 +145,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
for room_id in room_ids:
results[room_id] = tags_by_room.get(room_id, {})
- defer.returnValue(results)
+ return results
def get_tags_for_room(self, user_id, room_id):
"""Get all the tags for the given room
@@ -194,7 +194,7 @@ class TagsStore(TagsWorkerStore):
self.get_tags_for_user.invalidate((user_id,))
result = self._account_data_id_gen.get_current_token()
- defer.returnValue(result)
+ return result
@defer.inlineCallbacks
def remove_tag_from_room(self, user_id, room_id, tag):
@@ -217,7 +217,7 @@ class TagsStore(TagsWorkerStore):
self.get_tags_for_user.invalidate((user_id,))
result = self._account_data_id_gen.get_current_token()
- defer.returnValue(result)
+ return result
def _update_revision_txn(self, txn, user_id, room_id, next_id):
"""Update the latest revision of the tags for the given user and room.
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index c585cf6cf7..b3c3bf55bc 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -147,7 +147,7 @@ class TransactionStore(SQLBaseStore):
result = self._destination_retry_cache.get(destination, SENTINEL)
if result is not SENTINEL:
- defer.returnValue(result)
+ return result
result = yield self.runInteraction(
"get_destination_retry_timings",
@@ -158,7 +158,7 @@ class TransactionStore(SQLBaseStore):
# We don't hugely care about race conditions between getting and
# invalidating the cache, since we time out fairly quickly anyway.
self._destination_retry_cache[destination] = result
- defer.returnValue(result)
+ return result
def _get_destination_retry_timings(self, txn, destination):
result = self._simple_select_one_txn(
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 7fd16fe65e..b5188d9bee 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -109,7 +109,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
yield self._end_background_update("populate_user_directory_createtables")
- defer.returnValue(1)
+ return 1
@defer.inlineCallbacks
def _populate_user_directory_cleanup(self, progress, batch_size):
@@ -131,7 +131,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
)
yield self._end_background_update("populate_user_directory_cleanup")
- defer.returnValue(1)
+ return 1
@defer.inlineCallbacks
def _populate_user_directory_process_rooms(self, progress, batch_size):
@@ -177,7 +177,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
yield self._end_background_update("populate_user_directory_process_rooms")
- defer.returnValue(1)
+ return 1
logger.info(
"Processing the next %d rooms of %d remaining"
@@ -257,9 +257,9 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
- defer.returnValue(processed_event_count)
+ return processed_event_count
- defer.returnValue(processed_event_count)
+ return processed_event_count
@defer.inlineCallbacks
def _populate_user_directory_process_users(self, progress, batch_size):
@@ -268,7 +268,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
"""
if not self.hs.config.user_directory_search_all_users:
yield self._end_background_update("populate_user_directory_process_users")
- defer.returnValue(1)
+ return 1
def _get_next_batch(txn):
sql = "SELECT user_id FROM %s LIMIT %s" % (
@@ -298,7 +298,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
# No more users -- complete the transaction.
if not users_to_work_on:
yield self._end_background_update("populate_user_directory_process_users")
- defer.returnValue(1)
+ return 1
logger.info(
"Processing the next %d users of %d remaining"
@@ -322,7 +322,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
progress,
)
- defer.returnValue(len(users_to_work_on))
+ return len(users_to_work_on)
@defer.inlineCallbacks
def is_room_world_readable_or_publicly_joinable(self, room_id):
@@ -344,16 +344,16 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
join_rule_ev = yield self.get_event(join_rules_id, allow_none=True)
if join_rule_ev:
if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
- defer.returnValue(True)
+ return True
hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
if hist_vis_id:
hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True)
if hist_vis_ev:
if hist_vis_ev.content.get("history_visibility") == "world_readable":
- defer.returnValue(True)
+ return True
- defer.returnValue(False)
+ return False
def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
"""
@@ -499,7 +499,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
user_ids = set(user_ids_share_pub)
user_ids.update(user_ids_share_priv)
- defer.returnValue(user_ids)
+ return user_ids
def add_users_who_share_private_room(self, room_id, user_id_tuples):
"""Insert entries into the users_who_share_private_rooms table. The first
@@ -609,7 +609,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
users = set(pub_rows)
users.update(rows)
- defer.returnValue(list(users))
+ return list(users)
@defer.inlineCallbacks
def get_rooms_in_common_for_users(self, user_id, other_user_id):
@@ -635,7 +635,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
"get_rooms_in_common_for_users", None, sql, user_id, other_user_id
)
- defer.returnValue([room_id for room_id, in rows])
+ return [room_id for room_id, in rows]
def delete_all_from_user_dir(self):
"""Delete the entire user directory
@@ -782,7 +782,7 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
limited = len(results) > limit
- defer.returnValue({"limited": limited, "results": results})
+ return {"limited": limited, "results": results}
def _parse_query_sqlite(search_term):
diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py
index 1815fdc0dd..05cabc2282 100644
--- a/synapse/storage/user_erasure_store.py
+++ b/synapse/storage/user_erasure_store.py
@@ -12,9 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import operator
-from twisted.internet import defer
+import operator
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList
@@ -67,7 +66,7 @@ class UserErasureWorkerStore(SQLBaseStore):
erased_users = yield self.runInteraction("are_users_erased", _get_erased_users)
res = dict((u, u in erased_users) for u in user_ids)
- defer.returnValue(res)
+ return res
class UserErasureStore(UserErasureWorkerStore):
|