diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7ba22d511f..fb85b19770 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -349,10 +349,13 @@ class PresenceHandler(BasePresenceHandler):
[self.user_to_current_state[user_id] for user_id in unpersisted]
)
- async def _update_states(self, new_states):
+ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
should be sent to clients/servers.
+
+ Args:
+ new_states: The new user presence state updates to process.
"""
now = self.clock.time_msec()
@@ -368,7 +371,7 @@ class PresenceHandler(BasePresenceHandler):
new_states_dict = {}
for new_state in new_states:
new_states_dict[new_state.user_id] = new_state
- new_state = new_states_dict.values()
+ new_states = new_states_dict.values()
for new_state in new_states:
user_id = new_state.user_id
@@ -657,17 +660,6 @@ class PresenceHandler(BasePresenceHandler):
self._push_to_remotes(states)
- async def notify_for_states(self, state, stream_id):
- parties = await get_interested_parties(self.store, [state])
- room_ids_to_states, users_to_states = parties
-
- self.notifier.on_new_event(
- "presence_key",
- stream_id,
- rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states],
- )
-
def _push_to_remotes(self, states):
"""Sends state updates to remote servers.
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 3dfb0a26c2..1a8340000a 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -143,6 +143,10 @@ class UserDirectoryHandler(StateDeltasHandler):
if self.pos is None:
self.pos = await self.store.get_user_directory_stream_pos()
+ # If still None then the initial background update hasn't happened yet.
+ if self.pos is None:
+ return None
+
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "user_dir_delta"):
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d6e06967c8..a910548f1e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -56,7 +56,7 @@ from twisted.web.client import (
)
from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
-from twisted.web.iweb import IAgent, IBodyProducer, IResponse
+from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
@@ -406,6 +406,9 @@ class SimpleHttpClient:
agent=self.agent,
data=body_producer,
headers=headers,
+ # Avoid buffering the body in treq since we do not reuse
+ # response bodies.
+ unbuffered=True,
**self._extra_treq_args,
) # type: defer.Deferred
@@ -700,18 +703,6 @@ class SimpleHttpClient:
resp_headers = dict(response.headers.getAllRawHeaders())
- if (
- b"Content-Length" in resp_headers
- and max_size
- and int(resp_headers[b"Content-Length"][0]) > max_size
- ):
- logger.warning("Requested URL is too large > %r bytes" % (max_size,))
- raise SynapseError(
- 502,
- "Requested file is too large > %r bytes" % (max_size,),
- Codes.TOO_LARGE,
- )
-
if response.code > 299:
logger.warning("Got %d when downloading %s" % (response.code, url))
raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
@@ -778,7 +769,9 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
# in the meantime.
if self.max_size is not None and self.length >= self.max_size:
self.deferred.errback(BodyExceededMaxSize())
- self.transport.loseConnection()
+ # Close the connection (forcefully) since all the data will get
+ # discarded anyway.
+ self.transport.abortConnection()
def connectionLost(self, reason: Failure) -> None:
# If the maximum size was already exceeded, there's nothing to do.
@@ -812,6 +805,11 @@ def read_body_with_max_size(
Returns:
A Deferred which resolves to the length of the read body.
"""
+ # If the Content-Length header gives a size larger than the maximum allowed
+ # size, do not bother downloading the body.
+ if max_size is not None and response.length != UNKNOWN_LENGTH:
+ if response.length > max_size:
+ return defer.fail(BodyExceededMaxSize())
d = defer.Deferred()
response.deliverBody(_ReadBodyWithMaxSizeProtocol(stream, d, max_size))
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 5d0845588c..70b49854cf 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -340,7 +340,7 @@ class DataStore(
count = txn.fetchone()[0]
sql = (
- "SELECT name, user_type, is_guest, admin, deactivated, displayname, avatar_url "
+ "SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url "
+ sql_base
+ " ORDER BY u.name LIMIT ? OFFSET ?"
)
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 915b656b7a..25d8dcb6ab 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -119,6 +119,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"creation_ts",
"user_type",
"deactivated",
+ "shadow_banned",
],
allow_none=True,
desc="get_user_by_id",
@@ -475,23 +476,25 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"""
def set_shadow_banned_txn(txn):
+ user_id = user.to_string()
self.db_pool.simple_update_one_txn(
txn,
table="users",
- keyvalues={"name": user.to_string()},
+ keyvalues={"name": user_id},
updatevalues={"shadow_banned": shadow_banned},
)
# In order for this to apply immediately, clear the cache for this user.
tokens = self.db_pool.simple_select_onecol_txn(
txn,
table="access_tokens",
- keyvalues={"user_id": user.to_string()},
+ keyvalues={"user_id": user_id},
retcol="token",
)
for token in tokens:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 467738285f..02ee15676c 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -712,7 +712,13 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
return {row["room_id"] for row in rows}
- async def get_user_directory_stream_pos(self) -> int:
+ async def get_user_directory_stream_pos(self) -> Optional[int]:
+ """
+ Get the stream ID of the user directory stream.
+
+ Returns:
+ The stream token or None if the initial background update hasn't happened yet.
+ """
return await self.db_pool.simple_select_one_onecol(
table="user_directory_stream_pos",
keyvalues={},
|