diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c840da834c..3d676e7d8b 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -24,6 +24,7 @@ from synapse.http.servlet import (
)
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
+from synapse.util.logcontext import preserve_fn
from synapse.types import ThirdPartyInstanceID
import functools
@@ -79,6 +80,7 @@ class Authenticator(object):
def __init__(self, hs):
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
+ self.store = hs.get_datastore()
# A method just so we can pass 'self' as the authenticator to the Servlets
@defer.inlineCallbacks
@@ -138,6 +140,13 @@ class Authenticator(object):
logger.info("Request from %s", origin)
request.authenticated_entity = origin
+ # If we get a valid signed request from the other side, its probably
+ # alive
+ retry_timings = yield self.store.get_destination_retry_timings(origin)
+ if retry_timings and retry_timings["retry_last_ts"]:
+ logger.info("Marking origin %r as up", origin)
+ preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
+
defer.returnValue(origin)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c22f65ce5d..982cda3edf 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,6 +17,7 @@ from synapse.api.constants import EventTypes
from synapse.util import stringutils
from synapse.util.async import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.retryutils import NotRetryingDestination
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id, RoomStreamToken
from twisted.internet import defer
@@ -425,12 +426,38 @@ class DeviceListEduUpdater(object):
# This can happen since we batch updates
return
+ # Given a list of updates we check if we need to resync. This
+ # happens if we've missed updates.
resync = yield self._need_to_do_resync(user_id, pending_updates)
if resync:
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
- result = yield self.federation.query_user_devices(origin, user_id)
+ try:
+ result = yield self.federation.query_user_devices(origin, user_id)
+ except NotRetryingDestination:
+ # TODO: Remember that we are now out of sync and try again
+ # later
+ logger.warn(
+ "Failed to handle device list update for %s,"
+ " we're not retrying the remote",
+ user_id,
+ )
+ # We abort on exceptions rather than accepting the update
+ # as otherwise synapse will 'forget' that its device list
+ # is out of date. If we bail then we will retry the resync
+ # next time we get a device list update for this user_id.
+ # This makes it more likely that the device lists will
+ # eventually become consistent.
+ return
+ except Exception:
+ # TODO: Remember that we are now out of sync and try again
+ # later
+ logger.exception(
+ "Failed to handle device list update for %s", user_id
+ )
+ return
+
stream_id = result["stream_id"]
devices = result["devices"]
yield self.store.update_remote_device_list_cache(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2af9849ed0..52d97dfbf3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -380,13 +380,6 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- # if we're receiving valid events from an origin,
- # it's probably a good idea to mark it as not in retry-state
- # for sending (although this is a bit of a leap)
- retry_timings = yield self.store.get_destination_retry_timings(origin)
- if retry_timings and retry_timings["retry_last_ts"]:
- self.store.set_destination_retry_timings(origin, 0, 0)
-
room = yield self.store.get_room(event.room_id)
if not room:
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index cb13874ccf..f943ff640f 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_clients_context
logger = logging.getLogger(__name__)
@@ -66,6 +67,17 @@ class BulkPushRuleEvaluator:
def action_for_event_by_user(self, event, context):
actions_by_user = {}
+ # None of these users can be peeking since this list of users comes
+ # from the set of users in the room, so we know for sure they're all
+ # actually in the room.
+ user_tuples = [
+ (u, False) for u in self.rules_by_user.keys()
+ ]
+
+ filtered_by_user = yield filter_events_for_clients_context(
+ self.store, user_tuples, [event], {event.event_id: context}
+ )
+
room_members = yield self.store.get_joined_users_from_context(
event, context
)
@@ -75,14 +87,6 @@ class BulkPushRuleEvaluator:
condition_cache = {}
for uid, rules in self.rules_by_user.items():
- if event.sender == uid:
- continue
-
- if not event.is_state():
- is_ignored = yield self.store.is_ignored_by(event.sender, uid)
- if is_ignored:
- continue
-
display_name = None
profile_info = room_members.get(uid)
if profile_info:
@@ -94,6 +98,13 @@ class BulkPushRuleEvaluator:
if event.type == EventTypes.Member and event.state_key == uid:
display_name = event.content.get("displayname", None)
+ filtered = filtered_by_user[uid]
+ if len(filtered) == 0:
+ continue
+
+ if filtered[0].sender == uid:
+ continue
+
for rule in rules:
if 'enabled' in rule and not rule['enabled']:
continue
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index ff14e54c11..aa84ffc2b0 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -308,16 +308,3 @@ class AccountDataStore(SQLBaseStore):
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
-
- @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
- def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
- ignored_account_data = yield self.get_global_account_data_by_type_for_user(
- "m.ignored_user_list", ignorer_user_id,
- on_invalidate=cache_context.invalidate,
- )
- if not ignored_account_data:
- defer.returnValue(False)
-
- defer.returnValue(
- ignored_user_id in ignored_account_data.get("ignored_users", {})
- )
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b01f0046e9..747d2df622 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -33,6 +33,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
+ max_entries=5000,
)
super(ClientIpStore, self).__init__(hs)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index c8d5f5ba8b..d9936c88bb 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -18,7 +18,7 @@ import ujson as json
from twisted.internet import defer
from synapse.api.errors import StoreError
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, Cache
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
@@ -29,6 +29,14 @@ class DeviceStore(SQLBaseStore):
def __init__(self, hs):
super(DeviceStore, self).__init__(hs)
+ # Map of (user_id, device_id) -> bool. If there is an entry that implies
+ # the device exists.
+ self.device_id_exists_cache = Cache(
+ name="device_id_exists",
+ keylen=2,
+ max_entries=10000,
+ )
+
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)
@@ -54,6 +62,10 @@ class DeviceStore(SQLBaseStore):
defer.Deferred: boolean whether the device was inserted or an
existing device existed with that ID.
"""
+ key = (user_id, device_id)
+ if self.device_id_exists_cache.get(key, None):
+ defer.returnValue(False)
+
try:
inserted = yield self._simple_insert(
"devices",
@@ -65,6 +77,7 @@ class DeviceStore(SQLBaseStore):
desc="store_device",
or_ignore=True,
)
+ self.device_id_exists_cache.prefill(key, True)
defer.returnValue(inserted)
except Exception as e:
logger.error("store_device with device_id=%s(%r) user_id=%s(%r)"
@@ -93,6 +106,7 @@ class DeviceStore(SQLBaseStore):
desc="get_device",
)
+ @defer.inlineCallbacks
def delete_device(self, user_id, device_id):
"""Delete a device.
@@ -102,12 +116,15 @@ class DeviceStore(SQLBaseStore):
Returns:
defer.Deferred
"""
- return self._simple_delete_one(
+ yield self._simple_delete_one(
table="devices",
keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_device",
)
+ self.device_id_exists_cache.invalidate((user_id, device_id))
+
+ @defer.inlineCallbacks
def delete_devices(self, user_id, device_ids):
"""Deletes several devices.
@@ -117,13 +134,15 @@ class DeviceStore(SQLBaseStore):
Returns:
defer.Deferred
"""
- return self._simple_delete_many(
+ yield self._simple_delete_many(
table="devices",
column="device_id",
iterable=device_ids,
keyvalues={"user_id": user_id},
desc="delete_devices",
)
+ for device_id in device_ids:
+ self.device_id_exists_cache.invalidate((user_id, device_id))
def update_device(self, user_id, device_id, new_display_name=None):
"""Update a device.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 7cbc1470fd..c96dae352d 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -15,6 +15,7 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.util.caches.descriptors import cached
from canonicaljson import encode_canonical_json
import ujson as json
@@ -177,10 +178,14 @@ class EndToEndKeyStore(SQLBaseStore):
for algorithm, key_id, json_bytes in new_keys
],
)
+ txn.call_after(
+ self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
+ )
yield self.runInteraction(
"add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
)
+ @cached(max_entries=10000)
def count_e2e_one_time_keys(self, user_id, device_id):
""" Count the number of one time keys the server has for a device
Returns:
@@ -225,6 +230,9 @@ class EndToEndKeyStore(SQLBaseStore):
)
for user_id, device_id, algorithm, key_id in delete:
txn.execute(sql, (user_id, device_id, algorithm, key_id))
+ txn.call_after(
+ self.count_e2e_one_time_keys.invalidate, (user_id, device_id,)
+ )
return result
return self.runInteraction(
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
@@ -242,3 +250,4 @@ class EndToEndKeyStore(SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_e2e_one_time_keys_by_device"
)
+ self.count_e2e_one_time_keys.invalidate((user_id, device_id,))
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 98707d40ee..2ab44ceaa7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1343,11 +1343,26 @@ class EventsStore(SQLBaseStore):
def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
- def _get_events_from_cache(self, events, allow_rejected):
+ def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
+ """Fetch events from the caches
+
+ Args:
+ events (list(str)): list of event_ids to fetch
+ allow_rejected (bool): Whether to teturn events that were rejected
+ update_metrics (bool): Whether to update the cache hit ratio metrics
+
+ Returns:
+ dict of event_id -> _EventCacheEntry for each event_id in cache. If
+ allow_rejected is `False` then there will still be an entry but it
+ will be `None`
+ """
event_map = {}
for event_id in events:
- ret = self._get_event_cache.get((event_id,), None)
+ ret = self._get_event_cache.get(
+ (event_id,), None,
+ update_metrics=update_metrics,
+ )
if not ret:
continue
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 353a135c4e..0a819d32c5 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -16,6 +16,7 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.push.baserules import list_with_base_rules
+from synapse.api.constants import EventTypes
from twisted.internet import defer
import logging
@@ -184,11 +185,23 @@ class PushRuleStore(SQLBaseStore):
if uid in local_users_in_room:
user_ids.add(uid)
+ forgotten = yield self.who_forgot_in_room(
+ event.room_id, on_invalidate=cache_context.invalidate,
+ )
+
+ for row in forgotten:
+ user_id = row["user_id"]
+ event_id = row["event_id"]
+
+ mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
+ if event_id == mem_id:
+ user_ids.discard(user_id)
+
rules_by_user = yield self.bulk_get_push_rules(
user_ids, on_invalidate=cache_context.invalidate,
)
- rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None}
+ rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
defer.returnValue(rules_by_user)
@@ -398,8 +411,7 @@ class PushRuleStore(SQLBaseStore):
with self._push_rules_stream_id_gen.get_next() as ids:
stream_id, event_stream_ordering = ids
yield self.runInteraction(
- "delete_push_rule", delete_push_rule_txn, stream_id,
- event_stream_ordering,
+ "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering
)
@defer.inlineCallbacks
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index ad3c9b06d9..2fa20bd87c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -421,9 +421,13 @@ class RoomMemberStore(SQLBaseStore):
# We check if we have any of the member event ids in the event cache
# before we ask the DB
+ # We don't update the event cache hit ratio as it completely throws off
+ # the hit ratio counts. After all, we don't populate the cache if we
+ # miss it here
event_map = self._get_events_from_cache(
member_event_ids,
allow_rejected=False,
+ update_metrics=False,
)
missing_member_event_ids = []
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index aa182eeac7..48dcbafeef 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -96,7 +96,7 @@ class Cache(object):
"Cache objects can only be accessed from the main thread"
)
- def get(self, key, default=_CacheSentinel, callback=None):
+ def get(self, key, default=_CacheSentinel, callback=None, update_metrics=True):
"""Looks the key up in the caches.
Args:
@@ -104,6 +104,7 @@ class Cache(object):
default: What is returned if key is not in the caches. If not
specified then function throws KeyError instead
callback(fn): Gets called when the entry in the cache is invalidated
+ update_metrics (bool): whether to update the cache hit rate metrics
Returns:
Either a Deferred or the raw result
@@ -113,7 +114,8 @@ class Cache(object):
if val is not _CacheSentinel:
if val.sequence == self.sequence:
val.callbacks.update(callbacks)
- self.metrics.inc_hits()
+ if update_metrics:
+ self.metrics.inc_hits()
return val.deferred
val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
@@ -121,7 +123,8 @@ class Cache(object):
self.metrics.inc_hits()
return val
- self.metrics.inc_misses()
+ if update_metrics:
+ self.metrics.inc_misses()
if default is _CacheSentinel:
raise KeyError()
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 5590b866ed..c4dd9ae2c7 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -189,6 +189,25 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
@defer.inlineCallbacks
+def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context):
+ user_ids = set(u[0] for u in user_tuples)
+ event_id_to_state = {}
+ for event_id, context in event_id_to_context.items():
+ state = yield store.get_events([
+ e_id
+ for key, e_id in context.current_state_ids.iteritems()
+ if key == (EventTypes.RoomHistoryVisibility, "")
+ or (key[0] == EventTypes.Member and key[1] in user_ids)
+ ])
+ event_id_to_state[event_id] = state
+
+ res = yield filter_events_for_clients(
+ store, user_tuples, events, event_id_to_state
+ )
+ defer.returnValue(res)
+
+
+@defer.inlineCallbacks
def filter_events_for_client(store, user_id, events, is_peeking=False):
"""
Check which events a user is allowed to see
|