diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b6a9ce4f38..54631b4ee2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -274,22 +274,25 @@ class PresenceHandler(BasePresenceHandler):
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
- # Start a LoopingCall in 30s that fires every 5s.
- # The initial delay is to allow disconnected clients a chance to
- # reconnect before we treat them as offline.
- def run_timeout_handler():
- return run_as_background_process(
- "handle_presence_timeouts", self._handle_timeouts
- )
-
- self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
+ if self._presence_enabled:
+ # Start a LoopingCall in 30s that fires every 5s.
+ # The initial delay is to allow disconnected clients a chance to
+ # reconnect before we treat them as offline.
+ def run_timeout_handler():
+ return run_as_background_process(
+ "handle_presence_timeouts", self._handle_timeouts
+ )
- def run_persister():
- return run_as_background_process(
- "persist_presence_changes", self._persist_unpersisted_changes
+ self.clock.call_later(
+ 30, self.clock.looping_call, run_timeout_handler, 5000
)
- self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
+ def run_persister():
+ return run_as_background_process(
+ "persist_presence_changes", self._persist_unpersisted_changes
+ )
+
+ self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
LaterGauge(
"synapse_handlers_presence_wheel_timer_size",
@@ -299,7 +302,7 @@ class PresenceHandler(BasePresenceHandler):
)
# Used to handle sending of presence to newly joined users/servers
- if hs.config.use_presence:
+ if self._presence_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
# Presence is best effort and quickly heals itself, so lets just always
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b45b179fed..9059382246 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -291,9 +291,8 @@ class SyncHandler:
user_id = sync_config.user.to_string()
await self.auth.check_auth_blocking(requester=requester)
- res = await self.response_cache.wrap_conditional(
+ res = await self.response_cache.wrap(
sync_config.request_key,
- lambda result: since_token != result.next_batch,
self._wait_for_sync_for_user,
sync_config,
since_token,
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 74219cb05e..6b608ebc9b 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -39,6 +39,11 @@ class PusherWorkerStore(SQLBaseStore):
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
+ self.db_pool.updates.register_background_update_handler(
+ "remove_deactivated_pushers",
+ self._remove_deactivated_pushers,
+ )
+
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table
@@ -284,6 +289,54 @@ class PusherWorkerStore(SQLBaseStore):
lock=False,
)
+ async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
+ """A background update that deletes all pushers for deactivated users.
+
+ Note that we don't proacively tell the pusherpool that we've deleted
+ these (just because its a bit off a faff to do from here), but they will
+ get cleaned up at the next restart
+ """
+
+ last_user = progress.get("last_user", "")
+
+ def _delete_pushers(txn) -> int:
+
+ sql = """
+ SELECT name FROM users
+ WHERE deactivated = ? and name > ?
+ ORDER BY name ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (1, last_user, batch_size))
+ users = [row[0] for row in txn]
+
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="pushers",
+ column="user_name",
+ iterable=users,
+ keyvalues={},
+ )
+
+ if users:
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "remove_deactivated_pushers", {"last_user": users[-1]}
+ )
+
+ return len(users)
+
+ number_deleted = await self.db_pool.runInteraction(
+ "_remove_deactivated_pushers", _delete_pushers
+ )
+
+ if number_deleted < batch_size:
+ await self.db_pool.updates._end_background_update(
+ "remove_deactivated_pushers"
+ )
+
+ return number_deleted
+
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self) -> int:
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
index 20ba4abca3..0ec6764150 100644
--- a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
@@ -14,8 +14,7 @@
*/
--- We may not have deleted all pushers for deactivated accounts. Do so now.
---
--- Note: We don't bother updating the `deleted_pushers` table as it's just use
--- to stop pushers on workers, and that will happen when they get next restarted.
-DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1);
+-- We may not have deleted all pushers for deactivated accounts, so we set up a
+-- background job to delete them.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (5908, 'remove_deactivated_pushers', '{}');
diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql
index 9c95646281..cc9b267c7d 100644
--- a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql
+++ b/synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql
@@ -13,5 +13,14 @@
* limitations under the License.
*/
+-- This originally was in 58/, but landed after 59/ was created, and so some
+-- servers running develop didn't run this delta. Running it again should be
+-- safe.
+--
+-- We first delete any in progress `rejected_events_metadata` background update,
+-- to ensure that we don't conflict when trying to insert the new one. (We could
+-- alternatively do an ON CONFLICT DO NOTHING, but that syntax isn't supported
+-- by older SQLite versions. Plus, this should be a rare case).
+DELETE FROM background_updates WHERE update_name = 'rejected_events_metadata';
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5828, 'rejected_events_metadata', '{}');
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 53f85195a7..32228f42ee 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, Set, TypeVar
+from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar
from twisted.internet import defer
@@ -40,7 +40,6 @@ class ResponseCache(Generic[T]):
def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0):
# Requests that haven't finished yet.
self.pending_result_cache = {} # type: Dict[T, ObservableDeferred]
- self.pending_conditionals = {} # type: Dict[T, Set[Callable[[Any], bool]]]
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.0
@@ -102,11 +101,7 @@ class ResponseCache(Generic[T]):
self.pending_result_cache[key] = result
def remove(r):
- should_cache = all(
- func(r) for func in self.pending_conditionals.pop(key, [])
- )
-
- if self.timeout_sec and should_cache:
+ if self.timeout_sec:
self.clock.call_later(
self.timeout_sec, self.pending_result_cache.pop, key, None
)
@@ -117,31 +112,6 @@ class ResponseCache(Generic[T]):
result.addBoth(remove)
return result.observe()
- def add_conditional(self, key: T, conditional: Callable[[Any], bool]):
- self.pending_conditionals.setdefault(key, set()).add(conditional)
-
- def wrap_conditional(
- self,
- key: T,
- should_cache: Callable[[Any], bool],
- callback: "Callable[..., Any]",
- *args: Any,
- **kwargs: Any
- ) -> defer.Deferred:
- """The same as wrap(), but adds a conditional to the final execution.
-
- When the final execution completes, *all* conditionals need to return True for it to properly cache,
- else it'll not be cached in a timed fashion.
- """
-
- # See if there's already a result on this key that hasn't yet completed. Due to the single-threaded nature of
- # python, adding a key immediately in the same execution thread will not cause a race condition.
- result = self.get(key)
- if not result or isinstance(result, defer.Deferred) and not result.called:
- self.add_conditional(key, should_cache)
-
- return self.wrap(key, callback, *args, **kwargs)
-
def wrap(
self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any
) -> defer.Deferred:
|