summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/presence.py18
-rw-r--r--synapse/handlers/user_directory.py4
-rw-r--r--synapse/http/client.py26
-rw-r--r--synapse/storage/databases/main/__init__.py2
-rw-r--r--synapse/storage/databases/main/registration.py7
-rw-r--r--synapse/storage/databases/main/user_directory.py8
6 files changed, 34 insertions, 31 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py

index 7ba22d511f..fb85b19770 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -349,10 +349,13 @@ class PresenceHandler(BasePresenceHandler): [self.user_to_current_state[user_id] for user_id in unpersisted] ) - async def _update_states(self, new_states): + async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state should be sent to clients/servers. + + Args: + new_states: The new user presence state updates to process. """ now = self.clock.time_msec() @@ -368,7 +371,7 @@ class PresenceHandler(BasePresenceHandler): new_states_dict = {} for new_state in new_states: new_states_dict[new_state.user_id] = new_state - new_state = new_states_dict.values() + new_states = new_states_dict.values() for new_state in new_states: user_id = new_state.user_id @@ -657,17 +660,6 @@ class PresenceHandler(BasePresenceHandler): self._push_to_remotes(states) - async def notify_for_states(self, state, stream_id): - parties = await get_interested_parties(self.store, [state]) - room_ids_to_states, users_to_states = parties - - self.notifier.on_new_event( - "presence_key", - stream_id, - rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states], - ) - def _push_to_remotes(self, states): """Sends state updates to remote servers. diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 3dfb0a26c2..1a8340000a 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py
@@ -143,6 +143,10 @@ class UserDirectoryHandler(StateDeltasHandler): if self.pos is None: self.pos = await self.store.get_user_directory_stream_pos() + # If still None then the initial background update hasn't happened yet. + if self.pos is None: + return None + # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): diff --git a/synapse/http/client.py b/synapse/http/client.py
index d6e06967c8..a910548f1e 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py
@@ -56,7 +56,7 @@ from twisted.web.client import ( ) from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent, IBodyProducer, IResponse +from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri @@ -406,6 +406,9 @@ class SimpleHttpClient: agent=self.agent, data=body_producer, headers=headers, + # Avoid buffering the body in treq since we do not reuse + # response bodies. + unbuffered=True, **self._extra_treq_args, ) # type: defer.Deferred @@ -700,18 +703,6 @@ class SimpleHttpClient: resp_headers = dict(response.headers.getAllRawHeaders()) - if ( - b"Content-Length" in resp_headers - and max_size - and int(resp_headers[b"Content-Length"][0]) > max_size - ): - logger.warning("Requested URL is too large > %r bytes" % (max_size,)) - raise SynapseError( - 502, - "Requested file is too large > %r bytes" % (max_size,), - Codes.TOO_LARGE, - ) - if response.code > 299: logger.warning("Got %d when downloading %s" % (response.code, url)) raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN) @@ -778,7 +769,9 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): # in the meantime. if self.max_size is not None and self.length >= self.max_size: self.deferred.errback(BodyExceededMaxSize()) - self.transport.loseConnection() + # Close the connection (forcefully) since all the data will get + # discarded anyway. + self.transport.abortConnection() def connectionLost(self, reason: Failure) -> None: # If the maximum size was already exceeded, there's nothing to do. @@ -812,6 +805,11 @@ def read_body_with_max_size( Returns: A Deferred which resolves to the length of the read body. """ + # If the Content-Length header gives a size larger than the maximum allowed + # size, do not bother downloading the body. + if max_size is not None and response.length != UNKNOWN_LENGTH: + if response.length > max_size: + return defer.fail(BodyExceededMaxSize()) d = defer.Deferred() response.deliverBody(_ReadBodyWithMaxSizeProtocol(stream, d, max_size)) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 5d0845588c..70b49854cf 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -340,7 +340,7 @@ class DataStore( count = txn.fetchone()[0] sql = ( - "SELECT name, user_type, is_guest, admin, deactivated, displayname, avatar_url " + "SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url " + sql_base + " ORDER BY u.name LIMIT ? OFFSET ?" ) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 915b656b7a..25d8dcb6ab 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -119,6 +119,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): "creation_ts", "user_type", "deactivated", + "shadow_banned", ], allow_none=True, desc="get_user_by_id", @@ -475,23 +476,25 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """ def set_shadow_banned_txn(txn): + user_id = user.to_string() self.db_pool.simple_update_one_txn( txn, table="users", - keyvalues={"name": user.to_string()}, + keyvalues={"name": user_id}, updatevalues={"shadow_banned": shadow_banned}, ) # In order for this to apply immediately, clear the cache for this user. tokens = self.db_pool.simple_select_onecol_txn( txn, table="access_tokens", - keyvalues={"user_id": user.to_string()}, + keyvalues={"user_id": user_id}, retcol="token", ) for token in tokens: self._invalidate_cache_and_stream( txn, self.get_user_by_access_token, (token,) ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 467738285f..02ee15676c 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -712,7 +712,13 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): return {row["room_id"] for row in rows} - async def get_user_directory_stream_pos(self) -> int: + async def get_user_directory_stream_pos(self) -> Optional[int]: + """ + Get the stream ID of the user directory stream. + + Returns: + The stream token or None if the initial background update hasn't happened yet. + """ return await self.db_pool.simple_select_one_onecol( table="user_directory_stream_pos", keyvalues={},