diff --git a/.gitignore b/.gitignore
index af36c00cfa..9bb5bdd647 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,6 +21,7 @@ _trial_temp*/
/.python-version
/*.signing.key
/env/
+/.venv*/
/homeserver*.yaml
/logs
/media_store/
diff --git a/changelog.d/8544.feature b/changelog.d/8544.feature
new file mode 100644
index 0000000000..542993110b
--- /dev/null
+++ b/changelog.d/8544.feature
@@ -0,0 +1 @@
+Allow running background tasks in a separate worker process.
diff --git a/changelog.d/8562.misc b/changelog.d/8562.misc
new file mode 100644
index 0000000000..ebdbddb500
--- /dev/null
+++ b/changelog.d/8562.misc
@@ -0,0 +1 @@
+Add type annotations for `LruCache`.
diff --git a/changelog.d/8563.misc b/changelog.d/8563.misc
new file mode 100644
index 0000000000..eeba8e5fee
--- /dev/null
+++ b/changelog.d/8563.misc
@@ -0,0 +1 @@
+Replace `DeferredCache` with the lighter-weight `LruCache` where possible.
diff --git a/changelog.d/8566.misc b/changelog.d/8566.misc
new file mode 100644
index 0000000000..453cf48ffa
--- /dev/null
+++ b/changelog.d/8566.misc
@@ -0,0 +1 @@
+Add virtualenv-generated folders to `.gitignore`.
\ No newline at end of file
diff --git a/changelog.d/8571.misc b/changelog.d/8571.misc
new file mode 100644
index 0000000000..f6a65057e0
--- /dev/null
+++ b/changelog.d/8571.misc
@@ -0,0 +1 @@
+Fix `synmark` benchmark runner.
diff --git a/changelog.d/8577.misc b/changelog.d/8577.misc
new file mode 100644
index 0000000000..75fe563a02
--- /dev/null
+++ b/changelog.d/8577.misc
@@ -0,0 +1 @@
+Adjust a protocol-type definition to fit `sqlite3` assertions.
\ No newline at end of file
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 668e0e4314..2d9ecb0a16 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -69,7 +69,9 @@ class Auth:
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
- self.token_cache = LruCache(10000, "token_cache")
+ self.token_cache = LruCache(
+ 10000, "token_cache"
+ ) # type: LruCache[str, Tuple[str, bool]]
self._auth_blocking = AuthBlocking(self.hs)
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 4c95b149c5..2ce9e444ab 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -16,7 +16,7 @@
import logging
import re
-from typing import Any, Dict, List, Optional, Pattern, Union
+from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
from synapse.events import EventBase
from synapse.types import UserID
@@ -173,19 +173,21 @@ class PushRuleEvaluatorForEvent:
# Similar to _glob_matches, but do not treat display_name as a glob.
r = regex_cache.get((display_name, False, True), None)
if not r:
- r = re.escape(display_name)
- r = _re_word_boundary(r)
- r = re.compile(r, flags=re.IGNORECASE)
+ r1 = re.escape(display_name)
+ r1 = _re_word_boundary(r1)
+ r = re.compile(r1, flags=re.IGNORECASE)
regex_cache[(display_name, False, True)] = r
- return r.search(body)
+ return bool(r.search(body))
def _get_value(self, dotted_key: str) -> Optional[str]:
return self._value_cache.get(dotted_key, None)
# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
-regex_cache = LruCache(50000, "regex_push_cache")
+regex_cache = LruCache(
+ 50000, "regex_push_cache"
+) # type: LruCache[Tuple[str, bool, bool], Pattern]
def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
@@ -203,7 +205,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
if not r:
r = _glob_to_re(glob, word_boundary)
regex_cache[(glob, True, word_boundary)] = r
- return r.search(value)
+ return bool(r.search(value))
except re.error:
logger.warning("Failed to parse glob to regex: %r", glob)
return False
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 4b0ea0cc01..0f5b7adef7 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -15,7 +15,7 @@
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
-from synapse.util.caches.deferred_cache import DeferredCache
+from synapse.util.caches.lrucache import LruCache
from ._base import BaseSlavedStore
@@ -24,9 +24,9 @@ class SlavedClientIpStore(BaseSlavedStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
- self.client_ip_last_seen = DeferredCache(
- name="client_ip_last_seen", keylen=4, max_entries=50000
- ) # type: DeferredCache[tuple, int]
+ self.client_ip_last_seen = LruCache(
+ cache_name="client_ip_last_seen", keylen=4, max_size=50000
+ ) # type: LruCache[tuple, int]
async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
@@ -41,7 +41,7 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
- self.client_ip_last_seen.prefill(key, now)
+ self.client_ip_last_seen.set(key, now)
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ab49d227de..2b196ded1b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -76,14 +76,16 @@ class SQLBaseStore(metaclass=ABCMeta):
"""
try:
- if key is None:
- getattr(self, cache_name).invalidate_all()
- else:
- getattr(self, cache_name).invalidate(tuple(key))
+ cache = getattr(self, cache_name)
except AttributeError:
# We probably haven't pulled in the cache in this worker,
# which is fine.
- pass
+ return
+
+ if key is None:
+ cache.invalidate_all()
+ else:
+ cache.invalidate(tuple(key))
def db_to_json(db_content):
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 763722d6bc..0217e63108 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -160,7 +160,7 @@ class LoggingDatabaseConnection:
self.conn.__enter__()
return self
- def __exit__(self, exc_type, exc_value, traceback) -> bool:
+ def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]:
return self.conn.__exit__(exc_type, exc_value, traceback)
# Proxy through any unknown lookups to the DB conn class.
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 9e66e6648a..339bd691a4 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -19,7 +19,7 @@ from typing import Dict, Optional, Tuple
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
-from synapse.util.caches.deferred_cache import DeferredCache
+from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@@ -410,8 +410,8 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
class ClientIpStore(ClientIpWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
- self.client_ip_last_seen = DeferredCache(
- name="client_ip_last_seen", keylen=4, max_entries=50000
+ self.client_ip_last_seen = LruCache(
+ cache_name="client_ip_last_seen", keylen=4, max_size=50000
)
super().__init__(database, db_conn, hs)
@@ -442,7 +442,7 @@ class ClientIpStore(ClientIpWorkerStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
- self.client_ip_last_seen.prefill(key, now)
+ self.client_ip_last_seen.set(key, now)
self._batch_row_update[key] = (user_agent, device_id, now)
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e662a20d24..dfb4f87b8f 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -34,8 +34,8 @@ from synapse.storage.database import (
)
from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
-from synapse.util.caches.deferred_cache import DeferredCache
from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import shortstr
@@ -1005,8 +1005,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Map of (user_id, device_id) -> bool. If there is an entry that implies
# the device exists.
- self.device_id_exists_cache = DeferredCache(
- name="device_id_exists", keylen=2, max_entries=10000
+ self.device_id_exists_cache = LruCache(
+ cache_name="device_id_exists", keylen=2, max_size=10000
)
async def store_device(
@@ -1052,7 +1052,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
if hidden:
raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
- self.device_id_exists_cache.prefill(key, True)
+ self.device_id_exists_cache.set(key, True)
return inserted
except StoreError:
raise
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ba3b1769b0..87808c1483 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1051,9 +1051,7 @@ class PersistEventsStore:
def prefill():
for cache_entry in to_prefill:
- self.store._get_event_cache.prefill(
- (cache_entry[0].event_id,), cache_entry
- )
+ self.store._get_event_cache.set((cache_entry[0].event_id,), cache_entry)
txn.call_after(prefill)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index ff150f0be7..c342df2a8b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -42,8 +42,8 @@ from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import Collection, get_domain_from_id
-from synapse.util.caches.deferred_cache import DeferredCache
from synapse.util.caches.descriptors import cached
+from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
@@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore):
db_conn, "events", "stream_ordering", step=-1
)
- if not hs.config.worker.worker_app:
+ if hs.config.run_background_tasks:
# We periodically clean out old transaction ID mappings
self._clock.looping_call(
run_as_background_process,
@@ -146,11 +146,10 @@ class EventsWorkerStore(SQLBaseStore):
self._cleanup_old_transaction_ids,
)
- self._get_event_cache = DeferredCache(
- "*getEvent*",
+ self._get_event_cache = LruCache(
+ cache_name="*getEvent*",
keylen=3,
- max_entries=hs.config.caches.event_cache_size,
- apply_cache_factor_from_config=False,
+ max_size=hs.config.caches.event_cache_size,
)
self._event_fetch_lock = threading.Condition()
@@ -749,7 +748,7 @@ class EventsWorkerStore(SQLBaseStore):
event=original_ev, redacted_event=redacted_event
)
- self._get_event_cache.prefill((event_id,), cache_entry)
+ self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry
return result_map
diff --git a/synapse/storage/types.py b/synapse/storage/types.py
index 970bb1b9da..9cadcba18f 100644
--- a/synapse/storage/types.py
+++ b/synapse/storage/types.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Any, Iterable, Iterator, List, Tuple
+from typing import Any, Iterable, Iterator, List, Optional, Tuple
from typing_extensions import Protocol
@@ -65,5 +65,5 @@ class Connection(Protocol):
def __enter__(self) -> "Connection":
...
- def __exit__(self, exc_type, exc_value, traceback) -> bool:
+ def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]:
...
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index 91fdc8142d..4026e1f8fa 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -98,7 +98,7 @@ class DeferredCache(Generic[KT, VT]):
size_callback=(lambda d: len(d)) if iterable else None,
metrics_collection_callback=metrics_cb,
apply_cache_factor_from_config=apply_cache_factor_from_config,
- )
+ ) # type: LruCache[KT, VT]
self.thread = None # type: Optional[threading.Thread]
@@ -240,11 +240,12 @@ class DeferredCache(Generic[KT, VT]):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
+ key = cast(KT, key)
self.cache.del_multi(key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
- entry_dict = self._pending_deferred_cache.pop(cast(KT, key), None)
+ entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
entry.invalidate()
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index 8b426c005b..588d2d49f2 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -12,10 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import enum
import logging
import threading
from collections import namedtuple
+from typing import Any
from synapse.util.caches.lrucache import LruCache
@@ -38,23 +39,26 @@ class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "va
return len(self.value)
+class _Sentinel(enum.Enum):
+ # defining a sentinel in this way allows mypy to correctly handle the
+ # type of a dictionary lookup.
+ sentinel = object()
+
+
class DictionaryCache:
"""Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
fetching a subset of dictionary keys for a particular key.
"""
def __init__(self, name, max_entries=1000):
- self.cache = LruCache(max_size=max_entries, cache_name=name, size_callback=len)
+ self.cache = LruCache(
+ max_size=max_entries, cache_name=name, size_callback=len
+ ) # type: LruCache[Any, DictionaryEntry]
self.name = name
self.sequence = 0
self.thread = None
- class Sentinel:
- __slots__ = []
-
- self.sentinel = Sentinel()
-
def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
@@ -76,8 +80,8 @@ class DictionaryCache:
Returns:
DictionaryEntry
"""
- entry = self.cache.get(key, self.sentinel)
- if entry is not self.sentinel:
+ entry = self.cache.get(key, _Sentinel.sentinel)
+ if entry is not _Sentinel.sentinel:
if dict_keys is None:
return DictionaryEntry(
entry.full, entry.known_absent, dict(entry.value)
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index e4804f79e0..3b471d8fd3 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -15,12 +15,35 @@
import threading
from functools import wraps
-from typing import Callable, Optional, Type, Union
+from typing import (
+ Any,
+ Callable,
+ Generic,
+ Iterable,
+ Optional,
+ Type,
+ TypeVar,
+ Union,
+ cast,
+ overload,
+)
+
+from typing_extensions import Literal
from synapse.config import cache as cache_config
from synapse.util.caches import CacheMetric, register_cache
from synapse.util.caches.treecache import TreeCache
+# Function type: the type used for invalidation callbacks
+FT = TypeVar("FT", bound=Callable[..., Any])
+
+# Key and Value type for the cache
+KT = TypeVar("KT")
+VT = TypeVar("VT")
+
+# a general type var, distinct from either KT or VT
+T = TypeVar("T")
+
def enumerate_leaves(node, depth):
if depth == 0:
@@ -42,7 +65,7 @@ class _Node:
self.callbacks = callbacks
-class LruCache:
+class LruCache(Generic[KT, VT]):
"""
Least-recently-used cache, supporting prometheus metrics and invalidation callbacks.
@@ -128,13 +151,13 @@ class LruCache:
if metrics:
metrics.inc_evictions(evicted_len)
- def synchronized(f):
+ def synchronized(f: FT) -> FT:
@wraps(f)
def inner(*args, **kwargs):
with lock:
return f(*args, **kwargs)
- return inner
+ return cast(FT, inner)
cached_cache_len = [0]
if size_callback is not None:
@@ -188,8 +211,31 @@ class LruCache:
node.callbacks.clear()
return deleted_len
+ @overload
+ def cache_get(
+ key: KT,
+ default: Literal[None] = None,
+ callbacks: Iterable[Callable[[], None]] = ...,
+ update_metrics: bool = ...,
+ ) -> Optional[VT]:
+ ...
+
+ @overload
+ def cache_get(
+ key: KT,
+ default: T,
+ callbacks: Iterable[Callable[[], None]] = ...,
+ update_metrics: bool = ...,
+ ) -> Union[T, VT]:
+ ...
+
@synchronized
- def cache_get(key, default=None, callbacks=[], update_metrics=True):
+ def cache_get(
+ key: KT,
+ default: Optional[T] = None,
+ callbacks: Iterable[Callable[[], None]] = [],
+ update_metrics: bool = True,
+ ):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
@@ -203,7 +249,7 @@ class LruCache:
return default
@synchronized
- def cache_set(key, value, callbacks=[]):
+ def cache_set(key: KT, value: VT, callbacks: Iterable[Callable[[], None]] = []):
node = cache.get(key, None)
if node is not None:
# We sometimes store large objects, e.g. dicts, which cause
@@ -232,7 +278,7 @@ class LruCache:
evict()
@synchronized
- def cache_set_default(key, value):
+ def cache_set_default(key: KT, value: VT) -> VT:
node = cache.get(key, None)
if node is not None:
return node.value
@@ -241,8 +287,16 @@ class LruCache:
evict()
return value
+ @overload
+ def cache_pop(key: KT, default: Literal[None] = None) -> Optional[VT]:
+ ...
+
+ @overload
+ def cache_pop(key: KT, default: T) -> Union[T, VT]:
+ ...
+
@synchronized
- def cache_pop(key, default=None):
+ def cache_pop(key: KT, default: Optional[T] = None):
node = cache.get(key, None)
if node:
delete_node(node)
@@ -252,18 +306,18 @@ class LruCache:
return default
@synchronized
- def cache_del_multi(key):
+ def cache_del_multi(key: KT) -> None:
"""
This will only work if constructed with cache_type=TreeCache
"""
popped = cache.pop(key)
if popped is None:
return
- for leaf in enumerate_leaves(popped, keylen - len(key)):
+ for leaf in enumerate_leaves(popped, keylen - len(cast(tuple, key))):
delete_node(leaf)
@synchronized
- def cache_clear():
+ def cache_clear() -> None:
list_root.next_node = list_root
list_root.prev_node = list_root
for node in cache.values():
@@ -274,7 +328,7 @@ class LruCache:
cached_cache_len[0] = 0
@synchronized
- def cache_contains(key):
+ def cache_contains(key: KT) -> bool:
return key in cache
self.sentinel = object()
@@ -283,6 +337,9 @@ class LruCache:
self.set = cache_set
self.setdefault = cache_set_default
self.pop = cache_pop
+ # `invalidate` is exposed for consistency with DeferredCache, so that it can be
+ # invalidated by the cache invalidation replication stream.
+ self.invalidate = cache_pop
if cache_type is TreeCache:
self.del_multi = cache_del_multi
self.len = synchronized(cache_len)
diff --git a/synmark/__init__.py b/synmark/__init__.py
index 53698bd5ab..9ec72c1973 100644
--- a/synmark/__init__.py
+++ b/synmark/__init__.py
@@ -41,7 +41,7 @@ async def make_homeserver(reactor, config=None):
config_obj = HomeServerConfig()
config_obj.parse_config_dict(config, "", "")
- hs = await setup_test_homeserver(
+ hs = setup_test_homeserver(
cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock
)
stor = hs.get_datastore()
|