diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 4ca13011e5..c644b4dfc5 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -191,7 +191,7 @@ def _context_info_cb(ssl_connection, where, ret):
# ... we further assume that SSLClientConnectionCreator has set the
# '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
tls_protocol._synapse_tls_verifier.verify_context_info_cb(ssl_connection, where)
- except: # noqa: E722, taken from the twisted implementation
+ except BaseException: # taken from the twisted implementation
logger.exception("Error during info_callback")
f = Failure()
tls_protocol.failVerification(f)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index af85fe0a1e..89df9a619b 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -480,6 +480,8 @@ class PerDestinationQueue:
# the other sending servers are up).
if new_pdus:
room_catchup_pdus = new_pdus
+ else:
+ room_catchup_pdus = [pdu]
logger.info(
"Catching up rooms to %s: %r", self._destination, pdu.room_id
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index 1a7ea4fa96..03cf3c2b8e 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -689,7 +689,7 @@ def run_in_background(f, *args, **kwargs) -> defer.Deferred:
current = current_context()
try:
res = f(*args, **kwargs)
- except: # noqa: E722
+ except Exception:
# the assumption here is that the caller doesn't want to be disturbed
# by synchronous exceptions, so let's turn them into Failures.
return defer.fail()
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index f1ba529a2d..94590e7b45 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -670,7 +670,7 @@ class DatabasePool:
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
- except: # noqa: E722, as we reraise the exception this is fine.
+ except Exception:
for after_callback, after_args, after_kwargs in exception_callbacks:
after_callback(*after_args, **after_kwargs)
raise
@@ -1906,6 +1906,7 @@ class DatabasePool:
retcols: Iterable[str],
filters: Optional[Dict[str, Any]] = None,
keyvalues: Optional[Dict[str, Any]] = None,
+ exclude_keyvalues: Optional[Dict[str, Any]] = None,
order_direction: str = "ASC",
) -> List[Dict[str, Any]]:
"""
@@ -1929,7 +1930,10 @@ class DatabasePool:
apply a WHERE ? LIKE ? clause.
keyvalues:
column names and values to select the rows with, or None to not
- apply a WHERE clause.
+ apply a WHERE key = value clause.
+ exclude_keyvalues:
+ column names and values to exclude rows with, or None to not
+ apply a WHERE key != value clause.
order_direction: Whether the results should be ordered "ASC" or "DESC".
Returns:
@@ -1938,7 +1942,7 @@ class DatabasePool:
if order_direction not in ["ASC", "DESC"]:
raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
- where_clause = "WHERE " if filters or keyvalues else ""
+ where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
arg_list = [] # type: List[Any]
if filters:
where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
@@ -1947,6 +1951,9 @@ class DatabasePool:
if keyvalues:
where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
arg_list += list(keyvalues.values())
+ if exclude_keyvalues:
+ where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
+ arg_list += list(exclude_keyvalues.values())
sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
", ".join(retcols),
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 29edab34d4..0ff693a310 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import List, Tuple
+from typing import Dict, List, Tuple
from synapse.api.presence import UserPresenceState
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -157,5 +157,63 @@ class PresenceStore(SQLBaseStore):
return {row["user_id"]: UserPresenceState(**row) for row in rows}
+ async def get_presence_for_all_users(
+ self,
+ include_offline: bool = True,
+ ) -> Dict[str, UserPresenceState]:
+ """Retrieve the current presence state for all users.
+
+ Note that the presence_stream table is culled frequently, so it should only
+ contain the latest presence state for each user.
+
+ Args:
+ include_offline: Whether to include offline presence states
+
+ Returns:
+ A dict of user IDs to their current UserPresenceState.
+ """
+ users_to_state = {}
+
+ exclude_keyvalues = None
+ if not include_offline:
+ # Exclude offline presence state
+ exclude_keyvalues = {"state": "offline"}
+
+ # This may be a very heavy database query.
+ # We paginate in order to not block a database connection.
+ limit = 100
+ offset = 0
+ while True:
+ rows = await self.db_pool.runInteraction(
+ "get_presence_for_all_users",
+ self.db_pool.simple_select_list_paginate_txn,
+ "presence_stream",
+ orderby="stream_id",
+ start=offset,
+ limit=limit,
+ exclude_keyvalues=exclude_keyvalues,
+ retcols=(
+ "user_id",
+ "state",
+ "last_active_ts",
+ "last_federation_update_ts",
+ "last_user_sync_ts",
+ "status_msg",
+ "currently_active",
+ ),
+ order_direction="ASC",
+ )
+
+ for row in rows:
+ users_to_state[row["user_id"]] = UserPresenceState(**row)
+
+ # We've run out of updates to query
+ if len(rows) < limit:
+ break
+
+ offset += limit
+
+ return users_to_state
+
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f33c115844..c3b2d981ea 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -496,7 +496,7 @@ def timeout_deferred(
try:
deferred.cancel()
- except: # noqa: E722, if we throw any exception it'll break time outs
+ except Exception: # if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")
# the cancel() call should have set off a chain of errbacks which
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index e676c2cac4..f968706334 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -116,7 +116,7 @@ def register_cache(
"""
if resizable:
if not resize_callback:
- resize_callback = getattr(cache, "set_cache_factor")
+ resize_callback = cache.set_cache_factor # type: ignore
add_resizable_cache(cache_name, resize_callback)
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
|